011-Python-进程、线程于协程

1.进程与线程

  1. 进程:

    一个程序要运行时所需的所有资源的集合;

    一个进程至少需要一个线程,这个线程称为主线程,一个进程里可以包含多个线程;

    cpu 核数越多,代表着你可以真正并发的线程越多2个进程之间的数据是完全独立的,默认情况下相互不能访问;

  2. 线程:

    工作最小单元的是线程,一个应用程序至少有一个线程;多个线程在涉及修改同一个数据时一定要加锁;

  3. 应用场景:

    IO密集型:线程(IO的读写)

    计算密集型:进程(涉及到CPU运算)

  4. GIL,全局解释器锁:

    保证同一个进程中只有一个线程同时被调用;

2.多线程的格式:

模块:

threading # 线程模块

t = threading.Thread(target=run, args=(1,)) # 创建线程

t.start() # 启动线程

2.1将两个线程执行后,发现并不会sleep4秒而执行,而是同时执行2秒结束;

#导入模块threading
import threading
import time def run(n):
time.sleep(2)
print("这是一个线程", n)
#定义线程t:threading.Thread(target=函数名, args=(元组,)) # args 后面跟可迭代的列表,元组等
t = threading.Thread(target=run, args=(1,))
t2 = threading.Thread(target=run, args=(2,)) #启动线程 t.start()
t.start()
t2.start()

2.2循环的实现多线程:

import threading
import time def run(n):
time.sleep(2)
print("这是一个线程", n) for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()

2.3获取一共有多少线程数,以及线程名设置与获取;

import threading
import time def run(n):
time.sleep(2)
print("这是一个线程", n) for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
t.setName("t-%s" % i) # 修改线程名称
print(t.getName()) # 打印线程名称
print(threading.active_count()) # 统计一共有多少活跃(没有结束)的线程

2.4通过类的方式调用线程;

import threading
import time class abcd(threading.Thread):
def __init__(self, num):
super().__init__() # 继承父类
self.num = num def run(self):
print("运行这个数字:%s" % self.num)
time.sleep(3) if __name__ == '__main__':
t1 = abcd(1)
t2 = abcd(2) t1.start()
t2.start()

2.5主线程等待子线程运行完毕后,在运行主线程;

import time
import threading t_list = [] def run(n):
time.sleep(1)
print("运行这个数字", n) for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start() t_list.append(t) for t in t_list:
t.join() print("====最后这是一个主线程=====")

2.6检测,如果主线程结束了,子线程也跟着结束setDaemon(True);

import time
import threading def run(n):
time.sleep(0.01)
print("运行这个数字", n) for i in range(100):
t = threading.Thread(target=run, args=(i,))
t.setDaemon(True) # 设置为守护线程,如果主线程结束,守护线程也会结束;
t.start() print("====这是一个主线程=====")

3.线程锁

1.当涉及到对数据的同时修改操作时,就需要对数据进行加锁;以免数据的错乱;

2.格式:

lock = threading.Lock() # 创建锁

lock.acquire() # 申请使用锁 其他线程等待

lock.release() # 释放锁 下个进程继续

3.1实例:

3.1.1将进程1个进程涉及到修改数据的部分使用锁进行锁定(可用于修改)

import threading
import time v = 10
lock = threading.Lock() # 创建锁 def task(arg):
time.sleep(2)
lock.acquire() # 申请使用锁 其他线程等待 global v
v -= 1
print(v)
lock.release() # 释放锁 下个进程继续 for i in range(10):
t = threading.Thread(target=task, args=(i, ))
t.start()

3.1.2分批次操作限定进程数量时,限定同时有3个进程对文件读的操作;

import threading
import time v = 10
lock = threading.BoundedSemaphore(3) # 创建锁,但同时有3个线程可以突破锁 def task(arg): lock.acquire() # 申请使用锁 其他线程等待
time.sleep(2)
global v
v -= 1
print(v)
lock.release() # 释放锁 下个进程继续 for i in range(10):
t = threading.Thread(target=task, args=(i, ))
t.start()

