环境:
MacOS 10.14
Node.js 8.9.1
零、背景
目前有个上线应用会接受多个请求,且每个请求的处理时间可能很久,可能到数小时,所以就想采用异步机制,至于复杂的运算就用消息队列(MQ)去慢慢消化。
网上调研了一圈,遂采用RabbitMQ。
一、安装
1、安装
(1) MacOS
brew install rabbitmq
(2) CentOS (Linux)
https://tecadmin.net/install-rabbitmq-on-centos/
2、配置环境变量
export PATH=$PATH:/usr/local/opt/rabbitmq/sbin
3、使用
(1) 服务器端
rabbitmq-server
启动需要(默认)200 MB的磁盘空间,但可以通过配置文件里的
disk_free_limit
修改。
(2) 客户端
以 Node.js 为例:
npm i amqplib
https://www.npmjs.com/package/amqplib
var amqp = require('amqplib/callback_api');
4、用 rabbitmq management 进行后台管理
(1) 开启服务
rabbitmq-plugins enable rabbitmq_management
此时/etc/rabbitmq
下会多出enabled_plugins
文件,内容为:
[rabbitmq_management].
此时 rabbitmq management 的地址为http://localhost:15672
,默认用户密码为 guest/guest
。
但此时外网访问,登录时会提示 User can only log in via localhost 。
这是由于 rabbitmq 从 3.3.0 开始禁止使用 guest/guest 权限通过除 localhost 外的访问。
(2) 开启外网访问
1、rabbitmq 初始并没有创建配置文件,需要自行拷贝。
cp /usr/share/doc/rabbitmq-server-3.7.9/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
2、修改此配置文件rabbitmq.config
vim /etc/rabbitmq/rabbitmq.config
把 loopback_users 的注释解开:
%%{loopback_users, []}
-> {loopback_users, []}
这里请小心 {loopback_users, []} 后的逗号可能需要去掉,不然格式会报错。
3、重启服务
sudo systemctl restart rabbitmq-server
注:可能在访问的时候会报这样的错:
解决办法: 关闭全局 ss 代理
二、使用
1、connection —— 连接
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
// ……
// connection.close();
});
2、channel —— 通道
通道分为:
生产者(发送者)
消费者(接收者)
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
// channel ……
});
(1) queue —— 队列
队列里面塞入的是消息
。
生产者
var queue = 'queue_name';
# 创建or连上队列
channel.assertQueue(queue, {
durable: true # 队列持久化
});
# 临时队列(当前 connection 断掉后就会被删除)—— 队列名随机
channel.assertQueue('',{ exclusive:true });
# 将消息塞入队列
channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true # 消息持久化
});
关于持久化:
一个是防止服务器端的队列丢失,一个是防止服务器端的队列里的消息丢失。
但是这并不能避免:如果服务器端在RabbitMQ接受消息的过程中挂了导致的消息丢失。如果需要更强的保证,可以使用 发布者确认。
消费者
var queue = 'queue_name';
# 从队列取出消息
channel.consume(queue, function(msg) {
channel.ack(msg); # 发送确认信号
}, {
noAck: false
});
关于 ACK ( Acknowledgement )
noAck: true
则服务器端不会期望收到 ACK,也就是说,消息在被发送后会立即出列。
而 noAck: false
则需要消费者发送 ACK,即channel.ack(msg);
,但如果超时未回复 ACK,消息会重新排队(但如果同时有其他可用消费者,则会迅速安排过去)
查看当前有多少队列及各中有多少消息:
sudo rabbitmqctl list_queues
(2) prefetch —— 预取
channel.prefetch(1);
# 表示这个通道如果有{1}个未完成的消息,则不会接受新的消息
(3) exchange —— 交换
如果有多个队列,生产者的消息应该如何分配呢?这个时候就需要一个中间件——交换
其中交换类型
有四种:“”(默认交换), topic, headers, fanout
A、 “”(默认交换)
RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。所以不建议使用channel.sendToQueue(),此为 “”(默认交换)。
如果没有队列绑定到交换,消息将会丢失。
B、fanout(广播)
生产者
var exchange = 'logs';
# 创建or连上交换
channel.assertExchange(exchange, 'fanout', {
durable: false # 持久化
});
### 推消息给交换
channel.publish(exchange, '', Buffer.from(msg));
消费者
var exchange = 'logs';
# 创建or连上交换
channel.assertExchange(exchange, 'fanout', {
durable: false # 持久化
});
# ------------------------
# 绑定 交换+队列
channel.bindQueue('queue_1', exchange, '');
channel.bindQueue('queue_2', exchange, '');
channel.bindQueue('queue_3', exchange, '');
queue_1、queue_2、queue_3 都会收到相同的一条消息。
C、direct (直接)
生产者
var exchange = 'logs';
# 创建or连上交换
channel.assertExchange(exchange, 'fanout', {
durable: false # 持久化
});
### 推消息给交换
channel.publish(exchange, 'black', Buffer.from(msg));
消费者
var exchange = 'logs';
# 创建or连上交换
channel.assertExchange(exchange, 'fanout', {
durable: false # 持久化
});
# ------------------------
# 绑定 交换+队列
channel.bindQueue('queue_1', exchange, 'white');
channel.bindQueue('queue_2', exchange, 'black');
channel.bindQueue('queue_3', exchange, 'red');
只有 queue_2 才会收到消息。
D、topic
生产者
var exchange = 'logs';
# 创建or连上交换
channel.assertExchange(exchange, 'fanout', {
durable: false # 持久化
});
### 推消息给交换
channel.publish(exchange, 'kern.critical', Buffer.from(msg));
消费者
var exchange = 'logs';
# 创建or连上交换
channel.assertExchange(exchange, 'fanout', {
durable: false # 持久化
});
# ------------------------
# 绑定 交换+队列
channel.bindQueue('queue_1', exchange, '#');
channel.bindQueue('queue_2', exchange, "kern.*");
channel.bindQueue('queue_3', exchange, "*.critical");
- *(星号)可以替代一个单词。
- #(hash)可以替换零个或多个单词。
查看所有的 交换 及 交换绑定队列
sudo rabbitmqctl list_exchanges
sudo rabbitmqctl list_bindings
代码职责风格:
生产者只管发送消息就好 (比如发送消息给队列或者交换)
消费者要负责接受消息以外的更多事 (比如负责队列的 prefetch 设置,或者交换的绑定)
3、远程过程调用(RPC)
略
三、应用
例如可以用到日志系统中:对所有等级的日志都打印到控制台(即下面的队列),而 error 日志单独持久化到 disk(即上面的队列)。
四、MQ 的优缺点
优点:解耦、异步、削峰
缺点:系统可用性降低,系统复杂性增加
参考资料
1、官方RabbitMQ教程
https://www.rabbitmq.com/getstarted.html
2、amqp.node 参考API
https://www.squaremobius.net/amqp.node/channel_api.html#channel_ack