Day30--Python--struct, socketserver

1. struct 
struct.pack 打包
def pack(fmt, *args): # known case of _struct.pack
"""
pack(fmt, v1, v2, ...) -> bytes Return a bytes object containing the values v1, v2, ... packed according
to the format string fmt. See help(struct) for more on format strings.
"""
return b""
    struct.unpack 解包
def unpack(fmt, string): # known case of _struct.unpack
"""
unpack(fmt, buffer) -> (v1, v2, ...) Return a tuple containing values unpacked according to the format string
fmt. The buffer's size in bytes must be calcsize(fmt). See help(struct)
for more on format strings.
"""
pass

fmt 长度表

Day30--Python--struct, socketserver


# 粘包解决方案_3_服务端     struct.pack  打包

import socket
import struct
import subprocess
import time server = socket.socket()
ip_port = ('192.168.15.87', 8001)
server.bind(ip_port)
server.listen(3) while 1:
print('等待连接中...')
conn, addr = server.accept()
print('连接成功!')
while 1:
print('等待接收命令...')
cmd = conn.recv(1024).decode('utf-8')
if cmd == 'exit':
break
sub_obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 调用控制台, stdout标准输出,返回信息 stderr 标准错误,返回错误信息
content = sub_obj.stdout.read() # 读取标准输出,获取到是gbk编码的bytes
error = sub_obj.stderr.read()  # 读取标准错误, 也是gbk编码的bytes
len_of_content = len(content)   # 获取标准输出长度
len_packed = struct.pack('i', len_of_content) # 'i'表示打包成4个字节长度. 此处将数据长度打包成4个字节长度的数据 len_of_err = len(error)  # 获取标准错误长度
len_err_packed = struct.pack('i', len_of_err) # 'i' 表示打包成4个字节长度. 此处将错误信息长度打包成4个字节长度的数据
# print(len_packed) # 显示的是字节 b'\xaf\x01\x00\x00'
if len_of_content == 0:   # 当标准输出长度是零,也就是返回错误信息的时候
conn.send(len_err_packed)  # 发送打包后的错误信息的长度
print('数据长度发送成功!')
conn.sendall(error)      # 循环着发送错误信息数据,防止数据过大造成缓冲区溢出 # 缓冲区大小 8kb MTU 最大传输单元 1518b, 每次发送数据最好不超过这个数
print('数据发送成功!')
else:
conn.send(len_packed)    # 发送打包后的标准输出信息长度
print('数据长度发送成功!')  # 循环着发送标准输出信息数据,防止数据过大造成缓冲区溢出
conn.sendall(content)
print('数据发送成功!')
conn.close()
print('连接已断开!')
time.sleep(3)
# 粘包解决方案_3_客户端        struct.unpack 解包

import socket
import struct client = socket.socket()
serverip_port = ('192.168.15.87', 8001)
client.connect(serverip_port) while 1:
cmd = input('请输入命令>>>')
client.send(cmd.encode('utf-8'))
if cmd == 'exit':
break
msg_len_return = client.recv(4)    # 先接收4个字节长度的打包信息
msg_return_unpacked = struct.unpack('i', msg_len_return)[0] # 拆包, 获取数据长度
# print(msg_return_unpacked) # struct.unpack('i', msg_len_return)返回一个元组 (431,), 取[0]得到长度431 total_len = 0
total_data = b'' while total_len < msg_return_unpacked:
data_splited = client.recv(1024) # 分段接收信息,一次最多接收1024,防止超过MTU
total_data += data_splited       # 把接收到的数据拼接到一起
total_len += len(data_splited)     # 计算接收到的数据总长度 print(total_data.decode('gbk'))      # 接收到的信息都是gbk编码的bytes,需要进行解码 client.close()

2. FTP 简单上传
   127.0.0.1 本机回环地址
# 服务端