3.1.3事件锁,满足某种需求后释放所有的锁;

import threading
import time lock = threading.Event() # 创建锁,但同时有3个线程可以突破锁 def task(arg):
time.sleep(1)
lock.wait() # wait锁住所有的线程等待
print(arg) for i in range(10):
t = threading.Thread(target=task, args=(i, ))
t.start() while True:
v = input(">>>")
if v == "1":
lock.set() # 释放锁,将所有线程释放

3.1.4事件锁,用户想释放几个释放几个;

import threading
import time lock = threading.Condition() # 创建锁,控制锁 def task(arg):
time.sleep(1)
lock.acquire() # 申请使用锁 其他线程等待 lock.wait() # wait锁住所有的线程等待 print("线程:", arg) lock.release() # 释放锁 下个进程继续 for i in range(10):
t = threading.Thread(target=task, args=(i,))
t.start() while True:
v = input(">>>")
lock.acquire() # 申请使用锁 其他线程等待
lock.notify(int(v))
lock.release() # 释放锁 下个进程继续

4.线程池

4.1通过线程池的方式,请求URL返回的状态;

from concurrent.futures import ThreadPoolExecutor
import time
import requests def task(arg):
data = requests.get(arg)
print("返回结果:", arg, data.status_code) pool = ThreadPoolExecutor(2) # 创建连接池 大小为 2
url_list = [
"http://www.baidu.com",
"http://www.jd.com",
"http://www.taobao.com",
] for i in url_list:
print("开始请求:", i)
pool.submit(task, i) # 去连接池中获取一个连接,并执行

4.2通过回调的方式,将获取的数据传送给txt函数;

from concurrent.futures import ThreadPoolExecutor
import requests def txt(a):
# print(a.result().url, a.result().status_code)
download_future = a.result()
print(download_future.url, download_future.status_code) def download(arg):
data = requests.get(arg)
# print("返回结果:", arg, data.status_code) return data # 包含下载的所有内容 pool = ThreadPoolExecutor(2) # 创建连接池 大小为 2 url_list = [
"http://www.baidu.com",
"http://www.jd.com",
"http://www.taobao.com",
] for i in url_list:
print("开始请求:", i)
future = pool.submit(download, i) # 去连接池中获取一个连接,并执行
future.add_done_callback(txt)

5.进程

5.1基本使用

from multiprocessing import Process      # 导入进程模块
import time def task(arg):
time.sleep(1)
print(arg) if __name__ == '__main__': # 进程在windos上面需要使用if __name__ == '__main__'执行,mac和Linux无需
for i in range(10):
p = Process(target=task,args=(i,)) # 生成一个进程
# p.daemon = True # 守护进程,如果主进程结束子进程也结束
p.start() # 启动进程
# p.join(1) # 设置进程最多等待时间
print("主进程到最后了。。。")

5.2进程池:

from concurrent.futures import ProcessPoolExecutor

def call(arg):
data = arg.result()
print(data) def task(arg):
# print(arg)
return arg+100 if __name__ == '__main__': pool = ProcessPoolExecutor(5) # 创建进程池,大小为 5 for i in range(10):
obj = pool.submit(task, i)
obj.add_done_callback(call)

6.进程之间的数据共享

6.1默认情况下,进程之间的数据是不可以共享的,每个线程向v里面添加数据输出结果并不是追加形式;

from multiprocessing import Process

def task(num, v):
v.append(num)
print(v) if __name__ == '__main__':
v = []
for i in range(10):
p = Process(target=task, args=(i, v,))
p.start()

6.2通过#C语言中的Array进行实现数据共享:

from multiprocessing import Process, Array

