1 import math 2 import datetime 3 import multiprocessing as mp 4 5 # 调用 Python 自带的多进程库 Multiprocessing, 就可以进行 多核并行 计算 6 # Manager 是一个 Multiprocessing 库里的类,用来创建 可以进行多进程共享的 数据容器,容器种类包括了几乎所有 Python 自带的数据类 7 # 既然出现了可以共享的数据类,就要再次通过锁 (Lock) 来避免资源竞争,所以同时通过 Manager 创建了锁 Lock 类,并用 With 语境来锁住共享的数据类 8 9 # 定义目标函数 10 def train_on_parameter(name, param, result_dict, result_lock): 11 result = 0 12 for num in param: 13 result += math.sqrt(num * math.tanh(num) / math.log2(num) / math.log10(num)) 14 15 with result_lock:# 用 With 语境来锁住共享的数据类 16 result_dict[name] = result 17 18 return 19 20 if __name__ == '__main__': 21 22 start_t = datetime.datetime.now() 23 # 核心数量: cpu_count() 函数可以获得计算机的核心数量。 24 num_cores = int(mp.cpu_count()) 25 print("本计算机总共有: " + str(num_cores) + " 核心") 26 27 # 进程池: Pool() 函数创建了一个进程池类,用来管理多进程的生命周期和资源分配。 28 # 这里进程池传入的参数是核心数量,意思是最多有多少个进程可以进行并行运算。 29 pool = mp.Pool(num_cores) 30 31 param_dict = {'task1': list(range(10, 30000000)), 32 'task2': list(range(30000000, 60000000)), 33 'task3': list(range(60000000, 90000000)), 34 'task4': list(range(90000000, 120000000)), 35 'task5': list(range(120000000, 150000000)), 36 'task6': list(range(150000000, 180000000)), 37 'task7': list(range(180000000, 210000000)), 38 'task8': list(range(210000000, 240000000))} 39 40 # 创建一个 manager 进行进程共享 41 manager = mp.Manager() 42 # 创建一个可以进行进程共享的字典类(数据类), 计算函数把计算好的结果保存在字典managed_dict里,而不是直接返回 43 managed_dict = manager.dict()# 创建一个可以进行进程共享的字典managed_dict,随后作为第三个参数传入计算函数中。计算函数把计算好的结果保存在字典managed_dict里,而不是直接返回 44 # 通过 Manager 创建了锁 Lock 类, 并用 With 语境来锁住共享的字典类 45 managed_locker = manager.Lock()# 既然出现了可以共享的数据类managed_dict,就要再次通过锁 (Lock) 来避免资源竞争,所以同时通过 Manager 创建了锁 Lock 类,以第四个参数传入计算函数,并用 With 语境来锁住共享的字典类。 46 47 # 异步调度: apply_async() 是进程池的一个调度函数。第一个参数是计算函数.第二个参数是需要传入计算函数的参数,这里传入了计算函数名字和计算调参。 48 # 异步的意义是在调度之后,虽然计算函数开始运行并且可能没有结束,异步调度都会返回一个临时结果,并且通过列表生成器临时保存在一个列表-results里。 49 results = [pool.apply_async(train_on_parameter, args=(name, param, managed_dict, managed_locker)) for name, param in param_dict.items()] 50 results = [p.get() for p in results] 51 52 # 在并行运算结束之后,通过 print() 来查看字典里的结果 53 print(managed_dict) 54 55 end_t = datetime.datetime.now() 56 elapsed_sec = (end_t - start_t).total_seconds() 57 print("多进程计算 共消耗: " + "{:.2f}".format(elapsed_sec) + " 秒")