RabbitMQ的安装与基本使用

  运行环境:https://oneinstack.com/install/

在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。如发送短信、邮件、过滤非法关键字等等。它还可以用于RPC。

先看一张官方图:

RabbitMQ的安装与基本使用

 

一、概念:

     Broker:简单来说就是消息队列服务器实体。
   Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
   Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
   Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
   Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
   vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
   producer:消息生产者,就是投递消息的程序。
   consumer:消息消费者,就是接受消息的程序。
   channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
二、安装RabbitMQ
     Ubuntu:
          sudo apt-get install erlang
          sudo apt-get install rabbitmq-server
     CentOS:
          1.先安装erlang
               yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel 
               yum -y install ncurses-devel 
               yum install ncurses-devel 
    cd /usr/local
               wget http://www.erlang.org/download/otp_src_17.5.tar.gz
               tar -xzvf otp_src_17.5.tar.gz 
               cd otp_src_17.5
               ./configure --without-javac
               make && make install
               测试一下是否安装成功,在控制台输入命令erl
    安装完后输入“erl”以下提示即为安装成功
[root@cloud bin]# erl
Erlang R16B02 (erts-5.10.3) [source] [64-bit] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false] Eshell V5.10.3 (abort with ^G)
1>
          2.安装rabbitmq
               cd /usr/local
               tar -zxvf rabbitmq-server-3.5.1.tar.gz 
               cd rabbitmq-server-3.5.1
               make
               make TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man DOC_INSTALL_DIR=/usr/local/rabbitmq/doc install
               报错处理:
                    /bin/sh: xmlto: command not found
                    /bin/sh: line 2: xmlto: command not found
                   解决:yum install xmlto
三、管理命令
  切换到安装目录,sbin文件下才能执行命令,如:cd /usr/local/rabbitmq/sbin/
       启动:./rabbitmq-server start
       后台启动:./rabbitmq-server -detached
       关闭:./rabbitmqctl stop
       状态:./rabbitmqctl status
四、插件
    启动web管理插件,切换到安装目录,sbin文件下才能执行命令,如:cd /usr/local/rabbitmq/sbin/
        ./rabbitmq-plugins enable rabbitmq_management
        错误解决:
            Error: {cannot_write_enabled_plugins_file,"/etc/rabbitmq/enabled_plugins",            enoent}
            mkdir /etc/rabbitmq
            重新启动输入地址:localhost:15672,帐号默认为guest,密码guest,此帐号默认只能在本机访问。不建议打开远程访问。你可以创建一个帐户,并设置可以远程访问的角色进行访问。
            如:./rabbitmqctl add_user luo 123456
            ./rabbitmqctl  set_user_tags  luo administrator
五、用户管理
     默认的guest帐户相当于root帐户
     rabbitmqctl add_user username password 添加帐户
     rabbitmqctl change_password username newpassword 修改密码
     rabbitmqctl delete_user username 删除帐户
     rabbitmqctl list_users 列出所有帐户
     rabbitmqctl  set_user_tags  User  Tag 设置角色(administrator、monitoring、policymaker、management、其它)
     立即生效,不需重启
 六、安装PHP扩展(rabbitmq-c,amqp)
    
 1.安装rabbitmq-c
 cd /usr/local
    wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz 
  tar zxvf rabbitmq-c-0.8.0.tar.gz
    cd  rabbitmq-c-0.8.0
    autoreconf -i
    ./configure --prefix=/usr/local/rabbitmq-c
最后显示一下内容表示正常
rabbitmq-c build options:
Host: x86_64-unknown-linux-gnu
Version: 0.4.1
SSL/TLS: openssl
Tools: yes
Documentation: no
Examples: yes

然后进行make和安装了.
  make && make install

   2.安装amqp
    cd /usr/local
    wget https://pecl.php.net/get/amqp-1.9.1.tgz
    tar -zxvf amqp-1.9.1.tgz 
    cd amqp-1.9.1
     /usr/local/php/bin/phpize( 可使用find / -name phpize查找phpize路径 )
   ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c(可使用find / -name php-config查找php-config路径)
make && make install
   在php.ini中extenstion部分写入extension=amqp.so
   重启php,/etc/init.d/php-fpm restart
    最后phpinfo检查搜索amqp是否成功
七、使用PHP与之交互
     Produce端:

Consumer端:

//设置连接属性
$connArgs = array(
'host' => 'localhost',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
); //创建RabbitMQ连接
$conn = new AMQPConnection($connArgs);
if (!$conn->connect()) {
echo 'cannot connect to the broker';
} //创建channel
$channel = new AMQPChannel($conn); //设置队列名称
$exchangeName = 'exchange';
$routingKey = 'key_test';
$queueName = 'queue_test_1';
$queue = new AMQPQueue($channel);
$queue->setName($queueName);//设置名称
$queue->setFlags(AMQP_DURABLE);//持久化队列,当代理重启动后依然存在,并包括它们中的完整数据
$queue->declare();//声明此队列
$queue->bind('exchange',$routingKey);//使用某交换机,并绑定某路由关键字 //消费数据方式一:非阻塞方式
// while(true)
// {
// sleep(1);
// $envelope = $queue->get(AMQP_AUTOACK) ;//第一个参数表示自动ACK应答
// if ($envelope){
// $messages = $envelope->getBody();
// echo $messages;
// }
// } //消费数据方式二:以阻塞模式消费数据(推荐)
while(true)
{
$queue->consume('processMessage');//第一个参数表示要回调的方法名,第二个参数设置为AMQP_AUTOACK,表示自动ACK应答
}
/**
* 定义回调方法
*/
function processMessage($envelope, $queue)
{
$messages = $envelope->getBody();#获取消息数据
echo $messages;
$queue->ack($envelope->getDeliveryTag()); //处理成功后,手动发送ACK应答
//$queue->nack($envelope->getDeliveryTag()); //处理不成功,手动发送NO-ACK应答,放回队列中
}

八:特别说明:

     1.如果某一次消费数据没有ACK,则此条消息会记录为Unacknowledged,如果对于某一节点有连续三条则RabbitMQ认为此节点有故障,则不会再对它进行分发.当它断开后或一定时间后Unacknowledged状态消息会重新放回交换机中。手动no-ack后此Message将会放回队列中且不会再转发给此节点。
     2.对于计算密集型的工作,我们需要建立多个Consumer,对于多个Consumer,默认的分发机制是“公平分发”,将第n个发给第n个Consumer,超过个数取n的模。
     3.Exchange中的类型有:direct, topic 和fanout。
          direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息。
              topic :所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息,direct的区别是它的routingkey可 以模糊匹配,#代表一个或多个字符,*代表任何字符。一般为确保程序严谨性而使用direct。注意当使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
          fanout:是广播模式,所有bind到此exchange的queue都可以接收消息,即该一个Message可以对应多个Consumer,应用场景举例:生产了了添加到购物车的Message,一个Consumer写推荐商品日志,一个Consumer写购物车表。
     4.两个不相同的queue名的队列绑定到同一exchange和rotingky上,此时相当于同一queue名的fanout广播模式,两个queue都会得都到

所有Message。如图所示:

RabbitMQ的安装与基本使用
5.RPC的实现流程:
RabbitMQ的安装与基本使用
上一篇:简单的面向过程的Redis存储加入购物车


下一篇:yii框架场景的用法