一.发布hello world
首先我们看一个最简单的消息队列系统
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: zengchunyun
"""
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel() channel.queue_declare(queue="hello") # 在发送之前,我们要确保这个队列存在,如果我们发送一个消息到不存在的队列,rabbitmq会认为这个是垃圾消息
# 我们已经创建了一个队列名为hello,待会我们的消息都会发送到这个队列 # rabbitmq不允许我们直接将消息发送到队列,而是通过一个交换器,现在我们使用一个特殊对交换器,它是一个空对字符串标识对交换器,它能确保我们对消息该放到哪个
# 队列,这个队列需要特殊对rouning_key
channel.basic_publish(exchange="",
routing_key="hello",
body="hello world",) print(" [x] Sent 'hello world") # 在退出之前,我们需要确保网络缓冲区已经清空,且我们对消息的确已经发送到rabbitMQ,我们可以关闭这个连接
connection.close()
发布者
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: zengchunyun
"""
import pika
import time connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel() channel.queue_declare(queue="hello") # 为了确保每次订阅的队列都存在,我们先声明一个队列 # 接收队列消息需要通过回调函数来接收 def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(3)
ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback,
queue="hello",
no_ack=True) # 这里我们需要告诉rabbitMQ这个回调函数会从我们的hello队列接收消息,关闭消息确认标记,
# 那么当worker工作中异常,如没有完成任务就关闭了连接,可能会丢失任务,使用no_ack=True默认只要rabbit分配任务给该worker了,就会将任务从队列删除 print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming() # 我们这里进入了一个永不终止的循环等待数据状态
订阅者
上面这个系统只是单一的消息队列,