Publish/Subscribe(发布/订阅模式)
(using php-amqplib)
In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we‘ll do something completely different -- we‘ll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".
上一扒,我们建立一个工作队列。其隐含的假设是一个任务只派送给一个worker。这回我们会做一些完全不同的——派送信息到多个消费者,这种模式叫做“发布/订阅”。
To illustrate the pattern, we‘re going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.
为了阐述这种模式,我们来建立一个简单的日志系统。它由两个程序组成——第一个来发布日志消息,第二个来接收并打印这些消息。
In our logging system every running copy of the receiver program will get the messages. That way we‘ll be able to run one receiver and direct the logs to disk; and at the same time we‘ll be able to run another receiver and see the logs on the screen.
在我们的日志系统中,每个接收程序的运行时拷贝都可以接收到发送者(sender)发送的消息。那样呢我们就能运行一个接收程序把日志写入硬盘,同时运行另一个在屏幕查看消息。
Essentially, published log messages are going to be broadcast to all the receivers.
本质上,发布的消息将被广播到所有的接收程序(听众)。
Exchanges(交换器)
In previous parts of the tutorial we sent and received messages to and from a queue. Now it‘s time to introduce the full messaging model in Rabbit.
在之前的部分,我们从队列中发送和接收消息。现在是时候介绍一下RabbitMQ的完整的消息传递模式了。
Let‘s quickly go over what we covered in the previous tutorials:
我们快速复习一下之前涉及到的内容。
- A producer is a user application that sends messages.
- 生产者是一个发送消息的用户程序
- A queue is a buffer that stores messages.
- 队列是用来存储消息的缓冲区
- A consumer is a user application that receives messages.
- 消费者是一个接收消息的用户程序
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn‘t even know if a message will be delivered to any queue at all.
这种消息传递模式的核心思想是生产者从不直接给队列发送任何消息。实际上,很多时候生产者甚至都不知道消息是否会被
传递到队列。
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
反而,生产者仅仅是把消息发送到交换器(exchange)。交换器是个非常简单的东西。一方面它从生产者接收消息,另一方面
它将消息推送到队列。交换器必须清楚地知道如何处理收到的消息。是应该把这个消息追加到一个特定的队列?还是追加到多个队列?亦或者应将其丢弃。 要做到这个,我们可以通过交换器类型(exchange type)来定义规则。
There are a few exchange types available: direct, topic, headers and fanout. We‘ll focus on the last one -- the fanout. Let‘s create an exchange of this type, and call it logs:
集中可用的交换器类型:direct,topic,headers和fanout. 我们来看最后一个--fanout. 创建一个这种类型的交换器叫“logs".
$channel->exchange_declare(‘logs‘, ‘fanout‘, false, false, false);
The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that‘s exactly what we need for our logger.
fanout交换器倍儿简单。从它的名字你可能已经猜到,它就是把所有收到的消息广播给所有已知队列。这恰好是我们日志系统想要的。
Listing exchanges(交换器列表)
To list the exchanges on the serve you can run the ever useful rabbitmqctl:
你还是可以运行rabbitmqctl来列出服务中的交换器(还可以看到类型哦)。
$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.In this list there are some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you‘ll need to use them at the moment.
列表中有一些amq.*开头的交换器和默认(没名字内个)交换器。他们是默认创建的。但现在我们不太可能用的到。
Nameless exchange(无名氏交换器)
In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which is identified by the empty string ("").
之前那几扒,我们对交换器一无所知,但仍然能给队列发送消息(为喵?)。那很有可能我们是在用默认的交换器,就是用空字符串定义的那个啦。
Recall how we published a message before:
回忆一下我们之前咋发布消息的:
$channel->basic_publish($msg, ‘‘, ‘hello‘);Here we use the default or nameless exchange: messages are routed to the queue with the name specified by routing_key, if it exists. The routing key is the second argument to basic_publish
我们在这用默认的或无名氏交换器:如果有routing_key,消息就会被路由到routing_key指定的队列,routing_key就是 basic_publish的第二个参数。
Now, we can publish to our named exchange instead:
现在嘞,我们发布个带名字的交换器。
$channel->exchange_declare(‘logs‘, ‘fanout‘, false, false, false); $channel->basic_publish($msg, ‘logs‘);
Temporary queues(临时队列)
As you may remember previously we were using queues which had a specified name (remember hello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.
你可能还记得,之前我们用过叫hello和task_queue的队列。会命名队列对我们很重要——我们得把worker指向相同的队列(同名字的)。当你想在生产者和消费者之间共享一个队列的时候,命名一个队列的名字就显得尤为重要。
But that‘s not the case for our logger. We want to hear about all log messages, not just a subset of them. We‘re also interested only in currently flowing messages not in the old ones. To solve that we need two things.
但那不是我们日志系统要用的情景。 我们是要收听到所有的日志消息,不是一部分。同样,我们只对当前最新的消息感兴趣,而不是那些陈旧的。两件事儿来解决这个问题。
Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.
首先,无论啥时候链接到Rabbit我们都需要一个全新的、空的队列。要这么做,我们可以创建随机名字的队列,或者,
更棒的是——让rabbitmq服务来为我们选择一个随机队列。
Secondly, once we disconnect the consumer the queue should be automatically deleted.
期次,一旦断开消费者的连接,队列应该被自动删除。
In the php-amqplib client, when we supply queue name as an empty string, we create a non-durable queue with a generated name:
在php-amqplib客户端中,每当创建一个名字为空字符的队列,就相当创建一个非持久的队列,随之系统分配一个名字。
list($queue_name, ,) = $channel->queue_declare("");
When the method returns, the $queue_name variable contains a random queue name generated by RabbitMQ. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.
$queue_name接收一个由RabbitMQ随机生成的队列名。例如:可能像amq.gen-JzTY20BRgKO-HjmUJj0wLg这个样子。
When the connection that declared it closes, the queue will be deleted because it is declared as exclusive.
当连接关闭时,由于队列声明为独有的,所以会被删除掉。
Bindings(捆绑/绑定)
We‘ve already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.
我们已经创建了一个fanout类型的交换器和一个队列。现在我们得告诉交换器向这个队列发送消息。这种交换器和队列之间的
关系称之为“捆绑”。
$channel->queue_bind($queue_name, ‘logs‘);
From now on the logs exchange will append messages to our queue.
从现在起logs交换器就会把信息追加到我们的队列中了。
Listing bindings(捆绑列表)
You can list existing bindings using, you guessed it, rabbitmqctl list_bindings.
已经猜到了吧,用rabbitmqctl list_bindings来列出现存的捆绑使用。
Putting it all together(合体!!!又来!!哈哈哈)
The producer program, which emits log messages, doesn‘t look much different from the previous tutorial. The most important change is that we now want to publish messages to our logs exchange instead of the nameless one. We need to supply a routing_key when sending, but its value is ignored for fanout exchanges. Here goes the code for emit_log.php script:
发送日志消息的生产者程序看起来和之前的没太大不同。最大的不同是现在我们要发送消息到logs交换器,而不是无名氏那个。
当发送时,我们需要提供一个routing_key,但对于fanout交换器来讲,它的值是忽略的。来看看emit_log.php脚本:
<?php require_once __DIR__ . ‘/vendor/autoload.php‘; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPConnection(‘localhost‘, 5672, ‘guest‘, ‘guest‘); $channel = $connection->channel(); $channel->exchange_declare(‘logs‘, ‘fanout‘, false, false, false); $data = implode(‘ ‘, array_slice($argv, 1)); if(empty($data)) $data = "info: Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, ‘logs‘); echo " [x] Sent ", $data, "\n"; $channel->close(); $connection->close(); ?>
As you see, after establishing the connection we declared the exchange. This step is necessary as publishing to a non-existing exchange is forbidden.
如亲所见,建立连接后我们声明了一个交换器。因为发布消息到不存在的交换器是禁止的,所以这步是必须的。
The messages will be lost if no queue is bound to the exchange yet, but that‘s okay for us; if no consumer is listening yet we can safely discard the message.
然而如果没有队列捆绑到交换器,消息将会丢失,但对我们来讲没什么关系。如果还没有消费者收听,我们可以安全地丢弃这条消息。
The code for receive_logs.php:(receive_logs.php代码)
<?php require_once __DIR__ . ‘/vendor/autoload.php‘; use PhpAmqpLib\Connection\AMQPConnection; $connection = new AMQPConnection(‘localhost‘, 5672, ‘guest‘, ‘guest‘); $channel = $connection->channel(); $channel->exchange_declare(‘logs‘, ‘fanout‘, false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, ‘logs‘); echo ‘ [*] Waiting for logs. To exit press CTRL+C‘, "\n"; $callback = function($msg){ echo ‘ [x] ‘, $msg->body, "\n"; }; $channel->basic_consume($queue_name, ‘‘, false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
If you want to save logs to a file, just open a console and type:
如果你想把日志保存到文件,就打开控制台然后输入:
$ php receive_logs.php > logs_from_rabbit.log
If you wish to see the logs on your screen, spawn a new terminal and run:
如果你想在屏幕上查看日志,打开一个新的终端然后运行:
$ php receive_logs.php
And of course, to emit logs type:
当然,发布消息请输入:
$ php emit_log.php
Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two receive_logs.rb programs running you should see something like:
使用rabbitmqctl list_bindings你可以验证代码有木有真的按照我们的期许创建捆绑和队列。运行俩receive_logs.php(这里官方文档copy有误,.rb的ruby文件扩展名还没改回来),有应该会看到像这样:
$ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that‘s exactly what we intended.
结果展示简单明了:数据从logs交换器进入到两个由rabbitmq服务命名的队列中。这个和我们打算的一模一样。
To find out how to listen for a subset of messages, let‘s move on to tutorial 4
要想知道怎么收听部分消息嘛,请听下回分解~~~~~~~~~~~ :)