通过SSHClient 执行命令
"""通过用户名密码验证""" import paramiko # 创建 SSH 对象
ssh = paramiko.SSHClient()
# 自动添加key到 known_hosts
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname='10.211.55.5', port=22, username='root', password='')
# 执行命令
stdin, stdout, stderr = ssh.exec_command('ls')
# 获取命令结果
result = stdout.readlines()
print(result)
# 关闭连接
ssh.close()
"""通过秘钥验证""" import paramiko private_key = paramiko.RSAKey.from_private_key_file('/Users/wenchong/.ssh/id_rsa') ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 使用秘钥连接
ssh.connect(hostname='10.211.55.5', port=22, username='root', pkey=private_key)
stdin, stdout, stderr = ssh.exec_command('ls')
result = stdout.readlines()
print(result)
ssh.close()
"""SSHClient 封装 Transport""" import paramiko transport = paramiko.Transport(('10.211.55.5', 22))
# 通过密码验证
# transport.connect(username='root', password='111111') # 通过秘钥验证
private_key = paramiko.RSAKey.from_private_key_file('/Users/wenchong/.ssh/id_rsa')
transport.connect(username='root', pkey=private_key) ssh = paramiko.SSHClient()
ssh._transport = transport stdin, stdout, stderr = ssh.exec_command('ls')
result = stdout.readlines()
print(result)
transport.close()
通过 SFTPClient 上传下载文件
"""上传下载文件""" import paramiko transport = paramiko.Transport(('10.211.55.5', 22))
# 通过密码验证
# transport.connect(username='root', password='111111') # 通过秘钥验证
private_key = paramiko.RSAKey.from_private_key_file('/Users/wenchong/.ssh/id_rsa')
transport.connect(username='root', pkey=private_key) sftp = paramiko.SFTPClient.from_transport(transport)
# 将本地的 /tmp/local.py 上传到服务器,并重命名为 /tmp/remote.py
sftp.put('/tmp/local.py', '/tmp/remote.py')
# 从服务器的 /tmp/remote.py 下载文件到本地,并重命名为 /tmp/local.py
sftp.get('/tmp/remote.py', '/tmp/local.py') transport.close()
堡垒机的实现
1、在所有的服务器上创建账号,用户通过堡垒机管理该服务器
2、用户使用用户名密码登陆到堡垒机,并根据登陆的用户信息,在数据库中查找用户可管理的主机列表
3、用户选择服务器,并自动登陆
4、用户操作,并记录操作日志
在用户的 .bashrc 文件的最后一行执行该脚本,并在执行完成后退出shell。
echo -e "python3.5 demo.py\nlogout" .bashrc
# /user/bin/env python
__author__ = 'wenchong' """demo.py""" import getpass
import paramiko
import os
import sys
import socket
import logging from paramiko.py3compat import u # windows系统无 termios 模块
try:
import termios
import tty
has_termios = True
except ImportError:
has_termios = False # 成功登陆的用户名
USERNAME = None def log_write(msg):
"""记录日志"""
fh = logging.FileHandler(filename='log', mode='a', encoding='utf-8')
fmt = logging.Formatter(fmt='%(asctime)s - {} - %(name)s - %(message)s'.format(USERNAME))
fh.setFormatter(fmt)
logger = logging.Logger("Command", level=logging.DEBUG)
logger.addHandler(fh)
logger.info(msg) def interactive_shell(channel):
"""启动shell"""
if has_termios:
posix_shell(channel)
else:
windows_shell(channel) def login():
"""模拟登陆堡垒机"""
while True:
username = input("Username: ")
password = getpass.getpass("Password: ") if (username == 'wen' and password == '') or (username == 'chong' and password == ''):
global USERNAME
USERNAME = username
return username
else:
print("Username or Password is error. Please try again.") def select_host(username):
"""根据登陆的用户名列出主机并选择"""
hosts = {
'wen': [
'10.211.55.5',
'192.168.165.130',
],
'chong': [
'10.211.55.6',
'10.211.55.7',
]
} hosts_list = hosts.get(username) for index, host in enumerate(hosts_list, 1):
print(index, host) while True:
try:
user_input = input("Please select: ")
host = hosts_list[int(user_input) - 1]
return host
except KeyboardInterrupt as e:
exit("\n")
except Exception as e:
continue def posix_shell(channel):
"""启用 Linux shell"""
import select # 获取之前的 tty
fd = sys.stdin.fileno()
oldtty = termios.tcgetattr(fd) try:
tty.setraw(fd)
tty.setcbreak(fd) channel.settimeout(0.0) command_list = []
tab_flag = False while True:
r_list, w_list, e_list = select.select([channel, sys.stdin], [], [], 1)
if channel in r_list:
try:
x = u(channel.recv(1024))
if len(x) == 0:
print("\r\n*** EOF\r\n")
break # 输入 tab 后的返回值如果不换行则记录为命令[补全命令]
if tab_flag:
if not x.startswith("\r\n"):
command_list.append(x)
tab_flag = False sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass if sys.stdin in r_list:
x = sys.stdin.read(1) if len(x) == 0:
break # 用户输入 tab 键
if x == "\t":
tab_flag = True
else:
command_list.append(x) if x == '\r':
command = ''.join(command_list)
# 发送的命令为空,即只有回车时忽略记录日志
if command != '\r':
log_write(command)
command_list.clear() channel.sendall(x) finally:
# 恢复之前的 tty
termios.tcsetattr(fd, termios.TCSADRAIN, oldtty) def windows_shell(channel):
"""windows shell, 未验证"""
import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock):
while True:
data = sock.recv(256)
if not data:
sys.stdout.write('\r\n*** EOF ***\r\n\r\n')
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush() writer = threading.Thread(target=writeall, args=(channel,))
writer.start() try:
while True:
d = sys.stdin.read(1)
if not d:
break
channel.send(d)
except EOFError:
# user hit ^Z or F6
pass def login_server(host):
"""通过 key 认证登陆到远程服务器"""
transport = paramiko.Transport((host, 22))
transport.start_client() default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
private_key = paramiko.RSAKey.from_private_key_file(default_path)
transport.auth_publickey(username='root', key=private_key) # 打开一个通道
channel = transport.open_session()
# 获取一个终端
channel.get_pty()
# 激活器
channel.invoke_shell() return channel def main():
username = login()
if username:
host = select_host(username) channel = login_server(host) interactive_shell(channel) if __name__ == '__main__': main()
堡垒机脚本 demo.py