生产者
$connConfig = array(
'host' => $this->host,
'port' => $this->port,
'login' => $this->user,
'password' => $this->password,
'vhost'=>'/' //mq的虚拟机 类似mysql里面的数据库
);
//建立RabbitMQ连接
$conn = new \AMQPConnection($connConfig);
if(!$conn->connect()){
echo '链接失败';
exit();
}
echo "连接名 ".$this->host." 成功建立连接!\n";
//创建channel(信道或者叫通道)
$channel = new \AMQPChannel($conn);
echo "创建通道完毕!\n";
//创建一个交换机 消息是根据交换机发送的mq中的
$ex = new \AMQPExchange($channel);
//声明一个路由键
$routingKey = $this->routing_key;
//声明交换机名称
$exchangeName = $this->exchange_name;
//设置交换机名称
$ex->setName($exchangeName);
//设置交换机类型 一共四种类型
$ex->setType(AMQP_EX_TYPE_DIRECT);
//设置交换机持久类型 1:不持久在磁盘 2:持久化在磁盘
$ex->setFlags(AMQP_DURABLE);
//交换机设置完后需要一个代码
$ex->declareExchange();
echo "创建通道完毕!\n";
//推送消息 推送10条
for ($i=1;$i<=10;$i++){
echo '插入消息成功:'."\n";
$msg = [
'data'=>'消息:'.$i,
];
$ex->publish(json_encode($msg),$routingKey,AMQP_NOPARAM,['delivery_mode'=>2]);
}
在浏览器或者命令行执行该方法
消费者 目前有个小问题一直没有解决 如果有小伙伴知道解决方法可以联系我 大家一起进步 感谢~!
/**
* @Notes (备注) : 消费者
* @Author (作者) : Seven
* @Date (开发时间) : 2021/10/20 16:57
* @Interface (方法名称) : consumption
*/
public function consumption(){
$connConfig = array(
'host' => $this->host,
'port' => $this->port,
'login' => $this->user,
'password' => $this->password,
'vhost'=>'/' //mq的虚拟机 类似mysql里面的数据库
);
//建立RabbitMQ连接
$conn = new \AMQPConnection($connConfig);
if(!$conn->connect()){
echo '链接失败';
exit();
}
echo "连接名 ".$this->host." 成功建立连接!\n";
//创建channel(信道或者叫通道)
$channel = new \AMQPChannel($conn);
echo "创建通道完毕!\n";
//创建一个交换机 消息是根据交换机发送的mq中的
$ex = new \AMQPExchange($channel);
//声明一个路由键
$routingKey = $this->routing_key;
//声明交换机名称
$exchangeName = $this->exchange_name;
//设置交换机名称
$ex->setName($exchangeName);
//设置交换机类型 一共四种类型
$ex->setType(AMQP_EX_TYPE_DIRECT);
//设置交换机持久类型 1:不持久在磁盘 2:持久化在磁盘
$ex->setFlags(AMQP_DURABLE);
//交换机设置完后需要一个代码
$ex->declareExchange();
echo "创建通道完毕!\n";
//创建消息队列 参数是通道的返回的对象
$query = new \AMQPQueue($channel);
//设置消息队列名称
$query->setName($this->queue_name);
//设置队列的持久
$query->setFlags(AMQP_DURABLE);
//队列设置完后需要一个代码
$query->declareQueue();
//把交换机绑定在路由键里面 参数1:交换机返回的name 参数2:路由键
$query->bind($ex->getName(),$routingKey);
echo "创建消息队列完毕!\n";
//网上资料都是第一个方法,但是我在执行的时候会返回找不到该方法所以用的第二个方法,但是第二个返回我无法打印出返回的结果
//方法一:参数为回调方法
$query->consume('receive');
//方法二:队列 consume 监听的意思 参数为方法名 消费者代码
$query->consume(function($envelope,$queue){
sleep(1);
//消息接收器 监听消息并进行处理 回调方法
echo $envelope->getBody()."\n";
$this->recordErrorLog(json_encode($envelope->getBody()));
});
}
//消息接收器 监听消息并进行处理 回调方法
public function receive($envelope){
echo $envelope->getBody()."\n";
}
/*
* 将异常写入日志
*/
private function recordErrorLog($msg)
{
Log::init([
'type' => 'File',
'path' => LOG_PATH,
'level' => ['error'],
]);
Log::record([
'错误消息' => $msg,
], 'error');
}