一.僵尸进程与孤儿进程
1.1 僵尸进程
进程代码运行结束之后并没有直接结束而是需要等待回收子进程资源才会结束
1.2 孤儿进程
主程序已经死亡,子程序还在运行
二.守护进程
守护进程即守护者某个进程,一旦该进程结束那么也随之结束
from multiprocessing import Process import time def test(name): print('总管:%s is running' % name) time.sleep(3) print('总管:%s is over' % name) if __name__ == '__main__': p = Process(target=test, args=('jason',)) p.daemon = True # 设置为守护进程(一定要放在start语句上方) p.start() print("皇帝jason寿终正寝") time.sleep(0.1)
三.互斥锁
问题:并发情况下操作同一份数据,极其容易造成数据错乱
解决措施:将并发变成串行,虽然降低了效率,但是提升了数据安全。
锁就可以实现将并发变成串行的效果
锁分为:①行锁 ②表锁
import json import time import random from multiprocessing import Process, Lock # 查票 def find(name): with open(r'data.txt', 'r', encoding='utf8') as f: data_dict = json.load(f) ticket_num = data_dict.get('ticket_num') print('您好,%s,目前剩余票量为%s' % (name, ticket_num)) # 买票 def buy(name): with open(r'data.txt', 'r', encoding='utf8') as f: data_dict = json.load(f) ticket_num = data_dict.get('ticket_num') # 创建一个延迟 time.sleep(1) if ticket_num > 0: data_dict['ticket_num'] -= 1 with open(r'data.txt', 'w', encoding='utf8') as f: json.dump(data_dict, f) print('%s购票成功' % name) else: print('没票了') def run(name): find(name) mutex.acquire() buy(name) mutex.release() if __name__ == '__main__': mutex = Lock() for i in range(1, 11): p = Process(target=run, args=('用户%s' % i,)) p.start()
四.消息队列
队列就是先进先出
from multiprocessing import Queue q = Queue(5) # put存放数据 q.put(3333) q.put(111) q.put(22) # get拿取数据 顺序就是先进先出 print(q.get()) print(q.get()) print(q.get()) print(q.full()) # 判断队列中数据是否满了 try: print(q.get_nowait()) # 判断队列中是否还有数据 没有数据立即报错 except Exception as e: print(e)
五.ipc机制
from multiprocessing import Queue, Process def producer(q): q.put("子进程p放的数据") def consumer(q): print("子进程c取的数据", q.get()) if __name__ == '__main__': q = Queue() # 开一个进程 p = Process(target=producer, args=(q,)) c = Process(target=consumer, args=(q,)) p.start() c.start() # p.join() # c.join()
六.生产者消费者模型
生产者
负责产生数据(做包子的)
消费者
负责处理数据(吃包子的)
该模型需要解决恭喜不平衡现象
""" 生产者 负责产生数据(做包子的) 消费者 负责处理数据(吃包子的) 该模型需要解决恭喜不平衡现象 """ from multiprocessing import Queue, Process, JoinableQueue import time import random def producer(name, food, q): for i in range(10): print('%s 生产了 %s' % (name, food)) q.put(food) time.sleep(random.random()) def consumer(name, q): while True: data = q.get() print('%s 吃了 %s' % (name, data)) q.task_done() if __name__ == '__main__': # q = Queue() q = JoinableQueue() p1 = Process(target=producer, args=('大厨jason', '玛莎拉', q)) p2 = Process(target=producer, args=('印度阿三', '飞饼', q)) p3 = Process(target=producer, args=('泰国阿人', '榴莲', q)) c1 = Process(target=consumer, args=('班长阿飞', q)) p1.start() p2.start() p3.start() c1.daemon = True c1.start() p1.join() p2.join() p3.join() q.join() # 等待队列中所有的数据被取干净 print('主')
七.线程理论
什么是线程?
进程其实是一个资源单位 真正被CPU执行的其实是进程里面的线程
"""
进程类似于是工厂 线程类似于是工厂里面的一条条流水线
所有的进程肯定含有最少一个线程
"""
进程间数据默认是隔离的 但是同一个进程内的多个线程数据是共享的
八.开设线程的两种方式
from threading import Thread import time # 第一种 def test(name): start_time = time.time() print('%s哈哈哈' % name) time.sleep(2) # io操作 print('%s哈哈哈,结束了' % name) t = Thread(target=test, args=('jason',)) t.start() print('主') # 第二种 class MyClass(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): print('%s哈哈哈' % self.name) time.sleep(2) # io操作 print('%s哈哈哈,结束了' % self.name) a = MyClass('jason') a.start() a.join() print('主')
九.守护线程
""" 主线程的结束意味着整个进程的结束 所以主线程需要等待里面所有非守护线程的结束才能结束 """ from threading import Thread from multiprocessing import Process import time def foo(): print(123) time.sleep(3) print("end123") def bar(): print(456) time.sleep(1) print("end456") if __name__ == '__main__': t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")
十.线程数据共享
from threading import Thread money = 100 def test(): global money money = 999 t = Thread(target=test) t.start() t.join() print(money)
十一.线程互斥锁
from threading import Thread, Lock from multiprocessing import Lock import time num = 100 def test(mutex): global num mutex.acquire() # 先获取num的数值 tmp = num # 模拟延迟效果 time.sleep(0.1) # 修改数值 tmp -= 1 num = tmp mutex.release() t_list = [] mutex = Lock() for i in range(100): t = Thread(target=test, args=(mutex,)) t.start() t_list.append(t) # 确保所有的子线程全部结束 for t in t_list: t.join() print(num)
十二.tcp服务端实行多线程并发
服务端
import socket from threading import Thread from multiprocessing import Process server = socket.socket() server.bind(('127.0.0.1', 8081)) server.listen(5) # 半连接池 def talk(sock): while True: try: data = sock.recv(1024) # 接收消息 if len(data) == 0: continue else: print(data.decode('utf8')) sock.send(data + b'jack, my_lover') except ConnectionResetError as e: print(e) break while True: sock, address = server.accept() print(address) t = Thread(target=talk, args=(sock,)) t.start()View Code
客户端
import socket from threading import Thread from multiprocessing import Process client = socket.socket() client.connect(('127.0.0.1', 8081)) while True: msg = input('你输入发送内容>>>:').strip() if len(msg) == 0: print('格式错误,重新输入') continue client.send(msg.encode('utf8')) # 发送消息 data = client.recv(1024) # 接受消息 print(data.decode('utf8'))View Code