RabbitMQ
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列
用rabbitmq实现一个简单的生产者消费者模型
发送端代码
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) channel = connection.channel() channel.queue_declare(queue="hello") channel.basic_publish(exchange='', routing_key = 'hello', body='hello world', ) print("Send hello world") connection.close()
接收端代码
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) 4 channel = connection.channel() 5 channel.queue_declare(queue="hello") 6 7 def callback(ch,method,properties,body): 8 print(ch,method,properties) 9 print("received %s" %body) 10 11 channel.basic_consume(callback, 12 queue='hello', 13 no_ack=True) 14 15 print("waiting for messages to exit press 'CTRL+C'") 16 channel.start_consuming()
通过上述代码便可以实现一个简单的生产者消费者模型,但是现在的结果是:当开启多个消费者程序的时候,启动生产者发送消息,这个时候只有一个可以收到,并且再次启动,会下一个消费者收到,类似一个轮询的关系。
acknowledgment 消息不丢失(通过客户端设置实现)
通过no_ack = False参数设置,如果消费者遇到情况突然中断了没有收到,那么RabbitMQ会重新将任务添加到队列中
下面将接收端的代码进行更改:
#AUTHOR:FAN import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) channel = connection.channel() channel.queue_declare(queue="hello") def callback(ch,method,properties,body): print(ch,method,properties) time.sleep(10) print("received %s" %body) channel.basic_consume(callback, queue='hello', no_ack=False) print("waiting for messages to exit press 'CTRL+C'") channel.start_consuming()
标注的地方就是代码修改的地方,通过将no_ack更改为False,以及在callback回到函数这里让等待10s,这样启动接收端后,再启动发送算,在还没有打印数据的时候将客户端关闭,然后再启动,发现依然可以收到刚才发送端发送的数据。
但是这种方式只能实现客户端断开重新连接的时候数据不丢失,如果是rabbitmq挂了的情况如何解决?
durable消息不丢失(通过在服务端设置保证数据不丢失)
这个时候生产者和消费者的代码都需要改动
发送者代码
1 import pika 2 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) 5 channel = connection.channel() 6 7 channel.queue_declare(queue='fan',durable=True) 8 9 channel.basic_publish(exchange='', 10 routing_key='fan', 11 body='hello world', 12 properties = pika.BasicProperties( 13 delivery_mode=2 14 )) 15 16 print("send 'hello world'") 17 connection.close()
接收者的代码
1 import pika 2 import time 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) 5 channel = connection.channel() 6 7 channel.queue_declare(queue='fan',durable=True) 8 9 def callback(ch,method,properies,body): 10 print("received %s" %body) 11 time.sleep(10) 12 print("is ok") 13 ch.basic_ack(delivery_tag=method.delivery_tag) 14 15 channel.basic_consume(callback, 16 queue='fan', 17 no_ack=False) 18 19 print("waitting for messages.To exit press CTRL+C") 20 channel.start_consuming()
这样即使在接收者接收数据过程中rabbitmq服务器出现问题了,在服务恢复之后,依然可以收到数据
发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
通过exchange type = fanout参数实现
代码例子:
发布者:
1 #AUTHOR:FAN 2 3 import pika 4 import sys 5 6 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.8.103')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange="fan", 10 type='fanout') 11 12 message = ' '.join(sys.argv[1:]) or "info :hello world" 13 channel.basic_publish(exchange = 'fan', 14 routing_key='', 15 body=message) 16 17 print("send %s" %message) 18 connection.close()
订阅者:
#AUTHOR:FAN import pika connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.8.103')) channel = connection.channel() channel.exchange_declare(exchange="fan", type='fanout') #随机生成队列名字 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #将exchange和队列绑定 channel.queue_bind(exchange='fan', queue=queue_name) print("waiting for fan ,To exit press CTRL+C") def callback(ch,method,proerties,body): print("---",body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
关键字发送
通过参数:exchange type = direct实现
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
代码例子如下:
消费者代码:
1 #AUTHOR:FAN 2 import pika 3 import sys 4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) 5 channel = connection.channel() 6 channel.exchange_declare(exchange='direct_logs_1', 7 type='direct') 8 result = channel.queue_declare(exclusive=True) 9 queue_name = result.method.queue 10 11 severities = sys.argv[1:] 12 if not severities: 13 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 14 exit(1) 15 print(severities) 16 for severity in severities: 17 print(severity) 18 channel.queue_bind(exchange='direct_logs_1', 19 queue=queue_name, 20 routing_key=severity) 21 print("waiting for logs,To exit press CTRL+C") 22 def callback(ch,method,properties,body): 23 print("%s:%s" %(method.routing_key,body)) 24 25 channel.basic_consume(callback, 26 queue=queue_name, 27 no_ack=True) 28 channel.start_consuming()
生产者代码
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25')) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange='direct_logs_1', 8 type='direct') 9 10 print(sys.argv) 11 severity = sys.argv[1] if len(sys.argv) >1 else "error" 12 message = ' '.join(sys.argv[2:]) or 'hello world' 13 channel.basic_publish(exchange='direct_logs_1', 14 routing_key = severity, 15 body = message) 16 print("send %s:%s" %(severity,message)) 17 connection.close()
模糊匹配
通过参数exchange type = topic实现
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
# 表示可以匹配 0 个 或 多个 单词
* 表示只能匹配 一个 单词
--------------------还没有整理完
所有的努力都值得期许,每一份梦想都应该灌溉!