import socket
import struct
import json # 用户上传文件储存位置
file_storage_catalogue = r"D:\python_work\Day030 struct, socketserver, ftp上传文件\uploads" server = socket.socket()
ip_port = ('127.0.0.1', 8001)
server.bind(ip_port)
server.listen()
print('等待连接中...')
conn, addr = server.accept()
print('连接成功!')
file_info_len = struct.unpack('i', conn.recv(4))[0]
file_info = conn.recv(file_info_len).decode('utf-8')
file_info = json.loads(file_info)
print(file_info)
print(type(file_info)) full_file_path = file_storage_catalogue + '\\' + file_info['file_name']
total_data_size = 0
with open(full_file_path, 'wb') as f:
while total_data_size < file_info['file_size']:
data_slice = conn.recv(1024)
total_data_size += len(data_slice)
f.write(data_slice) conn.send('文件上传成功!'.encode('utf-8'))
conn.close()
server.close()
# 客户端

import socket
import json
import struct
import os client = socket.socket()
server_ip_port = ('127.0.0.1', 8001)
client.connect(server_ip_port)
read_size = 1024
file_info = {
'file_path': r'D:\老男孩IT\课件\day30 粘包解决方案2+ftp上传+socketserver\视频\03 socketserver模块.mp4',
'file_name': '03 socketserver模块.mp4',
'file_size': None
} file_info['file_size'] = os.path.getsize(file_info['file_path'])
print('上传文件大小为:%sb' % file_info['file_size']) file_info_json = json.dumps(file_info, ensure_ascii=False).encode('utf-8')
len_of_info = len(file_info_json)
info_len_packed = struct.pack('i', len_of_info)
client.send(info_len_packed)
print(file_info_json)
client.sendall(file_info_json) # total_data = b''
total_data_len = 0
with open(file_info['file_path'], 'rb') as f:
while total_data_len < file_info['file_size']:
data_slice = f.read(read_size)
# total_data += data_slice
total_data_len += len(data_slice)
client.send(data_slice) ack = client.recv(1024).decode('utf-8')
print(ack)
client.close()
3. socketserver 套接字服务器
# 服务端同时接待多个客户端

import socketserver

class MyServer(socketserver.BaseRequestHandler):

    def handle(self):  # 重新定义父类BaseRequestHandler中的handle方法. (约束)
print('Connected From:', self.cliend_address) # 获取客户端地址和端口
while 1:
data_received = self.request.recv(1024) # self.request 相当于 conn 连接通道
if data_received == b'': """ 当连接的客户端关close时,通道仍然没有关闭,里面没有内容,会循环打印b'', 此处判断如果对面关闭了,服务端对应的通道也关闭.
当对面直接敲回车或者发空字符串或者空字节b''过来,而且对方的通道没有关闭时,此处是收不到任何信息的,甚至不会有空字节b''过来,因为字节为空就是没有东西,没传东西就什么也过不来"""
       self.request.close()
break
print('妲己宝宝说:%s' % data_received.decode('utf-8')) reply = input('亚瑟说>>>')
self.request.send(reply.encode('utf-8')) if __name__ == '__main__':
ip_port = ('127.0.0.1', 8001)
socketserver.TCPServer.allow_reuse_address = True
socketserver.TCPServer.request_queue_size = 10 # 设置监听数量,默认是5,可修改
server = socketserver.ThreadingTCPServer(ip_port, MyServer) # 绑定IP地址和端口,并启动刚才定义的MyServer类
server.serve_forever() # 永久的执行下去, 相当于accept()一直等待连接
print('这里是测试')
# 客户端

import socket

client = socket.socket()
server_ip_port = ('127.0.0.1', 8001)
client.connect(server_ip_port)
while 1:
msg = input('发送消息, 按Q退出>>>')
if msg.upper() == 'Q':
break
elif msg == '': # 此处需要判断,如果输入的是空字符串'',空字节b'',或者直接敲回车,通道内不会存在任何数据,服务端不会收到任何消息,也就没法回复消息,客户端接不到回复,
                   程序会阻塞住
