RabbitMQ Part3 (Publish/Subscribe)

发布/订阅

在前几节教学中,我们创建了工作队列。工作队列之后的假设是每个任务都投递给一个特定的工作进程。在这一节中,我们做一些完全不同的事情。我们会分发一个消息给多个消费者。这个模式称为" publish/subscribe"。

为了说明这种模式,我们将创建一个简单的日志系统。它包括两部分程序:第一部分会发出日志消息,第二部分接收并打印它们。

在我们的日志系统中,接收器程序的每一个运行的副本都会收到消息。这样,我们就可以运行一个接收器,并将日志定向到磁盘;同时,我们将能够运行另一个接收器,并在屏幕上看到日志。

本质上,发布的日志消息将被广播给所有接收者。

交换机

在本教程的前几部分中,我们向队列发送消息和从队列接收消息。现在是时候介绍 Rabbit 中完整的消息传递模型了。

让我们快速回顾一下之前教程的内容:

  • 生产者是一个用来发送消息的用户程序。
  • 队列是存储消息的缓存区。
  • 消费者是接收消息的用户程序。

RabbitMQ 消息模型的核心理念是生产者从来不会直接向队列发送消息。实际上,大多数情况生产者甚至都不需要知道一个消息要发送到哪一个队列。

相反,生产者只能向向交换机发送消息。一个交换机是非常简单的。在一头,它从生产者接收消息,另一头将消息推送到队列。交换机必须确切的知道如何处理收到的消息。它应该被附加到特定的队列吗?它应该附加到多个队列么?或者是应该丢弃它。其规则由 exchange type 定义。  
RabbitMQ Part3 (Publish/Subscribe)
交换机类型有如下几种:directtopicheadersfanout(广播模式)。我们将集中讨论最后一个(广播模式)。让我们创建一个这样的类型,称其为 logs:

$channel->exchange_declare('logs', 'fanout', false, false, false);

广播交换机非常简单。正如你可能从名字中猜到的,它只是把收到的所有消息广播给它知道的所有队列。这正是我们日志所需要的。

列出交换机

要列出服务器上的交换,您可以运行命令 rabbitmqctl:

  • sudo rabbitmqctl list_exchanges

在这个列表中会有一些 amq.*交换机和默认(未命名)交换机。这些是默认创建的,但是现在不太可能用得着。

默认交换机

在本教程的前几部分,我们对交换机一无所知,但是仍然能够向队列发送消息。这是因为我们使用了默认交换机,它由空字符串("")标识。
回忆一下我们之前是如何发布消息的:

  • $channel->basic_publish($msg, '', 'hello');
    这里我们使用了默认或匿名交换机:消息被路由到 routing_key 指定的队列。该关键字在 basic_publish 中的第三个参数指定。

现在,我们可以发布消息到刚才命名的交换机上:

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

临时队列

你或许记得之前我们使用队列都指定了名字(如:hellotask_queue?)。给队列命名对我们来说至关重要——我们需要将工作进程指向该一个队列。当您想要在生产者和消费者之间共享队列时,给队列命名也很重要。

但对于我们的日志系统来说,情况并非如此。我们希望收到所有日志消息,而不仅仅是其中的一部分。我们也只对当前的信息流感兴趣,而不是之前的。要解决这个问题,我们需要完成两部分。

第一部分,无论何时连接到 RabbitMQ 都需要一个新的、空的队列。为此,我们可以创建一个具有随机名称的队列,或者,更好的是,让服务器为我们选择一个随机队列名称。

第二部分,一旦我们断开了消费者的连接,队列应该被自动删除。

php-amqplib 客户端中,当我们以空字符串的形式提供队列名称时,我们生成一个非持久化的自动生成名字的队列:

list($queue_name, ,) = $channel->queue_declare("");

当方法返回时,$queue_name 变量包含 RabbitMQ 生成的随机队列名。例如,它可能看起来像 amq.gen-JzTY20BRgKO-HjmUJj0wLg

当声明它的连接关闭时,队列将被删除,因为它被声明为独占的。可以在 队列指南 中了解关于 exclusive 标志和其他队列属性的更多信息。

绑定(bindings)

RabbitMQ Part3 (Publish/Subscribe)
我们已经创建了广播交换机和队列。现在,我们需要告诉交换机向我们的队列发送消息。交换机和队列之间的关系称为绑定

$channel->queue_bind($queue_name, 'logs');

从现在开始,logs 交换机将把消息附加到我们的队列中。

绑定列表

您可以列出现有的绑定:
rabbitmqctl list_bindings

总结

RabbitMQ Part3 (Publish/Subscribe)
发出日志消息的生产者程序看起来与上一个教程没有太大不同。最重要的变化是,我们现在想要将消息发布到日志交换机,而一个匿名的交换机。下面是emit_log.php 脚本的代码:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "info: Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'logs');

echo ' [x] Sent ', $data, "\n";

$channel->close();
$connection->close();

建立连接后,我们声明交换机。此步骤是必要的,因为发布到不存在的交换机是禁止的。

如果还没有队列绑定到交换,消息将会丢失,但这对我们来说没关系;如果没有消费者监听,我们可以安全地丢弃这条信息。

receive_logs.php 代码:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('logs', 'fanout', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

如果想要保存日志到文件,打开控制台输入:

php receive_logs.php > logs_from_rabbit.log

如果想要在屏幕上看到日志,生成一个新的终端,并运行:

 php receive_logs.php

发出日志:

php emit_log.php

使用 rabbitmqctl list_bindings,您可以验证代码是否确实按照我们的要求创建和绑定了队列。运行两个 receive_logs.php 程序时,应该会看到如下内容:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

对结果的解释很简单:来自 logs 交换机的数据会被送到两个队列中,这两个队列名都是由服务器分配的。这正是我们想要的效果。

要了解如何监听消息子集,让我们继续学习 教程 4

翻译自:https://www.rabbitmq.com/tutorials/tutorial-three-php.html

上一篇:响应式web-mqtt应用


下一篇:larvel使用debugbar