本文来源于i春秋学院,未经允许严禁转载。
最近打算更新微信机器人,发现机器人的作者将代码改进了很多,但去掉了sqlite数据库,需要自己根据需求设计数据库,跟作者沟通得到的建议是为了防止消息并发导致数据库死锁,建议另开一个进程读写数据库,将消息加入一个队列中,因为对Python了解有限,队列和多线程更不是我擅长的内容,于是最近疯狂Google、百度,探索着实现了此功能。写此文记录下基本概念和实现方法
0x00 Python队列
队列是线程中交换数据的形式。
创建一个队列对象
import Queue q = Queue.Queue(maxsize = ) #maxsize是队列长度,不限制长度可以不不赋值
将一个值放入队列
q.put()
将一个值从队列取出
q.get()
三种队列及构造函数
- FIFO队列先进先出: class Queue.Queue(maxsize)
- LIFO类似于堆,即先进后出: class Queue.LifoQueue(maxsize)
- 优先级队列: class Queue.PriorityQueue(maxsize)
队列的常用方法
q.qsize() #返回队列的大小 q.empty() #如果队列为空,返回True,反之False q.full() #如果队列满了,返回True,反之False q.full #与 maxsize 大小对应 q.get([block[, timeout]]) #获取队列,block:是否阻塞等待,timeout等待时间 q.get_nowait() #相当q.get(False) q.put(item[, block[, timeout]) #非阻塞写入队列,block:是否阻塞等待,timeout等待时间 q.put_nowait(item) #相当q.put(item, False) q.task_done() #在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 q.join() #等到队列为空,再执行别的操作
几个例子
一个线程往队列里写入随机数,另一个线程从队列里取数字(阻塞等待)
#!/usr/bin/env python #coding:utf8 import random,threading,time from Queue import Queue #Producer thread class Producer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): for i in range(): #随机产生10个数字 ,可以修改为任意大小 randomnum=random.randint(,) print "%s: %s 生成了一个数字 %d 并把它扔进了队列!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum) #将数据依次存入队列 time.sleep() print "%s: %s finished!" %(time.ctime(), self.getName()) #Consumer thread class Consumer_all(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data = queue def run(self): while : try: # print self.data val_even = self.data.get() print "%s: %s 从队列里取出了 %d !" % (time.ctime(), self.getName(), val_even) time.sleep() except Exception, e: print '取数据失败' continue #Main thread def main(): queue = Queue() producer = Producer('Pro.', queue) consumer_all = Consumer_all('Con_all.', queue) producer.start() consumer_all.start() if __name__ == '__main__': main()
运行结果 Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. finished!
如果将入队的时间间隔修改,出队的程序将阻塞运行,将“ time.sleep(1)”改为“ time.sleep(10)”,再次运行
Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! Wed Aug :: : Pro. finished!
会发现当队列没有值的时候程序会阻塞等待队列有值才继续运行。
稍微修改程序,一个线程往队列里写入随机数,另一个线程从队列里取数字(非阻塞等待)
#!/usr/bin/env python #coding:utf8 import random,threading,time from Queue import Queue #Producer thread class Producer(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): for i in range(): #随机产生10个数字 ,可以修改为任意大小 randomnum=random.randint(,) print "%s: %s 生成了一个数字 %d 并把它扔进了队列!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum) #将数据依次存入队列 time.sleep() print "%s: %s finished!" %(time.ctime(), self.getName()) #Consumer thread class Consumer_all(threading.Thread): def __init__(self, t_name, queue): threading.Thread.__init__(self, name=t_name) self.data = queue def run(self): while : try: # print self.data val_even = self.data.get(,) # get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒 print "%s: %s 从队列里取出了 %d !" % (time.ctime(), self.getName(), val_even) time.sleep() except Exception, e: # 等待输入,超过5秒 就报异常 print '取数据失败' continue #Main thread def main(): queue = Queue() producer = Producer('Pro.', queue) consumer_all = Consumer_all('Con_all.', queue) producer.start() consumer_all.start() if __name__ == '__main__': main()
Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! 取数据失败 Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! 取数据失败 Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! 取数据失败 Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! 取数据失败 Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! 取数据失败 Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! 取数据失败 Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! 取数据失败 Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! 取数据失败 Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! 取数据失败 Wed Aug :: : Pro. 生成了一个数字 并把它扔进了队列! Wed Aug :: : Con_all. 从队列里取出了 ! 取数据失败 Wed Aug :: : Pro. finished! 取数据失败 取数据失败 取数据失败
可以看出,取数据的线程在超过设定的等待时间后会抛出异常并继续往下执行。
0x01 使用队列将微信机器人消息存入MongoDB
使用上面的例子,稍作修改,将接收到的消息扔进队列,开启另一个线程取数据,取到后将消息格式化并存入数据库
#!/usr/bin/env python #coding:utf8 import threading,time,pymongo from pymongo import MongoClient from Queue import Queue #消息入队 class MsgInQueue(): def __init__(self, queue): threading.Thread.__init__(self) self.data=queue def putmsgqueue(self,msg): self.data.put(msg) print 'put msg in queue success:'+msg['Content']
###消息出队并存入数据库
class MsgOutQueue2db(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.data = queue #建立MongoDB连接 self.conn = MongoClient() #数据库 self.db = self.conn.wechatRobot #数据表 self.messages = self.db.messages def run(self): while : try: # print self.data #从队列里取消息 msg = self.data.get(, ) # get(self, block=True, timeout=None) ,1就是阻塞等待,5是超时5秒 print "%s: %s get %s from queue !" % (time.ctime(), self.getName(), msg['Content'].encode('utf-8')) try: #格式化消息数据 m = dict(groupname=msg['FromUserName'].encode('utf-8'), time=msg['CreateTime'], username=msg['ActualUserName'], usernickname=msg['ActualNickName'].encode('utf-8'), message=msg['Content'].encode('utf-8'), messagetype=msg['MsgType'] ) print m #存入数据库 self.db.messages.insert(m) time.sleep() except Exception, e: print e continue except Exception, e: continue
主线程
def complex_reply(): queue = Queue() outqueue = MsgOutQueue2db(queue)#实例化出队入库类 outqueue.start()#开启线程 @itchat.msg_register('Text', isGroupChat = True) def text_reply(msg): # print itchat.__client.storageClass.groupDict print itchat.__client.storageClass.chatroomList print msg inqueue=MsgInQueue(queue)#实例化入队类 inqueue.putmsgqueue(msg)#消息入队 if msg['isAt']: print msg itchat.send(u'@%s\u2005I received: %s'%(msg['ActualNickName'], msg['Content']), msg['FromUserName'])
消息存入数据库:
通过多线程和MongoDB的结合,有效防止消息过多导致数据库死锁的问题,也更加模块化,可以根据真实需求更换其他数据库。后面我将结合MongoDB插入、更新数据快的特点写一下我如何设计群聊统计功能,在Python方面和MongoDB方面我都是小白,如有更好的建议请多指教,我们共同学习有关Python多线程的课程请参考《python安全编程入门》