目录
5.PHP中 register_shutdown_function 函数的基础介绍与用法详解
前言
昨天用树莓派搭建的rabbitmq今天就迫不及待的上手试试了!哈哈哈,(附上树莓派搭建步骤树莓派(Raspberry Pi)上安装RabbitMQ(一)_zk_jy520的博客-CSDN博客)赶紧熬夜把操作过程整理了一遍分享给大家,哪个小伙伴想试试的话,也可以按照这个这个步骤来,能帮到的话,记得点赞收藏哦,没有帮到,也可以点赞收藏,给我点鼓励呀!!!
composer安装扩展
1.composer安装
composer require php-amqplib/php-amqplib
遇到的问题
代码展示
1.api接口,进行消息发布
//RabbitMQ使用
public function test4(){
$client = Producer::getInstance();
$res = $client->publishMsg('exchange_name',['queue_name'],'111111','','');
dump($res);
}
发送成功
2.生产者类库
<?php
namespace services\rabbitmq;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
/**生产者类
* Class Producer
* @package services\rabbitmq
*/
class Producer
{
private $host;
private $port;
private $user;
private $pwd;
private $vhost;
private static $client;
private static $instance;
private function __construct($host,$port,$user,$pwd,$vhost)
{
if (empty($host) || empty($port) || empty($user) || empty($pwd) || empty($vhost)){
$info = config('config')['RabbitMQ'];
$this->host = $info['address'];
$this->port = $info['port'];
$this->user = $info['user'];
$this->pwd = $info['pwd'];
$this->vhost = $info['vhost'];
}else{
$this->host = $host;
$this->port = $port;
$this->user = $user;
$this->pwd = $pwd;
$this->vhost = $vhost;
}
self::$client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost);
}
public static function getInstance($host = '',$port = '',$user = '',$pwd = '',$vhost = '') {
if (!(self::$instance instanceof self)) {
self::$instance = new self($host,$port,$user,$pwd,$vhost);
}
return self::$instance;
}
public function publishMsg($exchange,$QueueArr,$msg,$message_id,$route_key = '',$expiration = 3600 * 90){
$channel = self::$client->channel();
/**交换机声明
* $exchange 交换机名称
* AMQPExchangeType::DIRECT 路由模式
* passive: false
* durable: true 持久化 交换器将在服务器重启后继续存在
* auto_delete: false 一旦通道关闭,交换器将不会被删除
*/
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false, false, false, [], null);
//绑定多个对列
foreach ($QueueArr as $key=>$value){
/**声明队列(设置队列的时间必须设置一次,如要修改需要删除这个队列)new AMQPTable(['x-message-ttl'=>10000])
* $value 队列名称
* passive false
* 持久durable true 队列将在服务器重启后继续存在
* 互斥exclusive false 队列可以通过其他渠道访问
* auto_delete false 通道关闭后,队列不会被删除
*/
$channel->queue_declare($value,false,true,false,false,false);
//队列和交换机绑定
$channel->queue_bind($value, $exchange,$route_key);
}
//发送消息
$message = new AMQPMessage($msg,array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,'expiration'=>$expiration * 1000,'message_id'=>$message_id));
$channel->basic_publish($message,$exchange,$route_key);
$channel->close();
self::$client->close();
return true;
}
}
3.消费者类库
<?php
namespace services\rabbitmq;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Consumer
{
private $host;
private $port;
private $user;
private $pwd;
private $vhost;
private $client;
private $channel;
public function __construct()
{
$info = config('config')['RabbitMQ'];
$this->host = $info['address'];
$this->port = $info['port'];
$this->user = $info['user'];
$this->pwd = $info['pwd'];
$this->vhost = $info['vhost'];
$this->client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost);
$this->channel = $this->client->channel();
}
public function start(){
/**
*
* queue: queue_name // 被消费的队列名称
* consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端
* no_local: false // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现
* no_ack: true // 收到消息后,是否不需要回复确认即被认为被消费
* exclusive: false // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
* nowait: false // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
* callback: $callback // 回调逻辑处理函数
*
*/
$this->channel->basic_consume('queue_name','',false, false, false, false,[$this,'process_message']);
register_shutdown_function([$this, 'shutdown'], $this->channel, $this->client);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function shutdown($channel, $connection){
$channel->close();
$connection->close();
save_log('close');
}
public function process_message($message){
echo $message->body."\n";
//手动发送ack
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
}
}
}
4.目录结构
考虑到大家会不理解这个函数哈,下面这链接详解,通俗易懂
5.PHP中 register_shutdown_function 函数的基础介绍与用法详解
参考链接:https://www.jb51.net/article/129213.htm
windows上运行消费者类库
1.项目根目录下执行
php think make:command Consumer
2.执行后代码展示
<?php
declare (strict_types = 1);
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
/**项目根目录下执行 php think make:command Consumer 会在command目录下生成Consumer.php文件
* Class Consumer
* @package app\command
*/
class Consumer extends Command
{
protected function configure()
{
// 指令配置
$this->setName('consumer')
->setDescription('the consumer command');
}
protected function execute(Input $input, Output $output)
{
$consumer = new \services\rabbitmq\Consumer();
$consumer->start();
// 指令输出
$output->writeln('consumer');
}
}
3.修改下config/console.php
添加下面代码
<?php
// +----------------------------------------------------------------------
// | 控制台配置
// +----------------------------------------------------------------------
return [
// 指令定义
'commands' => [
'consumer' => 'app\command\Consumer',
],
];
4.项目根目录下执行
php think consumer
5.效果展示
6.tp6.0自定义指令
参考链接:自定义指令 · ThinkPHP6.0完全开发手册 · 看云