print('内容不能为空!')
continue
client.send(msg.encode('utf-8'))
reply = client.recv(1024).decode('utf-8')
print('亚瑟说: %s' % reply) client.close()
class TCPServer(BaseServer) 中的类变量,可以修改
request_queue_size = 5
allow_reuse_address = False
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
ThreadingTCPServer(ip_port, MyServer) # 要找__init__
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass # 自己类中没有__init__,根据MRO顺序找父类
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
class TCPServer(BaseServer): # ThreadingMixIn 类中没有,继续在TCPServer中找到__init__
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): # 找到了ip_port和MyServer对应的形参
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass) # 里面还有__init__, 找到了对应的形参
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind() # 调用server_bind()方法
self.server_activate()
except:
self.server_close()
raise def server_bind(self):
"""Called by constructor to bind the socket. May be overridden. """
if self.allow_reuse_address: # allow_reuse_address = True的条件下
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address) # 绑定IP地址
self.server_address = self.socket.getsockname() def server_activate(self):
"""Called by constructor to activate the server. May be overridden. """
self.socket.listen(self.request_queue_size) # 监听. request_queue_size 类变量给了值,可修改 def server_close(self):
"""Called to clean-up the server. May be overridden. """
self.socket.close() # 关闭
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serve_forever() ==> _handle_request_noblock(self) class BaseServer:
def _handle_request_noblock(self):
"""Handle one request, without blocking. I assume that selector.select() has returned that the socket is
readable before this function was called, so there should be no risk of
blocking in get_request().
"""
try:
request, client_address = self.get_request() # 同conn, addr 此处get_request()相当于accept() >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
class BaseRequestHandler: # 重新 定义 handle方法,调用handle方法
def handle(self):
pass

View 梳理流程

class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
constructor sets the instance variables request, client_address
and server, and then calls the handle() method. To implement a
specific service, all you need to do is to derive a class which
defines a handle() method. The handle() method can find the request as self.request, the
client address as self.client_address, and the server (in case it
needs access to per-server information) as self.server. Since a
separate instance is created for each request, the handle() method
can define other arbitrary instance variables. """ def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish() def setup(self):
pass def handle(self):
pass def finish(self):
pass

View class BaseRequestHandler

class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the
# main process
daemon_threads = False def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread. In addition, exception handling is done here. """
try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request) def process_request(self, request, client_address):
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
t.start()

View class ThreadingMixIn

class TCPServer(BaseServer):

    """Base class for various socket-based server classes.

    Defaults to synchronous IP stream (i.e., TCP).

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
- serve_forever(poll_interval=0.5)
- shutdown()
- handle_request() # if you don't use serve_forever()
- fileno() -> int # for selector Methods that may be overridden: - server_bind()
- server_activate()
- get_request() -> request, client_address
- handle_timeout()
- verify_request(request, client_address)
- process_request(request, client_address)
- shutdown_request(request)
- close_request(request)
- handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or
instances: - timeout
- address_family
- socket_type
- request_queue_size (only for stream sockets)
- allow_reuse_address Instance variables: - server_address
- RequestHandlerClass
- socket """ address_family = socket.AF_INET socket_type = socket.SOCK_STREAM request_queue_size = 5 allow_reuse_address = False def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise def server_bind(self):
"""Called by constructor to bind the socket. May be overridden. """
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname() def server_activate(self):
"""Called by constructor to activate the server. May be overridden. """
self.socket.listen(self.request_queue_size) def server_close(self):
"""Called to clean-up the server. May be overridden. """
self.socket.close() def fileno(self):
"""Return socket file number. Interface required by selector. """
return self.socket.fileno() def get_request(self):
"""Get the request and client address from the socket. May be overridden. """
return self.socket.accept() def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
try:
#explicitly shutdown. socket.close() merely releases
#the socket and waits for GC to perform the actual close.
request.shutdown(socket.SHUT_WR)
except OSError:
pass #some platforms may raise ENOTCONN here
self.close_request(request) def close_request(self, request):
"""Called to clean up an individual request."""
request.close()

