Python多线程编程

1.全局解释器锁定

    Python虚拟机使用GIL(Global Interpreter Lock,全局解释器锁定)来互斥线程对共享资源的访问,暂时无法利用多处理器的优势。虽然python解释器可以“运行”多个线程,但在任意时刻,不管有多少的处理器,任何时候都总是只有一个线程在执行。对于I/O密集型任务,使用线程一般是没有问题的,而对于涉及大量CPU计算的应用程序而言,使用线程来细分工作没有任何好处,用户最好使用子进程和消息传递。

2.threading

     python的threading模块提供Thread类和各种同步原语,用于编写多线程的程序。

2.1. Thread(target=None,name=None,args=(),kwargs={})

    此函数创建一个Thread实例。target是一个可调用函数,线程启动时,run()方法将调用此对象。name是线程的名称,默认是‘Thread-N‘的格式。args是传递给target函数的参数元组,kwargs是传递给target函数的关键字参数的字典。

    Thread实例t支持以下方法和属性:

  • t.start()                 启动线程,就是调用run()方法
  • t.run()                   可以在Thread的子类中重新定义
  • t.join([timeout])      阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数)。
  • t.is_live()                返回线程的活动状态
  • t.name                   线程的名称
  • t.ident                    线程标识符
  • t.daemon                设置线程是否为守护线程,必须在t.start()前设置。当设置为True时,主线程要退出时,不必等守护线程完成。‘

    创建线程有两种方法:

  1. 创建一个Thread实例,传递给它一个函数
1
2
3
4
5
6
7
8
9
import threading
import time
def clock(nsec):
    whhile True:
        print ‘Now is %s‘%time.ctime()
        time.sleep(nsec)
t=threading.Thread(target=clock,args=(5,))
t.daemon=True  #设置为守护线程
t.start()

     2. 从Thread派生出一个子类,然后创建一个子类的实例

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
import time
class ClockThread(threading.Thread):
    def __init__(self,nsec):
        threading.Thread.__init__(self)
        self.daemon=True   #设置为守护线程
        self.nsec=nsec
    def run():
        while True:
            print ‘Now is s%‘%time.ctime()
            time.sleep(self.nsec)
t=ClockThread(5)
t.start()

  后一种方法比较python一点。

      由于线程会无限循环,所以设置daemonTrue,这样当进程结束时,线程也将被销毁。

      例如有个数数程序,一个线程从1数到9,另一个线程从a数到j,每个线程都要耗费9s,如果要顺序执行的话需耗费18s。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading
import time
class CountThread(threading.Thread):
    def __init__(self,func,name):
        threading.Thread.__init__(self)
        self.name=str(name)
        self.func=func
    def run(self):
        apply(self.func)
def numcount():
    print threading.currentThread().name,‘start at : ‘,time.ctime()
    for i in range(10):
        print i
        time.sleep(1)
    print threading.currentThread().name,‘done at : ‘,time.ctime()
def alphacount():
    print threading.currentThread().name,‘start at : ‘,time.ctime()
    for i in range(97,107):
        print chr(i)
        time.sleep(1)
    print threading.currentThread().getName(),‘done at : ‘,time.ctime()
def main():
    funclist=[numcount,alphacount]
    threads=[]
    for i in funclist:
        t=CountThread(i,i.__name__)
        threads.append(t)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print ‘All done at :‘,time.ctime()
if __name__==‘__main__‘:
    main()

  结果:

    10s就完成了。

    举一个更清晰的看t.join()作用的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import time
def join():
    print ‘in Threadjoin‘
    time.sleep(1)
    print ‘out Threadjoin‘
Threadjoin=threading.Thread(target=join,name=‘Threadjoin‘)
def context(Threadjoin):
    print ‘in Threadcontext‘
    Threadjoin.start()
    Threadjoin.join()   #Threadjoin线程开始阻塞,等待Threadjoin完成
    print ‘out Threadcontext‘
Threadcontext=threading.Thread(target=context,name=‘Threadcontext‘,args=(Threadjoin,))
Threadcontext.start()

  结果:

