PHP RabbitMQ 使用

生产者

 $connConfig = array(
            'host' => $this->host,
            'port' => $this->port,
            'login' => $this->user,
            'password' => $this->password,
            'vhost'=>'/' //mq的虚拟机 类似mysql里面的数据库
        );
        //建立RabbitMQ连接
        $conn = new \AMQPConnection($connConfig);
        if(!$conn->connect()){
            echo '链接失败';
            exit();
        }
        echo "连接名 ".$this->host." 成功建立连接!\n";

        //创建channel(信道或者叫通道)
        $channel = new \AMQPChannel($conn);
        echo "创建通道完毕!\n";

        //创建一个交换机 消息是根据交换机发送的mq中的
        $ex = new \AMQPExchange($channel);
        //声明一个路由键
        $routingKey = $this->routing_key;
        //声明交换机名称
        $exchangeName = $this->exchange_name;
        //设置交换机名称
        $ex->setName($exchangeName);
        //设置交换机类型 一共四种类型
        $ex->setType(AMQP_EX_TYPE_DIRECT);
        //设置交换机持久类型 1:不持久在磁盘 2:持久化在磁盘
        $ex->setFlags(AMQP_DURABLE);
        //交换机设置完后需要一个代码
        $ex->declareExchange();
        echo "创建通道完毕!\n";

        //推送消息 推送10条
        for ($i=1;$i<=10;$i++){
            echo '插入消息成功:'."\n";
            $msg = [
                'data'=>'消息:'.$i,
            ];
            $ex->publish(json_encode($msg),$routingKey,AMQP_NOPARAM,['delivery_mode'=>2]);
        }

在浏览器或者命令行执行该方法

PHP RabbitMQ 使用

 PHP RabbitMQ 使用

 

消费者 目前有个小问题一直没有解决 如果有小伙伴知道解决方法可以联系我 大家一起进步 感谢~!

 /**
     * @Notes (备注)        : 消费者
     * @Author (作者)       : Seven
     * @Date (开发时间)      : 2021/10/20 16:57
     * @Interface (方法名称) : consumption
     */
    public function consumption(){
        $connConfig = array(
            'host' => $this->host,
            'port' => $this->port,
            'login' => $this->user,
            'password' => $this->password,
            'vhost'=>'/' //mq的虚拟机 类似mysql里面的数据库
        );
        //建立RabbitMQ连接
        $conn = new \AMQPConnection($connConfig);
        if(!$conn->connect()){
            echo '链接失败';
            exit();
        }
        echo "连接名 ".$this->host." 成功建立连接!\n";

        //创建channel(信道或者叫通道)
        $channel = new \AMQPChannel($conn);
        echo "创建通道完毕!\n";

        //创建一个交换机 消息是根据交换机发送的mq中的
        $ex = new \AMQPExchange($channel);

        //声明一个路由键
        $routingKey = $this->routing_key;
        //声明交换机名称
        $exchangeName = $this->exchange_name;
        //设置交换机名称
        $ex->setName($exchangeName);
        //设置交换机类型 一共四种类型
        $ex->setType(AMQP_EX_TYPE_DIRECT);
        //设置交换机持久类型 1:不持久在磁盘 2:持久化在磁盘
        $ex->setFlags(AMQP_DURABLE);
        //交换机设置完后需要一个代码
        $ex->declareExchange();
        echo "创建通道完毕!\n";

        //创建消息队列 参数是通道的返回的对象
        $query = new \AMQPQueue($channel);
        //设置消息队列名称
        $query->setName($this->queue_name);
        //设置队列的持久
        $query->setFlags(AMQP_DURABLE);
        //队列设置完后需要一个代码
        $query->declareQueue();
        //把交换机绑定在路由键里面 参数1:交换机返回的name 参数2:路由键
        $query->bind($ex->getName(),$routingKey);
        echo "创建消息队列完毕!\n";

        //网上资料都是第一个方法,但是我在执行的时候会返回找不到该方法所以用的第二个方法,但是第二个返回我无法打印出返回的结果
        //方法一:参数为回调方法
        $query->consume('receive');

        //方法二:队列 consume 监听的意思 参数为方法名 消费者代码
        $query->consume(function($envelope,$queue){
            sleep(1);
            //消息接收器 监听消息并进行处理 回调方法
            echo $envelope->getBody()."\n";
            $this->recordErrorLog(json_encode($envelope->getBody()));
        });
    }
    //消息接收器 监听消息并进行处理 回调方法
    public function receive($envelope){
       echo $envelope->getBody()."\n";
    }

    /*
     * 将异常写入日志
     */
    private function recordErrorLog($msg)
    {
        Log::init([
            'type' => 'File',
            'path' => LOG_PATH,
            'level' => ['error'],
        ]);
        Log::record([
            '错误消息' => $msg,
        ], 'error');
    }

上一篇:只允许在input框输入文字,不能输入数字和其他字符


下一篇:如何详细的处理异常