Day11-协程/异步IO/RabbitMQ

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程的好处:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销  
  • "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

之前讲过的yield实现协程

import time
import queue def consumer(name):
print("--->starting eating baozi...")
while True:
new_baozi = yield
print("[%s] is eating baozi %s" % (name, new_baozi)) def producer():
r = con.__next__()
r = con2.__next__()
n = 0
while n < 50000:
n += 1
con.send(n)
con2.send(n)
print("\033[32;1m[producer]\033[0m is making baozi %s" % n) if __name__ == '__main__':
con = consumer("c1")
con2 = consumer("c2")
p = producer()

协程定义:

网络编程,之前都是用的线程,并发量5000,线程切换耗资源,用协程的话

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 一个协程遇到IO操作自动切换到其它协程

Greenlet

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

from greenlet import greenlet

def test1():
print(12)
gr2.switch()
print(34)
gr2.switch() def test2():
print(56)
gr1.switch()
print(78) gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

但是这个模块并没有解决遇到IO自动切换到其它协程

gevent

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的import gevent


import gevent

def func1():
print('12')
gevent.sleep(1)
print('34') def func2():
print('56')
gevent.sleep(1)
print('78') def func3():
print('9') gevent.joinall([gevent.spawn(func1),
gevent.spawn(func2),
gevent.spawn(func3)])

gevent.joinall([gevent.spawn(函数,参数)])

上面的代码输出结果:12、56、9同时输出,1秒后输出34、78

同步与异步的性能区别

import gevent

def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(0.5)
print('Task %s done' % pid) def synchronous():
for i in range(1,10):
task(i) def asynchronous():
threads = [gevent.spawn(task, i) for i in range(10)]#列表生成式
gevent.joinall(threads) print('Synchronous:')
synchronous() print('Asynchronous:')
asynchronous()

上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。

遇到IO阻塞时会自动切换任务

from gevent import monkey; monkey.patch_all()
import gevent
from urllib.request import urlopen def f(url):
print('GET: %s' % url)
resp = urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])

执行时间应该为url响应时间最长的那个

通过gevent实现单线程下的多socket并发

server

import sys
import socket
import time
import gevent from gevent import monkey monkey.patch_all() def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.joinall([gevent.spawn(handle_request, cli)]) def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR) except Exception as ex:
print(ex)
finally:
conn.close() if __name__ == '__main__':
server(8001)

client

import socket
import threading def sock_conn(): client = socket.socket() client.connect(("localhost",8001))
count = 0
while True:
#msg = input(">>:").strip()
#if len(msg) == 0:continue
client.send( ("hello %s" %count).encode("utf-8")) data = client.recv(1024) print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
count +=1
client.close() for i in range(100):
t = threading.Thread(target=sock_conn)
t.start()

论事件驱动与异步IO

