php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)

1、AMQP_EX_TYPE_DIRECT:直连型

直连型又包括: 1对1 和1对N(N对1、 N对N)

php amqp 消息队列 RabbitMQ 交换器类型 直连 (三) 

接收端receive.php代码如下
<?php

$connect = new AMQPConnection();
$connect->connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName(‘logs‘);
$queue->declare();

$queue->bind(‘exchange‘, ‘logs‘);

while (true) {
    $queue->consume(‘callback‘);
}

$connection->close();

function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    $queue->nack($envelope->getDeliveryTag());
}

发送端send.php代码如下

<?php

$connect = new AMQPConnection();
$connect->connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$exchange->publish(‘direct type test‘,‘logs‘);
var_dump("Send Message OK");

$connect->disconnect();

运行结果如图所示

php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)



php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)
创建receive_one.php和receive_two.php 并把send.php代码改成如下代码方便我们观看
receive_one.php 和 receive_two.php 代码相同 或者用dos运行多个接收端
<?php

$connect = new AMQPConnection();
$connect->connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName(‘logs‘);
@$queue->declare();

$queue->bind(‘exchange‘, ‘logs‘);

while (true) {
    $queue->consume(‘callback‘);
}

$connection->close();

function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    $queue->nack($envelope->getDeliveryTag());
}




send.php
<?php

$connect = new AMQPConnection();
$connect->connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);
$exchange->setName(‘exchange‘);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

for ($index = 1; $index < 5; $index++) {
    $exchange->publish($index,‘logs‘);
    var_dump("Send:$index");
}

$exchange->delete();
$connect->disconnect();

运行结果如下
php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)

列队会把消息分配给每一个接收端分配处理这里看似完美但是如果想要更好的处理不同的任务就需要 公平调度

比如当1、3处理的都是简单的人 2、4都是处理的复杂的任务 如果任务过多时 receive_one.php是空闲的而receive_two.php是任务繁重的
我们进行如下测试
send.php改成5改成50
for ($index = 1; $index < 50; $index++) {
    $exchange->publish($index,‘logs‘);
    var_dump("Send:$index");
}

receive_two.php 加上 sleep(3)
function callback($envelope, $queue) {
    var_dump($envelope->getBody());
    sleep(3);
    $queue->nack($envelope->getDeliveryTag());
}

我们运行程序结果如下

php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)

receive_one全部运行完而receive_two才运行一个 之后receive_one一直空闲
我们可以通过 在接收端设置
$channel->setPrefetchCount(1);
任务没人完成前不接收新的消息把消息发送给其他接收端

如下receive_one.php 和 receive_two.php
$channel = new AMQPChannel($connect);
改成如下
$channel = new AMQPChannel($connect);
$channel->setPrefetchCount(1);




php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)

上一篇:开源项目Elastic Shell发布,基本管理


下一篇:vim 常用