Py-解决粘包现象,tcp实现并发,tcp实现传输文件的程序,校验思路,线程与进程

黏包现象

TCP粘包就是指发送方发送的若干包数据到达接收方时粘成了一包,从接收缓冲区来看,
后一包数据的头紧接着前一包数据的尾,出现粘包的原因是多方面的,可能是来自发送
方,也可能是来自接收方
TCP接收到数据包时,并不会马上交到应用层进行处理,或者说应用层并不会立即处理。
实际上,TCP将接收到的数据包保存在接收缓存里,然后应用程序主动从缓存读取收到
的分组。这样一来,如果TCP接收数据包到缓存的速度大于应用程序从缓存中读取数据
包的速度,多个包就会被缓存,应用程序就有可能读取到多个首尾相接粘到一起的包。

UDP不粘包(因为udp是面向消息的,采用数据报的形式,每个udp包里有消息头,消
息来源地址,端口等信息)。但是不可靠,因为它不需要连接,不管服务端在不在它都
只管发。

解决tcp粘包的问题:

由于tcp是流式传输,没有数据头等,接收不知道接收多少个字节才会出现粘包。
粘包出现类型1:时间太短导致算法自动把他们弄到一起
粘包出现类型2:数据量大超过buffsize导致接收没有接收完

解决粘包版tcp

服务端程序:

from socket import *
import subprocess
import struct
ip_port=('127.0.0.1',5038)
devicenum=5
buffer_size=1024
tcp_serve=socket(AF_INET,SOCK_STREAM)#第一个参数代表基于网络通信,第二个参数代表基于tcp协议
tcp_serve.bind(ip_port)
tcp_serve.listen(devicenum)
while 1:
conn,addr=tcp_serve.accept()
print('双向链接时:',conn)
print('客户端地址:',addr)
while 1:
try:
msg=conn.recv(buffer_size)
print('客户端发来的消息是',msg) #msg为发过来的命令
if not msg: break #这里用于如果客户端强行断开不断发空就跳出
res=subprocess.Popen(msg.decode('gbk'),shell=True, #使用cmd执行
# 把命令执行的结果放进管道
stderr=subprocess.PIPE, #出错的放进管道stderr
stdout=subprocess.PIPE, #结果输出口stdout
stdin=subprocess.PIPE #结果放进管道
)
err=res.stderr.read() #查看有没有出错
if err:
cmd_res=err #如果有错就是接收err的内容
else:
cmd_res=res.stdout.read()#没错就读取程序在cmd运行的结果
if not cmd_res: #由于有时候正常执行没有返回值,返回空发送会卡住
cmd_res='执行成功'.encode('gbk') #所以如果为空就把他变成执行成功
length=len(cmd_res)
data_length=struct.pack('i',length) #i代表整形,把length这个整形数据(字节长度)封装成4个字节
conn.send(data_length) #发送即将发送的数据的字节长度
conn.send(cmd_res) #注意发送和接收的数据必须是二进制的数据
except Exception:
break #用于如果客户端强行断掉不会报错
#以上为发送消息
conn.close()
tcp_serve.close()
客户端程序:
from socket import *
import struct
ip_port=('127.0.0.1',5038)
devicenum=5
buffer_size=1024
tcp_client=socket(AF_INET,SOCK_STREAM)#第一个参数代表基于网络通信,第二个参数代表基于tcp协议
tcp_client.connect(ip_port)
while 1:
neirong=input('请输入发送内容')
if neirong =='': #防止输入为空,导致程序卡死
continue
if neirong =='quit': break
tcp_client.send(bytes(neirong,encoding='gbk'))
print('客户端已经发送消息')
data_length=tcp_client.recv(4)
length=struct.unpack('i',data_length)[0] #恢复出数据的长度
# print('数据长度为',length)
recv_size=0 #默认接收到的长度为0
recv_msg=b'' #默认接收到的二进制内容为空
while recv_size<length: #当接收到的数据的长度小于(原数据长度时)
recv_msg=recv_msg+tcp_client.recv(buffer_size) #给接收到的二进制内容加进东西
recv_size=len(recv_msg) #然后将接收到的数据长度变为接收到的二进制内容的长度
print('服务端发来的消息是',recv_msg.decode('gbk'))
tcp_client.close()
#输入的内容可以是dir(查看当前目录下的内容)
#ipconfig(查看ip参数)

偏函数partial

from functools import partial
def add(x,y):
return x+y
#要使它自增1可以使用partial,前面写结果,后面写要自增的数值
func=partial(add,1)
print(func(5))
print(func(6))

server类:处理链接

BaseServer
TCPServer
UDPServer

request类:处理通信

BaseRequestHandler
StreamRequestHandler
DatagramRequestHandler

tcp并发的实现方法

服务端程序

import socketserver
class Myserver(socketserver.BaseRequestHandler):
def handle(self):
print('conn is:',self.request)
print('addr is:', self.client_address)
while True:
try:
#收消息
data=self.request.recv(1024)
if not data:break
print('收到的客户端消息是',data)
#发消息
self.request.sendall(data.upper())
except Exception as e:
print(e)
break
if __name__=='__main__':
s=socketserver.ThreadingTCPServer(('127.0.0.1',5038),Myserver) #threading是多线程
#myserver是通信循环
s.serve_forever() #链接循环

