运行环境:https://oneinstack.com/install/
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。如发送短信、邮件、过滤非法关键字等等。它还可以用于RPC。
先看一张官方图:
一、概念:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
二、安装RabbitMQ
Ubuntu:
sudo apt-get install erlang
sudo apt-get install rabbitmq-server
CentOS:
1.先安装erlang
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
yum -y install ncurses-devel
yum install ncurses-devel
cd /usr/local
wget http://www.erlang.org/download/otp_src_17.5.tar.gz
tar -xzvf otp_src_17.5.tar.gz
cd otp_src_17.5
./configure --without-javac
make && make install
测试一下是否安装成功,在控制台输入命令erl
安装完后输入“erl”以下提示即为安装成功
[root@cloud bin]# erl
Erlang R16B02 (erts-5.10.3) [source] [64-bit] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false] Eshell V5.10.3 (abort with ^G)
1>
2.安装rabbitmq
cd /usr/local
tar -zxvf rabbitmq-server-3.5.1.tar.gz
cd rabbitmq-server-3.5.1
make
make TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man DOC_INSTALL_DIR=/usr/local/rabbitmq/doc install
报错处理:
/bin/sh: xmlto: command not found
/bin/sh: line 2: xmlto: command not found
解决:yum install xmlto
三、管理命令
切换到安装目录,sbin文件下才能执行命令,如:cd /usr/local/rabbitmq/sbin/
启动:./rabbitmq-server start
后台启动:./rabbitmq-server -detached
关闭:./rabbitmqctl stop
状态:./rabbitmqctl status
四、插件
启动web管理插件,切换到安装目录,sbin文件下才能执行命令,如:cd /usr/local/rabbitmq/sbin/
./rabbitmq-plugins enable rabbitmq_management
错误解决:
Error: {cannot_write_enabled_plugins_file,"/etc/rabbitmq/enabled_plugins", enoent}
mkdir /etc/rabbitmq
重新启动输入地址:localhost:15672,帐号默认为guest,密码guest,此帐号默认只能在本机访问。不建议打开远程访问。你可以创建一个帐户,并设置可以远程访问的角色进行访问。
如:./rabbitmqctl add_user luo 123456
./rabbitmqctl set_user_tags luo administrator
五、用户管理
默认的guest帐户相当于root帐户
rabbitmqctl add_user username password 添加帐户
rabbitmqctl change_password username newpassword 修改密码
rabbitmqctl delete_user username 删除帐户
rabbitmqctl list_users 列出所有帐户
rabbitmqctl set_user_tags User Tag 设置角色(administrator、monitoring、policymaker、management、其它)
立即生效,不需重启
六、安装PHP扩展(rabbitmq-c,amqp)
1.安装rabbitmq-c
cd /usr/local
wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz
tar zxvf rabbitmq-c-0.8.0.tar.gz
cd rabbitmq-c-0.8.0
autoreconf -i
./configure --prefix=/usr/local/rabbitmq-c
最后显示一下内容表示正常
rabbitmq-c build options:
Host: x86_64-unknown-linux-gnu
Version: 0.4.1
SSL/TLS: openssl
Tools: yes
Documentation: no
Examples: yes
然后进行make和安装了.
make && make install
2.安装amqp
cd /usr/local
wget https://pecl.php.net/get/amqp-1.9.1.tgz
tar -zxvf amqp-1.9.1.tgz
cd amqp-1.9.1
/usr/local/php/bin/phpize( 可使用find / -name phpize查找phpize路径 )
./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c(可使用find / -name php-config查找php-config路径)
make && make install
在php.ini中extenstion部分写入extension=amqp.so
重启php,/etc/init.d/php-fpm restart
最后phpinfo检查搜索amqp是否成功
七、使用PHP与之交互
Produce端:
Consumer端:
//设置连接属性
$connArgs = array(
'host' => 'localhost',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
); //创建RabbitMQ连接
$conn = new AMQPConnection($connArgs);
if (!$conn->connect()) {
echo 'cannot connect to the broker';
} //创建channel
$channel = new AMQPChannel($conn); //设置队列名称
$exchangeName = 'exchange';
$routingKey = 'key_test';
$queueName = 'queue_test_1';
$queue = new AMQPQueue($channel);
$queue->setName($queueName);//设置名称
$queue->setFlags(AMQP_DURABLE);//持久化队列,当代理重启动后依然存在,并包括它们中的完整数据
$queue->declare();//声明此队列
$queue->bind('exchange',$routingKey);//使用某交换机,并绑定某路由关键字 //消费数据方式一:非阻塞方式
// while(true)
// {
// sleep(1);
// $envelope = $queue->get(AMQP_AUTOACK) ;//第一个参数表示自动ACK应答
// if ($envelope){
// $messages = $envelope->getBody();
// echo $messages;
// }
// } //消费数据方式二:以阻塞模式消费数据(推荐)
while(true)
{
$queue->consume('processMessage');//第一个参数表示要回调的方法名,第二个参数设置为AMQP_AUTOACK,表示自动ACK应答
}
/**
* 定义回调方法
*/
function processMessage($envelope, $queue)
{
$messages = $envelope->getBody();#获取消息数据
echo $messages;
$queue->ack($envelope->getDeliveryTag()); //处理成功后,手动发送ACK应答
//$queue->nack($envelope->getDeliveryTag()); //处理不成功,手动发送NO-ACK应答,放回队列中
}
八:特别说明:
1.如果某一次消费数据没有ACK,则此条消息会记录为Unacknowledged,如果对于某一节点有连续三条则RabbitMQ认为此节点有故障,则不会再对它进行分发.当它断开后或一定时间后Unacknowledged状态消息会重新放回交换机中。手动no-ack后此Message将会放回队列中且不会再转发给此节点。
2.对于计算密集型的工作,我们需要建立多个Consumer,对于多个Consumer,默认的分发机制是“公平分发”,将第n个发给第n个Consumer,超过个数取n的模。
3.Exchange中的类型有:direct, topic 和fanout。
direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息。
topic :所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息,direct的区别是它的routingkey可 以模糊匹配,#代表一个或多个字符,*代表任何字符。一般为确保程序严谨性而使用direct。注意当使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
fanout:是广播模式,所有bind到此exchange的queue都可以接收消息,即该一个Message可以对应多个Consumer,应用场景举例:生产了了添加到购物车的Message,一个Consumer写推荐商品日志,一个Consumer写购物车表。
4.两个不相同的queue名的队列绑定到同一exchange和rotingky上,此时相当于同一queue名的fanout广播模式,两个queue都会得都到
所有Message。如图所示:
5.RPC的实现流程: