python之RabbitMQ

一、安装RabbitMQ

1. 安装erlang

1
2
3
4
tar xf otp_src_18.3.tar.gz
cd otp_src_18.3
./configure --prefix=/mapbar/app/erlang
make && make install

2. 安装rabbitMQ

1
2
3
4
5
6
tar xf rabbitmq-server-generic-unix-3.6.0.tar.xz
mv rabbitmq_server-3.6.0 /mapbar/app/
ln -s /mapbar/app/rabbitmq_server-3.6.0 /mapbar/app/rabbitmq
启动:
cd /mapbar/app/rabbitmq/sbin/
./rabbitmq-server -detached

3.安装API

1
pip install pika

二、Python操作RabbitMQ

1,基本用法

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
mport pika
 
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.10.131')) 
#创建一个链接对象,对象中绑定rabbitmq的IP地址
  
channel=connection.channel()        #创建一个频道
  
channel.queue_declare(queue='name1'#通过这个频道来创建队列,如果MQ中队列存在忽略,没有则创建
  
channel.basic_publish(exchange='',
                      routing_key='name1',   #指定队列名称
                      body='Hello World!')   #往该队列中发送一个消息
print(" [x] Sent 'Hello World!'")
connection.close()                           #发送完关闭链接

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pika
  
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.10.131'))
#创建一个链接对象,对象中绑定rabbitmq的IP地址
  
channel = connection.channel()         #创建一个频道
  
channel.queue_declare(queue='name1')   #通过这个频道来创建队列,如果MQ中队列存在忽略,没有则创建
  
def callback(ch, method, properties, body):   #callback函数负责接收队列里的消息
    print(" [x] Received %r" % body)
  
channel.basic_consume(callback,              #从队列里去消息
                      queue='name1',         #指定队列名
                      no_ack=True)
  
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

2,发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,二发布者发布消息时,会将消息放置在所有相关队列中。

在RabbitMQ中,所有生产者提交的消息都有Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储,RabbitMQ提供了四种Exchange:fanout、direct、topic、header。由于header模式在实际工作中用的比较少。

   发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

import pika
  
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
  
channel.exchange_declare(exchange='test_fanout',
                         type='fanout')
  
message = '4456'
channel.basic_publish(exchange='test_fanout',
                      routing_key='',
                      body=message)
print(' [x] Sent %r' % message)
connection.close()

订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import pika
  
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
  
channel.exchange_declare(exchange='test_fanout',        #创建一个exchange
                         type='fanout')                 #任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上
  
#随机创建队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
  
#绑定
channel.queue_bind(exchange='test_fanout',
                   queue=queue_name)                    #exchange绑定后端队列
  
print('<------------->')
  
def callback(ch,method,properties,body):
    print(' [x] %r' % body)
  
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()



上一篇:怎么去掉li标签前面的点??


下一篇:【RabbitMQ】RabbitMQ在Windows的安装和简单的使用