客户端程序

from socket import *
import struct
ip_port=('127.0.0.1',5038)
devicenum=5
buffer_size=1024
tcp_client=socket(AF_INET,SOCK_STREAM)#第一个参数代表基于网络通信,第二个参数代表基于tcp协议
tcp_client.connect(ip_port)
while 1:
neirong=input('请输入发送内容')
if neirong =='': #防止输入为空,导致程序卡死
continue
if neirong =='quit': break
tcp_client.send(bytes(neirong,encoding='gbk'))
print('客户端已经发送消息')
data=tcp_client.recv(buffer_size)
print('收到服务端发来的消息是',data.decode('utf8'))
#输入的内容可以是dir(查看当前目录下的内容)
#ipconfig(查看ip参数)
客户端2程序
from socket import *
import struct
ip_port=('127.0.0.1',5038)
devicenum=5
buffer_size=1024
tcp_client=socket(AF_INET,SOCK_STREAM)#第一个参数代表基于网络通信,第二个参数代表基于tcp协议
tcp_client.connect(ip_port)
while 1:
neirong=input('请输入发送内容')
if neirong =='': #防止输入为空,导致程序卡死
continue
if neirong =='quit': break
tcp_client.send(bytes(neirong,encoding='gbk'))
print('客户端已经发送消息')
data=tcp_client.recv(buffer_size)
print('收到服务端发来的消息是',data.decode('utf8'))
#输入的内容可以是dir(查看当前目录下的内容)
#ipconfig(查看ip参数)
 

tcp实现传输文件:

服务端程序:
import socketserver
import json
import os
class Myserver(socketserver.BaseRequestHandler):
def handle(self):
print('conn is:',self.request)
print('addr is:', self.client_address)
while True:
try:
has_receive=0
#收消息
data=self.request.recv(1024)
if not data:break
data=json.loads(data)
file_name=data.get('file_name') #从发过来的json序列中获取文件名
file_size = data.get('file_size')#从发过来的json序列中获取文件大小
targetpath=data.get('targetpath')#从发过来的json序列中获取绝对路径
print('从json中提取出文件名%s,文件大小%s,接受路径绝对地址%s' %(file_name,file_size,targetpath))
abpath=os.path.join(targetpath,file_name)
print('拼接出接收地址的绝对路径',abpath)
#接下来是判断文件状况
if os.path.exists(abpath): #判断接收路径下是否有同名文件
file_has_size=os.stat(abpath).st_size #判断绝对路径那里的那个同名文件的尺寸
if file_has_size<file_size: #如果它比表示里的文件长度小,说明传的不完整,要断点续传
print('进行断点续传')
self.request.sendall('filnotcomplete'.encode('utf8'))
choice=self.request.recv(1024).decode('utf8')
if choice =="yes": #续传的情况
self.request.sendall(str(file_has_size).encode('utf8'))
has_receive=has_receive+file_has_size #续传的时候就要把下面的has_receive变成同名文件已经有的size长度
f = open(abpath, 'ab')
else: #不续传的情况
continue
else: #两者相等说明文件完整
self.request.sendall('filcomplete'.encode('utf8'))
print('文件完整')
continue else:
print('没有同名文件,直接传新的')
self.request.sendall('notsimilar'.encode('utf8')) #如果没有同名文件发送'notsimilar'
f=open(abpath,'wb') while has_receive<file_size: #当已经接收到的字节数小于标识里的文件长度时
data1=self.request.recv(1024) #不断进行接收,一次1024就能收掉一个字节的数据
f.write(data1) #将接收到的写入
has_receive=has_receive+len(data1)
f.close() except Exception as e:
print(e)
break
if __name__=='__main__':
s=socketserver.ThreadingTCPServer(('127.0.0.1',5038),Myserver) #threading是多线程
#myserver是通信循环
s.serve_forever() #链接循环

客户端程序:

