PHP + RabbitMQ + TP6实现,记录操作全过程

目录

前言

composer安装扩展

遇到的问题 

代码展示

1.api接口,进行消息发布

2.生产者类库

3.消费者类库

4.目录结构

5.PHP中 register_shutdown_function 函数的基础介绍与用法详解

windows上运行消费者类库

1.项目根目录下执行

2.执行后代码展示

3.修改下config/console.php

4.项目根目录下执行

5.效果展示

6.tp6.0自定义指令


前言

昨天用树莓派搭建的rabbitmq今天就迫不及待的上手试试了!哈哈哈,(附上树莓派搭建步骤树莓派(Raspberry Pi)上安装RabbitMQ(一)_zk_jy520的博客-CSDN博客)赶紧熬夜把操作过程整理了一遍分享给大家,哪个小伙伴想试试的话,也可以按照这个这个步骤来,能帮到的话,记得点赞收藏哦,没有帮到,也可以点赞收藏,给我点鼓励呀!!!

composer安装扩展

1.composer安装
composer require  php-amqplib/php-amqplib

遇到的问题 

代码展示

1.api接口,进行消息发布

    //RabbitMQ使用
    public function test4(){

        $client = Producer::getInstance();

        $res = $client->publishMsg('exchange_name',['queue_name'],'111111','','');
        dump($res);
    }

发送成功

PHP + RabbitMQ + TP6实现,记录操作全过程

2.生产者类库

<?php


namespace services\rabbitmq;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;

/**生产者类
 * Class Producer
 * @package services\rabbitmq
 */
class Producer
{

    private $host;
    private $port;
    private $user;
    private $pwd;
    private $vhost;

    private static $client;

    private static $instance;

    private function __construct($host,$port,$user,$pwd,$vhost)
    {
        if (empty($host) || empty($port) || empty($user) || empty($pwd) || empty($vhost)){
            $info = config('config')['RabbitMQ'];
            $this->host = $info['address'];
            $this->port = $info['port'];
            $this->user = $info['user'];
            $this->pwd = $info['pwd'];
            $this->vhost = $info['vhost'];
        }else{
            $this->host = $host;
            $this->port = $port;
            $this->user = $user;
            $this->pwd = $pwd;
            $this->vhost = $vhost;
        }
        self::$client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost);
    }

    public static function getInstance($host = '',$port = '',$user = '',$pwd = '',$vhost = '') {
        if (!(self::$instance instanceof self)) {
            self::$instance = new self($host,$port,$user,$pwd,$vhost);
        }
        return self::$instance;
    }

    public function publishMsg($exchange,$QueueArr,$msg,$message_id,$route_key = '',$expiration = 3600 * 90){
        $channel = self::$client->channel();

        /**交换机声明
         * $exchange 交换机名称
         * AMQPExchangeType::DIRECT 路由模式
         * passive: false
         * durable: true  持久化  交换器将在服务器重启后继续存在
         * auto_delete: false  一旦通道关闭,交换器将不会被删除
         */
        $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false, false, false, [], null);

        //绑定多个对列
        foreach ($QueueArr as $key=>$value){

            /**声明队列(设置队列的时间必须设置一次,如要修改需要删除这个队列)new AMQPTable(['x-message-ttl'=>10000])
             * $value  队列名称
             * passive false
             * 持久durable true 队列将在服务器重启后继续存在
             * 互斥exclusive false  队列可以通过其他渠道访问
             * auto_delete false 通道关闭后,队列不会被删除
             */
            $channel->queue_declare($value,false,true,false,false,false);
            //队列和交换机绑定
            $channel->queue_bind($value, $exchange,$route_key);
        }

        //发送消息
        $message = new AMQPMessage($msg,array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,'expiration'=>$expiration * 1000,'message_id'=>$message_id));
        $channel->basic_publish($message,$exchange,$route_key);
        $channel->close();
        self::$client->close();
        return true;
    }


}

3.消费者类库

<?php


namespace services\rabbitmq;


use PhpAmqpLib\Connection\AMQPStreamConnection;

class Consumer
{

    private $host;
    private $port;
    private $user;
    private $pwd;
    private $vhost;

    private $client;
    private $channel;

    public function __construct()
    {
        $info = config('config')['RabbitMQ'];
        $this->host = $info['address'];
        $this->port = $info['port'];
        $this->user = $info['user'];
        $this->pwd = $info['pwd'];
        $this->vhost = $info['vhost'];

        $this->client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost);
        $this->channel = $this->client->channel();
    }

    public function start(){

        /**
         *
         * queue: queue_name    // 被消费的队列名称
         * consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端
         * no_local: false      // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现
         * no_ack: true         // 收到消息后,是否不需要回复确认即被认为被消费
         * exclusive: false     // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
         * nowait: false        // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
         * callback: $callback  // 回调逻辑处理函数
         *
         */
        $this->channel->basic_consume('queue_name','',false, false, false, false,[$this,'process_message']);
        register_shutdown_function([$this, 'shutdown'], $this->channel, $this->client);

        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }

    }

    public function shutdown($channel, $connection){
        $channel->close();
        $connection->close();
        save_log('close');
    }

    public function process_message($message){
        echo  $message->body."\n";
        //手动发送ack
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        // Send a message with the string "quit" to cancel the consumer.
        if ($message->body === 'quit') {
            $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
        }
    }


}

4.目录结构

PHP + RabbitMQ + TP6实现,记录操作全过程

考虑到大家会不理解这个函数哈,下面这链接详解,通俗易懂

5.PHP中 register_shutdown_function 函数的基础介绍与用法详解

参考链接:https://www.jb51.net/article/129213.htm

windows上运行消费者类库

1.项目根目录下执行

php think make:command Consumer

PHP + RabbitMQ + TP6实现,记录操作全过程

2.执行后代码展示

<?php
declare (strict_types = 1);

namespace app\command;

use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

/**项目根目录下执行 php think make:command Consumer 会在command目录下生成Consumer.php文件
 * Class Consumer
 * @package app\command
 */
class Consumer extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('consumer')
            ->setDescription('the consumer command');        
    }

    protected function execute(Input $input, Output $output)
    {

        $consumer = new \services\rabbitmq\Consumer();

        $consumer->start();

    	// 指令输出
    	$output->writeln('consumer');
    }
}

3.修改下config/console.php

添加下面代码

<?php
// +----------------------------------------------------------------------
// | 控制台配置
// +----------------------------------------------------------------------
return [
    // 指令定义
    'commands' => [
        'consumer' => 'app\command\Consumer',
    ],
];

4.项目根目录下执行

php think consumer

5.效果展示

PHP + RabbitMQ + TP6实现,记录操作全过程

6.tp6.0自定义指令

参考链接:自定义指令 · ThinkPHP6.0完全开发手册 · 看云 

上一篇:PHP tp6 订单页面


下一篇:tp6命令行生成模型(多应用)