Python——threading模块(线程)

一、threading模块的对象

Thread:表示一个执行线程的对象

Lock:

Rlock:可重入锁对象

Condition:条件变量对象,使得一个线程等待另一个线程满足特定的“条件”

Event:条件变量的通用版本,任意数量的线程等待某个时间的发生,在该事件发生后所有线程将被激活

Semphore:为线程间共享有限资源提供一个“计数器”

BoundedSemaphore:与Semphore相同,不过它不允许超过初始值

Timer:与Thread相同,不过它要在运行前等待一段时间

Barrier:创建一个“障碍”,必须达到指定数量的线程后才可以继续

二、守护进程

  守护进程(daemon)是一类在后台运行的特殊进程,用于执行特定的系统任务。很多守护进程在系统引导的时候启动,并且一直运行直到系统关闭。另一些只在需要的时候才启动,完成任务后就自动结束

import time
from threading import Thread
def func1():
while True:
print('*'*10)
time.sleep(1)
def func2():
print('in func2')
time.sleep(5)
t = Thread(target=func1,)
t.daemon = True
t.start()
t2 = Thread(target=func2,)
t2.start()
t2.join()
print('主线程')

三、Threading模块提供的常用方法

  threading.currentThread(): 返回当前的线程变量。 
  threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 
  threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

四、代码示例

1、创建线程的两种方法

# coding:utf-8
import threading
import time
#方法一:将要执行的方法作为参数传给Thread的构造方法
def action(arg):
time.sleep(1)
print 'the arg is:%s\r' %arg
for i in xrange(4):
t =threading.Thread(target=action,args=(i,))
t.start()
print 'main thread end!'
#方法二:从Thread继承,并重写run()
class MyThread(threading.Thread):
def __init__(self,arg):
super(MyThread, self).__init__()#注意:一定要显式的调用父类的初始化函数。
self.arg=arg
def run(self):#定义每个线程要运行的函数
time.sleep(1)
print 'the arg is:%s\r' % self.arg
for i in xrange(4):
t =MyThread(i)
t.start()
print 'main thread end!'

2、join的使用方法

五、Lock、Rlock类

  简言之:Lock属于全局,Rlock属于线程

1、使用Lock

# coding:utf-8
import threading
import time
gl_num = 0
lock = threading.RLock()
# 调用acquire([timeout])时,线程将一直阻塞,
# 直到获得锁定或者直到timeout秒后(timeout参数可选)。
# 返回是否获得锁。
def Func():
lock.acquire()
global gl_num
gl_num += 1
time.sleep(1)
print gl_num
lock.release()
for i in range(10):
t = threading.Thread(target=Func)
t.start()

2、使用Rlock

from threading import RLock   # 递归锁
fork_lock = noodle_lock = RLock() # 一个钥匙串上的两把钥匙
def eat1(name):
noodle_lock.acquire() # 一把钥匙
print('%s拿到面条啦'%name)
fork_lock.acquire()
print('%s拿到叉子了'%name)
print('%s吃面'%name)
fork_lock.release()
noodle_lock.release()
def eat2(name):
fork_lock.acquire()
print('%s拿到叉子了'%name)
time.sleep(1)
noodle_lock.acquire()
print('%s拿到面条啦'%name)
print('%s吃面'%name)
noodle_lock.release()
fork_lock.release()
Thread(target=eat1,args=('alex',)).start()
Thread(target=eat2,args=('Egon',)).start()
Thread(target=eat1,args=('bossjin',)).start()
Thread(target=eat2,args=('nezha',)).start()

六、Condition类

  Condition(条件变量)通常与一个锁关联。需要在多个Contidion*享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

  可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。

1、构造方法:

   Condition([lock/rlock])

2、实例方法:   

  acquire([timeout])/release(): 调用关联的锁的相应方法。 
  wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。 
  notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。 
  notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

3、代码示例(生产消费者模型)

# encoding: UTF-8
import threading
import time
# 商品
product = None
# 条件变量
con = threading.Condition()
# 生产者方法
def produce():
global product
if con.acquire():
while True:
if product is None:
print 'produce...'
product = 'anything'
# 通知消费者,商品已经生产
con.notify()
# 等待通知
con.wait()
time.sleep(2)
# 消费者方法
def consume():
global product
if con.acquire():
while True:
if product is not None:
print 'consume...'
product = None
# 通知生产者,商品已经没了
con.notify()
# 等待通知
con.wait()
time.sleep(2)
t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t2.start()
t1.start()
'''
结果
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
Process finished with exit code -1
程序不断循环运行下去。重复生产消费过程。
'''

七、Event类

  Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。

1、构造方法: 

Event()

2、实例方法:  

  isSet(): 当内置标志为True时返回True。 
  set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。 
  clear(): 将标志设为False。 
  wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。

3、代码示例

import time
import random
from threading import Thread,Event
def connect_db(e):
count = 0
while count < 3:
e.wait(0.5) # 状态为False的时候,我只等待1s就结束
if e.is_set() == True:
print('连接数据库')
break
else:
count += 1
print('第%s次连接失败'%count)
else:
raise TimeoutError('数据库连接超时')
def check_web(e):
time.sleep(random.randint(0,3))
e.set()
e = Event()
t1 = Thread(target=connect_db,args=(e,))
t2 = Thread(target=check_web,args=(e,))
t1.start()
t2.start()

八、Timer类  

  Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。

1、构造方法:   

  Timer(interval, function, args=[], kwargs={}) 
  interval: 指定的时间 
  function: 要执行的方法 
  args/kwargs: 方法的参数

2、代码示例

# encoding: UTF-8
import threading
def func():
print 'hello timer!'
timer = threading.Timer(5, func)
timer.start()
上一篇:总结Flink状态管理和容错机制


下一篇:AngularJS+requireJS项目的目录结构设想