我正在构建一个基于python的接口,用于通过TCP从仪器中提取数据.数据流是作为特定事件而来的,并且时间安排不是很稳定:我会突发数据,然后出现缓慢的周期.它们是小的数据包,因此为简单起见,假定它们是完整的数据包.
这是我从套接字获得的行为:
>发送事件1:socket.recv返回事件1
>发送事件2:socket.recv返回事件2
>快速发送事件3-50:socket.recv仅返回事件3-30(返回27次)
>缓慢发送事件#51:套接字return.recv事件#31
>缓慢发送事件#52:套接字return.recv事件#32
没有数据丢失.但是显然在某个地方有一个缓冲区已被填充,套接字现在正在返回旧数据.但是,不应该只是继续返回直到缓冲区为空?取而代之的是,尽管已建立了数据包缓冲区,但它仅在收到新数据包时才返回.奇怪的!
这是代码的本质(这是用于非阻塞的,我也已经用recv完成了阻塞-同样的结果).为了简单起见,我剥离了所有的包装重组材料.我已经仔细地将其追溯到套接字,所以我知道这不应该受到指责.
class mysocket:
def __init__(self,ip,port):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((ip,port))
self.keepConn = True
self.socket.setblocking(0)
threading.Thread(target = self.rcvThread).start()
threading.Thread(target = self.parseThread).start()
def rcvThread(self):
while self.keepConn:
readable,writable,inError = select([self.socket],[self.socket],[],.1)
if readable:
packet = self.socket.recv(4096)
self.recvqueue.put_nowait(packet)
try:
xmitmsg = self.sendqueue.get_nowait()
except Queue.Empty:
pass
else:
if writable:
self.socket.send(xmitmsg)
def parseThread(self,rest = .1):
while self.keepConn:
try:
output = self.recvqueue.get_nowait()
eventnumber = struct.unpack('<H',output[:2]
print eventnumber
except Queue.Empty:
sleep(rest)
为什么我无法让套接字将所有数据转储到其缓冲区中?我永远追不上!这个太奇怪了.有人有指针吗?
我是一个业余爱好者,但我确实对此做完作业,完全感到困惑.
解决方法:
packet = self.socket.recv(4096)
self.recvqueue.put_nowait(packet)
TCP是基于流的协议,而不是基于消息的协议.它不保留消息边界.这意味着您不能期望每条消息有一个recv()调用.如果您以突发方式发送数据,则Nagle’s algorithm会将数据合并为一个TCP数据包.
您的代码假定每个recv()调用都返回一个“数据包”,并且解析线程将打印每个“数据包”中的第一个数字.但是recv()不返回数据包,而是从TCP流返回数据块.这些块可以包含一个消息或多个消息,甚至可以包含部分消息.不能保证前两个字节始终是事件号.
通常,从TCP连接读取数据涉及多次调用recv()并将获取的数据存储在缓冲区中.一旦收到一条完整的消息,便从缓冲区中删除适当数量的字节并进行处理.
如果您有可变长度的消息,则需要自己跟踪消息边界. TCP不会像UDP那样为您做这件事.这意味着将包含消息长度的标头添加到每个消息的开头.
try:
xmitmsg = self.sendqueue.get_nowait()
except Queue.Empty:
pass
else:
if writable:
self.socket.send(xmitmsg)
另一方面,该代码似乎有错误.无论套接字是否可写,它都会从发送队列中删除消息.如果套接字不可写,它将无声地丢弃消息.