一、线程
第一个线程
import threading #导入线程模块 def f1(arg): print(arg) t = threading.Thread(target=f1,args=(123,)) #定义一个线程任务,对象为f1,传入参数123 t.start() #执行线程任务
基本使用
Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。
更多方法:
start 线程准备就绪,等待CPU调度
setName 为线程设置名称
getName 获取线程名称
setDaemon 设置为后台线程或前台线程(默认)
如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
run 线程被cpu调度后自动执行线程对象的run方法
自定义线程
import threading # class MyThread(threading.Thread): #继承线程方法 def __init__(self, func, args): #重新定义init方法 self.func = func self._args = args super(MyThread, self).__init__() #执行自定义的init方法 def run(self): self.func(self._args) def f2(arg): #自定义任务 print(arg) obj = MyThread(f2, 123,) obj.start()
线程锁
import threading import time NUM = 10 def func(l): global NUM #上锁 l.acquire() NUM -= 1 time.sleep(2) print(NUM) #开锁 l.release() lock = threading.Lock() #单次 #lock = threading.RLock() #支持多次,多种锁 for i in range(10): t = threading.Thread(target=func,args=(lock,)) t.start()
信号量(Semaphore)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
自定义线程池
import queue #导入队列 import threading #导入线程 import time #导入时间 class ThreadPool: def __init__(self, maxsize=5): #定义默认最大5个线程任务 self.maxsize = maxsize self._q = queue.Queue(maxsize) for i in range(maxsize): self._q.put(threading.Thread) # 【threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread】 def get_thread(self): #获取队列任务 return self._q.get() def add_thread(self): #增加线程 self._q.put(threading.Thread) pool = ThreadPool(5) #实例化 def task(arg,p): #定义任务方法 print(arg) time.sleep(1) p.add_thread() for i in range(100): #假设设100个任务过来 # threading.Thread类 t = pool.get_thread() obj = t(target=task,args=(i,pool,)) #定义任务 obj.start() #启动任务
信号量
import threading,time def run(n): semaphore.acquire() #互斥锁 time.sleep(1) print("run the thread: %s" %n) semaphore.release() if __name__ == '__main__': num= 0 semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行 for i in range(20): t = threading.Thread(target=run,args=(i,)) t.start()
事件(event)
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
clear:将“Flag”设置为False
set:将“Flag”设置为True
import threading def func(i, e): print(i) e.wait() print(i + 100) event = threading.Event() for i in range(10): t = threading.Thread(target=func, args=(i,event,)) t.start() event.clear()#设置成红灯,停止 inp = input('>>>>') if inp == "1": event.set() #设置成绿灯,执行
Timer
定时器,指定n秒后执行某操作
from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed
生产者消费者模型(队列)
二、进程
from multiprocessing import Process from multiprocessing import queues import multiprocessing def foo(i,arg): arg.put(i) print('say hi',i,arg.qsize()) if __name__ == "__main__": # li = [] li = queues.Queue(20,ctx=multiprocessing) for i in range(10): p = Process(target=foo,args=(i,li,)) #p.daemon = True p.start() #p.join()
默认数据不共享,可以使用下面的三种方法进行进程数据共享
queues
from multiprocessing import Process from multiprocessing import queues import multiprocessing def foo(i,arg): arg.put(i) print('say hi',i,arg.qsize()) if __name__ == "__main__": # li = [] li = queues.Queue(20,ctx=multiprocessing) for i in range(10): p = Process(target=foo,args=(i,li,)) #p.daemon = True p.start() #p.join()
array
from multiprocessing import Process
from multiprocessing import Array
from multiprocessing import RLock
import time
def foo(i,lis,lc):
lc.acquire()
lis[0] = lis[0] - 1
time.sleep(1)
print('say hi',lis[0])
lc.release()
if __name__ == "__main__":
# li = []
li = Array('i', 1)
li[0] = 10
lock = RLock()
for i in range(10):
p = Process(target=foo,args=(i,li,lock))
p.start()
p = Process(target=foo,args=(i,li,lock))
p.start()
Manager.dict
from multiprocessing import Process from multiprocessing import Manager def foo(i,arg): arg[i] = i + 100 print(arg.values()) if __name__ == "__main__": obj = Manager() li = obj.dict() for i in range(10): p = Process(target=foo,args=(i,li,)) p.start() p.join()
进程池
from multiprocessing import Pool import time def f1(arg): print(arg,'b') time.sleep(5) print(arg,'a') if __name__ == "__main__": pool = Pool(5) for i in range(30): # pool.apply(func=f1,args=(i,)) pool.apply_async(func=f1,args=(i,)) # pool.close() # 所有的任务执行完毕 time.sleep(2) pool.terminate() # 立即终止 pool.join()
PS:
IO密集型-多线程
计算密集型 - 多进程
三、协程
原理:利用一个线程,分解一个线程成为多个“微线程”==》程序级别
greenlet
import gevent def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again') def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
gevent安装:
pip3 install gevent