socketserver模块写的一个简单ftp程序

一坨需求...

  1. 用户加密认证
  2. 允许同时多用户登录
  3. 每个用户有自己的家目录 ,且只能访问自己的家目录
  4. 对用户进行磁盘配额,每个用户的可用空间不同
  5. 允许用户在ftp server上随意切换目录 (cd)
  6. 允许用户查看当前目录下文件 (ls)
  7. 允许上传和下载文件,保证文件一致性(get put)
  8. 文件传输过程中显示进度条
  9. 支持文件的断点续传

程序实现

1、README

 ### 作者信息
姓名: hexm
email: xiaoming.unix@gmail.com ### 实现功能
用户加密认证
允许同时多用户登录
每个用户有自己的家目录 ,且只能访问自己的家目录
对用户进行磁盘配额,每个用户的可用空间不同
允许用户在ftp server上随意切换目录 (cd)
允许用户查看当前目录下文件 (ls)
允许上传和下载文件,保证文件一致性(get put)
文件传输过程中显示进度条
支持文件的断点续传 ### 代码目录树
ftpserver/
├── bin
│   └── ftpserver.py
├── client
│   └── ftp.py
├── db
│   └── user.db
├── fstab
├── inittab
├── lib
│   ├── common.py
│   ├── __pycache__
│   │   ├── common.cpython-35.pyc
│   │   └── user_lib.cpython-35.pyc
│   └── user_lib.py
├── src
│   ├── ftpserver.py
│   └── __pycache__
│   └── ftpserver.cpython-35.pyc
└── yum.conf ### 功能介绍
默认有一个用户,hexm,密码也是hexm,家目录为/tmp,限额1M,
pwd 查看用户家目录
cd 进入家目录其他目录,并且不能进入其他目录
ls 列出家目录下内容
put 上传文件 支持断点续传
get 下载文件 支持断点续传
bye 退出 ### 操作步骤
* 启动服务端
python3 ftpserver/bin/ftpserver.py &
* 启动客户端
python3 ftpserver/client/ftp.py hexm@127.0.0.1 21
>> 输入ftp密码:hexm
认证成功 * ls 列出当前目录内容
列出当前目录内容
ftp> ls
etc/ fstab inittab pip-53mpg1m2-unpack/ pip-poq_f3ob-unpack/ yum.conf
列出某一目录内容
ftp> ls etc
fstab group inittab
ftp> ls /etc
fstab group inittab
显示多个文件详细信息
ftp> ls -l yum.conf fstab
-rw-r--r-- 1 root root 970 11月 20 11:46 /tmp/yum.conf
-rw-r--r-- 1 root root 396 11月 19 19:44 /tmp/fstab * cd 进入其他目录
查看当前目录
ftp> pwd
/tmp
进入家目录中/etc目录
ftp> cd /etc
ftp> pwd
/tmp/etc
返回上级目录
ftp> cd ..
ftp> pwd
/tmp * 上传文件 支持断点续传
上传bash文件,因为家目录限额为1M,不能上传
ftp> put /bin/bash
磁盘空间不足
上传文件不存在
ftp> put /bin/ifstat
文件不存在
正常上传
ftp> put /etc/fstab
开始上传[/etc/fstab]
====================================================================================================>100% * 下载文件,支持断点续传
发送并校验成功
ftp> get yum.conf
====================================================================================================>100%
* 退出
ftp> bye

2、目录树

socketserver模块写的一个简单ftp程序

3、bin/ftpserver.py

 #!/usr/bin/env python
# coding=utf-8 import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src import ftpserver as src_server if __name__ == '__main__':
src_server.run()

4、src/ftpserver.py

 #!/bin/python
