子进程回收资源两种方式
- 1) join让主进程等待子进程结束,并回收子进程资源,主进程再结束并回收资源。
- 2) 主进程 “正常结束” ,子进程与主进程一并被回收资源。
from multiprocessing import Process
import time # 任务
def task():
print('start....')
time.sleep(2)
print('end......') if __name__ == '__main__':
p = Process(target=task) # 告诉操作系统帮你开启子进程
p.start()
# p.join() time.sleep(3) # 主进程结束
print('主进程结束....')
2.僵尸进程与孤儿进程(了解)
'''
僵尸进程 (有坏处):
- 在子进程结束后,主进程没有正常结束, 子进程PID不会被回收。 缺点:
- 操作系统中的PID号是有限的,如有子进程PID号无法正常回收,则会占用PID号。
- 资源浪费。
- 若PID号满了,则无法创建新的进程。 孤儿进程(没有坏处): - 在子进程没有结束时,主进程没有“正常结束”, 子进程PID不会被回收。
- 操作系统优化机制(孤儿院):
当主进程意外终止,操作系统会检测是否有正在运行的子进程,会他们放入孤儿院中,让操作系统帮你自动回收。 # 进程的其他属性与方法
''' from multiprocessing import Process
from multiprocessing import current_process
# 在子进程中调用,可以拿到子进程对象.pid可以pid号
# 在主进程中调用,可以拿到主进程对象.pid可以pid号
import os import time # 任务
def task():
print(f'start....{current_process().pid}')
time.sleep(1000)
print(f'end......{os.getpid()}')
print('子进程结束啦啊....~~~') if __name__ == '__main__':
p = Process(target=task) # 告诉操作系统帮你开启子进程
p.start()
# p.join() print(f'进入主进程的IO-->{current_process().pid}')
time.sleep(4)
print(f'进入主进程的IO-->{os.getpid()}')
# 主进程结束
print('主进程结束....')
print(f'查看主主进程{os.getppid()}')
f = open('tank.txt') # 强制让主进程报错
3.守护进程:
当主进程结束时,子进程也必须结束,并回收。
from multiprocessing import Process import time # 任务
def demo(name):
print(f'start....{name}')
time.sleep(1000)
print(f'end......{name}')
print('子进程结束啦啊....~~~') if __name__ == '__main__':
p = Process(target=demo, args=('童子军jason1号', )) # 守护进程必须在p.start()调用之前设置
p.daemon = True # 将子进程p设置为守护进程 # 告诉操作系统帮你开启子进程
p.start()
# p.join() time.sleep(1)
print('皇帝驾崩啦啊~~~')
4.进程间数据是隔离的
from multiprocessing import Process
import time
'''
进程间数据是隔离。
'''
number = 10 def func():
global number
number = 100 def func2(number):
number += 100 if __name__ == '__main__':
p_obj = Process(target=func)
p_obj2 = Process(target=func2, args=(number, ))
p_obj.start()
p_obj2.start()
p_obj2.join()
p_obj.join()
time.sleep(1)
print(number) #
5.进程互斥锁
互斥锁是一把锁,用来保证数据读写安全的。
- 抢票例子
from multiprocessing import Process
from multiprocessing import Lock # ---》 进程互斥锁
import random
import time
import json
# 抢票例子: # 1.查看余票
def search(name):
# 1.读取data.json文件中的数据
with open('data.json', 'r', encoding='utf-8') as f:
data_dic = json.load(f)
print(f'用户【{name}】查看余票,余票还剩: {data_dic.get("number")}!') # 2.若有余票,购买成功,票数会减少
def buy(name): # buy() # 网络延时
with open('data.json', 'r', encoding='utf-8') as f:
data_dic = json.load(f) # 进入这一步证明最先抢到票
if data_dic.get('number') > 0:
data_dic['number'] -= 1
time.sleep(random.randint(1, 3))
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data_dic, f)
print(f'用户【{name}】, 抢票成功!') else:
print(f'用户【{name}】, 抢票失败!') def run(name, lock):
# 1.假设1000个用户过来都可以立马查看余票
search(name) lock.acquire() # 加锁
buy(name)
lock.release() # 释放锁 if __name__ == '__main__':
lock = Lock()
# 开启多进程: 实现并发
for line in range(10):
p_obj = Process(target=run, args=(f'jason{line}', lock))
p_obj.start()
6.队列: (FIFO)先进先出 进----》 [3, 2, 1] ----》 出 1, 2, 3
- 先存放的数据,就先取出来。
相当于一个第三方的管道,可以存放数据。
附:堆栈(FILO)
应用: 让进程之间数据进行交互。
from multiprocessing import Queue # multiprocessing提供队列 先进先出
from multiprocessing import JoinableQueue # 基于 Queue 封装的队列 先进先出
import queue # python内置的队列 先进先出 # 第一种
Queue(5) #指的是队列中只能存放5份数据 如果像q=Queue(),括号内没有指定最大接受消息数量,或者数量为负数,代表可接受的消息数量没有上限,知道内存尽头
q_obj1 = Queue(5) # q_obj1队列对象
# 添加数据到队列中
q_obj1.put('jason')
print('添加1个')
q_obj1.put('hcy')
print('添加1个')
q_obj1.put('hb')
print('添加1个')
q_obj1.put('zsb')
print('添加1个')
q_obj1.put('lh')
print('添加1个')
#
# put: 只要队列满了,会进入阻塞
# q_obj1.put('sean')
# print('sean into ') # put_nowait: 只要队列满了,就会报错
# q_obj1.put_nowait('sean') # get: 只要队列中有数据,就能获取数据,若没有则会进入阻塞
print(q_obj1.get())
print(q_obj1.get())
print(q_obj1.get())
print(q_obj1.get())
print(q_obj1.get())
# print(q_obj1.get())
# get_nowait: 若队列中没有数据获取则会报错
# print(q_obj1.get_nowait()) # 第二种
q_obj1 = JoinableQueue(5) # q_obj1队列对象
# 添加数据到队列中
q_obj1.put('jason')
print('添加1个')
q_obj1.put('hcy')
print('添加1个')
q_obj1.put('hb')
print('添加1个')
q_obj1.put('zsb')
print('添加1个')
q_obj1.put('lh')
print('添加1个') # put: 只要队列满了,会进入阻塞
# q_obj1.put('sean')
# print('sean into ') # put_nowait: 只要队列满了,就会报错
# q_obj1.put_nowait('sean') # get: 只要队列中有数据,就能获取数据,若没有则会进入阻塞
print(q_obj1.get())
print(q_obj1.get())
print(q_obj1.get())
print(q_obj1.get())
print(q_obj1.get()) # 第三种
q_obj1 = queue.Queue(5) # q_obj1队列对象
# 添加数据到队列中
q_obj1.put('jason')
print('添加1个')
q_obj1.put('hcy')
print('添加1个')
q_obj1.put('hb')
print('添加1个')
q_obj1.put('zsb')
print('添加1个')
q_obj1.put('lh')
print('添加1个') # put: 只要队列满了,会进入阻塞
# q_obj1.put('sean')
# print('sean into ') # put_nowait: 只要队列满了,就会报错
# q_obj1.put_nowait('sean') # get: 只要队列中有数据,就能获取数据,若没有则会进入阻塞
print(q_obj1.get())
print(q_obj1.get())
print(q_obj1.get())
print(q_obj1.get())
print(q_obj1.get())
7.IPC机制 (进程间实现通信) 面试问: 什么是IPC机制?
进程间数据是相互隔离的,若想实现进程间通信,可以利用队列.
from multiprocessing import Process
from multiprocessing import JoinableQueue
import time def task1(q):
x = 100
q.put(x)
print('添加数据') time.sleep(3)
print('p1获取数据:',q.get()) def task2(q):
# 想要在task2中获取task1的x
res = q.get()
print(f'p2获取的数据是{res}')
q.put(9527) if __name__ == '__main__':
# 产生队列
q=JoinableQueue(10)
# 产生两个不同的子进程
p1 = Process(target=task1,args=(q,))
p2 = Process(target=task2,args=(q,))
p1.start()
p2.start()
8.生产者与消费者
- 生产者: 生产数据的
- 消费者: 使用数据的
- 生产油条的有人总比吃油条的人少 ---> 生产数据跟不上 使用数据的人 ---》 供需不平衡
- 吃油条的人比生产的油条要少 ---> 使用数据的速度 跟不上 生产数据的速度
- 通过队列来实现,解决供需不平衡问题
from multiprocessing import JoinableQueue
from multiprocessing import Process
import time # 生产者: 生产数据 ---》 队列
def producer(name, food, q):
msg = f'{name} 生产了 {food} 食物'
# 生产一个食物,添加到队列中
q.put(food)
print(msg) # 消费者: 使用数据 《--- 列队
def customer(name, q):
while True:
try:
time.sleep(0.5)
# 若报错,则跳出循环
food = q.get_nowait()
msg = f'{name} 吃了 {food} 食物!'
print(msg) except Exception:
break if __name__ == '__main__':
q = JoinableQueue() # 创建两个生产者
for line in range(10):
p1 = Process(target=producer, args=('tank1', f'Pig饲料{line}', q))
p1.start() # 创建两个消费者
c1 = Process(target=customer, args=('jason', q))
c2 = Process(target=customer, args=('sean', q))
c1.start()
c2.start()
9.线程
线程不需要像进程,使用if __name__ == '__main__':
10.守护线程
'''
线程:
1.什么是线程?
进程: 资源单位。
线程: 执行单位。 线程与进程都是虚拟的概念,为了更好表达某种事物。 注意: 开启一个进程,一定会自带一个线程,线程才是真正的执行者。 2.为什么要使用线程?
节省资源的占用。 - 开启进程:
- 1) 会产生一个内存空间,申请一块资源。
- 2) 会自带一个主线程
- 3) 开启子进程的速度要比开启子线程的速度慢 - 开启线程
- 1) 一个进程内可以开启多个线程,从进程的内存空间中申请执行单位。
- 2) 节省资源。 - 开启三个进程:
- 占用三份内存资源 - 开启三个线程:
- 从一个内存资源中,申请三个小的执行单位 - IO密集型用(多核): 多线程
- IO(时间由用户定):
- 阻塞: 切换 + 保存状态 - 计算密集型用(多核): 多进程
- 计算(时间由操作系统定):
- 计算时间很长 ---> 切换 + 保存状态 注意: 进程与进程之间数据是隔离的,线程与线程之间的数据是共享的。 3.怎么使用线程? - 守护线程 '''
from threading import Thread
import time number = 1000 # 启动线程的方式一:
# 任务1:
def task():
global number
number = 100
print('start...')
time.sleep(1)
print('end...') if __name__ == '__main__':
# 开启一个子线程
t = Thread(target=task)
t.start()
# t.join()
print('主进程(主线程)...')
print(number) # 启动线程的方式二:
class MyThread(Thread):
def run(self):
print('start...')
time.sleep(1)
print('end...') if __name__ == '__main__':
# 开启一个子线程
t = MyThread()
t.start()
# t.join()
print('主进程(主线程)...')
# 守护线程
from threading import current_thread number = 1000
def task():
global number
number = 100
print(f'start...{current_thread().name}')
time.sleep(3)
print(f'end...{current_thread().name}') if __name__ == '__main__':
# 开启一个子线程
for line in range(10):
t = Thread(target=task)
# 加上守护线程: 主进程结束,代表主线程也结束,子线程有可能未被回收。
t.daemon = True
t.start() # t.join()
print(f'主进程(主线程)...{current_thread().name}')
print(number)
线程互斥锁
from threading import Lock
from threading import Thread
import time
lock = Lock() # 开启10个线程,对一个数据进行修改
number = 100 def task():
global number # lock.acquire()
number2 = number
# time.sleep(1)
number = number2 - 1
# lock.release() if __name__ == '__main__': list1 = []
for line in range(100000000):
t = Thread(target=task)
t.start()
list1.append(t) for t in list1:
t.join() print(number) #
threading.local
# 为每个线程创建一个独立的空间,是的线程对自己的空间中的数据进行操作(数据隔离)
import threading
from threading import local
import time obj = local() def task(i):
obj.xxxxx = i
time.sleep(2)
print(obj.xxxxx) for i in range(10): # 开启10个线程
t = threading.Thread(target=task,args=(i,))
t.start()
11.线程池
- 线程池是用来限制 创建的线程数
from concurrent.futures import ThreadPoolExecutor
import time # pool只能创建10个线程
pool = ThreadPoolExecutor(10) # 只能同时运行10个线程 def task(line):
print(line)
time.sleep(10) if __name__ == '__main__':
for line in range(100):
pool.submit(task, line)
回调函数 add_done_callback
from concurrent.futures import ThreadPoolExecutor
# pool线程池对象
pool = ThreadPoolExecutor(50) # 并发执行的任务
def task():
return '刺客伍六七' # 自定义的回调函数
def callback(res_obj): # res_obj {'result()': '刺客伍六七'}
result = res_obj.result() # 刺客伍六七 # submit(函数名, 函数接收的参数1, 参数2...) ===== Thread(target=函数名, args=(函数接收的参数1, 参数2...))
# pool.submit(函数名).add_done_callback(回调函数的名字) pool.submit(task).add_done_callback(callback)
# add_done_callback
from concurrent.futures import ThreadPoolExecutor
import time
# 池子对象: 内部可以帮你提交50个启动进程的任务
p_pool = ThreadPoolExecutor(50) def task1(n):
print(f'from task1...{n}')
time.sleep(5)
return 'tank' def get_result(obj):
# print(obj.__dict__)
# print(obj._result)
result = obj.result()
print(result) if __name__ == '__main__':
n = 1
while True:
# 参数1: 函数名
# 参数2: 函数的参数1
# 参数3: 函数的参数2
# submit(参数1, 参数2, 参数3)
# add_done_callback(参数1),会将submit提交的task1执行的结果,传给get_result中的第一个参数,第一个参数是一个对象。
p_pool.submit(task1, n).add_done_callback(get_result)
n += 1
TCP服务端实现并发
#server
import socket
from concurrent.futures import ThreadPoolExecutor server = socket.socket() server.bind(
('127.0.0.1', 9000)
) server.listen(5) # 1.封装成一个函数
def run(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0:
break
print(data.decode('utf-8'))
conn.send(''.encode('utf-8')) except Exception as e:
break conn.close() if __name__ == '__main__':
print('Server is run....')
pool = ThreadPoolExecutor(50)
while True:
conn, addr = server.accept()
print(addr)
pool.submit(run, conn)
#client
import socket client = socket.socket() client.connect(
('127.0.0.1', 9000)
) print('Client is run....')
while True:
msg = input('客户端>>:').encode('utf-8')
client.send(msg) data = client.recv(1024)
print(data)