代码说明一切:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
#encoding=utf-8 #author: walker #date: 2014-05-21 #summary: 自定义进程池遍历目录下文件 from multiprocessing import Process, Queue, Lock
import time, os
#消费者 class Consumer(Process):
def __init__( self , queue, ioLock):
super (Consumer, self ).__init__()
self .queue = queue
self .ioLock = ioLock
def run( self ):
while True :
task = self .queue.get() #队列中无任务时,会阻塞进程
if isinstance (task, str ) and task = = 'quit' :
break ;
time.sleep( 1 ) #假定任务处理需要1秒钟
self .ioLock.acquire()
print ( str (os.getpid()) + ' ' + task)
self .ioLock.release()
self .ioLock.acquire()
print 'Bye-bye'
self .ioLock.release()
#生产者 def Producer():
queue = Queue() #这个队列是进程/线程安全的
ioLock = Lock()
subNum = 4 #子进程数量
workers = build_worker_pool(queue, ioLock, subNum)
start_time = time.time()
for parent, dirnames, filenames in os.walk(r 'D:\test' ):
for filename in filenames:
queue.put(filename)
ioLock.acquire()
print ( 'qsize:' + str (queue.qsize()))
ioLock.release()
while queue.qsize() > subNum * 10 : #控制队列中任务数量
time.sleep( 1 )
for worker in workers:
queue.put( 'quit' )
for worker in workers:
worker.join()
ioLock.acquire()
print ( 'Done! Time taken: {}' . format (time.time() - start_time))
ioLock.release()
#创建进程池 def build_worker_pool(queue, ioLock, size):
workers = []
for _ in range (size):
worker = Consumer(queue, ioLock)
worker.start()
workers.append(worker)
return workers
if __name__ = = '__main__' :
Producer()
|
ps:
1
2
3
|
self .ioLock.acquire()
... self .ioLock.release()
|
可用
1
2
|
with self .ioLock:
...
|
替代。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
#encoding=utf-8 #author: walker #date: 2017-03-14 #summary: 一个子进程生产,一个子进程消费 import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager
#生产 def Produce(q):
print ( 'Produce %d ...' % os.getpid())
for i in range ( 1 , 20 ):
while q.full():
print ( 'sleep %d/%d ...' % (i, q.qsize()))
time.sleep( 1 )
q.put(i)
q.put( 0 ) #用0通知结束
#消费 def Consume(q):
print ( 'Consume %d ...' % os.getpid())
while True :
num = q.get()
if 0 = = num: #收到结束信号
print ( 'receive 0' )
break
print ( 'Consumer ' + str (num))
time.sleep( 2 )
print ( 'Consumer end ' + str (num))
if __name__ = = '__main__' :
q = Queue( 10 ) #可用
q = Manager().Queue( 10 ) #可用
print (os.getpid())
producerProcess = Process(target = Produce, args = (q,)) #生产进程
consumerProcess = Process(target = Consume, args = (q,)) #消费进程
producerProcess.start()
consumerProcess.start()
producerProcess.join()
consumerProcess.join()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
#encoding=utf-8 #author: walker #date: 2017-03-14 #summary: 一个子进程生产,进程池消费 import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager
#生产 def Produce(q, poolSize):
print ( 'Produce ...' )
for i in range ( 1 , 100 ):
while q.full():
print ( 'sleep %d/%d ...' % (i, q.qsize()))
time.sleep( 1 )
q.put(i)
for _ in range ( 0 , poolSize):
q.put( 0 ) #用0通知结束
#消费 def Consume(q):
print ( 'Consume ...' )
while True :
num = q.get()
if 0 = = num: #收到结束信号
print ( 'receive 0' )
break
print ( 'Consumer ' + str (num))
time.sleep( 2 )
print ( 'Consumer end ' + str (num))
if __name__ = = '__main__' :
#q = Queue(10) #不可用
q = Manager().Queue( 10 ) #可用
poolSize = 4
producerProcess = Process(target = Produce, args = (q, poolSize)) #生产进程
consumerPool = Pool(processes = poolSize) #消费进程池,默认子进程个数为os.cpu_count()
for _ in range ( 0 , poolSize):
consumerPool.apply_async(func = Consume, args = (q,))
producerProcess.start()
consumerPool.close()
producerProcess.join()
consumerPool.join()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
#encoding=utf-8 #author: walker #date: 2017-03-14 #summary: 主进程生产,进程池消费 import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager
#消费 def Consume(q):
print ( 'Consume ...' )
num = q.get()
print ( 'Consume %d ...' % num)
time.sleep( 2 )
print ( 'Consumer %d over' % num)
if __name__ = = '__main__' :
#q = Queue(10) #不可用
q = Manager().Queue( 10 ) #可用
pool = Pool(processes = 4 )
for i in range ( 1 , 100 ): #生产
while q.full():
print ( 'sleep %d ...' % q.qsize())
time.sleep( 1 )
q.put(i)
print (i)
pool.apply_async(Consume, (q,))
pool.close()
pool.join()
|
*** Updated 2016-01-06 ***
一个好玩的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
#encoding=utf-8 #author: walker #date: 2016-01-06 #summary: 一个多进程的好玩例子 import os, sys, time
from multiprocessing import Pool
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
g_List = [ 'a' ]
#修改全局变量g_List def ModifyDict_1():
global g_List
g_List.append( 'b' )
#修改全局变量g_List def ModifyDict_2():
global g_List
g_List.append( 'c' )
#处理一个 def ProcOne(num):
print ( 'ProcOne ' + str (num) + ', g_List:' + repr (g_List))
#处理所有 def ProcAll():
pool = Pool(processes = 4 )
for i in range ( 1 , 20 ):
#ProcOne(i)
#pool.apply(ProcOne, (i,))
pool.apply_async(ProcOne, (i,))
pool.close()
pool.join()
ModifyDict_1() #修改全局变量g_List
if __name__ = = '__main__' :
ModifyDict_2() #修改全局变量g_List
print ( 'In main g_List :' + repr (g_List))
ProcAll()
|
Windows7 下运行的结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
λ python3 demo.py In main g_List :['a', 'b', 'c'] ProcOne 1, g_List:['a', 'b'] ProcOne 2, g_List:['a', 'b'] ProcOne 3, g_List:['a', 'b'] ProcOne 4, g_List:['a', 'b'] ProcOne 5, g_List:['a', 'b'] ProcOne 6, g_List:['a', 'b'] ProcOne 7, g_List:['a', 'b'] ProcOne 8, g_List:['a', 'b'] ProcOne 9, g_List:['a', 'b'] ProcOne 10, g_List:['a', 'b'] ProcOne 11, g_List:['a', 'b'] ProcOne 12, g_List:['a', 'b'] ProcOne 13, g_List:['a', 'b'] ProcOne 14, g_List:['a', 'b'] ProcOne 15, g_List:['a', 'b'] ProcOne 16, g_List:['a', 'b'] ProcOne 17, g_List:['a', 'b'] ProcOne 18, g_List:['a', 'b'] ProcOne 19, g_List:['a', 'b'] |
Ubuntu 14.04下运行的结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
In main g_List :['a', 'b', 'c'] ProcOne 1, g_List:['a', 'b', 'c'] ProcOne 2, g_List:['a', 'b', 'c'] ProcOne 3, g_List:['a', 'b', 'c'] ProcOne 5, g_List:['a', 'b', 'c'] ProcOne 4, g_List:['a', 'b', 'c'] ProcOne 8, g_List:['a', 'b', 'c'] ProcOne 9, g_List:['a', 'b', 'c'] ProcOne 7, g_List:['a', 'b', 'c'] ProcOne 11, g_List:['a', 'b', 'c'] ProcOne 6, g_List:['a', 'b', 'c'] ProcOne 12, g_List:['a', 'b', 'c'] ProcOne 13, g_List:['a', 'b', 'c'] ProcOne 10, g_List:['a', 'b', 'c'] ProcOne 14, g_List:['a', 'b', 'c'] ProcOne 15, g_List:['a', 'b', 'c'] ProcOne 16, g_List:['a', 'b', 'c'] ProcOne 17, g_List:['a', 'b', 'c'] ProcOne 18, g_List:['a', 'b', 'c'] ProcOne 19, g_List:['a', 'b', 'c'] |
可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是充重启实现的子进程;Linux下是fork实现的。
相关阅读:
0、官方多进程文档。
3、python的threading和multiprocessing模块
4、python下使用ctypes获取threading线程id
*** walker * 2014-05-21 ***
本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1414703如需转载请自行联系原作者
RQSLT