1
2
3
4
5
>>>
in Threadcontext
in Threadjoin
out Threadjoin
out Threadcontext

2.2. 线程的同步

    线程运行在创建它的进程内部,共享所有的数据和资源,但都有自己独立的栈和堆。编写并发编程的难点在于同步和访问共享数据。多个任务同时更新一个数据结构可能导致数据损坏和程序状态不一致(也就是竞争条件)。要解决这个问题,必须找出程序的关键代码段,并使用互斥锁和其它类似的同步手段保护他们。

2.2.1 Lock

    原语锁定(互斥锁定)是一个同步原语,状态是"已锁定"或"未锁定"之一。两个方法acquire()和release()用于修改锁定的状态。如果有多个线程在等待获取锁定,当锁定释放时,只有一个线程能获得它。

构造方法: 
Lock()  :创建新的Lock对象,初始状态为未锁定

实例方法: 
Lock.acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。 成功获取锁定返回True,无法获取锁定返回False。
Lock.release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

    Python多线程分块读取大文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import threading
import os
seekposition=0
blocksize=1000000
filesize=0
def getFilesize(filename):
    f=open(filename)
    f.seek(0,os.SSEK_END)
    filesize=f.tell()
    f.close()
    return filesize
 
def parsefile(filename):
    global seekposition,filesize
    f=open(filename)
     
    while True:
        lock.acquire()   #seekposition是线程共享的,修改时需要锁定
        startposition=seekposition
        endposition=(startposition+blocksize) if (startposition+blocksize)<filesize else filesize
        seekposition=endposition
        lock.release()
        if startposition==filesize:
            break
        elif startposition>0:
            f.seek(startposition)
            f.readline()    #分成的block第一行可能不是完整的一行,略掉不处理,而是作为上一个block的最后一行处理
        position=f.tell()
        outfile=open(str(endposition)+‘.txt‘,‘w‘)
        while position<=endposition:
            line=f.readline()
            outfile.write(line)
            position=f.tell()
        outfile.close()
    f.close()
 
def main(filename):
    global seekposition,filesize
    filesize=getFilesize(filename)
    lock=threading.Lock()
    threads=[]
    for i in range(4):
        t=threading.Thread(target=parsefile,args=(filename,))
        threads.append(t)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
if __name__==‘__main__‘:
    filename=‘‘
    main(filename)

2.2.2 RLock

    多重锁定是一个类似于Lock对象的同步原语,但同一个线程可以多次获取它。这允许拥有锁定的线程执行嵌套的acquire()和release()操作。可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading
import time
rlock=threading.RLock()
count=0
class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
        global count
        if rlock.acquire():
            count+=1
            print ‘%s set count : %d‘%(self.name,count)
            time.sleep(1)
            if rlock.acquire():
                count+=1
                print ‘%s set count : %d‘%(self.name,count)
                time.sleep(1)
                rlock.release()
            rlock.release()
if __name__==‘__main__‘:
    for i in range(5):
        t=MyThread()
        t.start()

2.2.3 信号量Semaphore

    信号量是一个基于计数器的同步原语,调用acquire()方法时此计数器减1,调用release()方法时此计数器加1.如果计数器为0,acquire()方法会被阻塞,直到其他线程调用release()为止。

    下面是一个说明信号量的好例子,引用自http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import threading
import time
  
semaphore = threading.Semaphore(2)   # 计数器初值为2
  
def func():
     
    # 请求Semaphore,成功后计数器-1;计数器为0时阻塞
    print ‘%s acquire semaphore...‘ % threading.currentThread().getName()
    if semaphore.acquire():
         
        print ‘%s get semaphore‘ % threading.currentThread().getName()
        time.sleep(4)
         
        # 释放Semaphore,计数器+1
        print ‘%s release semaphore‘ % threading.currentThread().getName()
        semaphore.release()
  
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t4 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()
t4.start()
  
time.sleep(2)
  
# 没有获得semaphore的主线程也可以调用release
# 若使用BoundedSemaphore,t4释放semaphore时将抛出异常
print ‘MainThread release semaphore without acquire‘
semaphore.release()