# coding=utf-8 import socketserver
import json
import os
import subprocess
import re os.sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lib.user_lib import User
from lib.common import bytes_dumps, str_loads, bytes_encoding, str_encoding, getdirsize class FtpServer(socketserver.BaseRequestHandler): def handle(self):
self.count = 0
while True:
recv_data = self.request.recv(1024)
if len(recv_data) == 0: break
task_data = str_loads(recv_data)
task_action = task_data.get('action')
# 根据客户端命令执行操作
if hasattr(self, task_action):
func = getattr(self, task_action)
func(task_data) def auth(self, *args, **kwargs):
"""
验证用户合法性
param auth_user: 用户名
param auth_pwd: 密码
param ret: 是否验证成功, 结果为True或False
param self.home_path: 用户家目录
param auth_pass_msg: 发送给客户端是否验证成功的信息
"""
auth_user = args[0].get('user')
auth_pwd = args[0].get('pwd')
auth = User(auth_user, auth_pwd)
ret = auth.login()
if ret: # 认证成功获取用户家目录
auth_pass_msg = {"status": "True"}
self.home_path = auth.home_path()
self.limit_home = auth.limit_home()
else:
auth_pass_msg = {"status": "False"}
self.request.send(bytes_dumps(auth_pass_msg)) def put(self, *args, **kwargs):
file_size = args[0].get('file_size')
file_name = args[0].get('file_name')
md5sum = args[0].get('md5sum')
abs_file = os.path.join(self.home_path, file_name)
if os.path.isfile(abs_file): # 上传的文件存在
if self._md5sum(abs_file) == md5sum: #上传的文件md5和本地的一致,不用重新上传
status_msg = {"status": "same"}
self.request.send(bytes_dumps(status_msg))
else: # 不一致断点续传
server_file_size = os.stat(abs_file).st_size
status_msg = {"status": "add", "from_size": server_file_size}
self.request.send(bytes_dumps(status_msg))
with open(abs_file, 'ab') as f:
recv_size = server_file_size
while recv_size < file_size:
data = self.request.recv(4096)
f.write(data)
recv_size += len(data)
else: #上传文件不存在,开始写入
# 上传的文件大小加上家目录大小 大于 限制的家目录大小,不允许上传
if getdirsize(self.home_path) + int(file_size) > int(self.limit_home.strip('M')) * 1024 * 1024:
status_msg = {"status": "less"}
self.request.send(bytes_dumps(status_msg))
else:
status_msg = {"status": "True"}
self.request.send(bytes_dumps(status_msg))
with open(abs_file, 'wb') as f:
recv_size = 0
while recv_size < file_size:
data = self.request.recv(4096)
f.write(data)
recv_size += len(data)
request_client_md5 = {"option": "md5"}
self.request.send(bytes_dumps(request_client_md5))
md5sum = self._md5sum(abs_file)
client_md5 = str_loads(self.request.recv(1024))
if client_md5['md5'] == md5sum:
check_msg = {"status": "True"}
else:
check_msg = {"status": "False"}
self.request.send(bytes_dumps(check_msg)) @staticmethod
def _md5sum(filename):
cmd = 'md5sum %s' % filename
ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
md5sum = ret.stdout.read().split()[0]
return str(md5sum, encoding='utf-8') def get(self, *args, **kwargs):
file_name = args[0].get('file_name')
abs_filename = os.path.join(self.home_path, file_name)
if os.path.isfile(abs_filename): # 文件存在的话,向客户端发送此文件状态信息
file_size = os.stat(abs_filename).st_size
md5sum = self._md5sum(abs_filename)
status_msg = {"status": "True", "file_size": file_size, "md5sum": md5sum}
self.request.send(bytes_dumps(status_msg)) # 客户端接收状态信息后,接收客户端回复信息
client_ack_msg = self.request.recv(1024)
client_ack_msg = str_loads(client_ack_msg)
if client_ack_msg["status"] == 'True': # 客户端没有这个文件,完整下载
with open(abs_filename, 'rb') as f:
while True:
filedata = f.read(4096)
if not filedata: break
self.request.send(filedata)
client_request_md5 = str_loads(self.request.recv(1024))
if client_request_md5['option'] == 'md5':
md5sum = self._md5sum(abs_filename)
md5_msg = {"md5": md5sum}
self.request.send(bytes_dumps(md5_msg))
elif client_ack_msg['status'] == 'same': # 客户端和服务端文件相同
pass
elif client_ack_msg['status'] == 'add': # 客户端有文件但不完整
from_size = int(client_ack_msg['from_size'])
with open(abs_filename, 'rb') as f:
f.seek(from_size)
while True:
filedata = f.read(4096)
if not filedata: break
self.request.send(filedata)
else:
status_msg = {"status": "False"}
self.request.send(bytes_dumps(status_msg)) def cd(self, *args, **kwargs):
if self.count == 0:
self.tmp_home_path = self.home_path cd_dir = args[0].get('cd_dir')
if cd_dir == '/':
self.home_path = self.tmp_home_path
status = {"status": "True"}
elif cd_dir.startswith('/'):
cd_dir = cd_dir[1:]
if os.path.isdir(os.path.join(self.home_path, cd_dir)):
self.home_path = os.path.join(self.home_path, cd_dir)
status = {"status": "True"}
else:
status = {"status": "False"}
elif cd_dir == '':
self.home_path = self.tmp_home_path
status = {"status": "True"}
else:
if os.path.isdir(os.path.join(self.home_path, cd_dir)):
self.home_path = os.path.join(self.home_path, cd_dir)
status = {"status": "True"}
else:
status = {"status": "False"}
status = bytes_dumps(status)
self.request.send(status) self.count += 1 def pwd(self, *args, **kwargs):
cmd = args[0].get('action')
res = subprocess.Popen(cmd, shell=True, cwd=self.home_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd_res = res.stdout.read()
cmd_res_len = len(cmd_res)
status_msg = {"status": "ready", "len_ret": cmd_res_len}
self.request.send(bytes_dumps(status_msg))
ack_msg = self.request.recv(1024)
if str_loads(ack_msg).get('status') == 'yes':
self.request.send(cmd_res) def ls(self, *args, **kwargs):
cmd = args[0].get('action')
args = args[0].get('args')
if len(args) == 0:
fnames = args = ''
else:
try:
args, fnames = re.search('(.*-\w+ ?)(.*)', args).groups()
except AttributeError as e:
fnames = args
args = ''
cmd_res = ''
if len(fnames.strip().split()) > 1:
for i in fnames.strip().split():
res = subprocess.Popen(cmd + ' ' + args + ' ' + self.home_path + '/' + i, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd_res += str(res.stdout.read(), encoding='utf-8')
cmd_res = bytes(cmd_res, encoding='utf-8')
else:
res = subprocess.Popen(cmd + ' ' + args + ' ' + self.home_path + '/' + fnames, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd_res = res.stdout.read()
if '-l' not in args:
cmd_res = str(cmd_res, encoding='utf-8')
tmp = []
for res in cmd_res.strip().split():
if os.path.isdir(os.path.join(self.home_path, res)):
res = res + '/'
tmp.append(res)
cmd_res = ' '.join(tmp)
cmd_res = bytes(cmd_res, encoding='utf-8') cmd_res_len = len(cmd_res)
if not cmd_res:
cmd_res = res.stderr.read()
cmd_res_len = len(cmd_res)
status_msg = {"status": "ready", "len_ret": cmd_res_len}
self.request.send(bytes_dumps(status_msg))
ack_msg = self.request.recv(1024)
if str_loads(ack_msg).get('status') == 'yes':
self.request.send(cmd_res) def _bytes_dumps(self, msg_data):
return bytes(json.dumps(msg_data), encoding='utf-8') def _str_loads(self, msg_data):
return json.loads(str(msg_data, encoding='utf-8')) def main():
server = socketserver.ThreadingTCPServer(('0.0.0.0', 21), FtpServer)
server.serve_forever() def run():
main() if __name__ == '__main__':
run()

5、client/ftp.py

 #!/usr/bin/env python
# coding=utf-8 import subprocess
import socket
import json
import sys
import os os.sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lib.common import bytes_dumps, str_loads, bytes_encoding, str_encoding, view_bar class FtpClient(object): def __init__(self, fuser, fpwd, fip, fport=21):
self.fuser = fuser
self.fpwd = fpwd
self.fip = fip
self.fport = int(fport)
self.s = socket.socket() def connect_socket(self):
self.s.connect((self.fip, self.fport)) def auth(self):
auth_msg = {"action": "auth", "user": self.fuser, "pwd": self.fpwd}
self.s.send(bytes(json.dumps(auth_msg), encoding='utf-8'))
server_ack_msg = self.s.recv(1024)
ack_data = json.loads(server_ack_msg.decode())
if ack_data['status'] == 'True':
return True
else:
return False def send_data(self):
while True:
send_data = input('ftp> ')
if len(send_data) == 0:
continue
cmd_list = send_data.strip().split()
if cmd_list == 0:
continue
cmd = cmd_list[0]
if hasattr(self, cmd):
func = getattr(self, cmd)
func(cmd_list)
else:
print('--暂时不支持[%s]命令' % cmd) def put(self, *args):
cmd = args[0][0]
abs_filename = args[0][-1]
if os.path.islink(abs_filename):
print('符号链接不能上传')
elif os.path.isfile(abs_filename):
file_size = os.stat(abs_filename).st_size
file_name = os.path.basename(abs_filename)
md5sum = self._md5sum(abs_filename)
msg_data = {"action": cmd, "file_name": file_name, "file_size":file_size, "md5sum": md5sum}
self.s.send(bytes_dumps(msg_data))
server_ack_msg = self.s.recv(1024)
ack_data = str_loads(server_ack_msg)
if ack_data['status'] == 'True':
print('开始上传[%s]' % abs_filename)
with open(abs_filename, 'rb') as f:
send_size = 0
while True:
filedata = f.read(8192)
send_size += len(filedata)
if not filedata: break
self.s.send(filedata)
view_bar(send_size, file_size)
sys.stdout.write('\n')
server_request_md5 = str_loads(self.s.recv(1024))
if server_request_md5['option'] == 'md5':
md5sum = self._md5sum(abs_filename)
md5_msg = {"md5": md5sum}
self.s.send(bytes_dumps(md5_msg))
md5_status = str_loads(self.s.recv(1024))
if md5_status['status'] == 'True':
print('发送并校验成功')
else:
print('上传文件失败')
elif ack_data['status'] == 'add':
with open(abs_filename, 'rb') as f:
send_size = f.seek(ack_data['from_size'])
print('开始上传[%s]' % abs_filename)
while True:
filedata = f.read(8192)
send_size += len(filedata)
if not filedata: break
self.s.send(filedata)
view_bar(send_size, file_size)
sys.stdout.write('\n')
print('增量上传成功')
elif ack_data['status'] == 'same':
print('开始上传[%s]' % abs_filename)
print('====================================================================================================>100%')
print('发送并校验成功')
elif ack_data['status'] == 'less':
print('磁盘空间不足')
else:
print('文件不存在') @staticmethod
def _md5sum(filename):
cmd = 'md5sum %s' % filename
ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
md5sum = ret.stdout.read().split()[0]
return str(md5sum, encoding='utf-8') def get(self, *args, **kwargs):
cmd = args[0][0]
abs_filename = args[0][-1]
file_name = os.path.basename(abs_filename)
msg_data = {"action": cmd, "file_name": file_name} self.s.send(bytes_dumps(msg_data)) server_status_msg = self.s.recv(1024)
status_msg = str_loads(server_status_msg) if status_msg['status'] == 'True': # 如果要下载的文件在服务端存在
file_size = status_msg['file_size'] if os.path.exists(file_name):
if self._md5sum(file_name) == status_msg['md5sum']: # 本地文件和服务端相同,不用重新下载
ack_msg = {"status": "same"}
self.s.send(bytes_dumps(ack_msg))
print('====================================================================================================>100%')
print('下载并校验成功')
else: # 文件存在,但校验和不同,说明没有下载完全
local_file_size = os.stat(abs_filename).st_size
ack_msg = {"status": "add", "from_size":local_file_size}
self.s.send(bytes_dumps(ack_msg))
with open(file_name, 'ab') as f:
recv_size = local_file_size
while recv_size < file_size:
data = self.s.recv(4096)
f.write(data)
recv_size += len(data)
view_bar(recv_size, file_size)
sys.stdout.write('\n')
else: # 本地没有这个文件,可以下载
ack_msg = {"status": "True"}
self.s.send(bytes_dumps(ack_msg))
with open(file_name, 'wb') as f:
recv_size = 0
while recv_size < file_size:
data = self.s.recv(4096)
f.write(data)
recv_size += len(data)
view_bar(recv_size, file_size)
sys.stdout.write('\n')
request_server_md5 = {"option": "md5"}
self.s.send(bytes_dumps(request_server_md5))
md5sum = self._md5sum(file_name)
server_md5 = str_loads(self.s.recv(1024))
if server_md5['md5'] == md5sum:
print('下载并校验成功')
else:
print('校验失败')
else:
print('服务端没有这个文件') def pwd(self, *args, **kwargs):
if len(args[0]) != 1:
print('错误的参数')
else:
cmd = args[0][0]
msg_data = {"action": cmd}
self.s.send(bytes_dumps(msg_data))
server_ack_msg = self.s.recv(1024)
ack_data = str_loads(server_ack_msg)
if ack_data['status'] == 'ready':
ack_msg = {"status": "yes"}
self.s.send(bytes_dumps(ack_msg))
cmd_recv_size = 0
cmd_ret = b''
while cmd_recv_size < ack_data['len_ret']:
cmd_recv = self.s.recv(1024)
cmd_ret += cmd_recv
cmd_recv_size += len(cmd_recv)
print(str(cmd_ret, encoding='utf-8')) def cd(self, *args, **kwargs):
if len(args[0]) > 2:
print('错误的参数')
elif len(args[0]) == 1:
cmd = args[0][0]
cd_dir = ''
else:
cmd = args[0][0]
cd_dir = args[0][1]
msg_data = {"action": cmd, "cd_dir": cd_dir}
self.s.send(bytes_dumps(msg_data))
status = self.s.recv(1024)
status = str_loads(status)
if status['status'] == 'False':
print('目录不存在') def bye(self, *args, **kwargs):
sys.exit(0) def ls(self, *args):
cmd = args[0][0]
args = ' '.join(args[0][1:])
msg_data = {"action": cmd, "args": args}
self.s.send(bytes_dumps(msg_data))
server_ack_msg = self.s.recv(1024)
ack_data = str_loads(server_ack_msg)
if ack_data['status'] == 'ready':
ack_msg = {"status": "yes"}
self.s.send(bytes_dumps(ack_msg))
cmd_recv_size = 0
cmd_ret = b''
while cmd_recv_size < ack_data['len_ret']:
cmd_recv = self.s.recv(1024)
cmd_ret += cmd_recv
cmd_recv_size += len(cmd_recv)
print(str(cmd_ret, encoding='utf-8')) def help(self, *args, **kwargs):
print('--支持如下命令: ls, cd, pwd, get, put, bye') def main(): user, ip = sys.argv[1].strip().split('@')
port = sys.argv[2]
pwd = input('>> 输入ftp密码:').strip() ftp = FtpClient(user, pwd, ip, port)
ftp.connect_socket()
ret = ftp.auth()
if ret:
print('认证成功')
ftp.send_data()
else:
print('认证失败')
ftp.bye()
ftp.send_data() def run():
main() if __name__ == '__main__':
run()

6、lib/common.py

#!/usr/bin/env python
# coding=utf-8 import json
import sys
import os def bytes_dumps( msg_data):
return bytes(json.dumps(msg_data), encoding='utf-8') def str_loads(msg_data):
return json.loads(str(msg_data, encoding='utf-8')) def bytes_encoding(msg_data):
return bytes(msg_data, encoding='utf-8') def str_encoding(msg_data):
return str(msg_data, encoding='utf-8') def view_bar(num, total):
rate = num / total
rate_num = int(rate * 100)
r = '\r%s>%d%%' % ("="*rate_num, rate_num, )
sys.stdout.write(r)
sys.stdout.flush() def getdirsize(dir):
size = 0
for root, dirs, files in os.walk(dir):
size += sum([os.path.getsize(os.path.join(root, name)) for name in files])
return size

7、lib/user_lib.py

#!/usr/bin/env python
# coding=utf-8 import hashlib
import re
import os BASEPATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
USERDB = os.path.join(BASEPATH, 'db', 'user.db') class User(object):
"""
实现登陆注册功能
""" def __init__(self, user, password):
self.username = user
self.password = password @staticmethod
def check(match, filedb):
"""
逐行匹配,查看是否用户名已经注册,不允许重复用户名
"""
with open(filedb, 'r') as fuser:
for line in fuser:
if re.match(match + '\\b', line):
return False
else:
continue
return True @staticmethod
def login_check(username, password, filedb):
"""
登陆验证
"""
with open(filedb, 'r') as fuser:
for line in fuser:
user, pwd, _, _ = line.strip().split()
if user == username and pwd == password:
return True
else:
continue
return False def home_path(self):
"""
返回用户家目录
"""
with open(USERDB, 'r') as fpath:
for line in fpath:
if line.startswith(self.username):
return line.strip().split()[-2]
else:
continue
return False def limit_home(self):
"""
返回用户家目录限额
"""
with open(USERDB, 'r') as flimit:
for line in flimit:
if line.startswith(self.username):
return line.strip().split()[-1]
else:
continue
return False def register(self):
"""
注册用户
"""
passobj = hashlib.md5(bytes(self.password, encoding='utf-8'))
passobj.update(bytes(self.password, encoding='utf-8'))
secure_password = passobj.hexdigest() if self.check(self.username, USERDB):
with open(USERDB, 'a') as fuser:
fuser.write(self.username + ' ' + secure_password + '\n')
return True
else:
print('用户名已存在') def login(self):
"""
用户登陆
"""
passobj = hashlib.md5(bytes(self.password, encoding='utf-8'))
passobj.update(bytes(self.password, encoding='utf-8'))
secure_password = passobj.hexdigest()
ret = self.login_check(self.username, secure_password, USERDB)
if ret:
return True
else:
return False def modify(self):
"""
修改密码
"""
pass

8、db/user.db

hexm 92df47e9074c048e0afe84ce0a5c407d /tmp 1M
xm 92df47e9074c048e0afe84ce0a5c407d /tmp/sb 5M
上一篇:MySQL 字符集和校对


下一篇:Client tried to access password protected page without proper authorization (status code 401) 无法发布SceneService的解决方法