不同程序之间通讯
1.socket
2.disk硬盘文件
3.broker中间代理
python中:
threading Queue 线程之间通讯,不能跨进程
multiprocessing Queue 父进程与子进程进行交互,或同一个父进程下的多个子进程
RabbitMQ 消息队列
MQ全称为Message Queue,一种应用程序对应用程序的通信方法
RabbitMQ 官方地址:http://www.rabbitmq.com/
安装:
erlang http://www.erlang.org/
rabbitmq http://www.rabbitmq.com/
参考:
《Windows下RabbitMQ安装及入门》
http://blog.csdn.net/hzw19920329/article/details/53156015
启动服务(管理员模式):rabbitmq-service start 或者:services.msc
访问管理后台:
http://localhost:15672 用户名:guest,密码:guest
查看队列:rabbitmqctl list_queues
查看状态:rabbitmqctl status
rabbitMQ轮询分发
将消息依次分发给每个消费者
生产者 –> 队列 –> 消费者1, 消费者2, 消费者3
rabbitMQ消息持久化
服务器关闭,队列消失,可以设置持久化队列名,持久化消息
消费者控制接收数量
消费者可以按需获取信息,实现能者多劳
发布者订阅者
是即时发送接收
发布者 –> 交换机 –> 队列1, 队列2, 队列3 –> 订阅者1, 订阅者2, 订阅者3
广播模式
fanout广播模式 无选择接收
direct广播模式 有选择接收 队列绑定关键字
topic广播模式 消息过滤
遇到的问题
问题:无法访问Web管理页面
解决:
启动管理模块:
rabbitmqctl start_app
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl stop
参考:
《RabbitMQ无法访问Web管理页面》
http://blog.csdn.net/u011642663/article/details/54691788
问题:Error: unable to perform an operation
解决:
C:\Windows\System32\config\systemprofile.erlang.cookie
拷贝.erlang.cookie文件覆盖
C:\User\username.erlang.cookie
参考:
《Authentication failed》
http://blog.csdn.net/j_shine/article/details/78833456
代码实例
安装第三方库:
pip install pika
生产者消费者
# 发送端 import pika queue_name = "hello2" # 队列名称 connection = pika.BlockingConnection( pika.ConnectionParameters("localhost") ) channel = connection.channel() # 声明queue channel.queue_declare(queue=queue_name) # durable=True消息名称持久化 # 发送消息,需要通过路由器 channel.basic_publish(exchange="", routing_key=queue_name, body="hello, world!", # 消息持久化 make message persistent #properties=pika.BasicProperties(delivery_mode=2) ) print("send hello") connection.close() # 持久化之后,服务器重启不消失
# 接收端 import pika import time queue_name = "hello2" # 队列名称 connection = pika.BlockingConnection( pika.ConnectionParameters("localhost") ) channel = connection.channel() # 不知道客户端还是服务端先启动,为了确保这个队列存在,两端都需要声明 channel.queue_declare(queue=queue_name) channel.basic_qos(prefetch_count=1) # 最多处理一个信息,处理完再接收 def callback(ch, method, properties, body): # 回调函数 print("ch:", ch) print("method:", method) print("properties:", properties) print("接收到信息:", body) time.sleep(30) # ch.basic_ask(delivery_tag=method.delivery_tag) # 消息确认 channel.basic_consume(callback, # 如果收到消息就调用函数处理消息 queue=queue_name, no_ack=True) #acknowledgement确认 print("waiting for message, ctrl+c break") # 启动接收 channel.start_consuming()
1对多的发送广播
广播模式:fanout
# 发布者 import pika connection = pika.BlockingConnection( pika.ConnectionParameters("localhost") ) channel = connection.channel() channel.exchange_declare(exchange="logs", exchange_type="fanout") message = "hello world" channel.basic_publish(exchange="logs", routing_key="", body=message) print("send ok") connection.close()
# 订阅者 import pika connection = pika.BlockingConnection( pika.ConnectionParameters("localhost") ) channel = connection.channel() channel.exchange_declare(exchange="logs", exchange_type="fanout") result = channel.queue_declare(exclusive=True) # 随机分配队列名 queue_name = result.method.queue channel.queue_bind(exchange="logs", queue=queue_name) print("waiting for logs") def callback(ch, method, properties, body): print("body:", body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
消息过滤
有选择的接收消息,广播模式:direct
# 实现1对多的发送 # 发布者 import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters("localhost") ) channel = connection.channel() channel.exchange_declare(exchange="direct_logs", exchange_type="direct") severity = sys.argv[1] if len(sys.argv)>1 else "info" # severity严重程度 message = " ".join(sys.argv[:2] or "hello world!") channel.basic_publish(exchange="direct_logs", routing_key=severity, body=message) print("send ok") connection.close()
# 订阅者 import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters("localhost") ) channel = connection.channel() channel.exchange_declare(exchange="direct_logs", exchange_type="direct") result = channel.queue_declare(exclusive=True) # 随机分配队列名 queue_name = result.method.queue # 获取参数 severities = sys.argv[1:] if not severities: sys.stderr.write("usage: %s [info] [warning] [error]" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange="direct_logs", routing_key=severity, queue=queue_name) print("waiting...") def callback(ch, method, properties, body): print("routing_key:", method.routing_key) print("body:", body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
通配符消息过滤
表达式符号说明:#代表一个或多个字符,*代表任何字符
广播模式:topic
# 实现1对多的发送 # 发布者 import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters("localhost") ) channel = connection.channel() channel.exchange_declare(exchange="topic_logs", exchange_type="topic") severity = sys.argv[1] if len(sys.argv)>1 else "info" # severity严重程度 message = " ".join(sys.argv[:2] or "hello world!") channel.basic_publish(exchange="topic_logs", routing_key=severity, body=message) print("send ok") connection.close()
# 订阅者 import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters("localhost") ) channel = connection.channel() channel.exchange_declare(exchange="topic_logs", exchange_type="topic") result = channel.queue_declare(exclusive=True) # 随机分配队列名 queue_name = result.method.queue # 获取参数 severities = sys.argv[1:] if not severities: sys.stderr.write("usage: %s [info] [warning] [error]" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange="topic_logs", routing_key=severity, queue=queue_name) print("waiting...") def callback(ch, method, properties, body): print("routing_key:", method.routing_key) print("body:", body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
Remote procedure call (RPC)
远程程序调用
# 服务器端 import pika import time connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): # 被调用函数 if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties( correlation_id= props.correlation_id ), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
# 客户端 import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost') ) self.channel = self.connection.channel() # 生成随机队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue ) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) # 用于区分发送指令和接收结果的一致性 self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
参考文章:
《Python之路,Day9 - 异步IO\数据库\队列\缓存》