这里有一些我目前的设置.
> REST API将数据推送(队列)到队列中
> Queue有一个消费者,它始终在运行并生成到Exchange
> Exchange路由到其他几个队列(如20)
>每个(20)队列都执行特定任务(消费者总是运行)
> Cron作业运行以检查是否所有(20)任务都已完成并生成另一个队列
我不确定我喜欢消费者一直在运行,因为每个消费者使用大约300MB的Ram(我认为它是MB,现在不在我面前)而且我正在寻找另一个实现.
M <-- Message coming from REST API
|
|
+-First Queue
|
|
| <-- The Exchange
/|\
/ | \
/ | \ <-- bind to multiple queues ( 20+ )
Q1 Q2 Q3 <-- Each Queue is a task that must be completed
| <-- CRON runs to check if all queues above have completed
|
|
Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
|
C <-- Consumer
我在下面的相关问题建议使用RPC,但问题是RPC(据我所知)将有多个实例.这是一个资源紧张的过程,我认为通过添加RPC调用它只会使服务器陷入困境然后变得没有响应(如果我错了请纠正我).
另一种方法是使用聚合器模式
> http://www.eaipatterns.com/Aggregator.html
这看起来正是我需要的,但我发现文档有限.有人做过这种模式吗?
我的问题是我对它目前的实施方式不满意,我正在寻找改进流程的方法.我正在寻找摆脱CRON,实现一个新的模式,并没有让消费者一直运行.
该过程目前还仅支持每个消费者的单个实例.它可以有多个消费者但我们如何实现它我们当时只想要一个.
这是使用RabbitMQBundle在PHP,Symfony2 Framework中实现的
相关问题:
> RabbitMQ wait for multiple queues to finish
解决方法:
OldSound在这里,是RabbitMQ Bundle的创造者.
bundle本身不支持开箱即用的Aggregator模式,但你可以使用底层的php-amqplib来实现它.
要执行聚合,您需要发布具有相关ID的消息和沿处理链的id的线程.然后,聚合器将根据您必须处理该特定任务的不同工作者的数量等待X消息量.等待消息的一种方法是让一个数组保留它们,因为它们被相关id索引.
因此,无论何时有传入消息,您都可以:
$correlation_id = $msg->get('correlation_id');
$this->receivedMessages[$correlation_id]['msgs'][] = $msg;
然后你去的地方:
if ($someNumber == count($this->receivedMessages[$correlation_id]['msgs']) {
// proceed to next step
}
我现在正在为Symfony开发一个Workflow捆绑包,我计划很快就开源.该捆绑包可用于以非常简单的方式实现您呈现的用例(即,您只需要为每个任务提供服务).
现在我想知道为什么每个消费者需要300 MB的RAM?你需要用它们运行完整的堆栈框架吗?如果可能的话,为消费者应用程序创建一个新的Symfony内核,只加载你需要的内容以减少开销.