View TCPServer

class BaseServer:

    """Base class for server classes.

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass)
- serve_forever(poll_interval=0.5)
- shutdown()
- handle_request() # if you do not use serve_forever()
- fileno() -> int # for selector Methods that may be overridden: - server_bind()
- server_activate()
- get_request() -> request, client_address
- handle_timeout()
- verify_request(request, client_address)
- server_close()
- process_request(request, client_address)
- shutdown_request(request)
- close_request(request)
- service_actions()
- handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or
instances: - timeout
- address_family
- socket_type
- allow_reuse_address Instance variables: - RequestHandlerClass
- socket """ timeout = None def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False def server_activate(self):
"""Called by constructor to activate the server. May be overridden. """
pass def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
# XXX: Consider using another file descriptor or connecting to the
# socket to wake this up instead of polling. Polling reduces our
# responsiveness to a shutdown request and wastes cpu at all other
# times.
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ) while not self.__shutdown_request:
ready = selector.select(poll_interval)
if ready:
self._handle_request_noblock() self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set() def shutdown(self):
"""Stops the serve_forever loop. Blocks until the loop has finished. This must be called while
serve_forever() is running in another thread, or it will
deadlock.
"""
self.__shutdown_request = True
self.__is_shut_down.wait() def service_actions(self):
"""Called by the serve_forever() loop. May be overridden by a subclass / Mixin to implement any code that
needs to be run during the loop.
"""
pass # The distinction between handling, getting, processing and finishing a
# request is fairly arbitrary. Remember:
#
# - handle_request() is the top-level call. It calls selector.select(),
# get_request(), verify_request() and process_request()
# - get_request() is different for stream or datagram sockets
# - process_request() is the place that may fork a new process or create a
# new thread to finish the request
# - finish_request() instantiates the request handler class; this
# constructor will handle the request all by itself def handle_request(self):
"""Handle one request, possibly blocking. Respects self.timeout.
"""
# Support people who used socket.settimeout() to escape
# handle_request before self.timeout was available.
timeout = self.socket.gettimeout()
if timeout is None:
timeout = self.timeout
elif self.timeout is not None:
timeout = min(timeout, self.timeout)
if timeout is not None:
deadline = time() + timeout # Wait until a request arrives or the timeout expires - the loop is
# necessary to accommodate early wakeups due to EINTR.
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ) while True:
ready = selector.select(timeout)
if ready:
return self._handle_request_noblock()
else:
if timeout is not None:
timeout = deadline - time()
if timeout < 0:
return self.handle_timeout() def _handle_request_noblock(self):
"""Handle one request, without blocking. I assume that selector.select() has returned that the socket is
readable before this function was called, so there should be no risk of
blocking in get_request().
"""
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request) def handle_timeout(self):
"""Called if no new request arrives within self.timeout. Overridden by ForkingMixIn.
"""
pass def verify_request(self, request, client_address):
"""Verify the request. May be overridden. Return True if we should proceed with this request. """
return True def process_request(self, request, client_address):
"""Call finish_request. Overridden by ForkingMixIn and ThreadingMixIn. """
self.finish_request(request, client_address)
self.shutdown_request(request) def server_close(self):
"""Called to clean-up the server. May be overridden. """
pass def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self) def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
self.close_request(request) def close_request(self, request):
"""Called to clean up an individual request."""
pass def handle_error(self, request, client_address):
"""Handle an error gracefully. May be overridden. The default is to print a traceback and continue. """
print('-'*40, file=sys.stderr)
print('Exception happened during processing of request from',
client_address, file=sys.stderr)
import traceback
traceback.print_exc()
print('-'*40, file=sys.stderr) def __enter__(self):
return self def __exit__(self, *args):
self.server_close()

View BaseServer

上一篇:jboss as7 o.h.c.s.c.i.BroadcastGroupImpl Network is unreachable


下一篇:微软职位内部推荐-Senior Software Engineer-News