项目名:多用户在线FTP程序
一、需求
1、用户加密认证
2、允许同时多用户登录
3、每个用户有自己的家目录 ,且只能访问自己的家目录
4、对用户进行磁盘配额,每个用户的可用空间不同
5、允许用户在ftp server上随意切换目录
6、允许用户查看当前目录下文件
7、允许上传和下载文件,保证文件一致性
8、文件传输过程中显示进度条
9、支持文件的断点续传
二、代码结构
服务端:
客户端:
具体代码:
服务端:
server.py
# -*- coding: utf-8 -*-
import socket, os, json, re, struct, threading, time
from lib import commons
from conf import settings
from core import logger class Server(object):
def __init__(self):
self.init_dir()
self.sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
# self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((settings.server_bind_ip,settings.server_bind_port))
self.sock.listen(settings.server_listen)
print("\033[42;1mserver started sucessful!\033[0m")
self.run() @staticmethod
def init_dir():
if not os.path.exists(os.path.join(settings.base_path, 'logs')):os.mkdir(os.path.join(settings.base_path, 'logs'))
if not os.path.exists(os.path.join(settings.base_path, 'db')):os.mkdir(os.path.join(settings.base_path, 'db'))
if not os.path.exists(os.path.join(settings.base_path, 'home')):os.mkdir(os.path.join(settings.base_path, 'home')) def run(self):
while True:#链接循环
self.conn, self.cli_addr = self.sock.accept()
# self.conn.settimeout(300)
thread = threading.Thread(target=TCPHandler.handle, args=(TCPHandler(), self.conn, self.cli_addr))
thread.start() class TCPHandler(object): STATUS_CODE = {
200 : 'Passed authentication!',
201 : 'Wrong username or password!',
202 : 'Username does not exist!',
300 : 'cmd successful , the target path be returned in returnPath',
301 : 'cmd format error!',
302 : 'The path or file could not be found!',
303 : 'The dir is exist',
304 : 'The file has been downloaded or the size of the file is exceptions',
305 : 'Free space is not enough',
401 : 'File MD5 inspection failed',
400 : 'File MD5 inspection success',
} def __init__(self):
self.server_logger = logger.logger('server')
self.server_logger.debug("server TCPHandler started successful!") def handle(self, request, address):
self.conn = request
self.cli_addr = address
self.server_logger.info('client[%s:%s] is conecting'% (self.cli_addr[0], self.cli_addr[1]))
print('client[%s:%s] is conecting'% (self.cli_addr[0], self.cli_addr[1]))
while True:#通讯循环
try:
#1、接收客户端的ftp命令
header_dic, req_dic = self.recv_request()
if not header_dic:break
if not header_dic['cmd']:break
print('收到客户端ftp指令:%s'%header_dic['cmd'])
#2、解析ftp命令,获取相应命令参数(文件名)
cmds = header_dic['cmd'].split()#['register',]、['get', 'a.txt']
if hasattr(self, cmds[0]):
self.server_logger.info('interface:[%s], request:{client:[%s:%s] action:[%s]}'% (
cmds[0], self.cli_addr[0], self.cli_addr[1], header_dic['cmd']))
getattr(self, cmds[0])(header_dic, req_dic)
except (ConnectionResetError, ConnectionAbortedError):break
except socket.timeout:
print('time out %s'%((self.cli_addr,)))
break
self.conn.close()
self.server_logger.info('client %s is disconect'% ((self.cli_addr,)))
print('client[%s:%s] is disconect'% (self.cli_addr[0], self.cli_addr[1])) def unpack_header(self):
try:
pack_obj = self.conn.recv(4)
header_size = struct.unpack('i', pack_obj)[0]
header_bytes = self.conn.recv(header_size)
header_json = header_bytes.decode('utf-8')
header_dic = json.loads(header_json)
return header_dic
except struct.error:#避免客户端发送错误格式的header_size
return def unpack_info(self, info_size):
recv_size = 0
info_bytes = b''
while recv_size < info_size:
res = self.conn.recv(1024)
info_bytes += res
recv_size += len(res)
info_json = info_bytes.decode('utf-8')
info_dic = json.loads(info_json)#{'username':ton, 'password':123}
info_md5 = commons.getStrsMd5(info_bytes)
return info_dic, info_md5 def recv_request(self):
header_dic = self.unpack_header()#{'cmd':'register','info_size':0}
if not header_dic:return None, None
req_dic, info_md5 = self.unpack_info(header_dic['info_size'])
if header_dic.get('md5'):
#校检请求内容md5一致性
if info_md5 == header_dic['md5']:
print('\033[42;1m请求内容md5校检结果一致\033[0m')
else:
print('\033[31;1m请求内容md5校检结果不一致\033[0m')
return header_dic, req_dic def response(self, **kwargs):
rsp_info = kwargs
rsp_bytes = commons.getDictBytes(rsp_info)
md5 = commons.getStrsMd5(rsp_bytes)
header_size_pack, header_bytes = commons.make_header(info_size=len(rsp_bytes),md5=md5)
self.conn.sendall(header_size_pack)
self.conn.sendall(header_bytes)
self.conn.sendall(rsp_bytes) def register(self, header_dic, req_dic):#{'cmd':'register','info_size':0,'resultCode':0,'resultDesc':None}
username = req_dic['user_info']['username']
#更新数据库,并制作响应信息字典
if not os.path.isfile(os.path.join(settings.db_file, '%s.json'%username)):
#更新数据库
user_info = dict()
user_info['username'] = username
user_info['password'] = req_dic['user_info']['password']
user_info['home'] = os.path.join(settings.user_home_dir, username)
user_info['quota'] = settings.user_quota*(1024*1024)
commons.save_to_file(user_info, os.path.join(settings.db_file, '%s.json'%username))
resultCode = 0
resultDesc = None
#创建家目录
if not os.path.exists(os.path.join(settings.user_home_dir, username)):
os.mkdir(os.path.join(settings.user_home_dir, username))
self.server_logger.info('client[%s:%s] 注册用户[%s]成功'% (self.cli_addr[0], self.cli_addr[1], username))
else:
resultCode = 1
resultDesc = '该用户已存在,注册失败'
self.server_logger.warning('client[%s:%s] 注册用户[%s]失败:%s'% (self.cli_addr[0], self.cli_addr[1],
username, resultDesc))
#响应客户端注册请求
self.response(resultCode=resultCode, resultDesc=resultDesc) @staticmethod
def auth(req_dic):
print(req_dic['user_info'])
user_info = None
status_code = 201
try:
req_username = req_dic['user_info']['username']
db_file = os.path.join(settings.db_file, '%s.json'%req_username)
#验证用户名密码,并制作响应信息字典
if not os.path.isfile(db_file):
status_code = 202
else:
with open(db_file, 'r') as f:
user_info_db = json.load(f)
if user_info_db['password'] == req_dic['user_info']['password']:
status_code = 200
user_info = user_info_db
return status_code, user_info
#捕获 客户端鉴权请求时发送一个空字典或错误的字典 的异常
except KeyError:
return 201, user_info def login(self, header_dic, req_dic):
#鉴权
status_code, user_info = self.auth(req_dic)
#响应客户端登陆请求
self.response(user_info=user_info, resultCode=status_code) def query_quota(self, header_dic, req_dic):
used_quota = None
total_quota = None
#鉴权
status_code, user_info = self.auth(req_dic)
#查询配额
if status_code == 200:
used_quota = commons.getFileSize(user_info['home'])
total_quota = user_info['quota']
#响应客户端配额查询请求
self.response(resultCode=status_code, total_quota=total_quota, used_quota=used_quota) @staticmethod
def parse_file_path(req_path, cur_path):
req_path = req_path.replace(r'/', '\\')
req_path = req_path.replace(r'//', r'/',)
req_path = req_path.replace('\\\\', '\\')
req_path = req_path.replace('~\\', '', 1)
req_path = req_path.replace(r'~', '', 1)
req_paths = re.findall(r'[^\\]+', req_path)
cur_paths = re.findall(r'[^\\]+', cur_path)
cur_paths.extend(req_paths)
cur_paths[0] += '\\'
while '.' in cur_paths:
cur_paths.remove('.')
while '..' in cur_paths:
for index,item in enumerate(cur_paths):
if item == '..':
cur_paths.pop(index)
cur_paths.pop(index-1)
break
return cur_paths def cd(self, header_dic, req_dic):
cmds = header_dic['cmd'].split()
#鉴权
status_code, user_info = self.auth(req_dic)
home = os.path.join(settings.user_home_dir, user_info['username'])
#先定义响应信息
returnPath = req_dic['user_info']['cur_path']
if status_code == 200:
if len(cmds) != 1:
#解析cd的真实路径
cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
cd_path = os.path.join('', *self.parse_file_path(cmds[1], cur_path))
print('cd解析后的路径:', cd_path)
if os.path.isdir(cd_path):
if home in cd_path:
resultCode = 300
returnPath = cd_path.replace('%s\\'%settings.user_home_dir, '', 1)
else:
resultCode = 302
else:
resultCode = 302
else:
resultCode = 301
else:
resultCode = 201
#响应客户端的cd命令结果
print('cd发送给客户端的路径:', returnPath)
self.response(resultCode=resultCode, returnPath=returnPath) def ls(self, header_dic, req_dic):
cmds = header_dic['cmd'].split()
#鉴权
status_code, user_info = self.auth(req_dic)
home = os.path.join(settings.user_home_dir, user_info['username'])
#先定义响应信息
returnFilenames = None
if status_code == 200:
if len(cmds) <= 2:
#解析ls的真实路径
cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
if len(cmds) == 2:
ls_path = os.path.join('', *self.parse_file_path(cmds[1], cur_path))
else:
ls_path = cur_path
print('ls解析后的路径:', ls_path)
if os.path.isdir(ls_path):
if home in ls_path:
returnCode, filenames = commons.getFile(ls_path, home)
resultCode = 300
returnFilenames = filenames
else:
resultCode = 302
else:
resultCode = 302
else:
resultCode = 301
else:
resultCode = 201
#响应客户端的ls命令结果
self.response(resultCode=resultCode, returnFilenames=returnFilenames) def rm(self, header_dic, req_dic):
cmds = header_dic['cmd'].split()
#鉴权
status_code, user_info = self.auth(req_dic)
home = os.path.join(settings.user_home_dir, user_info['username'])
#先定义响应信息
if status_code == 200:
if len(cmds) == 2:
#解析rm的真实路径
cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
rm_path = os.path.join('', *self.parse_file_path(os.path.dirname(cmds[1]), cur_path))
rm_file = os.path.join(rm_path, os.path.basename(cmds[1]))
print('rm解析后的文件或文件夹:', rm_file)
if os.path.exists(rm_file):
if home in rm_file:
commons.rmdirs(rm_file)
resultCode = 300
else:
resultCode = 302
else:
resultCode = 302
else:
resultCode = 301
else:
resultCode = 201
#响应客户端的rm命令结果
self.response(resultCode=resultCode) def mkdir(self, header_dic, req_dic):
cmds = header_dic['cmd'].split()
#鉴权
status_code, user_info = self.auth(req_dic)
home = os.path.join(settings.user_home_dir, user_info['username'])
#先定义响应信息
if status_code == 200:
if len(cmds) == 2:
#解析rm的真实路径
cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
mkdir_path = os.path.join('', *self.parse_file_path(cmds[1], cur_path))
print('mkdir解析后的文件夹:', mkdir_path)
if not os.path.isdir(mkdir_path):
if home in mkdir_path:
os.makedirs(mkdir_path)
resultCode = 300
else:
resultCode = 302
else:
resultCode = 303
else:
resultCode = 301
else:
resultCode = 201
#响应客户端的mkdir命令结果
self.response(resultCode=resultCode) def get(self, header_dic, req_dic):
"""客户端下载文件"""
cmds = header_dic['cmd'].split()#['get', 'a.txt', 'download']
get_file = None
#鉴权
status_code, user_info = self.auth(req_dic)
home = os.path.join(settings.user_home_dir, user_info['username'])
#解析断点续传信息
position = 0
if req_dic['resume'] and isinstance(req_dic['position'], int):
position = req_dic['position']
#先定义响应信息
resultCode = 300
FileSize = None
FileMd5 = None
if status_code == 200:
if 1 < len(cmds) < 4:
#解析需要get文件的真实路径
cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
get_file = os.path.join('', *self.parse_file_path(cmds[1], cur_path))
print('get解析后的路径:', get_file)
if os.path.isfile(get_file):
if home in get_file:
FileSize = commons.getFileSize(get_file)
if position >= FileSize != 0:
resultCode = 304
else:
resultCode = 300
FileSize = FileSize
FileMd5 = commons.getFileMd5(get_file)
else:
resultCode = 302
else:
resultCode = 302
else:
resultCode = 301
else:
resultCode = 201
#响应客户端的get命令结果
self.response(resultCode=resultCode, FileSize=FileSize, FileMd5=FileMd5)
if resultCode == 300:
#发送文件数据
with open(get_file, 'rb') as f:
f.seek(position)
for line in f:
self.conn.sendall(line) def put(self, header_dic, req_dic):
cmds = header_dic['cmd'].split()#['put', 'download/a.txt', 'video']
put_file = None
#鉴权
status_code, user_info = self.auth(req_dic)
home = os.path.join(settings.user_home_dir, user_info['username'])
#查询配额
used_quota = commons.getFileSize(user_info['home'])
total_quota = user_info['quota']
#先定义响应信息
if status_code == 200:
if 1 < len(cmds) < 4:
#解析需要put文件的真实路径
cur_path = os.path.join(settings.user_home_dir, req_dic['user_info']['cur_path'])
if len(cmds) == 3:
put_file = os.path.join(os.path.join('', *self.parse_file_path(cmds[2], cur_path)), os.path.basename(cmds[1]))
else:
put_file = os.path.join(cur_path, os.path.basename(cmds[1]))
print('put解析后的文件:', put_file)
put_path = os.path.dirname(put_file)
if os.path.isdir(put_path):
if home in put_path:
if (req_dic['FileSize'] + used_quota) <= total_quota:
resultCode = 300
else:
resultCode = 305
else:
resultCode = 302
else:
resultCode = 302
else:
resultCode = 301
else:
resultCode = 201
#响应客户端的put命令结果
self.response(resultCode=resultCode)
if resultCode == 300:
#接收文件数据,写入文件
recv_size = 0
with open(put_file, 'wb') as f:
while recv_size < req_dic['FileSize']:
file_data = self.conn.recv(1024)
f.write(file_data)
recv_size += len(file_data)
#校检文件md5一致性
if commons.getFileMd5(put_file) == req_dic['FileMd5']:
resultCode = 400
print('\033[42;1m文件md5校检结果一致\033[0m')
print('\033[42;1m文件上传成功,大小:%d,文件名:%s\033[0m'% (req_dic['FileSize'], put_file))
else:
os.remove(put_file)
resultCode = 401
print('\033[31;1m文件md5校检结果不一致\033[0m')
print('\033[42;1m文件上传失败\033[0m')
#返回上传文件是否成功响应
self.response(resultCode=resultCode)
server.py
logger.py
# -*- coding: utf-8 -*-
import logging
from logging import handlers
from conf import settings def logger(log_type):
level = settings.log_level
if level == 'debug':
level = logging.DEBUG
elif level == 'info':
level = logging.INFO
elif level == 'warning':
level = logging.WARNING
elif level == 'error':
level = logging.ERROR
else:
level = logging.CRITICAL
#1.生成logger对象
logger = logging.getLogger(log_type)
logger.setLevel(logging.DEBUG)
#2.生成handler对象
fh = handlers.TimedRotatingFileHandler(filename='%s/logs/%s.log'% (settings.base_path, log_type),
when='D', interval=1, backupCount=3)
# fh = logging.FileHandler(log_file)
fh.setLevel(level)
# ch = logging.StreamHandler()
# ch.setLevel(level)
#2.1 把handler对象绑定到logger
if not logger.handlers:
logger.addHandler(fh)
# logger.addHandler(ch)
#3.生成formatter对象
# f = logging.Formatter(fmt='%(asctime)s %(name)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %I:%M:%S %p')
f = logging.Formatter(fmt='%(asctime)s %(name)s [%(levelname)s] %(message)s', datefmt=None)
#3.1 把formatter对象绑定到handler
fh.setFormatter(f)
# ch.setFormatter(f)
return logger
logger.py
commons.py
# -*- coding: utf-8 -*- import subprocess, hashlib, struct, json, os def save_to_file(info, db_filename):
with open(db_filename, 'w') as f:
json.dump(info, f) def getDictBytes(dic):
dic_json = json.dumps(dic)
dic_json_bytes = dic_json.encode('utf-8')
return dic_json_bytes def make_header(info_size=0, cmd=None, md5=None):
header_dic = {
'cmd':cmd,
'info_size':info_size,
'md5':md5
}
header_json = json.dumps(header_dic)
header_bytes = header_json.encode('utf-8')
header_size_pack = struct.pack('i', len(header_bytes))
return header_size_pack, header_bytes def getFileSize(path, size=0):
"""获取路径下的总大小(字节)
:param path: 文件路径
:param size: 起始大小(字节)
:return:总大小(字节)
"""
if os.path.exists(path):
size = size
try:
if os.path.isdir(path):
for item in os.listdir(path):
items_path = os.path.join(path, item)
if os.path.isdir(items_path):
size = getFileSize(items_path, size)
else:
size += os.path.getsize(items_path)
else:
size = os.path.getsize(path)
except PermissionError:pass
else:
return 0
return size def getFile(path, home):
"""展示路径下的文件信息
:param path:文件路径
"""
if os.path.exists(path):
res = 'total %d\n'%getFileSize(path)
returnCode = 0
if os.path.isdir(path):
try:
for item in os.listdir(path):
items_path = os.path.join(path, item)
size = getFileSize(items_path)
if os.path.isdir(items_path):
res += 'd %s %s\n'% (item, size)
else:res += 'f %s %s\n'% (item, size)
except PermissionError:pass
else:
res += 'f %s %s\n'% (path.replace('%s\\'%home, '', 1), getFileSize(path))
else:
returnCode = 1
res = 'ls: error:没有那个文件或目录'
return returnCode, res def getStrsMd5(*strs):
"""该函数用于获取字符串的md5值
:param strs:命令结果
:return:将摘要值返回为十六进制数字的字符串
"""
md5 = hashlib.md5()
for str in strs:
md5.update(str)
return md5.hexdigest() def getFileMd5(filename):
"""该函数用于获取文件的md5值
:param filename:'文件名'
:return:将摘要值返回为十六进制数字的字符串
"""
if not os.path.isfile(filename):
return
md5 = hashlib.md5()
with open(filename, 'rb') as f:
for line in f:
md5.update(line)
return md5.hexdigest() def rmdirs(path):
if os.path.exists(path):
if os.path.isdir(path):
sub_items = os.listdir(path)
for sub_item in sub_items:
full_sub_item = os.path.join(path, sub_item)
rmdirs(full_sub_item)
else:
os.rmdir(path)
else:
os.remove(path)
else:
return 1
return 0 # def exec_cmd(command):
# """该函数用于执行系统命令,并返回结果
# :param command:系统命令,str类型
# :return:返回tuple(b'正常命令执行输出结果', b'错误命令执行输出结果')
# """
# res = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
# res_out = res.stdout.read()
# res_err = res.stderr.read()
# return res_out, res_err # def make_file_header(getFile, filePermission):
# """该函数用于制作文件固定长度报文头
# :param getFile:客户端下载的文件
# :param filePermission:客户端下载的文件的权限
# :return:返回tuple(报文头长度包对象(struct_pack 'i'), 报文头内容bytes类型)
# """
# if filePermission:
# md5 = getFileMd5(getFile)
# if md5:
# resultCode = 0
# file_size = os.path.getsize(getFile)
# failReason = None
# else:
# resultCode = 1
# file_size = None
# failReason = 'The file could not be found'
# else:
# md5= None
# resultCode = 1
# file_size = None
# failReason = 'Permission Denied!'
# header_dic = {
# 'filename':getFile,
# 'md5':md5,
# 'file_size':file_size,
# 'resultCode':resultCode,
# 'failReason':failReason
# }
# header_json = json.dumps(header_dic)
# header_bytes = header_json.encode('utf-8')
# header_size_pack = struct.pack('i', len(header_bytes))
# return resultCode, header_size_pack, header_bytes # def make_str_header(*strs, cmd=None, resultCode=0, failReason=None):
# """该函数用于制作字符串固定长度报文头
# :param cmd:客户端请求的命令
# :param resultCode:返回结果
# :param failReason:失败原因说明
# :param strs:命令结果
# :return:返回tuple(报文头长度包对象(struct_pack 'i'), 报文头内容bytes类型)
# """
# md5 = getStrsMd5(*strs)
# total_size = 0
# for str in strs:
# total_size += len(str)
# header_dic = {
# 'cmd':cmd,
# 'md5':md5,
# 'total_size':total_size,
# 'resultCode':resultCode,
# 'failReason':failReason
# }
# header_json = json.dumps(header_dic)
# header_bytes = header_json.encode('utf-8')
# header_size_pack = struct.pack('i', len(header_bytes))
# return header_size_pack, header_bytes
commons.py
settings.py
# Author:ton
# -*- coding: utf-8 -*-
import sys, os base_path = sys.path[0]
#服务端配置
server_bind_ip = '127.0.0.1'
server_bind_port = 8080
server_listen = 5
#日志配置
log_level = 'debug'
#数据库文件配置
db_file = os.path.join(base_path, 'db')
#用户配置
user_home_dir = os.path.join(base_path, 'home')
user_quota = 10#默认配额,单位:M
settings.py
客户端:
client.py
# -*- coding: utf-8 -*-
import socket, struct, json, os, time
from lib import commons
from conf import settings
from core import logger
from core import progressBar class Client(object): STATUS_CODE = {
200 : 'Passed authentication!',
201 : 'Wrong username or password!',
202 : 'Username does not exist!',
300 : 'cmd successful , the target path be returned in returnPath',
301 : 'cmd format error!',
302 : 'The path or file could not be found!',
303 : 'The dir is exist',
304 : 'The file has been downloaded or the size of the file is exceptions',
305 : 'Free space is not enough',
401 : 'File MD5 inspection failed',
400 : 'File MD5 inspection success',
} def __init__(self):
try:
self.init_dir()
self.client_logger = logger.logger('client')
self.connect()
self.user_info = {}
self.run()
self.client.close()
except (ConnectionResetError, ConnectionRefusedError, ConnectionAbortedError):
self.client_logger.info('Cannot find server, client will be closed...')
print('\033[31;1mCannot find server, client will be closed...\033[0m') @staticmethod
def init_dir():
if not os.path.exists(os.path.join(settings.base_path, 'logs')):
os.makedirs(os.path.join(settings.base_path, 'logs'))
if not os.path.exists(os.path.join(settings.base_path, 'downloadList')):
os.makedirs(os.path.join(settings.base_path, 'downloadList')) def connect(self):
self.client = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
self.client.connect((settings.client_connect_ip,settings.client_connect_port))
self.client_logger.info('Has been successfully connected to the server[%s:%s]'%
(settings.client_connect_ip, settings.client_connect_port))
print('Has been successfully connected to the server[%s:%s]'%
(settings.client_connect_ip, settings.client_connect_port)) def run(self):
while True:
if self.user_info:
self.interactive()
print('\n欢迎来到MyFTP程序\n'
'1、注册\n'
'2、登陆\n'
'3、退出')
choice = input('>>').strip()
if choice == '':
self.register('register')
elif choice == '':
self.login('login')
elif choice == '':
exit('Bye') def unpack_header(self):
pack_obj = self.client.recv(4)
header_size = struct.unpack('i', pack_obj)[0]
header_bytes = self.client.recv(header_size)
header_json = header_bytes.decode('utf-8')
header_dic = json.loads(header_json)
return header_dic def unpack_info(self, info_size):
recv_size = 0
info_bytes = b''
while recv_size < info_size:
res = self.client.recv(1024)
info_bytes += res
recv_size += len(res)
info_json = info_bytes.decode('utf-8')
info_dic = json.loads(info_json)#{'username':ton, 'password':123}
info_md5 = commons.getStrsMd5(info_bytes)
return info_dic, info_md5 def recv_response(self):
header_dic = self.unpack_header()
rsp_dic, info_md5 = self.unpack_info(header_dic['info_size'])
if header_dic.get('md5'):
#校检响应内容md5一致性
if info_md5 == header_dic['md5']:
print('\033[42;1m响应内容md5校检结果一致\033[0m')
else:
print('\033[31;1m响应内容md5校检结果不一致\033[0m')
return rsp_dic def request(self, cmd, **kwargs):
#1、向服务端发送命令、鉴权请求'cmd',cmd='login'、'cd'、'get 1.mp3'
req_info = kwargs
req_info_bytes = json.dumps(req_info).encode('utf-8')
md5 = commons.getStrsMd5(req_info_bytes)
header_size_pack, header_bytes = commons.make_header(info_size=len(req_info_bytes), cmd=cmd, md5=md5)
self.client.sendall(header_size_pack)
self.client.sendall(header_bytes)
#2、发送user_info用于服务端鉴权
self.client.sendall(req_info_bytes) def register(self, cmd):
username = input('username:').strip()
password = input('password:').strip()
password = commons.getStrsMd5(password.encode('utf-8'))#密码md5值
user_info = {'username':username, 'password':password}
#1、向服务端发送注册请求'register'及注册信息
self.request(cmd, user_info=user_info)
self.client_logger.info('向服务端发送注册请求,用户名:%s,密码:%s'% (username, password))
#2、接收服务端响应结果
rsp_dic = self.recv_response()
if rsp_dic['resultCode'] == 1:
self.client_logger.info('%s'%rsp_dic['resultDesc'])
print('\033[31;1m%s\033[0m'%rsp_dic['resultDesc'])
return
self.client_logger.info('用户[%s]注册成功'%username)
print('\033[42;1m用户[%s]注册成功\033[0m'%username) def login(self, cmd):
username = input('username:').strip()
password = input('password:').strip()
password = commons.getStrsMd5(password.encode('utf-8'))#密码md5值
user_info = {'username':username, 'password':password}
#1、向服务端发送登陆请求'login'
self.request(cmd, user_info=user_info)
self.client_logger.info('向服务端发送登陆请求,用户名:%s,密码:%s'% (username, password))
#2、接收服务端响应结果
rsp_dic = self.recv_response()
if rsp_dic['resultCode'] != 200:
print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
return
print(rsp_dic)
self.user_info = rsp_dic['user_info']
self.user_info['password'] = password
self.user_info['cur_path'] = username
print('\033[42;1m用户[%s]登陆成功\033[0m'%username) def interactive(self):
while True:#通讯循环
if not self.user_info:break
self.query_quota()
print('%dM/%dM'% (self.user_info['used_quota']/(1024*1024), self.user_info['total_quota']/(1024*1024)))
cmd = input('%s>'%self.user_info['cur_path']).strip()#'get a.txt'、'login'
if not cmd:continue
if cmd == 'exit':
self.user_info = {}
break
cmds = cmd.split()#['get', 'a.txt']
if hasattr(self, cmds[0]):
self.client_logger.info('calling method:[%s], cmd:[%s]}'% (cmds[0], cmd))
getattr(self, cmds[0])(cmd)
else:
print('语法错误,仅支持ls,cd,get(断点续传),put,rm,mkdir,listDownload(下载列表)') def query_quota(self):
#1、向服务端发送查询配额空间的请求'query_quota',并鉴权
self.request('query_quota', user_info=self.user_info)
#2、接收服务端配额查询结果
rsp_dic = self.recv_response()
if rsp_dic['resultCode'] == 200:
self.user_info['total_quota'] = rsp_dic['total_quota']
self.user_info['used_quota'] = rsp_dic['used_quota']
else:
print('\033[31;1m%s,pls login again!\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
self.user_info = {} def cd(self, cmd):
#判断语法
cmds = cmd.split()
if len(cmds) != 2:
print("用法错误,example:'cd music'、'cd ~'、'cd .'、'cd ..'")
return
#1、向服务端发送cd命令'cd music'、'cd ~'、'cd .'、'cd ..',并鉴权
self.request(cmd, user_info=self.user_info)
#2、接收服务端cd命令响应结果
rsp_dic = self.recv_response()
if rsp_dic['resultCode'] != 300:
print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
return
self.user_info['cur_path'] = rsp_dic['returnPath'] def ls(self, cmd):
#判断语法
cmds = cmd.split()
if len(cmds) > 2:
print("用法错误,example:'ls music'、'ls ~'、'ls ~/music'")
return
#1、向服务端发送ls命令'ls music'、'ls ~'、'ls ~/music',并鉴权
self.request(cmd, user_info=self.user_info)
#2、接收服务端ls命令响应结果
rsp_dic = self.recv_response()
if rsp_dic['resultCode'] != 300:
print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
return
print(rsp_dic['returnFilenames']) def rm(self, cmd):
#判断语法
cmds = cmd.split()
if len(cmds) != 2:
print("用法错误,example:'rm music'、'rm 1.mp3'")
return
#1、向服务端发送rm命令'rm music'、'rm 1.mp3',并鉴权
self.request(cmd, user_info=self.user_info)
#2、接收服务端rm命令响应结果
rsp_dic = self.recv_response()
if rsp_dic['resultCode'] != 300:
print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
else:
print('删除成功') def mkdir(self, cmd):
#判断语法
cmds = cmd.split()
if len(cmds) != 2:
print("用法错误,example:'mkdir music'、'mkdir music/1/11'")
return
#1、向服务端发送mkdir命令'mkdir music'、'mkdir music/1/11',并鉴权
self.request(cmd, user_info=self.user_info)
#2、接收服务端mkdir命令响应结果
rsp_dic = self.recv_response()
if rsp_dic['resultCode'] != 300:
print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
else:
print('创建目录成功') def listDownload(self, cmd):
download_dic = self.load_downList()
if download_dic:
for key in download_dic:
print('ori_file:%s local_file:%s Status:%s'% (key, download_dic[key]['local_file'], download_dic[key]['status']))
choice = input('请输入需要继续下载源文件名:').strip()
if download_dic.get(choice):
if download_dic[choice]['status'] != 'finish':
self.get(cmd='get %s %s'% (choice, os.path.dirname(download_dic[choice]['local_file'])), resume_flag=True)
else:
print('\033[31;1m文件已经下载完成\033[0m')
else:
print('\033[31;1m请输入正确源文件名\033[0m')
else:
print('\033[31;1m下载列表为空,请先进行get下载\033[0m') def load_downList(self):
dl_list_file = '%s/downloadList/%s.json'% (settings.base_path, self.user_info['username'])
if os.path.isfile(dl_list_file):
with open(dl_list_file, 'r') as f:
download_dic = json.load(f)
return download_dic
else:
return def progressBar(self, total, cur=0):
if total == 0:
total = 1
cur = 1
last_percent = int(cur / total * 100)
while True:
cur = yield
cur_percent = int(cur / total * 100)
if cur_percent > last_percent:
print('{0:3}%: '.format(cur_percent) + '#'*int(cur_percent/4), end='\r', flush=True)
if cur == total:
print('\n')
last_percent = cur_percent def get(self, cmd, resume_flag=False):
"""下载文件"""
#判断语法
cmds = cmd.split()
if not 1 < len(cmds) < 4:
print("\033[31;1m用法错误,example:'get 1.mp3 download/'\033[0m")
return
#解析写入文件路径
recv_filename = os.path.basename(cmds[1])
if len(cmds) == 3:
recv_file_path = cmds[2]
else:
recv_file_path = settings.base_path
recv_filename = os.path.join(recv_file_path, recv_filename)
if not os.path.isdir(recv_file_path):
print('\033[31;1m本地路径[%s]不存在\033[0m'%recv_file_path)
return
#先定义请求信息
resume = False,
position = 0
#选择是否断点续传
if resume_flag:
resume = True
position = commons.getFileSize(recv_filename)
else:
if os.path.isfile(recv_filename):
os.remove(recv_filename)
#1、向服务端发送ftp命令'get 1.mp3 download/',并鉴权
self.request(cmd, user_info=self.user_info, resume=resume, position=position)
#2、接收服务端get命令响应结果
rsp_dic = self.recv_response()
if rsp_dic['resultCode'] != 300:
print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
return
#新增功能,先添加下载列表,并保存到文件:
download_dic = self.load_downList()
if download_dic:
download_dic[cmds[1]] = {'local_file':recv_filename, 'status':'loading'}
else:
download_dic = dict()
download_dic[cmds[1]] = {'local_file':recv_filename, 'status':'loading'}
with open('%s/downloadList/%s.json'% (settings.base_path, self.user_info['username']), 'w') as f:
json.dump(download_dic, f)
#3、接收文件数据,写入文件
recv_size = position
with open(recv_filename, 'a+b') as f:
progress_generator = self.progressBar(total=rsp_dic['FileSize'], cur=recv_size)
progress_generator.__next__()
while recv_size < rsp_dic['FileSize']:
file_data = self.client.recv(1024)
f.write(file_data)
recv_size += len(file_data)
progress_generator.send(recv_size)
#校检文件md5一致性
if commons.getFileMd5(recv_filename) == rsp_dic['FileMd5']:
download_dic[cmds[1]]['status'] = 'finish'
with open('%s/downloadList/%s.json'% (settings.base_path, self.user_info['username']), 'w') as f:
json.dump(download_dic, f)
print('\033[42;1m文件md5校检结果一致\033[0m')
print('\033[42;1m文件下载成功,大小:%d,文件名:%s\033[0m'% (rsp_dic['FileSize'], recv_filename))
else:
os.remove(recv_filename)
download_dic[cmds[1]]['status'] = 'failed'
with open('%s/downloadList/%s.json'% (settings.base_path, self.user_info['username']), 'w') as f:
json.dump(download_dic, f)
print('\033[31;1m文件md5校检结果不一致\033[0m')
print('\033[42;1m文件下载失败\033[0m') def put(self, cmd):
cmds = cmd.split()
#判断语法
if not 1 < len(cmds) < 4:
print("用法错误,example:'put download/1.mp3 video'")
return
#判断文件是否存在
if not os.path.isfile(cmds[1]):
print('\033[31;1mThe file could not be found\033[0m')
return
#先定义请求信息
FileSize = commons.getFileSize(cmds[1])
FileMd5 = commons.getFileMd5(cmds[1])
#1、向服务端发送ftp命令'put download/2.mp3 video',并鉴权
self.request(cmd, user_info=self.user_info, FileSize=FileSize, FileMd5=FileMd5)
#2、接收服务端put命令响应结果
rsp_dic = self.recv_response()
if rsp_dic['resultCode'] != 300:
print('\033[31;1m%s\033[0m'%self.STATUS_CODE.get(rsp_dic['resultCode']))
return
#3、发送文件数据
progress_generator = self.progressBar(total=FileSize)
progress_generator.__next__()
send_size = 0
with open(cmds[1], 'rb') as f:
for line in f:
self.client.sendall(line)
send_size += len(line)
progress_generator.send(send_size)
#4、等待服务端响应成功接收
rsp_dic = self.recv_response()
if rsp_dic['resultCode'] == 400:
print('\033[42;1m文件上传成功,文件名:%s\033[0m'%os.path.basename(cmds[1]))
else:
print('文件上传失败[%s]'%self.STATUS_CODE.get(rsp_dic['resultCode'])) def pwd(self, cmd):
print(self.user_info['cur_path'])
client.py
progressBar.py
# -*- coding: utf-8 -*- import sys class ProgressBar:
def __init__(self, count = 0, total = 0, width = 50):
self.count = count
self.total = total
self.width = width
def move(self):
self.count += 1
def over(self):
self.count = self.total
def log(self, s):
sys.stdout.write('%s'%s)
progress = int(self.width * self.count / self.total)
sys.stdout.write('{0:3}%/{1:1}%: '.format(self.count, self.total))
sys.stdout.write('#' * progress + '-' * (self.width - progress) + '\r')
if progress == self.width:
sys.stdout.write('\n')
sys.stdout.flush()
progressBar.py
logger.py
# -*- coding: utf-8 -*-
import logging
from logging import handlers
from conf import settings def logger(log_type):
level = settings.log_level
if level == 'debug':
level = logging.DEBUG
elif level == 'info':
level = logging.INFO
elif level == 'warning':
level = logging.WARNING
elif level == 'error':
level = logging.ERROR
else:
level = logging.CRITICAL
#1.生成logger对象
logger = logging.getLogger(log_type)
logger.setLevel(logging.DEBUG)
#2.生成handler对象
fh = handlers.TimedRotatingFileHandler(filename='%s/logs/%s.log'% (settings.base_path, log_type),
when='D', interval=1, backupCount=3)
# fh = logging.FileHandler(log_file)
fh.setLevel(level)
# ch = logging.StreamHandler()
# ch.setLevel(level)
#2.1 把handler对象绑定到logger
logger.addHandler(fh)
# logger.addHandler(ch)
#3.生成formatter对象
# f = logging.Formatter(fmt='%(asctime)s %(name)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %I:%M:%S %p')
f = logging.Formatter(fmt='%(asctime)s %(name)s [%(levelname)s] %(message)s', datefmt=None)
#3.1 把formatter对象绑定到handler
fh.setFormatter(f)
# ch.setFormatter(f)
return logger
logger.py
commons.py
# -*- coding: utf-8 -*- import subprocess, hashlib, struct, json, os, time def getStrsMd5(*strs):
"""该函数用于获取字符串的md5值
:param strs:命令结果
:return:将摘要值返回为十六进制数字的字符串
"""
md5 = hashlib.md5()
for str in strs:
md5.update(str)
return md5.hexdigest() def getDictBytes(dic):
dic_json = json.dumps(dic)
dic_json_bytes = dic_json.encode('utf-8')
return dic_json_bytes def make_header(info_size=0, cmd=None, md5=None):
header_dic = {
'cmd':cmd,
'info_size':info_size,
'md5':md5
}
header_json = json.dumps(header_dic)
header_bytes = header_json.encode('utf-8')
header_size_pack = struct.pack('i', len(header_bytes))
return header_size_pack, header_bytes def getFileMd5(filename):
"""该函数用于获取字符串的md5值
:param filename:'文件名'
:return:将摘要值返回为十六进制数字的字符串
"""
if not os.path.isfile(filename):
return
md5 = hashlib.md5()
with open(filename, 'rb') as f:
for line in f:
md5.update(line)
return md5.hexdigest() def getFileSize(path, size=0):
"""获取路径下的总大小(字节)
:param path: 文件路径
:param size: 起始大小(字节)
:return:总大小(字节)
"""
if os.path.exists(path):
size = size
try:
if os.path.isdir(path):
for item in os.listdir(path):
items_path = os.path.join(path, item)
if os.path.isdir(items_path):
size = getFileSize(items_path, size)
else:
size += os.path.getsize(items_path)
else:
size = os.path.getsize(path)
except PermissionError:pass
else:
return 0
return size # def make_file_header(filename):
# """该函数用于制作自定义应用层协议固定长度包头
# :param filename:'文件名'
# :return:返回tuple(报文头长度包对象(struct_pack 'i'), 报文头内容bytes类型)
# """
# md5 = getFileMd5(filename)
# if md5:
# resultCode = 0
# file_size = os.path.getsize(filename)
# failReason = None
# else:
# resultCode = 1
# file_size = None
# failReason = 'The file could not be found'
# header_dic = {
# 'filename':filename,
# 'md5':md5,
# 'file_size':file_size,
# 'resultCode':resultCode,
# 'failReason':failReason
# }
# header_json = json.dumps(header_dic)
# header_bytes = header_json.encode('utf-8')
# header_size_pack = struct.pack('i', len(header_bytes))
# return header_size_pack, header_bytes # def exec_cmd(command):
# """该函数用于执行系统命令,并返回结果
# :param command:系统命令,str类型
# :return:返回tuple(b'正常命令执行输出结果', b'错误命令执行输出结果')
# """
# res = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
# res_out = res.stdout.read()
# res_err = res.stderr.read()
# return res_out, res_err
commons.py
settings.py
# Author:ton
# -*- coding: utf-8 -*-
import sys
base_path = sys.path[0]
# server_bind_ip = '127.0.0.1'
# server_bind_port = 8080
# server_listen = 5
client_connect_ip = '127.0.0.1'
client_connect_port = 8080 log_level = 'debug'
settings.py
三、使用截图
PS:支持多用户同时在线上传、下载文件,支持文件md5检验,支持断点续传