2.2.4 Condition

    条件变量是构建在另一个锁定上的同步原语。典型的用法是生产者-使用者问题,其中一个线程生产的数据供另一个线程使用。

    构造方法:
    Condition([lock/rlock])

    实例方法: 
    acquire([timeout])/release(): 调用关联的锁的相应方法。 
    wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁,直到另一个线程在条件变量上执行notify()或notify_all()方法将其唤醒为止。使用前线程必须已获得锁定,否则将抛出异常。 
    notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。 
    notify_all(): 唤醒所有等待此条件的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading
cv=threading.Condition()
alist=[]
 
def producer():
    global alist   
    cv.acquire()
    for i in range(10):
        alist.append(i)
    cv.notify()
    cv.release()
     
def consumer():
    cv.acquire()
    while alist is None:
        cv.wait()
    cv.release()
    print alist
 
tproducer = threading.Thread(target=producer)
tconsumer = threading.Thread(target=consumer)
tconsumer.start()
tproducer.start()

2.3 local()

    返回local对象,用于保存线程的数据,管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。 可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
mydata=threading.local()
mydata.number=42
mydata.color=‘red‘
print mydata.__dict__
log=[]
 
def foo():
    items=mydata.__dict__.items()  #在此线程中mydata属性字典为空,无number与color属性
    items.sort()
    log.append(items)
    mydata.number=11
    log.append(mydata.number)
 
t=threading.Thread(target=foo)
t.start()
t.join()
 
print log
print mydata.number   #仍为42

3. Queue

    尽管在Python中可以使用各种锁定和同步原语的组合编写非常传统的多线程程序,但有一种更优的编程方式——即将多线程程序组织为多个独立任务的集合,这些线程之间通过消息队列进行通讯。Queue模块可以用于线程间通讯,让各个线程共享数据。

    构造方法:

    Queue():创建一个FIFO队列
    LifoQueue():创建一个LIFO栈

    实例方法: 
    q.put(item):将item放入队列
    q.get():从队列中删除一项,然后返回这个项目
    q.task_done():队列中数据的使用者用来指示对于项目的处理已结束。从队列中删除的每一项都应该调用一次。
    q.join():阻塞直到队列中的所有项目均被处理为止。

    python核心编程中有关于多线程编程和Queue结合使用的思路:

  • UserThread:负责读取客户的输入,可能是一个I/O通道。程序可以创建多个线程,每个客户一个,输入放置到队列中。
  • ReplyThread:负责把用户的输入取出来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading
import Queue
q=Queue.Queue()
class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.daemon=True
    def run(self):
        while True:
            item=q.get()
            print threading.current_thread().name,‘get‘,item
            q.task_done()
for i in range(4):
    t=MyThread()
    t.start()
for i in range(100):
    q.put(i)
q.join()

4. 线程终止与挂起

    下面选自《Python参考手册》

    线程没有任何方法可用于强制终止或挂起。由于在设计上,如果某个线程获取了锁定,在它释放之前强制终止线程,将导致整个应用程序出现死锁。

    可以自己构建终止功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self._terminate=False  #设置终止标志
        self.lock=threading.Lock()
    def terminal(self):
        self._terminal=True
    def acquire(self):
        self.lock.acquire()
    def release(self):
        self.lock.release()
    def run(self):
        while True:
            if self._terminal:   #标志为True,则终止线程
                break
            self.lock.acquire()
            statements
            self.lock.release()
            statements

  也可以利用Queue传递终止信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading
import Queue
 
class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.queue=Queue.Queue()
    def send(self,item):
        self.queue.put(item)
    def close(self):
        self.queue.put(None)
        self.queue.join()
    def run(self):
        while True:
            item=self.queue.get()
            if item is None:
                break
            print item
            self.queue.task_done()
        self.queue.task_done()
t=MyThread()
t.start()
t.send(‘hello‘)
t.send(‘world‘)
t.close()

  

Python多线程编程

上一篇:Fluentd (tD-agent) 日志处理


下一篇:控制器相关