线程、进程、协程

 

一、线程

第一个线程

import threading   #导入线程模块

def f1(arg):
    print(arg)

t = threading.Thread(target=f1,args=(123,))    #定义一个线程任务,对象为f1,传入参数123
t.start()       #执行线程任务

基本使用

Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

更多方法:

start            线程准备就绪,等待CPU调度

setName      为线程设置名称

getName      获取线程名称

setDaemon   设置为后台线程或前台线程(默认)
                   如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
                    如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

join              逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

run              线程被cpu调度后自动执行线程对象的run方法

 

 

自定义线程

 

import threading   #

class MyThread(threading.Thread):   #继承线程方法
    def __init__(self, func, args):        #重新定义init方法
        self.func = func
        self._args = args
        super(MyThread, self).__init__()  #执行自定义的init方法

    def run(self):
        self.func(self._args)

def f2(arg):      #自定义任务
    print(arg)

obj = MyThread(f2, 123,)
obj.start()

 

线程锁

import threading
import time

NUM = 10


def func(l):
    global NUM
    #上锁
    l.acquire()
    NUM -= 1
    time.sleep(2)
    print(NUM)
    #开锁

    l.release()

lock = threading.Lock()      #单次
#lock = threading.RLock()   #支持多次,多种锁

for i in range(10):
    t = threading.Thread(target=func,args=(lock,))
    t.start()

 

 

信号量(Semaphore)

互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。


自定义线程池

 

import queue  #导入队列
import threading  #导入线程
import time  #导入时间


class ThreadPool:
    def __init__(self, maxsize=5):     #定义默认最大5个线程任务
        self.maxsize = maxsize
        self._q = queue.Queue(maxsize)
        for i in range(maxsize):
            self._q.put(threading.Thread)
        # 【threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread】
    def get_thread(self):    #获取队列任务
        return self._q.get()

    def add_thread(self):    #增加线程
        self._q.put(threading.Thread)

pool = ThreadPool(5)   #实例化

def task(arg,p):    #定义任务方法
    print(arg)
    time.sleep(1)
    p.add_thread()

for i in range(100):        #假设设100个任务过来
    # threading.Thread类
    t = pool.get_thread()     
    obj = t(target=task,args=(i,pool,))   #定义任务
    obj.start()   #启动任务

        

 

 

信号量

import threading,time
 
def run(n):
    semaphore.acquire()   #互斥锁
    time.sleep(1)
    print("run the thread: %s" %n)
    semaphore.release()
 
if __name__ == '__main__':
 
    num= 0
    semaphore  = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
    for i in range(20):
        t = threading.Thread(target=run,args=(i,))
        t.start()

 

事件(event)

python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

clear:将“Flag”设置为False

set:将“Flag”设置为True

 

import threading


def func(i, e):
    print(i)
    e.wait()
    print(i + 100)

event = threading.Event()

for i in range(10):
    t = threading.Thread(target=func, args=(i,event,))
    t.start()

event.clear()#设置成红灯,停止

inp = input('>>>>')
if inp == "1":
    event.set()  #设置成绿灯,执行

 

Timer

定时器,指定n秒后执行某操作

from threading import Timer


def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

 

 

生产者消费者模型(队列)


二、进程

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing

def foo(i,arg):
    arg.put(i)
    print('say hi',i,arg.qsize())


if __name__ == "__main__":
    # li = []
    li = queues.Queue(20,ctx=multiprocessing)
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        #p.daemon = True
        p.start()
        #p.join()

 

默认数据不共享,可以使用下面的三种方法进行进程数据共享

queues

from multiprocessing import Process
from multiprocessing import queues
import multiprocessing

def foo(i,arg):
    arg.put(i)
    print('say hi',i,arg.qsize())


if __name__ == "__main__":
    # li = []
    li = queues.Queue(20,ctx=multiprocessing)
    for i in range(10):
        p = Process(target=foo,args=(i,li,))
        #p.daemon = True
        p.start()
        #p.join()

 


array

from multiprocessing import Process
from multiprocessing import Array
from multiprocessing import RLock
import time

def foo(i,lis,lc):
lc.acquire()
lis[0] = lis[0] - 1
time.sleep(1)
print('say hi',lis[0])
lc.release()

if __name__ == "__main__":
# li = []
li = Array('i', 1)
li[0] = 10
lock = RLock()
for i in range(10):
p = Process(target=foo,args=(i,li,lock))
p.start()

p = Process(target=foo,args=(i,li,lock))
p.start()

Manager.dict

from multiprocessing import Process
from multiprocessing import Manager

def foo(i,arg):

    arg[i] = i + 100
    print(arg.values())

if __name__ == "__main__":

    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo,args=(i,li,))

        p.start()
        p.join()

 


进程池

from multiprocessing import Pool
import time
def f1(arg):
    print(arg,'b')
    time.sleep(5)
    print(arg,'a')
if __name__ == "__main__":
    pool = Pool(5)

    for i in range(30):
        # pool.apply(func=f1,args=(i,))
        pool.apply_async(func=f1,args=(i,))

    # pool.close() # 所有的任务执行完毕
    time.sleep(2)
    pool.terminate() # 立即终止
    pool.join()

 


PS:
IO密集型-多线程
计算密集型 - 多进程

三、协程

 


原理:利用一个线程,分解一个线程成为多个“微线程”==》程序级别
greenlet

import gevent
 
def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')
 
def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')
 
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

 


gevent安装:

pip3 install gevent

上一篇:flask 3 上下文管理、threading.local相关、基础知识


下一篇:python编程技巧之python多任务并发,并行与线程介绍与使用(详细版!!!)