一、简介
消息队列就是基础数据结构中的“先进先出”的一种数据机构,
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、Java、JMS、C等,支持AJAX。
用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
二、RabbitMQ的作用
1.主要应用于应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。 当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户
的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障。提升系统的可用性
2.流量削峰
举个栗子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统
是处理不了的,只能限制订单超过一万后不允许用户下单。 使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这事有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
3.消息分发
生产者只负责生产消息,消费者只要监听了生产者 ,那么生产者发的消息就能被接收。
4.异步消息
使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。 这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息
5.常见消息队列及比较
总结:
Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于kafka,改成了主从结构,在事务性可靠性方面做了优化。广泛来说,电商、
金融等对事务性要求很高的,可以考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka
三、安装
服务端安装:
# 原生: # 安装配置epel源 # 安装erlang yum -y install erlang # 安装RabbitMQ yum -y install rabbitmq-server # 使用Docker docker pull rabbitmq:management docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
客户端:
pip3 install pika
四、使用
基本使用:
# 生产者 import pika # 拿到连接对象 # 有用户名密码的情况 # connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200')) credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) # 拿到channel对象 channel = connection.channel() # 声明一个队列 channel.queue_declare(queue='hello') # 指定队列名字 # 生产者向队列中放一条消息 channel.basic_publish(exchange='', routing_key='hello', body='hello world') print(" Sent 'Hello World!'") # 关闭连接 connection.close()
# 消费者 import pika def main(): # 无密码 # connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.200')) credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) channel.start_consuming() if __name__ == '__main__': main()
消息确认(保障安全)
import pika # 无密码 # connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) # 有密码 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 声明一个队列(创建一个队列) channel.queue_declare(queue='qjk') channel.basic_publish(exchange='', routing_key='qjk', # 消息队列名称 body='hello world') connection.close()生产者
import pika def main(): credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 声明一个队列(创建一个队列) channel.queue_declare(queue='qjk') def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body) # 真正的消息处理完了,再发确认 ch.basic_ack(delivery_tag=method.delivery_tag) # 不会自动回复确认消息, # auto_ack=True,队列收到确认,就会自动把消费过的消息删除 channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False) channel.start_consuming() if __name__ == '__main__': main()消费者
持久化
import pika # 无密码 # connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) # 有密码 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 声明一个队列(创建一个队列) channel.queue_declare(queue='qjk', durable=True) # 指定队列持久化 channel.basic_publish(exchange='', routing_key='qjk', # 消息队列名称 body='hello world', properties=pika.BasicProperties( delivery_mode=2, ) # 指定消息持久化 ) connection.close()生产者
import pika def main(): credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 声明一个队列(创建一个队列) channel.queue_declare(queue='qjk', durable=True) # 指定队列持久化 def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body) # 真正的消息处理完了,再发确认 ch.basic_ack(delivery_tag=method.delivery_tag) # 不会自动回复确认消息, # auto_ack=True,队列收到确认,就会自动把消费过的消息删除 channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False) channel.start_consuming() if __name__ == '__main__': main()消费者
闲置消费
import pika # 无密码 # connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) # 有密码 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 声明一个队列(创建一个队列) channel.queue_declare(queue='qjk', durable=True) # 指定队列持久化 channel.basic_publish(exchange='', routing_key='qjk', # 消息队列名称 body='hello world', properties=pika.BasicProperties( delivery_mode=2, ) # 指定消息持久化 ) connection.close()生产者
import pika def main(): credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 声明一个队列(创建一个队列) channel.queue_declare(queue='qjk') def callback(ch, method, properties, body): print("消费者接受到了任务: %r" % body) # 真正的消息处理完了,再发确认 ch.basic_ack(delivery_tag=method.delivery_tag) # 不会自动回复确认消息, # auto_ack=True,队列收到确认,就会自动把消费过的消息删除 channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False) channel.basic_qos(prefetch_count=1) # 谁闲置谁获取,没必要按照顺序一个一个来 channel.start_consuming() if __name__ == '__main__': main()消费者
发布订阅
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 声明队列没有指定名字,指定了exchange channel.exchange_declare(exchange='logs', exchange_type='fanout') message = "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()发布者
# 起多个都能收到消息 import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) # 随机的名字 queue_name = result.method.queue print(queue_name) # 绑定 channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()订阅者
发布订阅(高级之Routing按关键字匹配)
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 声明队列没有指定名字,指定了exchange channel.exchange_declare(exchange='qjk123', exchange_type='direct') message = "info: asdfasdfasdfsadfasdf World!" channel.basic_publish(exchange='qjk123', routing_key='bnb', body=message) # 指定关键字 print(" [x] Sent %r" % message) connection.close()发布者
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='qjk123', exchange_type='direct') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue print(queue_name) channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='nb') print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()订阅者1
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='qjk123', exchange_type='direct') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue print(queue_name) channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='nb') channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='bnb') print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()订阅者2
发布订阅高级之Topic(按关键字模糊匹配)
* 只能加一个单词
# 可以加任意单词字符
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() # 声明队列没有指定名字,指定了exchange channel.exchange_declare(exchange='m3', exchange_type='topic') message = "info: asdfasdfasdfsadfasdf World!" channel.basic_publish(exchange='m3', routing_key='qjk.dd.dd', body=message) print(" [x] Sent %r" % message) connection.close()发布者
# 收不到消息 import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='m3', exchange_type='topic') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue print(queue_name) channel.queue_bind(exchange='m3', queue=queue_name, routing_key='qjk.*') print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()订阅者1
# 可以收到消息 import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.exchange_declare(exchange='m3', exchange_type='topic') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue print(queue_name) channel.queue_bind(exchange='m3', queue=queue_name, routing_key='qjk.#') print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()订阅者2
五、三种方式实现RPC
1.RabbitMQ实现RPC
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.credentials = pika.PlainCredentials("admin", "admin") self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=self.credentials)) self.channel = self.connection.channel() result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(20) # 外界看上去,就像调用本地的call()函数一样 print(" [.] Got %r" % response)客户端
import pika credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id= \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) print(" [x] Awaiting RPC requests") channel.start_consuming()服务端
2.内置包SimpleXMLRPCServer实现
from xmlrpc.client import ServerProxy # SimpleXMLRPCServer def xmlrpc_client(): print('xmlrpc client') c = ServerProxy('http://localhost:4242') data = {'client:' + str(i): i for i in range(100)} start = time.clock() for i in range(100): a = c.getObj() print(a) for i in range(100): c.sendObj(data) print('xmlrpc total time %s' % (time.clock() - start)) if __name__ == '__main__': xmlrpc_client()客户端
from xmlrpc.server import SimpleXMLRPCServer class RPCServer(object): def __init__(self): super(RPCServer, self).__init__() print(self) self.send_data = {'server:' + str(i): i for i in range(100)} self.recv_data = None def getObj(self): print('get data') return self.send_data def sendObj(self, data): print('send data') self.recv_data = data print(self.recv_data) # SimpleXMLRPCServer server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True) server.register_introspection_functions() server.register_instance(RPCServer()) server.serve_forever()服务端
3.ZeroRPC实现
安装:pip3 install zerorpc
import zerorpc import time # zerorpc def zerorpc_client(): print('zerorpc client') c = zerorpc.Client() c.connect('tcp://127.0.0.1:4243') data = {'client:' + str(i): i for i in range(100)} start = time.clock() for i in range(100): a = c.getObj() print(a) for i in range(100): c.sendObj(data) print('total time %s' % (time.clock() - start)) if __name__ == '__main__': zerorpc_client()客户端
import zerorpc class RPCServer(object): def __init__(self): super(RPCServer, self).__init__() print(self) self.send_data = {'server:' + str(i): i for i in range(100)} self.recv_data = None def getObj(self): print('get data') return self.send_data def sendObj(self, data): print('send data') self.recv_data = data print(self.recv_data) # zerorpc s = zerorpc.Server(RPCServer()) s.bind('tcp://0.0.0.0:4243') s.run()服务端