发布/订阅
在前几节教学中,我们创建了工作队列。工作队列之后的假设是每个任务都投递给一个特定的工作进程。在这一节中,我们做一些完全不同的事情。我们会分发一个消息给多个消费者。这个模式称为" publish/subscribe"。
为了说明这种模式,我们将创建一个简单的日志系统。它包括两部分程序:第一部分会发出日志消息,第二部分接收并打印它们。
在我们的日志系统中,接收器程序的每一个运行的副本都会收到消息。这样,我们就可以运行一个接收器,并将日志定向到磁盘;同时,我们将能够运行另一个接收器,并在屏幕上看到日志。
本质上,发布的日志消息将被广播给所有接收者。
交换机
在本教程的前几部分中,我们向队列发送消息和从队列接收消息。现在是时候介绍 Rabbit 中完整的消息传递模型了。
让我们快速回顾一下之前教程的内容:
- 生产者是一个用来发送消息的用户程序。
- 队列是存储消息的缓存区。
- 消费者是接收消息的用户程序。
RabbitMQ 消息模型的核心理念是生产者从来不会直接向队列发送消息。实际上,大多数情况生产者甚至都不需要知道一个消息要发送到哪一个队列。
相反,生产者只能向向交换机发送消息。一个交换机是非常简单的。在一头,它从生产者接收消息,另一头将消息推送到队列。交换机必须确切的知道如何处理收到的消息。它应该被附加到特定的队列吗?它应该附加到多个队列么?或者是应该丢弃它。其规则由 exchange type
定义。
交换机类型有如下几种:direct
,topic
,headers
和 fanout
(广播模式)。我们将集中讨论最后一个(广播模式)。让我们创建一个这样的类型,称其为 logs
:
$channel->exchange_declare('logs', 'fanout', false, false, false);
广播交换机非常简单。正如你可能从名字中猜到的,它只是把收到的所有消息广播给它知道的所有队列。这正是我们日志所需要的。
列出交换机
要列出服务器上的交换,您可以运行命令
rabbitmqctl
:
sudo rabbitmqctl list_exchanges
在这个列表中会有一些
amq.*
交换机和默认(未命名)交换机。这些是默认创建的,但是现在不太可能用得着。默认交换机
在本教程的前几部分,我们对交换机一无所知,但是仍然能够向队列发送消息。这是因为我们使用了默认交换机,它由空字符串("")标识。
回忆一下我们之前是如何发布消息的:
$channel->basic_publish($msg, '', 'hello');
这里我们使用了默认或匿名交换机:消息被路由到routing_key
指定的队列。该关键字在basic_publish
中的第三个参数指定。
现在,我们可以发布消息到刚才命名的交换机上:
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');
临时队列
你或许记得之前我们使用队列都指定了名字(如:hello
和 task_queue
?)。给队列命名对我们来说至关重要——我们需要将工作进程指向该一个队列。当您想要在生产者和消费者之间共享队列时,给队列命名也很重要。
但对于我们的日志系统来说,情况并非如此。我们希望收到所有日志消息,而不仅仅是其中的一部分。我们也只对当前的信息流感兴趣,而不是之前的。要解决这个问题,我们需要完成两部分。
第一部分,无论何时连接到 RabbitMQ 都需要一个新的、空的队列。为此,我们可以创建一个具有随机名称的队列,或者,更好的是,让服务器为我们选择一个随机队列名称。
第二部分,一旦我们断开了消费者的连接,队列应该被自动删除。
在 php-amqplib
客户端中,当我们以空字符串的形式提供队列名称时,我们生成一个非持久化的自动生成名字的队列:
list($queue_name, ,) = $channel->queue_declare("");
当方法返回时,$queue_name
变量包含 RabbitMQ 生成的随机队列名。例如,它可能看起来像 amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
当声明它的连接关闭时,队列将被删除,因为它被声明为独占的。可以在 队列指南 中了解关于 exclusive
标志和其他队列属性的更多信息。
绑定(bindings)
我们已经创建了广播交换机和队列。现在,我们需要告诉交换机向我们的队列发送消息。交换机和队列之间的关系称为绑定。
$channel->queue_bind($queue_name, 'logs');
从现在开始,logs
交换机将把消息附加到我们的队列中。
绑定列表
您可以列出现有的绑定:
rabbitmqctl list_bindings
总结
发出日志消息的生产者程序看起来与上一个教程没有太大不同。最重要的变化是,我们现在想要将消息发布到日志交换机,而一个匿名的交换机。下面是emit_log.php
脚本的代码:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "info: Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
建立连接后,我们声明交换机。此步骤是必要的,因为发布到不存在的交换机是禁止的。
如果还没有队列绑定到交换,消息将会丢失,但这对我们来说没关系;如果没有消费者监听,我们可以安全地丢弃这条信息。
receive_logs.php
代码:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
如果想要保存日志到文件,打开控制台输入:
php receive_logs.php > logs_from_rabbit.log
如果想要在屏幕上看到日志,生成一个新的终端,并运行:
php receive_logs.php
发出日志:
php emit_log.php
使用 rabbitmqctl list_bindings
,您可以验证代码是否确实按照我们的要求创建和绑定了队列。运行两个 receive_logs.php
程序时,应该会看到如下内容:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
对结果的解释很简单:来自 logs
交换机的数据会被送到两个队列中,这两个队列名都是由服务器分配的。这正是我们想要的效果。
要了解如何监听消息子集,让我们继续学习 教程 4。
翻译自:https://www.rabbitmq.com/tutorials/tutorial-three-php.html