温故而知新--day4
进程与线程
进程本质上就是一段程序的运行过程,由程序、数据集、进程控制块组成。每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据。操作系统管理所有的进程,并为他们合理分配资源。
线程是进程中的执行单元,可以共享进程中的资源。
进程之间是相互独立的,所以进程是最小的资源单位。
关于并行和并发
并行:系统能同时处理多个任务
并发:系统可以处理多个任务
线程
简单使用
import threading
import os
def work(num1, num2, name, **kwargs):
print(num1, num2) # 12 123
print(name) # lczmx
print(kwargs) # {'age': 20}
print("pid:", os.getpid()) # pid: 12932
if __name__ == "__main__":
t1 = threading.Thread(target=work, args=(12, 123),
kwargs={"name": "lczmx", "age": 20})
t2 = threading.Thread(target=work, args=(1, 3),
kwargs={"name": "xxx", "age": 20})
t1.start() # 开始线程活动
t1.join() # 等待,直到线程终结
t2.start()
t2.join()
print("pid:", os.getpid()) # pid: 12932
定义一个类,继承threading.Thread、重写run方法也可以
重写__init__
方法的话要super.__init__()
import threading
class MyThread(threading.Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
pass
def func2(self):
pass
if __name__ == '__main__':
t = MyThread()
t.start()
t.join()
Threading对象的方法
is_alive(): 返回线程是否活动的。
getName(): 返回线程名,也可以在创建时通过name参数指定。
setName(): 设置线程名。
守护线程
默认情况下,主线程会等到所有子线程执行完之后才会退出,但守护线程并不会。
守护线程就是跟随主线程一起结束的线程,守护线程通过setDaemon方法实现,其内部时设置daemon属性,可以被继承,所以daemon默认为False。
import threading
import time
def work(sleep_time=0.5):
time.sleep(sleep_time)
print("sleep time: ", sleep_time)
if __name__ == "__main__":
t1 = threading.Thread(target=work, args=(1,))
t1.setDaemon(True) # setDaemon要在start之前
t1.start()
print("exit")
注意以下例子
import threading
import time
def work(sleep_time=0.5):
time.sleep(sleep_time)
print("sleep time: ", sleep_time)
if __name__ == "__main__":
t1 = threading.Thread(target=work, args=(1,)) # 1秒
t2 = threading.Thread(target=work, args=(3,)) # 3秒
t1.setDaemon(True)
t2.setDaemon(True)
t1.start()
t2.start()
time.sleep(2) # 2秒
print(t1.is_alive()) # False
print(t2.is_alive()) # True
锁
锁主要时用来解决在cpu切换时造程序取得的数据不同步的问题。
比如这个例子:
from threading import Thread
import os
import time
def work():
global n
temp = n
time.sleep(0.01)
n = temp - 1
if __name__ == '__main__':
n = 100
l = []
for i in range(100):
p = Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) # 结果可能为99或98,但几乎不为零
为了解决这个问题,我们可以使用串行的方式让所有的代码按照顺序执行,但是这就失去了多线程的意义。那么只要串行部分代码就既能享受多线程的优势,又可以保证数据的安全了。也就是说,锁做的工作就是使操作数据的那部分代码串行。
互斥锁
使用threading.Lock
获取一把锁,它由一个acquire()
和release()
方法控制锁定和释放。
from threading import Thread, Lock
import os
import time
lock = Lock()
def work():
global n
lock.acquire()
temp = n
time.sleep(0.01)
n = temp - 1
lock.release()
if __name__ == '__main__':
n = 100
l = []
for i in range(100):
p = Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) # 现在结果为0
死锁
上面说过,锁就是把部分代码变为串行,只有当锁被释放后才能执行后面的代码。死锁的一个原因是互斥,还有可能是粗心大意,忘记release()了。
from threading import Thread, Lock
import time
lockA = Lock()
lockB = Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
lockA.acquire()
print("%s获得锁A" % self.name)
lockB.acquire()
print("%s获得锁B" % self.name)
lockB.release()
print("%s释放锁B" % self.name)
lockA.release()
print("%s释放锁A" % self.name)
def func2(self):
lockB.acquire()
print("%s获得锁B" % self.name)
time.sleep(2)
lockA.acquire()
print("%s获得锁A" % self.name)
lockA.release()
print("%s释放锁A" % self.name)
lockB.release()
print("%s释放锁B" % self.name)
if __name__ == '__main__':
for i in range(10):
t = MyThread(name="线程%d" % i)
t.start()
"""
线程0获得锁A
线程0获得锁B
线程0释放锁B
线程0释放锁A
线程0获得锁B
线程1获得锁A
卡死了
"""
解决死锁的好方式就是用递归锁,而使用一般的锁的话可以用with
关键词,以防忘记释放锁了。
递归锁
递归锁也是锁,其内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
from threading import Thread, RLock
import time
lockA = lockB = RLock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
lockA.acquire()
print("%s获得锁A" % self.name)
lockB.acquire()
print("%s获得锁B" % self.name)
lockB.release()
print("%s释放锁B" % self.name)
lockA.release()
print("%s释放锁A" % self.name)
def func2(self):
lockB.acquire()
print("%s获得锁B" % self.name)
time.sleep(2)
lockA.acquire()
print("%s获得锁A" % self.name)
lockA.release()
print("%s释放锁A" % self.name)
lockB.release()
print("%s释放锁B" % self.name)
if __name__ == '__main__':
for i in range(10):
t = MyThread(name="线程%d" % i)
t.start()
semaphore 信号量
信号量就是一把锁,之前我们说的threading.Lock
是互斥锁(Mutual exclusion,缩写 Mutex)实质上就是信号量为一的情景。信号量可以用来限定某些资源可以同时由几个线程访问,访问时同样要acquire,出来时同样也release。
import threading
import time
sm = threading.Semaphore(5)
def foo():
sm.acquire()
# 打印当前线程的名字
print("%s ..." % threading.current_thread().getName())
time.sleep(1)
sm.release()
if __name__ == "__main__":
for i in range(9):
t = threading.Thread(target=foo)
t.start()
GIL
关于GIL(global interpreter lock),点击这里。
线程间通信
-
event 同步条件
由于线程之间是相互独立的,彼此不能直接确认状态,为此python提供了threading.Event
对象,可以在不同线程间传递状态,其由一下方法:-
.wait(timeout=None)
,event变为True,timeout为None时,为阻塞;反之则为等待秒数(非阻塞) -
.set()
,设置event的值为True -
.clear()
,恢复event的状态值为False。 -
.is_set
,返回event状态值根据上述,可以把Event的情况分为以下几种:
import threading import time e = threading.Event() def foo(): print("event状态:", e.is_set()) print("等待。。。。") if e.wait(): # 默认阻塞 print("event状态:", e.is_set()) print("收到同步条件,ok") def bar(): time.sleep(2) e.set() if __name__ == '__main__': f = threading.Thread(target=foo) b = threading.Thread(target=bar) f.start() b.start()
-
-
queue 线程队列
线程队列特别适用于消息必须安全地在多线程间交换的线程编程,线程队列有三种类型,在实例化的时候根据需求指定:- 先进先出(FILO):
queue.Queue(maxsize=0)
- 后进先出(LIFO):
queue.LifoQueue(maxsize=0)
- 按优先级,使用heapq(堆队列算法),确定优先级:
queue.PriorityQueue(maxsize=0)
注:
maxsize
参数指定队列的大小,当maxsize
<= 0 时,队列的元素个数没有限制。这三者都返回
queue.Queue
对象的方法,因为LifoQueue
和PriorityQueue
都继承queue.Queue
,Queue对象拥有以下方法:方法 说明 .put(item, block=True, timeout=None)
将 item 放入队列,block默认为True,表示阻塞。优先级队列的item要包含优先级如: q.put([2, "abc"])
.get(block=True, timeout=None)
从队列中移除并返回一个项目。block默认为True,表示阻塞 .qsize()
返回队列的大致大小 .empty()
如果队列为空,返回 True ,否则返回 False .full()
如果队列是满的返回 True ,否则返回 False 。 .task_done()
完成一个任务后,向队列发信号( join()
用到)。.join()
阻塞到 队列中所有的元素 都 被 接 收 和 处 理 完毕(根据收到的task_done信号确定)。 关于task_done与join:
Queue内部有一个unfinished_tasks属性(默认为0),put时自增1,task_done调用时自减1
join的逻辑是while self.unfinished_tasks: self.all_tasks_done.wait()
,当unfinished_tasks为0的时候就跳出循环,停止阻塞状态。import threading import queue import time q = queue.Queue(5) # 只存5个元素 def worker(): while True: print("qsize: ", q.qsize()) item = q.get() # 队列为空时会阻塞 print(f'Working on {item}') time.sleep(0.5) # 模拟处理数据的时间 print(f'Finished {item}') q.task_done() # 已经执行 # 开启为worker线程,处理队列,设置为守护线程 threading.Thread(target=worker, daemon=True).start() print("队列是否为空:", q.empty()) # 往队列中添加元素 for item in range(10): if q.full(): print("已经满了,阻塞。。。。") q.put(item) # 队列满的时候会阻塞 print('全部元素已经放入队列中') # 会一直阻塞,知道unfinished_tasks为0 q.join() print('全部任务已完成')
PriorityQueue有点特殊,单独举例:
import queue q = queue.PriorityQueue() q.put([3, "c"]) q.put([1, "a"]) q.put([2, "b"]) print(q.get()) # [1, 'a'] print(q.get()) # [2, 'b'] print(q.get()) # [3, 'c']
- 先进先出(FILO):
进程
python中使用multiprocessing
模块来实现多进程。
简单使用
import multiprocessing
def work(num, name, age):
print(f"num: {num}, name: {name}, age: {age}")
if __name__ == '__main__': # 不要省略了这个,否则报错
p = multiprocessing.Process(target=work, args=(
1, ), kwargs={"name": "lczmx", "age": 22})
p.start()
p.join()
方法二
要重写__init__
方法的话要super.__init__()
import multiprocessing
class MyProcess(multiprocessing.Process):
def run(self):
self.func1()
self.func2()
def func1(self):
print("func1")
def func2(self):
print("func2")
if __name__ == "__main__":
p = MyProcess()
p.start()
一些常用方法
-
multiprocessing.set_start_method('spawn')
设置启动方法,关于启动方法类型及介绍见文档 -
Process对象.terminate()
立即终止进程 -
Process对象.pid
返回进程ID。在生成该进程之前,这将是None
-
Process对象.daemon
设置守护进程,和守护线程一样,可以在创建进程的时候通过daemon形参来设置。 -
Process对象.name
与threading类似 -
Process对象.is_alive
与threading类似 -
Process对象.join
与threading类似
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
进程间通信
使用多进程时,一般使用消息机制实现进程间通信,尽可能避免使用锁等同步原语。
- 进程队列 multiprocessing.Queue(maxsize=0)
进程队列的方法与queue.Queue的方法很像,常用的方法中就没有task_done
和join
(multiprocessing.JoinableQueue(maxsize=0)
有这两个方法,但要必须要手动调用task_done,否则用于统计未完成任务的信号量最终会溢出并抛出异常)from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # [42, None, 'hello'] p.join()
- 管道 multiprocessing.Pipe
conn1, conn2 = multiprocessing.Pipe([duplex])
conn1和conn2是一对 Connection 对象, 分别表示管道的两端。
如果 duplex 被置为 True (默认值),那么该管道是双向的。如果 duplex 被置为 False ,那么该管道是单向的,即 conn1 只能用于接收消息,而 conn2 仅能用于发送消息。
Connection对象的常用方法:- send(obj)
将一个对象发送到连接的另一端,可以用 recv() 读取。
发送的对象必须是可以序列化的,过大的对象 ( 接近 32MiB+ ,这个值取决于操作系统 ) 有可能引发 ValueError 异常。 - recv()
返回一个由另一端使用 send() 发送的对象。该方法会一直阻塞直到接收到对象。 如果对端关闭了连接或者没有东西可接收,将抛出 EOFError 异常。 - fileno()
返回由连接对象使用的描述符或者句柄。 - close()
关闭连接对象。当连接对象被垃圾回收时会自动调用。
更多方法详见文档
from multiprocessing import Process, Pipe def f(conn): print(conn.recv()) # [1, '12', True] if __name__ == '__main__': conn1, conn2 = Pipe() # 默认为双向 p = Process(target=f, args=(conn2,)) p.start() conn1.send([1, "12", True]) p.join()
- send(obj)
线程池和进程池
线程和进程的创建、切换、关闭都需要一定的成本,对于某些重复次数多且声明周期短的任务可以使用线/进程池,线/进程池的数量并不是越多越好,太多可能得不偿失,甚至导致python解释器崩溃。
使用线程池要用到concurrent.futures.ThreadPoolExecutor
使用进程池要用到concurrent.futures.ProcessPoolExecutor
线程池和进程池都提供了以下常用方法:
-
submit(fn, *args, **kwargs)
:将 fn 函数提交给线/进程池。
*args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。 -
map(func, *iterables, timeout=None, chunksize=1)
:该函数类似于全局函数 map(func, *iterables)
该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。 -
shutdown(wait=True, *, cancel_futures=False)
当待执行的 future 对象完成执行后向执行者发送信号,它就会释放正在使用的任何资源。
python3.9才开始增加cancel_futures参数以线程池为例(与with搭配使用更好)
from concurrent.futures import ThreadPoolExecutor pool = ThreadPoolExecutor(8) def work(num, name="unknown"): print(name, num) for i in range(10): pool.submit(work, i, name="work-%s" % i) pool.map(work, [1, 2, 3, 45]) # pool.shutdown(wait=True)
Future对象是submit方法的返回值,其本身也有一些实用的方法:
-
result(timeout=None)
返回 函数的返回值。如果调用还没完成那么这个方法将等待 timeout 秒。超时则触发`concurrent.futures.TimeoutError -
exception(timeout=None)
返回由调用函数引发的异常。如果调用还没完成那么这个方法将等待 timeout 秒。超时则触发concurrent.futures.TimeoutError
-
add_done_callback(fn)
回调函数,将 fn 附加到future对象。当 future 对象被取消或完成运行时,将会调用 fn,而这个future 对象将作为它唯一的参数。import time from concurrent.futures import ThreadPoolExecutor def callback(future): """ 回调函数,future是concurrent.futures._base.Future对象 """ print("result", future.result()) # result 123 print("exception", future.exception()) # exception None def work(num): if not isinstance(num, int): raise TypeError return num with ThreadPoolExecutor(8) as executor: res = executor.submit(work, 123) res.add_done_callback(callback)
进程池
# ...略 def main(): with ProcessPoolExecutor() as executor: res = executor.submit(work, 123) res.add_done_callback(callback) if __name__ == '__main__': # 不要省略了这个,否则报错 main()
生产者、消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题,通过一个容器来解决生产者和消费者的强耦合问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。在一般的高并发程序中通常就会有这样的场景出现:生产多快,处理不过来;生产太慢,等半天没得处理。所以要引入生产者消费者模型:
- 生产者向阻塞队列中添加数据,队列为满的时候就等
- 消费者从阻塞队列中拿数据,队列为空的时候也要等
- 消费者一般是死循环处理数据,当不消费时,可以发送信号,让消费者退出
生产者与消费者模型
多线程实现:
import threading
import queue
import time
import random
class Consumer(threading.Thread):
"""
消费者
"""
def __init__(self, q, lock, name):
super().__init__()
self.q = q # 阻塞消息队列
self.lock = lock # 互斥锁
self.name = "消费者-" + str(name)
self.daemon = True # 设置为守护线程
def run(self):
while True:
item = self.q.get() # 有则取,无则阻塞
with self.lock: # 使用上下文管理协议使用锁
print(f"{self.name}: 处理{item}....")
time.sleep(random.uniform(1, 2)) # 模拟处理时间1~2的浮点数
self.q.task_done() # 调用task_done()
class Producer(threading.Thread):
"""
生产者
"""
def __init__(self, q, count, name):
super().__init__()
self.q = q # 阻塞队列
self.count = count # 生产几个数据
self.name = "生产者-" + str(name)
def run(self):
for num in range(self.count):
data = "data-%d" % num
print(f"{self.name}: 生成数据 {data}")
time.sleep(random.random()) # 模拟处理时间0~1的浮点数
self.q.put(data) # 添加数据,满则阻塞
# 因为消费者是守护线程,其是否可以退出要看生产者
self.q.join() # 等待所有的数据都处理完了,才退出
if __name__ == '__main__':
q = queue.Queue()
lock = threading.Lock()
# 使用map生成,并启动消费者
list(map(lambda name: Consumer(q, lock, name).start(), ["甲", "乙", "丙"]))
# 生成者列表
producer_list = map(lambda name: Producer(q, 20, name), ["大厨", "小厨"])
for p in producer_list:
p.start()
for p in producer_list:
p.join()
# 等结束
多进程实现:
import multiprocessing
import time
import random
class Consumer(multiprocessing.Process):
"""
消费者
"""
def __init__(self, q, lock, name):
super().__init__()
self.q = q # 阻塞消息队列
self.lock = lock # 互斥锁
self.name = "消费者-" + str(name)
self.daemon = True # 设置为守护进程
def run(self):
while True:
item = self.q.get() # 有则取,无则阻塞
with self.lock: # 使用上下文管理协议使用锁
print(f"{self.name}: 处理{item}....")
time.sleep(random.uniform(1, 2)) # 模拟处理时间1~2的浮点数
self.q.task_done() # 调用task_done()
class Producer(multiprocessing.Process):
"""
生产者
"""
def __init__(self, q, count, name):
super().__init__()
self.q = q # 阻塞队列
self.count = count # 生产几个数据
self.name = "生产者-" + str(name)
def run(self):
for num in range(self.count):
data = "data-%d" % num
print(f"{self.name}: 生成数据 {data}")
time.sleep(random.random()) # 模拟处理时间0~1的浮点数
self.q.put(data) # 添加数据,满则阻塞
# 因为消费者是守护进程,其是否可以退出要看生产者
self.q.join() # 等待所有的数据都处理完了,才退出
if __name__ == '__main__':
q = multiprocessing.JoinableQueue()
lock = multiprocessing.Lock()
# 启动两个消费者进程
list(map(lambda name: Consumer(q, lock, name).start(), ["甲", "乙"]))
# 生成者只开一个进程
p = Producer(q, 20, "大厨")
p.start()
p.join()
# 等结束
协程
aiohttp
aiohttp是一个基于asyncio实现对http协议支持的第三方库,点击查看如何使用aiohttp。
IO
进程的执行是要靠操作系统调度的,为了保证不影响后面程序的运行,所以在执行过程中遇到阻塞或超过时间轮询时cpu会切换不同的进程执行
当我们写的程序需要数据即有IO的时候可以使用以下的四种模式来解决问题。
IO模式
IO多路复用实现
不同平台有不同的实现IO多路复用的模块,windows下支持select
且仅适用于套接字;Linux下至此select
、poll
、epoll
函数的访问,这些函数在大多数操作系统中是可用的;在 Solaris下为devpoll
; BSD 上可用kqueue
;在这些操作系统上,适用于套接字和其他文件类型。
水平触发
对于读:只要缓冲内容不为空返回读就绪
对于写:只要缓冲区还不满返回写就绪
边缘触发
对于读:缓冲区由空变为不空 或 数据变多 等时候返回读就绪
对于写:缓冲区由满变为空 或 数据变少 等时候返回写就绪
select和poll都是使用的水平触发方式。epoll既支持水平触发也支持边缘触发,默认是水平触发。
在python中要实现IO多路复用,可以使用select
或selectors
,selectors
是对select
的进一步封装,使用selectors.DefaultSelector()
可以自动选择当前平台最高效的接口。所以推荐使用selectors
模块。
使用selectors
模块主要要用到以下几个方法:
-
selectors.DefaultSelector()
以kqueue > epoll > devpoll > poll > select等优先级返回选择器类 -
选择器类.register(fileobj, event, data=None)
注册一个用于选择的文件对象,在其上监视 I/O 事件。- fileobj 是要监视的文件对象。 它可以是整数形式的文件描述符或者具有 fileno方法(返回文件描述符)的对
- events 是要监视的事件的位掩码。指明哪些 I/O 事件要在给定的文件对象上执行等待(
selectors.EVENT_READ
:可读,selectors.EVENT_WRITE
:可写) - data 是一个不透明对象。
-
选择器类.unregister(fileobj)
注销对一个文件对象的选择,移除对它的监视。 在文件对象被关闭之前应当先将其注销。 -
选择器类.select(timeout=None)
等待直到有已注册的文件对象就绪,或是超过时限。
这将返回由 (key, events) 元组构成的列表,每项各表示一个就绪的文件对象。
key是namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
,fd是文件描述符,其他的都是register
函数的参数 -
选择器类.close()
关闭选择器。确保释放下层资源。
来自python官方文档的例子:
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
conn, addr = sock.accept() # 等连接是读
print('accepted', conn, 'from', addr)
conn.setblocking(False) # 设置非阻塞
sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
data = conn.recv(1000) # 接收消息是读
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data)
else: # 断开连接
print('closing', conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False) # 设置非阻塞
sel.register(sock, selectors.EVENT_READ, accept)
while True:
events = sel.select()
for key, mask in events: # 一直阻塞,直到有数据来
print("event 循环")
# mask是位掩码EVENT_READ或EVENT_WRITE
# 得到回调函数,这里是read或accept
callback = key.data
callback(key.fileobj, mask)
运行这段代码,并用其它终端连接:
>>> import socket
>>> sock = socket.socket()
>>> sock.connect(("localhost", 1234))
>>> sock.send(b"hello world")
11