sudo apt install erlang -y sudo apt update && sudo apt install wget -y sudo apt install apt-transport-https -y wget -O- https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc | sudo apt-key add - wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - echo "deb https://dl.bintray.com/rabbitmq-erlang/debian focal erlang-22.x" | sudo tee /etc/apt/sources.list.d/rabbitmq.list echo "deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang-22.x" | sudo tee /etc/apt/sources.list.d/rabbitmq.list sudo apt install rabbitmq-server -y
pip install pika
实际应用的一个例子,可参考
RabbitMQ 简介及例子,可以运行。
send.py
#!/usr/bin/env python import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive.py
#!/usr/bin/env python import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume( queue='hello', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
参考 Python使用RabbitMQ(AMQP)极简例子,可以通过网页来监视rabbitmq。
Python使用RabbitMQ实现RPC调用示例,我已经验证通过,不过rpcclient.py在sublime中正常,在命令行下有错,暂时找不到原因。
rpcserver.py
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import pika def fib(n): if(n == 0): return 0 if(n == 1): return 1 return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): # 执行方法 n = int(body) print(" [.] fib(%s)" % n) response = fib(n) # 返回消息给客户端 ch.basic_publish( exchange='', routing_key=props.reply_to, # 消息发送队列 body=str(response), properties=pika.BasicProperties( correlation_id=props.correlation_id ) ) ch.basic_ack(delivery_tag=method.delivery_tag) # 任务完成,告诉客户端 if __name__ == "__main__": params = pika.ConnectionParameters(host='localhost') connection = pika.BlockingConnection(params) channel = connection.channel() channel.queue_declare(queue='rpc_queue') # 指定一个队列 channel.basic_qos(prefetch_count=1) channel.basic_consume( queue='rpc_queue', on_message_callback=on_request ) print(" [x] Awaiting RPC requests") channel.start_consuming()
rpcclient.py:
# coding=utf-8 import pika import uuid # import settings class RPCClient: def __init__(self): params = pika.ConnectionParameters(host='localhost') self.connection = pika.BlockingConnection(params) self.channel = self.connection.channel() result = self.channel.queue_declare( # 定义接收返回值队列 queue=str(uuid.uuid4()), exclusive=True ) self.callback_queue = result.method.queue self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response ) # 调用远程方法 def call(self, n): self.response = None # 调用远程方法 self.corr_id = str(uuid.uuid4()) # 调用唯一标识 self.channel.basic_publish( exchange='', routing_key='rpc_queue', # 消息发送队列 properties=pika.BasicProperties( correlation_id=self.corr_id, reply_to=self.callback_queue ), body=str(n) ) # 等待响应 while self.response is None: self.connection.process_data_events() # 非阻塞版的start_consuming() return int(self.response) # 接收到消息后调用 def on_response(self, ch, method, props, body): # 如果收到的ID和本机生成的相同,则返回的结果就是我想要的指令返回的结果 if(self.corr_id == props.correlation_id): self.response = body if __name__ == "__main__": client = RPCClient() print(" [x] Requesting fib(7)") response = client.call(7) print(" [.] Got %r" % response)