def task(i, v):
v[i] = 1 # 第一次循环的时候v = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; v[0] = 1 ; v[1] = 1
print(list(v)) if __name__ == '__main__':
# v = Array("数据类型", "长度")
v = Array("i", 10)
# print(list(v)) # Array的值为 v = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] for i in range(10):
p = Process(target=task, args=(i, v,))
p.start()

6.3通过Manager实现进程之间的通信(常用)执行报错可以忽略,由于断开socket连接导致;

from multiprocessing import Process, Manager
# Manager是基于socket实现的进程之间的通信 def task(num, v):
v.append(num) # 每个进程向列表中追加值
print(v) if __name__ == '__main__':
obj = Manager() # 创建了一个对象
alist = obj.list() # 创建一个列表,或字典dic = obj.dict()
print(alist) # alist 只是一个空列表 [] for i in range(10):
p = Process(target=task, args=(i, alist,))
p.start()

7.协程greenlet

  1. 协程永远是一个线程在执行;对线程的一个分片处理;在线程空闲是的时候干点其他的事情;
  2. 协程只有来回切换的功能,其他功能是没有的,需要实现二次加工;
  3. 单独使用协程没有任何意义;
  4. 使用模块greenlet实现协程

7.1实现协程的二次加工有两种方式:

1.使用已有模块 gevent模块

  • gevent是在协程的基础上,对IO请求又做了一次封装;

**2.使用一个线程完成对数据的请求,以及接收(效果返回并不会安装顺序执行) **

# 根据协程二次开发: 协程 + IO
from gevent import monkey; monkey.patch_all() # gevent补丁包,涉及到IO操作的都给改为异步模式
import gevent
import requests def z(url):
data = requests.get(url)
print(data.url, data.status_code) gevent.joinall([
gevent.spawn(z, "http://www.baidu.com"),
gevent.spawn(z, "http://www.jd.com"),
gevent.spawn(z, "http://www.taobao.com"),
])

8.IO多路复用(自定义形式的协程)

  • 用于监听多个socket对象,是否有变化(可读,可写,发生错误)

1.通过同时监听多个端口,实现客户端的同时请求,一个进程完成请求之间的来回调用;基于select实现的“伪”多并发

# 服务端
import socket
import select s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 基于网络通信,是TCP协议的通讯
s1.bind(("127.0.0.1", 8080)) # 开启 绑定IP以及端口
s1.listen(5) # 监听;进程池的大小5 s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s2.bind(("127.0.0.1.", 8081))
s2.listen(5) inputs = [s1, s2]
while True:
# IO多路复用
# - select,主动循环:内部进行的循环操作(监听的socket个数是有限制的:1024)
# - poll, 主动循环:内部进行循环的操作(没有监听个数的限制)
# - epoll, 被动告知:通过异步回调(没有监听个数的限制)
r, w, e = select.select(inputs, [], [], 0.5)
# 如果有访问 8080 列表r 的值为r = [s2]
# 如果有访问 8081 列表r 的值为r = [s1]
# 如果同时访问 8080 ,8081 列表r 的值为r = [s1, s2] for i in r:
if i in [s1, s2]:
# 判断如果i 是服务端的 8080、8081 就是新连接进来了
print("新连接进来了!")
conn, addr = i.accept()
inputs.append(conn)
else:
# 否则有连接用户发送消息来了
print("有数据过来了!")
try: # 捕捉客户端如果突然断开的异常进行 处理
data = i.recv(1024)
except Exception as e:
data = ""
if data: # 如果data有数据,将输入返回给客户端
i.sendall(data.upper())
else: # 没有数据将 这个socket连接关闭,并移除inputs
i.close()
inputs.remove(i)
# 客户端
import socket
import select client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("127.0.0.1", 8081,)) while True:
a = input(">>")
client.sendall(a.encode()) data = client.recv(1024)
print("来自服务端的数据", data)
上一篇:HTML5 Canvas画数字时钟


下一篇:学习笔记_J2EE_SpringMVC_03_注解配置_@RequestMapping用法