我们有一个充当服务器的Java应用程序.客户端应用程序(用C#编写)正在使用ZeroMQ与之通信.我们(主要是)遵循“懒惰海盗”模式.
该服务器具有一个路由器套接字,实现如下(使用JeroMQ):
ZContext context = new ZContext();
Socket socket = context.createSocket(ZMQ.ROUTER);
socket.bind("tcp://*:5555");
客户端连接并发送消息,如下所示:
ZContext context = ZContext.Create();
ZSocket socket = ZSocket.Create(context, ZSocketType.REQ);
socket.Identity = Encoding.UTF8.GetBytes("Some identity");
socket.Connect("tcp://my_host:5555");
socket.Send(new ZFrame("request data"));
当多个客户端同时发送消息时,我们会遇到丢失消息的情况.对于单个客户端,似乎没有任何问题.
我们是否为多客户端单服务器设置实施了正确的方法?
更新:示例客户端和服务器表现出这种行为:
服务器:
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;
public class SimpleServer
{
public static void main(String[] args) throws InterruptedException
{
ZContext context = new ZContext();
Socket socket = context.createSocket(ZMQ.ROUTER);
socket.setRouterMandatory(true);
socket.bind("tcp://*:5559");
PollItem pollItem = new PollItem(socket, Poller.POLLIN);
int messagesReceived = 0;
int pollCount = 0;
while ((pollCount = ZMQ.poll(new PollItem[]{pollItem}, 3000)) > -1)
{
messagesReceived += pollCount;
for (int i = 0 ; i < pollCount ; i++)
{
ZMsg msg = ZMsg.recvMsg(socket);
System.out.println(String.format("Received message: %s. Total messages received: %d", msg, messagesReceived));
}
if (pollCount == 0)
{
System.out.println(String.format("No messages on socket. Total messages received: %d", messagesReceived));
}
}
}
}
客户:
using NetMQ;
using System;
using System.Text;
namespace SimpleClient
{
class Program
{
static byte[] identity = Encoding.UTF8.GetBytes("id" + DateTime.UtcNow.Ticks);
static void Main(string[] args)
{
for (int i = 0; i < 100; i++)
{
SendMessage();
}
}
private static void SendMessage()
{
using (NetMQContext context = NetMQContext.Create())
{
using (NetMQSocket socket = context.CreateRequestSocket())
{
socket.Options.Identity = identity;
socket.Connect("tcp://localhost:5559");
socket.Send(Encoding.UTF8.GetBytes("hello!"));
}
}
}
}
}
如果我运行服务器和单个客户端,则可以看到所有100条消息都到达了.如果我同时运行5个客户端,那么我只能得到200个左右. 300条消息到达,而不是全部500条消息.顺便说一句,关闭客户端中的套接字似乎以某种方式阻止了服务器上的路由器套接字短暂接收消息,尽管这只是一个理论.
解决方法:
第1部分-民意调查可能会返回多个事件
ZMQ.poll()返回发现的事件数:
int rc = ZMQ.poll(new PollItem[]{pollItem}, 3000);
您目前假设民意测验的回报是一项事件.相反,您应该遍历ZMsg msg = ZMsg.recvMsg(socket); ZMQ.Poll()返回所指示的事件数.
/**
* Polling on items. This has very poor performance.
* Try to use zmq_poll with selector
* CAUTION: This could be affected by jdk epoll bug
*
* @param items
* @param timeout
* @return number of events
*/
public static int zmq_poll(PollItem[] items, long timeout)
{
return zmq_poll(items, items.length, timeout);
}
第2部分-ZMsg.receive()可能返回多个帧
当您从ZMsg msg = ZMsg.recvMsg(socket);收到ZMsg时,ZMsg可能包含多个ZFrame,每个ZFrame都包含客户端数据.
从ZMsg
class in JeroMQ’s source的评论中:
* // Receive message from ZMQSocket "input" socket object and iterate over frames
* ZMsg receivedMessage = ZMsg.recvMsg(input);
* for (ZFrame f : receivedMessage) {
* // Do something with frame f (of type ZFrame)
* }
第3部分-消息可以拆分为多个ZFrame
From ZFrame’s source in JeroMQ:
* The ZFrame class provides methods to send and receive single message
* frames across 0MQ sockets. A 'frame' corresponds to one underlying zmq_msg_t in the libzmq code.
* When you read a frame from a socket, the more() method indicates if the frame is part of an
* unfinished multipart message.
如果我正确理解这一点,那么对于每个事件,您可能会获得多个帧,并且一条客户端消息可能会映射到1..N帧(如果消息很大?).
总结一下:
>民意测验的一次回报可能表示多个事件.
>一个事件,因此一个ZMsg.receive()可能包含多个帧
>一帧可以包含一条完整的客户消息或仅一部分客户消息;一条客户消息映射到1..N帧.