【RabbitMQ 服务器】
1
2
3
4
5
6
|
# 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定 Broker: 192.168.0.xx virtual host: vhosttest Exchange: exchangetest Queue: queuetest Routing key: rkeytest |
【Python 环境】
1
2
3
|
OS: Windows 10 Python: 3.6.3 x64 pika: 0.11.2 |
【查看队列状态】
1
2
3
4
5
6
7
8
9
|
# 通过浏览器查看队列状态 http: //192 .168.0.xx:15672 /api/queues/vhosttest/queuetest # 通过命令行查看队列状态 curl -u user:password http: //192 .168.0.xx:15672 /api/queues/vhosttest/queuetest | jq
# 通过命令行查看队列长度 curl -s -u user:password http: //192 .168.0.xx:15672 /api/queues/vhosttest/queuetest | \
jq '.backing_queue_status.len'
|
【send.py】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
#encoding: utf-8 #author: walker #date: 2018-01-31 #summary: 发送方/生产者 import os, sys, time
import pika
def Main():
credentials = pika.PlainCredentials( "test" , "test" )
parameters = pika.ConnectionParameters(host = "192.168.0.xx" ,
virtual_host = 'vhosttest' ,
credentials = credentials)
connection = pika.BlockingConnection(parameters) # 连接 RabbitMQ
channel = connection.channel() # 创建频道
queue = channel.queue_declare(queue = 'queuetest' ) # 声明或创建队列
while True : # 循环向队列中发送信息
message = time.strftime( '%H:%M:%S' , time.localtime())
channel.basic_publish(exchange = 'exchangetest' ,
routing_key = 'rkeytest' ,
body = message)
print ( 'send message: %s' % message)
while True :
# 检查队列,以重新得到消息计数
queue = channel.queue_declare(queue = 'queuetest' , passive = True )
messageCount = queue.method.message_count
print ( 'messageCount: %d' % messageCount)
if messageCount < 100 :
break
time.sleep( 1 )
# 关闭连接
connection.close()
if __name__ = = '__main__' :
Main()
|
【recv.py】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
#encoding: utf-8 #author: walker #date: 2018-01-31 #summary: 接收方/消费者 import os, sys, time
import pika
# 接收处理消息的回调函数 def ConsumerCallback (channel, method, properties, body):
print ( "Received %s" % body)
def Main():
credentials = pika.PlainCredentials( "test" , "test" )
parameters = pika.ConnectionParameters(host = "192.168.0.xx" ,
virtual_host = 'vhosttest' ,
credentials = credentials)
connection = pika.BlockingConnection(parameters) # 连接 RabbitMQ
channel = connection.channel() # 创建频道
queue = channel.queue_declare(queue = 'queuetest' ) # 声明或创建队列
# no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面
# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
channel.basic_consume(ConsumerCallback, queue = 'queuetest' , no_ack = True )
print ( 'Wait Message ...' )
channel.start_consuming()
if __name__ = = '__main__' :
Main()
|
【相关阅读】
本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/2067244如需转载请自行联系原作者
RQSLT