通常,我们写服务器处理模型的程序时,有以下几种模型:
(1)每收到一个请求,创建一个新的进程,来处理该请求;
(2)每收到一个请求,创建一个新的线程,来处理该请求;
(3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求
上面的几种方式,各有千秋,
第(1)中方法,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。
第(2)种方式,由于要涉及到线程的同步,有可能会面临死锁等问题。
第(3)种方式,在写应用程序代码时,逻辑比前面两种都复杂。
综合考虑各方面因素,一般普遍认为第(3)种方式是大多数网络服务器采用的方式(如nginx)

 

Select\Poll\Epoll异步IO

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

用户空间是不能访问内核空间的

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。

从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:
1. 保存处理机上下文,包括程序计数器和其他寄存器。
2. 更新PCB信息。

3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。
4. 选择另一个进程执行,并更新其PCB。
5. 更新内存管理的数据结构。
6. 恢复处理机上下文。

总而言之就是很耗资源,具体的可以参考这篇文章:进程切换

注:进程控制块(Processing Control Block),是操作系统核心中一种数据结构,主要表示进程状态。其作用是使一个在多道程序环境下不能独立运行的程序(含数据),成为一个能独立运行的基本单位或与其它进程并发执行的进程。或者说,OS是根据PCB来对并发执行的进程进行控制和管理的。 PCB通常是系统内存占用区中的一个连续存区,它存放着操作系统用于描述进程情况及控制进程运行所需的全部信息 

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

二 IO模式

同步、异步:同步和异步关注的是消息通知机制,被调用者是如何把调用完成的结果返回给调用者。阻塞io,非阻塞io,io多路复用都属于同步io,事件驱动io和异步io属于异步io。

阻塞、非阻塞:关注的是调用者等待的状态。

刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

阻塞 I/O(blocking IO)

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

Day11-协程/异步IO/RabbitMQ

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

非阻塞 I/O(nonblocking IO)

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

Day11-协程/异步IO/RabbitMQ

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

I/O 多路复用( IO multiplexing)

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

Day11-协程/异步IO/RabbitMQ

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

异步 I/O(asynchronous IO)

linux下的asynchronous IO其实用得很少。先看一下它的流程:

Day11-协程/异步IO/RabbitMQ

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

 同步IO、异步IO

阻塞IO、非阻塞IO、IO多路复用都是同步IO

有人会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。

IO多路复用主要用select和selector模块

select

import select
import socket
import queue
server = socket.socket()
server.bind(('localhost',9999)) server.listen(5)
server.setblocking(False)#非阻塞 inputs = [server] msg_queues = {}
outputs = []
while True:
r_list,w_list,exception_list = select.select(inputs,outputs,inputs)#inputs监测有没有消息过来,xlist异常,监测所有的链接
#r_list有数据,表示一定有数据
for s in r_list:#s socket对象
if s is server:#新链接进来了
conn,addr = s.accept()
print('got a new conn',conn ,addr)
inputs.append(conn)#不能在那里等着收数据啊,所以加入到inputs,让select去监测
msg_queues[conn] = queue.Queue()
else:
try:
data = s.recv(1024)
print('recv data from [%s]:[%s]'%(s.getpeername(),data.decode()))
msg_queues[s].put(data)#给用户返回的数据放在queue中
if s not in outputs:#等下次select的时候,确保w_list的数据能返回给客户端
outputs.append(s)
except ConnectionResetError as e:
print('conn closed',s.getpeername(),e)
inputs.remove(s)
if s in outputs:
outputs.remove(s)
del msg_queues[s] for s in w_list:
try:
data = msg_queues[s].get_nowait()
s.send(data.upper())
except queue.Empty as e:
outputs.remove(s)

selector

import selectors
#selectors是自适应的,先去找epoll,没有poll,select
import socket sel = selectors.DefaultSelector() sock = socket.socket()
sock.bind(('localhost',10001))
sock.listen(100)
#sock.setblocking(False) def accept(sock,mask):
conn,addr = sock.accept()
print('accepted',conn,'from',addr)
#conn.setblocking(False)
sel.register(conn,selectors.EVENT_READ,read)
def read(conn,mask):
total_size = conn.recv(1024).decode()
total_size = int(total_size) received_size = 0
with open('recv.avi','wb') as f:
while received_size < total_size:
data = conn.recv(4096)
f.write(data)
received_size += len(data)
if data:
print('echoing',repr(data),'to',conn)
conn.send(data)
else:
print('closing',conn)
sel.unregister(conn)
conn.close()
sel.register(sock,selectors.EVENT_READ,accept)
#select.select(inputs,outputs,ex....) while True:
events = sel.select()
for key,mask in events:
callback = key.data#第一次建立连接时accept 发送数据就是read
callback(key.fileobj,mask)
#fileobj-conn

RabbitMQ

1、mac

  安装 http://www.rabbitmq.com/install-standalone-mac.html

  RabbitMQ安装需要依赖Erlang

 centos

  • 安装elang

  下载erlang 源(RabbitMQ是用erlang开发的)
  wget -O /etc/yum.repos.d/erlang_solutions.repo http://binaries.erlang-solutions.com/rpm/centos/erlang_solutions.repo
  yum install erlang

  • 安装 rabbitmq-server

  wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.3/rabbitmq-server-3.5.3-1.noarch.rpm

  rpm -ivh rabbitmq-server-3.5.3-1.noarch.rpm

  • rabbitmq-plugins enable rabbitmq_management 支持web页面管理

  直接访问ip:15672  默认用户名、密码guest此用户名密码只支持本机登录

  在搭建时windows一直访问不了页面,关闭防火墙和selinux还是不行,后来才发现浏览器用了代理,只能访问10.69网段的,而rabbitmq搭建在192网段,坑。。。。  

2、创建用户名、密码、授权

  rabbitmqctl add_user  testuser testuser

  rabbitmqctl set_user_tags  testuser administrator

  rabbitmqctl  set_permissions  -p /  testuser  ".*" ".*" ".*"

3、安装python rabbitMQ module

  pip install pika

实现最简单的队列通信

Day11-协程/异步IO/RabbitMQ

server

import pika

credentials = pika.PlainCredentials('admin', 'admin')#用户名、密码
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.147.147',credentials=credentials))#rabbitmq-server ip
channel = connection.channel() # 声明queue
channel.queue_declare(queue='alex',durable=True)#durable消息持久化 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='alex', #send msg to this queue
body='Hello World!23',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent消息持久化
)
) print(" [x] Sent 'Hello World!2'")
connection.close()

