我在ZeroMQ上遇到问题,我相信这是因为我对它不是很熟悉.
我正在尝试建立一个非常简单的服务,其中多个客户端连接到服务器并发送查询.服务器响应此查询.
当我使用REQ-REP套接字组合(客户端使用REQ,服务器绑定到REP套接字)时,我能够在服务器端每秒收到近60,000条消息(当客户端和服务器位于同一台计算机上时).当跨机器分布时,不同机器上客户端的每个新实例线性增加服务器上每秒的消息数量,并通过足够的客户端实例轻松达到40,000个.
现在REP套接字已阻塞,因此我遵循ZeroMQ指南并使用了rrbroker模式(http://zguide.zeromq.org/cs:rrbroker):
REQ (client) <----> [server ROUTER -- DEALER --- REP (workers running on different threads)]
但是,这完全破坏了性能.跨机器运行时,服务器每秒只能收到4000条消息.不仅如此,在不同计算机上启动的每个新客户端都会降低其他每个客户端的吞吐量.
我敢肯定我在做一些愚蠢的事情.我想知道ZeroMQ专家是否可以指出任何明显的错误.谢谢!
编辑:根据建议添加代码.我正在使用clrzmq nuget包(https://www.nuget.org/packages/clrzmq-x64/)
这是客户代码.计时器计算每秒接收到多少响应.
for (int i = 0; i < numTasks; i++) { Task.Factory.StartNew(() => Client(), TaskCreationOptions.LongRunning); }
void Client()
{
using (var ctx = new Context())
{
Socket socket = ctx.Socket(SocketType.REQ);
socket.Connect("tcp://192.168.1.10:1234");
while (true)
{
socket.Send("ping", Encoding.Unicode);
string res = socket.Recv(Encoding.Unicode);
}
}
}
服务器-情况1:服务器跟踪每秒接收到多少个请求
using (var zmqContext = new Context())
{
Socket socket = zmqContext.Socket(SocketType.REP);
socket.Bind("tcp://*:1234");
while (true)
{
string q = socket.Recv(Encoding.Unicode);
if (q.CompareTo("ping") == 0) {
socket.Send("pong", Encoding.Unicode);
}
}
}
通过此设置,在服务器端,我每秒可以看到大约60,000个请求(当客户端在同一台计算机上时).在不同的计算机上时,每个新客户端都会按预期增加服务器上收到的请求数量.
服务器案例2:这实际上是ZMQ指南中的rrbroker.
void ReceiveMessages(Context zmqContext, string zmqConnectionString, int numWorkers)
{
List<PollItem> pollItemsList = new List<PollItem>();
routerSocket = zmqContext.Socket(SocketType.ROUTER);
try
{
routerSocket.Bind(zmqConnectionString);
PollItem pollItem = routerSocket.CreatePollItem(IOMultiPlex.POLLIN);
pollItem.PollInHandler += RouterSocket_PollInHandler;
pollItemsList.Add(pollItem);
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("{0}", ze.Message);
return;
}
dealerSocket = zmqContext.Socket(SocketType.DEALER);
try
{
dealerSocket.Bind("inproc://workers");
PollItem pollItem = dealerSocket.CreatePollItem(IOMultiPlex.POLLIN);
pollItem.PollInHandler += DealerSocket_PollInHandler;
pollItemsList.Add(pollItem);
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("{0}", ze.Message);
return;
}
// Start the worker pool; cant connect
// to inproc socket before binding.
workerPool.Start(numWorkers);
while (true)
{
zmqContext.Poll(pollItemsList.ToArray());
}
}
void RouterSocket_PollInHandler(Socket socket, IOMultiPlex revents)
{
RelayMessage(routerSocket, dealerSocket);
}
void DealerSocket_PollInHandler(Socket socket, IOMultiPlex revents)
{
RelayMessage(dealerSocket, routerSocket);
}
void RelayMessage(Socket source, Socket destination)
{
bool hasMore = true;
while (hasMore)
{
byte[] message = source.Recv();
hasMore = source.RcvMore;
destination.Send(message, message.Length, hasMore ? SendRecvOpt.SNDMORE : SendRecvOpt.NONE);
}
}
工作池的启动方法是:
public void Start(int numWorkerTasks=8)
{
for (int i = 0; i < numWorkerTasks; i++)
{
QueryWorker worker = new QueryWorker(this.zmqContext);
Task task = Task.Factory.StartNew(() =>
worker.Start(),
TaskCreationOptions.LongRunning);
}
Console.WriteLine("Started {0} with {1} workers.", this.GetType().Name, numWorkerTasks);
}
public class QueryWorker
{
Context zmqContext;
public QueryWorker(Context zmqContext)
{
this.zmqContext = zmqContext;
}
public void Start()
{
Socket socket = this.zmqContext.Socket(SocketType.REP);
try
{
socket.Connect("inproc://workers");
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("Could not create worker, error: {0}", ze.Message);
return;
}
while (true)
{
try
{
string message = socket.Recv(Encoding.Unicode);
if (message.CompareTo("ping") == 0)
{
socket.Send("pong", Encoding.Unicode);
}
}
catch (ZMQ.Exception ze)
{
Console.WriteLine("Could not receive message, error: " + ze.ToString());
}
}
}
}
解决方法:
您能否发布一些源代码,或者至少对测试用例进行更详细的说明?通常,构建设计的方法是一次进行一项更改,并针对每次更改进行度量.您可以始终从已知的工作设计逐步过渡到更复杂的设计.