from socket import *
import time
import json
import os
import struct
import sys
ip_port=('127.0.0.1',5038)
devicenum=5
buffer_size=1024 abpath=os.path.dirname(os.path.abspath(__file__)) #这里为客户端的绝对路径地址
targetpath=os.path.dirname(os.path.dirname(__file__))#目标接收文件的地址是客户端所在文件夹的上一级 tcp_client=socket(AF_INET,SOCK_STREAM)#第一个参数代表基于网络通信,第二个参数代表基于tcp协议
tcp_client.connect(ip_port) def show_progress(has,total): #打印进度条
rate=float(has)/float(total) #发送比例等于已经发送的除以完整的尺寸
rate_num=int(rate*100)
sys.stdout.write('%s%%\n' %rate_num) #打印百分之x
sys.stdout.flush() while 1:
has_send=0
wenjianming=input('请输入想发送的客户端绝对路径文件夹下的文件名:')
localpath=os.path.join(abpath,wenjianming) #要传输的文件的路径
if localpath =='': #防止输入为空,导致程序卡死
continue
if localpath =='quit': break
#___传输的文件的标识性内容——————
file_name=os.path.basename(localpath) #传输文件的文件名
file_size=os.stat(localpath).st_size #传输文件的大小
#____将标识性内容打包————————
data={
'file_name':file_name,
'file_size':file_size,
'targetpath':targetpath
}
tcp_client.send(json.dumps(data).encode('utf8')) #将标识性内容的字典json序列化然后传输
print('客户端已经发送标识性内容json序列',json.dumps(data).encode('utf8'))
#发送完标识内容之后需要在这等服务端的信息,看文件的状态,接收路径是否存在同名文件等
is_exist=tcp_client.recv(buffer_size)
print('收到服务端发来的消息是',is_exist.decode('utf8'))
if is_exist.decode('utf8')== 'filnotcomplete':
print('文件不完整')
choice=input('文件不完整,继续传输吗?[yes/no]')
if choice=='yes':
tcp_client.send('yes'.encode('utf8'))
continue_position=tcp_client.recv(1024).decode('utf8')
has_send=has_send+int(continue_position)
else:
continue
elif is_exist.decode('utf8')== 'filcomplete':
print('文件完整')
continue f=open(localpath,'rb') #打开本地的那个文件,用二进制来读取
while has_send<file_size:
data1=f.read(1024) #用二进制读出来然后进行发送
tcp_client.send(data1)
has_send=has_send+len(data1)
show_progress(has_send,file_size)
f.close()
print('传输成功') #输入的内容可以是dir(查看当前目录下的内容)
#ipconfig(查看ip参数)

打印某个文件夹下面的所有文件名

import os
path1='D:\study tool\python project'
file_list=os.listdir(path1)
print(file_list)

关于文件传输的校验思路:

在每次发送字节数据的时候加入对每个字节数据的哈希校验并发送去服务器端

如果服务器端校验出来和原来的哈希值不相等,就发送‘错误’之类的字符串返回

给客户端

进程与线程

进程:

本质上就是一段程序的运行过程(抽象的概念)
进程是一个程序在一个数据集上的一次动态执行过程
进程一般由程序,数据集,进程控制块组成
进程是一种管理工具,用于保存切换应用时的应用数据状态等,等切换回来的时候好重置回切换前的状态
cpu的切换有两种:1.时间轮循切换2.io阻塞切换
io阻塞切换:当某个进程有大量数据处理时会优先处理,但是当前进程空闲时cpu就去处理其他进程
时间轮循切换:多核cpu分工合作,每人偶尔抽出点时间来处理其他超出核数限制的进程。

线程:

本质是某个进程里面的小进程,线程们共用进程的同一份数据集。
线程切换,线程挂起的消耗比进程小很多
同一个进程中,多个线程可以并发。但是线程是不能够独立执行的,寄托在进程之中。

线程的简单示例:

import threading
import time def Hi(num):
print('hello %s' %num)
time.sleep(3)
print('ok') if __name__ =='__main__':
Hi(3) #可见平时是单线程往下执行
Hi(2)
Hi(1)
#接下来用线程,一号线程二号线程,主线程并发
t1=threading.Thread(target=Hi,args=(10,)) #开线程,线程目标函数是Hi函数,参数传入10
t1.start() #一号线程
t2=threading.Thread(target=Hi,args=(9,)) #开线程,线程目标函数是Hi函数,参数传入10
t2.start() #二号线程
Hi('这里是主线程') #这里是主线程

简单示例2:

import threading
import time def music():
print('begin to listen %s' %time.ctime())
time.sleep(2)
print('stop to listen %s' % time.ctime()) def lashi():
print('begin to lashi %s' %time.ctime())
time.sleep(4)
print('stop to lashi %s' % time.ctime()) if __name__ =='__main__':
t1=threading.Thread(target=music,args=()) #开线程,线程目标函数是Hi函数,参数传入10
t1.start() #一号线程
t2=threading.Thread(target=lashi,args=()) #开线程,线程目标函数是Hi函数,参数传入10
t2.start() #二号线程

等待线程执行完再往下走join:

import threading
import time def music():
print('begin to listen开始听音乐 %s' %time.ctime())
time.sleep(2)
print('stop to listen不听了 %s' % time.ctime()) def lashi():
print('begin to lashi开始拉屎 %s' %time.ctime())
time.sleep(4)
print('stop to lashi不拉了 %s' % time.ctime()) if __name__ =='__main__':
t1=threading.Thread(target=music,args=())
t2=threading.Thread(target=lashi,args=())
t1.start() #一号线程
t2.start() #二号线程
t1.join() #线程不执行完不往下走
t2.join() #线程不执行完不往下走
print('这里是主线程') #这里是主线程
time.sleep(1)
t3=threading.Thread(target=music,args=())
t4=threading.Thread(target=lashi,args=())
t3.start() #一号线程
t4.start() #二号线程
t3.join() #t3线程不执行完不往下走
print('这里是主线程') #这里是主线程
上一篇:spring-boot配置外部静态资源的方法


下一篇:AlwaysON同步性能监控的三板斧