一、安装RabbitMQ
1. 安装erlang
1
2
3
4
|
tar xf otp_src_18.3. tar .gz
cd otp_src_18.3
. /configure --prefix= /mapbar/app/erlang
make && make install
|
2. 安装rabbitMQ
1
2
3
4
5
6
|
tar xf rabbitmq-server-generic-unix-3.6.0. tar .xz
mv rabbitmq_server-3.6.0 /mapbar/app/
ln -s /mapbar/app/rabbitmq_server-3 .6.0 /mapbar/app/rabbitmq
启动: cd /mapbar/app/rabbitmq/sbin/
. /rabbitmq-server -detached
|
3.安装API
1
|
pip install pika
|
二、Python操作RabbitMQ
1,基本用法
生产者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
mport pika connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.10.131' ))
#创建一个链接对象,对象中绑定rabbitmq的IP地址 channel = connection.channel() #创建一个频道
channel.queue_declare(queue = 'name1' ) #通过这个频道来创建队列,如果MQ中队列存在忽略,没有则创建
channel.basic_publish(exchange = '',
routing_key = 'name1' , #指定队列名称
body = 'Hello World!' ) #往该队列中发送一个消息
print ( " [x] Sent 'Hello World!'" )
connection.close() #发送完关闭链接
|
消费者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host= '192.168.10.131' ))
#创建一个链接对象,对象中绑定rabbitmq的IP地址 channel = connection.channel() #创建一个频道
channel.queue_declare(queue= 'name1' ) #通过这个频道来创建队列,如果MQ中队列存在忽略,没有则创建
def callback(ch, method, properties, body): #callback函数负责接收队列里的消息
print( " [x] Received %r" % body)
channel.basic_consume(callback, #从队列里去消息
queue= 'name1' , #指定队列名
no_ack=True)
print( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming() |
2,发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,二发布者发布消息时,会将消息放置在所有相关队列中。
在RabbitMQ中,所有生产者提交的消息都有Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储,RabbitMQ提供了四种Exchange:fanout、direct、topic、header。由于header模式在实际工作中用的比较少。
发布者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host= 'localhost' ))
channel = connection.channel() channel.exchange_declare(exchange= 'test_fanout' ,
type = 'fanout' )
message = '4456'
channel.basic_publish(exchange= 'test_fanout' ,
routing_key= '' ,
body=message)
print( ' [x] Sent %r' % message)
connection.close() |
订阅者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host= 'localhost' ))
channel = connection.channel() channel.exchange_declare(exchange= 'test_fanout' , #创建一个exchange
type = 'fanout' ) #任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上
#随机创建队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #绑定 channel.queue_bind(exchange= 'test_fanout' ,
queue=queue_name) #exchange绑定后端队列
print( '<------------->' )
def callback(ch,method,properties,body): print( ' [x] %r' % body)
channel.basic_consume(callback, queue=queue_name,
no_ack=True)
channel.start_consuming() |