client

import pika
import time
credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.147.147',credentials=credentials)) channel = connection.channel()
channel.queue_declare(queue='alex',durable=True) def callback(ch, method, properties, body):
print(ch, method, properties) print(" [x] Received %r" % body)
time.sleep(1) channel.basic_consume(callback,
queue='alex',
#no_ack=True
)
channel.basic_qos(prefetch_count=1)#消息公平分发
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消息公平分发

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

Publish\Subscribe(消息发布\订阅) 

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,

An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息

fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

   表达式符号说明:#代表一个或多个字符,*代表任何字符
      例:#.a会匹配a.a,aa.a,aaa.a等
          *.a会匹配a.a,b.a,c.a等
     注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 

headers: 通过headers 来决定把消息发给哪些queue

fanout广播

server

import pika
import sys
credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='192.168.147.147',credentials=credentials))
channel = connection.channel() channel.exchange_declare(exchange='logs',type='fanout')
message = ''.join(sys.argv[1:]) or 'info:Hello' channel.basic_publish(exchange = 'logs',
routing_key = '',
body = message) print('[x] send %r'%message)

client

import pika

credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='192.168.147.147',credentials=credentials))
channel = connection.channel() channel.exchange_declare(exchange='logs',type='fanout') result = channel.queue_declare(exclusive=True)#不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue channel.queue_bind(exchange='logs',queue=queue_name)
print('[*] waiting for logs') def callback(ch,method,properties,body):
print('[x] %r'%body) channel.basic_consume(callback,
queue = queue_name,) channel.start_consuming()

direct有选择的接收(参数随便定义,不止error,warning,info)

server

import pika
import sys credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='192.168.147.147',credentials=credentials))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

python3 rabbit_direct_send.py error test

client

import pika
import sys credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='192.168.147.147',credentials=credentials))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1) for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
) channel.start_consuming()

python3 rabbit_direct_recv.py error

topic

server

import pika
import sys credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='192.168.147.147',credentials=credentials))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
python3 rabbit_topic_send.py error.mysql fuck
 

client

import pika
import sys credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='192.168.147.147',credentials=credentials))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1) for severity in severities:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
) channel.start_consuming()
python3 rabbit_topic_recv.py error.*
 

Remote procedure call (RPC)

To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:

Day11-协程/异步IO/RabbitMQ

server

import pika
import uuid class SSHRpcClient(object):
def __init__(self):
credentials = pika.PlainCredentials('alex', 'alex3714')
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.184.128',credentials=credentials)) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) # 客户端的结果必须要返回到这个queue,生成一个随机队列,接收端退出,销毁临时产生的队列
self.callback_queue = result.method.queue  #callback_queue是生成的队列的队列名 self.channel.basic_consume(self.on_response,queue=self.callback_queue) #声明从这个queue里收结果,只有当下面channel.start.consuming时才会调用on_response函数 def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id: #任务标识符
self.response = body
print(body)
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4()) #唯一标识符
self.channel.basic_publish(exchange='',
routing_key='rpc_queue3',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n)) #
print("start waiting for cmd result ")
#self.channel.start_consuming()
count = 0
while self.response is None: #如果命令没返回结果
print("loop ",count)
count +=1
self.connection.process_data_events() #以不阻塞的形式去检测有没有新事件
#如果没事件,那就什么也不做, 如果有事件,就触发on_response事件 return self.response ssh_rpc = SSHRpcClient() print(" [x] sending cmd")
response = ssh_rpc.call("ipconfig") print(" [.] Got result ")
print(response.decode("gbk"))

client

import pika
import time
import subprocess credentials = pika.PlainCredentials('hp','')
connection = pika.BlockingConnection(pika.ConnectionParameters
(host='localhost',credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue3') def SSHRPCServer(cmd):
print("recv cmd:",cmd)
cmd_obj = subprocess.Popen(cmd.decode(),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) result = cmd_obj.stdout.read() or cmd_obj.stderr.read()
return result def on_request(ch, method, props, body):
#n = int(body) print(" [.] fib(%s)" % body)
response = SSHRPCServer(body) ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id= \
props.correlation_id),
body=response) channel.basic_consume(on_request, queue='rpc_queue3') print(" [x] Awaiting RPC requests")
channel.start_consuming()
上一篇:K-means算法应用:图片压缩


下一篇:10 Golden Rules of Project Risk Management