Python自定义进程池(生产者/消费者模型)

代码说明一切:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#encoding=utf-8
#author: walker
#date: 2014-05-21
#summary: 自定义进程池遍历目录下文件
 
from multiprocessing import Process, Queue, Lock
import time, os
 
#消费者
class Consumer(Process):
    def __init__(self, queue, ioLock):
        super(Consumer, self).__init__()
        self.queue = queue
        self.ioLock = ioLock
         
    def run(self):
        while True:
            task = self.queue.get()   #队列中无任务时,会阻塞进程
            if isinstance(task, strand task == 'quit':
                break;
            time.sleep(1)   #假定任务处理需要1秒钟
            self.ioLock.acquire()
            printstr(os.getpid()) + '  ' + task)
            self.ioLock.release()
        self.ioLock.acquire()
        print 'Bye-bye'
        self.ioLock.release()
 
#生产者       
def Producer():
    queue = Queue()    #这个队列是进程/线程安全的
    ioLock = Lock()
    subNum = 4    #子进程数量
    workers = build_worker_pool(queue, ioLock, subNum)
    start_time = time.time()
     
    for parent, dirnames, filenames in os.walk(r'D:\test'):
        for filename in filenames:
            queue.put(filename)
            ioLock.acquire()   
            print('qsize:' + str(queue.qsize()))
            ioLock.release()
            while queue.qsize() > subNum * 10:  #控制队列中任务数量
                time.sleep(1)
             
    for worker in workers:
        queue.put('quit')
         
    for worker in workers:
        worker.join()
     
    ioLock.acquire()   
    print('Done! Time taken: {}'.format(time.time() - start_time))
    ioLock.release()
 
#创建进程池
def build_worker_pool(queue, ioLock, size):
    workers = []
    for in range(size):
        worker = Consumer(queue, ioLock)
        worker.start()
        workers.append(worker)
    return workers
     
if __name__ == '__main__':
    Producer()

ps:

1
2
3
self.ioLock.acquire()  
...
self.ioLock.release()

可用

1
2
with self.ioLock:
    ...

替代。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,一个子进程消费
  
import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager
 
#生产
def Produce(q):
    print('Produce %d ...' % os.getpid())
    for in range(120):
        while q.full():
            print('sleep %d/%d ...' % (i, q.qsize()))
            time.sleep(1)
        q.put(i)
         
    q.put(0)        #用0通知结束
 
#消费
def Consume(q):
    print('Consume %d ...' % os.getpid())
    while True:
        num = q.get()
        if 0 == num: #收到结束信号
            print('receive 0')
            break
        print('Consumer ' + str(num))
        time.sleep(2)
        print('Consumer end ' + str(num))
  
if __name__ == '__main__'
    = Queue(10)             #可用
    = Manager().Queue(10)       #可用
     
    print(os.getpid())
     
    producerProcess = Process(target=Produce, args=(q,))     #生产进程
    consumerProcess = Process(target=Consume, args=(q,))     #消费进程
     
    producerProcess.start()
    consumerProcess.start()
     
    producerProcess.join()
    consumerProcess.join()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一个子进程生产,进程池消费
  
import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager
 
#生产
def Produce(q, poolSize):
    print('Produce ...')
    for in range(1100):
        while q.full():
            print('sleep %d/%d ...' % (i, q.qsize()))
            time.sleep(1)
        q.put(i)
         
    for in range(0, poolSize):
        q.put(0)        #用0通知结束
 
#消费
def Consume(q):
    print('Consume ...')
    while True:
        num = q.get()
        if 0 == num: #收到结束信号
            print('receive 0')
            break
        print('Consumer ' + str(num))
        time.sleep(2)
        print('Consumer end ' + str(num))
  
if __name__ == '__main__'
    #q = Queue(10)                #不可用
    = Manager().Queue(10)       #可用
     
    poolSize = 4
    producerProcess = Process(target=Produce, args=(q, poolSize))       #生产进程
    consumerPool = Pool(processes=poolSize)   #消费进程池,默认子进程个数为os.cpu_count()
    for in range(0, poolSize):
        consumerPool.apply_async(func=Consume, args=(q,))
     
    producerProcess.start()
    consumerPool.close()
     
    producerProcess.join()
    consumerPool.join()
    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 主进程生产,进程池消费
  
import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager
 
