由于时间问题,没有实现并发上传下载的功能。后面再完善。
import select,socket,queue,json,os,threading,time,gevent,hashlib from gevent import monkey monkey.patch_all() class MyselectTCP(object): ''' 用select 实现的socket 服务端,没能实现并发 ''' socket_familly = socket.AF_INET socket_type = socket.SOCK_STREAM socket_request_numb = 1000 inputs = [] outputs = [] readerable, writeable, exceptinal = None,None,None msg_dict ={} BASEADDR = os.path.abspath("..\log\home") def __init__(self,ip,port): ''' 实例化一个socket服务端 :param ip: :param port: ''' self.sever = socket.socket(self.socket_familly,self.socket_type) self.sever.bind((ip,port)) self.sever.listen(self.socket_request_numb) self.sever.setblocking(False) self.inputs.append(self.sever) self.connection() def com_get(self,w_conn,bcak_dict): ''' 客户端请求下载处理方法 :param w_conn: :param bcak_dict: :return: ''' filename = bcak_dict["file"] fileaddr = self.BASEADDR + '\\%s' % filename #判断下载文件是否存在 if os.path.isfile(fileaddr): bcak_dict["file"] = fileaddr bcak_dict["size"] = str(os.stat(fileaddr).st_size) else: bcak_dict["error"] = '777' print("back data is [%s]"%bcak_dict) #发送文件信息给客户端 w_conn.send(json.dumps(bcak_dict).encode()) md5 = hashlib.md5() #开始发送 if os.path.isfile(fileaddr): if w_conn.recv(2) == b'ok': file_obj = open(fileaddr,'rb') for line in file_obj: md5.update(line) w_conn.send(line) time.sleep(1) w_conn.send(md5.hexdigest().encode()) def com_put(self,w_conn,bcak_dict): ''' 客户端请求上传处理方法 :param w_conn: :param bcak_dict: :return: ''' filename = bcak_dict["file"] #格式和文件地址 fileaddr = self.BASEADDR + '\\%s' % filename #获取文件大小 filesize = int(bcak_dict["size"]) #是否重命名 if os.path.isfile(fileaddr): fileaddr += '_new' size = 0 recv_size = 0 file_obj = open(fileaddr,'wb') #开始接收 w_conn.send(b'ok') md5 = hashlib.md5() #开始接收文件 while recv_size <filesize: if filesize - recv_size >1024: size = 1024 else: size = filesize - recv_size recv_date = w_conn.recv(size) recv_size += len(recv_date) file_obj.write(recv_date) file_obj.flush() md5.update(recv_date) less = int(float(recv_size / filesize) * 100) print("file get now %s " % str(less)) # time.sleep(5) else: # 文件校验,检查是否接收出错 check_md5 = w_conn.recv(1024) print("\033[31;1m [%s] \033[0m \033[30;1m [%s] \033[0m" % (md5.hexdigest(), check_md5)) if md5.hexdigest() == check_md5.decode(): print("put the file successful!") file_obj.close() def send(self): ''' 响应方法,处理接收数据并作出动作 :return: ''' for w_conn in self.writeable: try: try: recv_date = self.msg_dict[w_conn].get_nowait() except queue.Empty as e: print('{} is no data'.format(w_conn.getpeername())) print("**********************************************") self.outputs.remove(w_conn) else: try: if recv_date.get("action") is not None: if hasattr(self, 'com_%s'%recv_date['action']): func = getattr(self, 'com_%s'%recv_date['action']) # threading.Thread(target= func ,args=(w_conn,recv_date)).start() # gevent.spawn(func,w_conn,recv_date) func(w_conn,recv_date) except Exception as e: w_conn.send(recv_date[0].upper().encode()) except ConnectionResetError as e: print(e) if w_conn in self.inputs: self.inputs.remove(w_conn) if w_conn in self.outputs: self.outputs.remove(w_conn) del self.msg_dict[w_conn] w_conn.close() def connection(self): ''' 等待文件描述符,活动的文件描述符会被监听 :return: ''' while True: self.readerable, self.writeable, self.exceptinal = select.select(self.inputs, self.outputs, self.inputs) for r_conn in self.readerable: if r_conn is self.sever: conn, addr = self.sever.accept() print('new connect [%s]'%str(addr)) self.inputs.append(conn) self.msg_dict[conn] = queue.Queue() else: try: recv_data = r_conn.recv(1024) #收到数据 if recv_data: print("recv data \033[31;1m [%s] \033[0m from [%s]" % (recv_data, r_conn.getpeername())) recv_data = json.loads(recv_data.decode()) self.msg_dict[r_conn].put(recv_data) #加入输出列表 if r_conn not in self.outputs: self.outputs.append(r_conn) else: print("client lost") self.inputs.remove(r_conn) if r_conn in self.outputs: self.outputs.remove(r_conn) del self.msg_dict[r_conn] r_conn.close() except ConnectionResetError as e: print(e) if r_conn in self.inputs: self.inputs.remove(r_conn) if r_conn in self.outputs: self.outputs.remove(r_conn) del self.msg_dict[r_conn] r_conn.close() self.send() for e in self.exceptinal: if e in self.outputs: self.outputs.remove(e) self.inputs.remove(e) del self.msg_dict[e] print("err", e) e.close() if __name__ =="__main__": tcp = MyselectTCP('localhost',9999)