线程:
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
threading模块:
threading模块主要用于线程相关的操作。
线程方法:
start 线程准备就绪,等待CPU调度
setName 为线程设置名称
getName 获取线程名称
setDaemon 设置为后台线程或前台线程(默认), 如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止。如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止。
join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
run 线程被cpu调度后自动执行线程对象的run方法
调用线程的两种方式:
1、直接调用
import threading
from time import sleep
def mythread(num):#定义每个线程要执行的操作(线程要做的事情)
print 'the thread num is %s' % num
sleep(3) if __name__ == '__main__':
t1 = threading.Thread(target=mythread,args=(1,))#实例化一个线程,执行mythread函数,传递参数1
t2 = threading.Thread(target=mythread,args=(2,))
t1.start()#启动线程1
t2.start()
print t1.getName()#获取线程1的名字
print t2.getName() 执行结果:
the thread num is 1
the thread num is 2
Thread-1
Thread-2
线程1和线程2同时执行完毕,然后sleep(3)。并不是执行完线程1后sleep(3),然后再执行线程2。因此两个线程是并行执行。
2、继承调用
import threading
from time import sleep class mythread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)#先继承父类中的构造方法(__init__)
self.num = num def run(self):#必须有一个run方法
print 'the thread num is %s ' % self.num
sleep(3) if __name__ == '__main__':
t1 = mythread(1)
t2 = mythread(2) t1.start()
t2.start()
print t1.getName()
print t2.getName()
执行结果同上。
主线程等待子线程:
以上两个例子中,主线程启动了两个子线程,并和两个子线程同时执行。如果让主线程等待子线程执行完毕后再执行,可以如下操作:
import threading
from time import sleep
def mythread(num):
print 'the thread num is %s' % num
sleep(3) if __name__ == '__main__':
t1 = threading.Thread(target=mythread,args=(1,))
t2 = threading.Thread(target=mythread,args=(2,))
t1.start()
t2.start()
print t1.getName()
print t2.getName() t1.join()#主线程等待t1执行完毕
t2.join()#主线程等待t2执行完毕 print '---main---'#两个子线程执行完,等待3秒,主线程执行。
同时启动10个线程,主线程与子线程同时执行:
import threading
from time import sleep
def mythread(num):
print 'the thread num is %s' % num
sleep(3) if __name__ == '__main__':
for i in range(10):
t = threading.Thread(target=mythread,args=(i,))
t.start()
print '---main---'
同时启动10个线程,主线程等待子线程执行完后再执行:
import threading
from time import sleep
def mythread(num):
print 'the thread num is %s' % num
sleep(3) t_list = [] if __name__ == '__main__':
for i in range(10):
t = threading.Thread(target=mythread,args=(i,))
t.start()
t_list.append(t)#将子线程加到列表中
for i in t_list:
t.join()#等待列表中的全部子线程执行完毕,主线程再执行
print '---main---'
守护进程:
setDaemon实例1:
import time
import threading def run(n):
print '%s running,,,' % n
time.sleep(2)
print 'done...' def main():
for i in range(5):
t = threading.Thread(target=run,args=(i,))
t.start()
t.join(1)
print 'start %s' % t.getName() m = threading.Thread(target=main,args=())
m.setDaemon(True)#将主线程设置为Daemon(守护)线程,它退出时,其它子线程会同时退出,不管是否执行完任务
m.start()
m.join(2)
print 'main thread done...' #执行结果:
0 running,,,
start Thread-2
1 running,,,
main thread done...
setDaemon实例2:
import time
import threading def run(n):
print '%s running,,,' % n
time.sleep(2)
print 'done...' def main():
for i in range(5):
t = threading.Thread(target=run,args=(i,))
t.start()
t.join(1)
print 'start %s' % t.getName() m = threading.Thread(target=main,args=())
m.setDaemon(True)
m.start()
#m.join(2)
print 'main thread done...' #执行结果:
main thread done...
GIL(Glogbal Interpreter Lock):
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
在 CPython中,某一时刻只有一个线程在运行。
首先需要明确的一点是GIL
并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL
归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
线程锁/互斥锁:
一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据。此时,如果多个线程同时要修改同一份数据,那么将会导致线程运算结果不准确。
eg:
import time
import threading def add():
global num
time.sleep(1)#打乱每个线程的顺序
num -=1
print num num = 10
for i in range(10):
t = threading.Thread(target=add,args=())
t.start() #执行结果:
9
8
6
7
5
4
3
2
1
由结果可以看出,多个线程同时操作一份共享数据,导致将结果不可预期,也称“线程不安全”。
要使结果准确,需要确保某一时刻只有一个线程可以对数据进行操作———线程锁。
eg:
import time
import threading def add():
global num
time.sleep(1)#打乱每个线程的顺序
lock.acquire()#添加一个进程锁
num -=1
print num
lock.release()#解除进程锁 lock = threading.Lock()#实例化一个lock
num = 10
for i in range(10):
t = threading.Thread(target=add,args=())
t.start() #运行结果:
9
8
7
6
5
4
3
2
1
递归锁:
在大锁中还要包含小锁:
def run1():
print("grab the first part data")
lock.acquire()
global num
num +=1
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2+=1
lock.release()
return num2
def run3():
lock.acquire()#在运行run1和run2之前加锁,确保执行完run1后接着执行run2,并且在这过程中没有其他的线程来执行run1和run2
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res,res2) if __name__ == '__main__': num,num2 = 0,0
lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=run3)
t.start() while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num,num2)
GIL(全局锁)与线程锁:
既然有GIL了,为什么还要线程锁呢???
GIL是防止底层多个C原生线程(CPython)同一时间修改同一数据,就是说python解释器不能并行的执行代码。线程锁是为了防止多个线程同时修改同一份数据。
多进程:
p2.start()
print '===main=== #执行结果:
===main===
hello ahaii
hello tom
主进程等待子进程执行完毕,再执行:
import multiprocessing
from time import sleep def say(name):
print 'hello %s' % name
sleep(2)
if __name__ == '__main__':
p1 = multiprocessing.Process(target=say,args=('ahaii',))
p2 = multiprocessing.Process(target=say,args=('tom',))
p1.start()
p2.start()
p1.join()
p2.join()
print '===main===' #执行结果:
hello ahaii
hello tom#执行完后,等待2s,执行主进程。
===main===
进程之间通信
线程之间可以相互访问,而不同进程之间内存不是共享的,不能相互访问。因此,要实现通信,需要通过第三方方法(管道)。
Queue队列
python中,队列是进程之间进行数据交换的主要形式。Queue模块提供了队列的操作。无论多少个线程向队列中放数据或者取数据,Queue同一时刻只允许一个线程在操作,即它自带锁。
Queue.put():向队列中放数据
Queue.get():从队列中取数据
Queue.qsize():查看队列中剩余数据个数
常用方法:
import Queue #q = Queue.Queue(maxsize=3)#最大长度
#print q.get()
#print q.get(timeout=2)
q = Queue.PriorityQueue(maxsize=3)#优先级队列 q.put((3,[1,2,3]))#第一组数据为优先级,第二组为数据
q.put((1,22))
q.put((2,33))
#q.put(5)#队列设置长度为3,第四次put时,由于队列已满,会发生阻塞。可以设置超时时间避免阻塞,如:q.put(5,timeout=2)
# print q.qsize()
# print q.full()#判断队列是否已满,False or True
# print q.empty()#判断队列是否已空,False or True
print q.get()
print q.get()
print q.get() #执行结果:
(1, 22)
(2, 33)
(3, [1, 2, 3])
默认Queue队列遵循先进先出的原则,如有特殊要求,可以使用优先级队列。在优先级队列中,优先级最小,权重越大,越现被取到。
eg:
from multiprocessing import Process,Queue def func(q):
q.put([123,'hello']) if __name__ == '__main__':
q = Queue()
p1 = Process(target=func,args=(q,))#子进程执行func函数,向队列中put数据
p1.start()
print q.get()#父进程向队列中get数据
p1.join() #执行结果:
[123, 'hello']
Queue队列在取数据时,时按照数据存放的先后顺序,每次只能存、取一个数据,顺序遵循“先进先出”的原则。
多次存、取数据:
eg:
from multiprocessing import Process,Queue def func(q):
q.put([123,'hello']) if __name__ == '__main__':
q = Queue()
p1 = Process(target=func,args=(q,))#子进程执行func函数,向队列中put数据
p2 = Process(target=func,args=(q,))#子进程执行func函数,向队列中put数据
p1.start()
p2.start()
print q.get()#父进程向队列中get数据
print q.get()
p1.join()
p2.join() #执行结果:
[123, 'hello']
[123, 'hello']
Managers:
from multiprocessing import Manager,Process def f(d,l):#传入一个字典和一个列表
d[1] = ''#修改字典中的值
d[''] = 2
d[0.25] = None
l.append(1)#列表追加1
print l if __name__ == '__main__':
with Manager() as manager:#这种写法好处是,manager执行完后自动销毁。
#manager = Manager()
d = manager.dict()#生成一个字典
l = manager.list(range(5))#生成一个列表
p_list = []
for i in range(10):#生成10个进程
p = Process(target=f,args=(d,l))
p.start()
p_list.append(p)
for res in p_list:
res.join()#等待10个线程执行完毕
print d#主进程执行
print l #执行结果:
[0, 1, 2, 3, 4, 1]
[0, 1, 2, 3, 4, 1, 1]
[0, 1, 2, 3, 4, 1, 1, 1]
[0, 1, 2, 3, 4, 1, 1, 1, 1]
[0, 1, 2, 3, 4, 1, 1, 1, 1, 1]
[0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1]
[0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1, 1]
[0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
{0.25: None, 1: '', '': 2}
[0, 1, 2, 3, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
由结果可以看出,每个子进程都向同一个列表中追加了1。
进程池:
由于进程比较消耗资源,因此当系统中运行多个进程时,需要设定一个限制。进程池就是设定系统中同一时刻,最多有多少个进程执行。
eg:
from multiprocessing import Process,Pool
import time def Foo(i):
time.sleep(2)
# print 'hello'
return i+100 def Bar(arg):
print('-->exec done:',arg) pool = Pool(3)#限制最多执行5个进程 for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar)#异步,执行完Foo后,接着执行Bar(回调),
# pool.apply(func=Foo, args=(i,))#同步,每个进程串行 print('end')
pool.close()
pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