#消费
def Consume(q):
    print('Consume ...')
    num = q.get()
    print('Consume %d ...' % num)
    time.sleep(2)
    print('Consumer %d over' % num)
      
if __name__ == '__main__':    
    #q = Queue(10)                #不可用
    = Manager().Queue(10)       #可用
     
    pool = Pool(processes = 4)
    for in range(1100):     #生产
        while q.full():
            print('sleep %d ...' % q.qsize())
            time.sleep(1)
        q.put(i)
        print(i)
        pool.apply_async(Consume, (q,))
          
    pool.close()
    pool.join()

*** Updated 2016-01-06 ***

一个好玩的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#encoding=utf-8
#author: walker
#date: 2016-01-06
#summary: 一个多进程的好玩例子
 
import os, sys, time
from multiprocessing import Pool
 
cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))
 
g_List = ['a']
 
#修改全局变量g_List
def ModifyDict_1():
    global g_List
    g_List.append('b')
 
#修改全局变量g_List    
def ModifyDict_2():
    global g_List
    g_List.append('c')
  
#处理一个
def ProcOne(num):
    print('ProcOne ' + str(num) + ', g_List:' + repr(g_List))
 
#处理所有
def ProcAll():    
    pool = Pool(processes = 4)
    for in range(120):
        #ProcOne(i)
        #pool.apply(ProcOne, (i,))
        pool.apply_async(ProcOne, (i,))
         
    pool.close()
    pool.join()  
     
ModifyDict_1()  #修改全局变量g_List
     
if __name__ == '__main__':
    ModifyDict_2()  #修改全局变量g_List
     
    print('In main g_List :' + repr(g_List))
     
    ProcAll()

Windows7 下运行的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
λ python3 demo.py
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b']
ProcOne 2, g_List:['a', 'b']
ProcOne 3, g_List:['a', 'b']
ProcOne 4, g_List:['a', 'b']
ProcOne 5, g_List:['a', 'b']
ProcOne 6, g_List:['a', 'b']
ProcOne 7, g_List:['a', 'b']
ProcOne 8, g_List:['a', 'b']
ProcOne 9, g_List:['a', 'b']
ProcOne 10, g_List:['a', 'b']
ProcOne 11, g_List:['a', 'b']
ProcOne 12, g_List:['a', 'b']
ProcOne 13, g_List:['a', 'b']
ProcOne 14, g_List:['a', 'b']
ProcOne 15, g_List:['a', 'b']
ProcOne 16, g_List:['a', 'b']
ProcOne 17, g_List:['a', 'b']
ProcOne 18, g_List:['a', 'b']
ProcOne 19, g_List:['a', 'b']

Ubuntu 14.04下运行的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b', 'c']
ProcOne 2, g_List:['a', 'b', 'c']
ProcOne 3, g_List:['a', 'b', 'c']
ProcOne 5, g_List:['a', 'b', 'c']
ProcOne 4, g_List:['a', 'b', 'c']
ProcOne 8, g_List:['a', 'b', 'c']
ProcOne 9, g_List:['a', 'b', 'c']
ProcOne 7, g_List:['a', 'b', 'c']
ProcOne 11, g_List:['a', 'b', 'c']
ProcOne 6, g_List:['a', 'b', 'c']
ProcOne 12, g_List:['a', 'b', 'c']
ProcOne 13, g_List:['a', 'b', 'c']
ProcOne 10, g_List:['a', 'b', 'c']
ProcOne 14, g_List:['a', 'b', 'c']
ProcOne 15, g_List:['a', 'b', 'c']
ProcOne 16, g_List:['a', 'b', 'c']
ProcOne 17, g_List:['a', 'b', 'c']
ProcOne 18, g_List:['a', 'b', 'c']
ProcOne 19, g_List:['a', 'b', 'c']

  可以看见Windows7下第二次修改没有成功,而Ubuntu下修改成功了。据uliweb作者limodou讲,原因是Windows下是充重启实现的子进程;Linux下是fork实现的。


相关阅读:

0、官方多进程文档

1、Python 并行任务技巧

2、python中的多进程处理

3、python的threading和multiprocessing模块

4、python下使用ctypes获取threading线程id 


*** walker * 2014-05-21 ***

本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1414703如需转载请自行联系原作者


RQSLT

上一篇:vite概念和设计思想


下一篇:Vue3 + Vite 动态引入图片