Python process (进程)

进程 (process)

进程是对各种资源管理的集合,包含对各种资源的调用、内存的管理、网络接口的调用

进程要操作 CPU 必须先启动一个线程,启动一个进程的时候会自动创建一个线程,进程里的第一个线程就是主线程

程序执行的实例

有唯一的进程标识符(pid)

multiprossing 模块

启动进程

示例:

import multiprocessing
import time def process_run(n):
time.sleep(1)
print('process', n) for i in range(10):
p = multiprocessing.Process(target=process_run, args=(i, ))
p.start()

所有进程都是由父进程启动的

示例:

import multiprocessing
import os def show_info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id', os.getpid())
print('\n\n') def f(name):
show_info('function f')
print(name) if __name__ == '__main__':
show_info('main process line')
p = multiprocessing.Process(target=f, args=('children process', ))
p.start()

进程间通信

线程间共享内存空间,进程间只能通过其他方法进行通信

Queue

注意这个 Queue 不同于 queue.Queue

Queue type using a pipe, buffer and thread

两个进程的 Queue 并不是同一个,而是将数据 pickle 后传给另一个进程的 Queue

用于父进程与子进程之间的通信或同一父进程的子进程之间通信


示例:

from multiprocessing import Process, Queue

def p_put(*args):
q.put(args)
print('Has put %s' % args) def p_get(*args):
print('%s wait to get...' % args)
print(q.get())
print('%s got it' % args) q = Queue()
p1 = Process(target=p_put, args=('p1', ))
p2 = Process(target=p_get, args=('p2', ))
p1.start()
p2.start()

输出结果:

Has put p1

p2 wait to get...

('p1',)

p2 got it


换成 queue 示例:

from multiprocessing import Process
import queue def p_put(*args):
q.put(args)
print('Has put %s' % args) def p_get(*args):
print('%s wait to get...' % args)
print(q.get())
print('%s got it' % args) q = queue.Queue()
p1 = Process(target=p_put, args=('p1', ))
p2 = Process(target=p_get, args=('p2', ))
p1.start()
p2.start()

输出结果:

Has put p1

p2 wait to get...


由于父进程启动子进程时是复制一份,所以每个子进程里也有一个空的队列,但是这些队列数据独立,所以 get 时会阻塞

Pipe

Pipe(管道) 是通过 socket 进行进程间通信的

所以步骤与建立 socket 连接相似:

建立连接、发送/接收数据(一端发送另一端不接受就会阻塞)、关闭连接

示例:

from multiprocessing import Pipe, Process

def f(conn):
conn.send('send by child')
print('child recv:', conn.recv())
conn.close() parent_conn, child_conn = Pipe() # 获得 Pipe 连接的两端
p = Process(target=f, args=(child_conn, ))
p.start()
print('parent recv:', parent_conn.recv())
parent_conn.send('send by parent')
p.join()

输出结果:

parent recv: send by child

child recv: send by parent

进程间数据共享

Manager

Manager 实现的是进程间共享数据

支持的可共享数据类型:

list
dict
Value
Array
Namespace
Queue queue.Queue
JoinableQueue queue.Queue
Event threading.Event
Lock threading.Lock
RLock threading.RLock
Semaphore threading.Semaphore
BoundedSemaphore threading.BoundedSemaphore
Condition threading.Condition
Barrier threading.Barrier
Pool pool.Pool

示例:

from multiprocessing import Manager, Process
import os def func():
m_dict['key'] = 'value'
m_list.append(os.getpid()) manager = Manager()
m_dict = manager.dict()
m_list = manager.list()
p_list = []
for i in range(10):
p = Process(target=func)
p.start()
p_list.append(p)
for p in p_list:
p.join()
print(m_list)
print(m_dict)

进程锁

打印时可能会出错,加锁可以避免

示例:

from multiprocessing import Lock, Process

def foo(n, l):
l.acquire()
print('hello world', n)
l.release() lock = Lock()
for i in range(100):
process = Process(target=foo, args=(i, lock))
process.start()

进程池 (pool)

同一时间最多有几个进程在 CPU 上运行

示例:

from multiprocessing import Pool
import time
import os def foo(n):
time.sleep(1)
print('In process', n, os.getpid())
return n def bar(*args):
print('>>done: ', args, os.getpid()) pool = Pool(processes=3)
print('主进程: ', os.getpid())
for i in range(10):
# pool.apply(func=foo, args=(i, ))
pool.apply_async(func=foo, args=(i, ), callback=bar)
print('end')
pool.close()
pool.join()

从程序运行过程中可以看出:同一时间最多只有3个进程在运行,类似于线程中的信号量

主进程在执行 callback 函数

注意

1.

pool.apply(func=foo, args=(i, )) 是串行执行

pool.apply_async(func=foo, args=(i, ), callback=bar) 是并行执行

2.

callback 函数会以 target 函数返回结果为参数,在 target 函数执行结束之后执行

callback 函数是主进程调用的

3.

如果不执行 join,程序会在主进程执行完成之后直接结束,不会等待子进程执行完成

Pool.join() 必须在 Pool.close() 之后执行,否则会报错:ValueError: Pool is still running

上一篇:How to convert `ctime` to `datetime` in Python? - Stack Overflow


下一篇:NodeJs中process.cwd()与__dirname的区别