我有一个包含股票行情自动收录器的向量,例如代码= [‘AAPL’,’XOM’,’GOOG’],在我的“传统” python程序中,我将遍历该股票行情向量,选择一个股票行情字符串,例如AAPL,导入一个csv包含AAPL股票收益的文件,使用收益作为通用函数的输入,最后生成一个csv文件作为输出.我有4000多个股票行情器,要应用到每个股票行情器的功能需要花费一些时间来处理.我可以使用mpi4py软件包访问计算机群集,每个作业可以访问约100个处理器.我很了解(并能够实现)此mpi example在python中:
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
data = [i for i in range(8)]
# dividing data into chunks
chunks = [[] for _ in range(size)]
for i, chunk in enumerate(data):
chunks[i % size].append(chunk)
else:
data = None
chunks = None
data = comm.scatter(chunks, root=0)
print str(rank) + ': ' + str(data)
[cha@cluster] ~/utils> mpirun -np 3 ./mpi.py
2: [2, 5]
0: [0, 3, 6]
1: [1, 4, 7]
因此,在此示例中,我们有一个大小为8的数据向量,并将相等数量的数据元素分配给每个处理器(总共3个).如何使用上面类似的示例,为每个处理器分配一个股票行情记录器,并应用需要为每个股票行情器运行的功能?我如何告诉python,一旦处理器有空,就返回代号矢量并处理尚未处理的代号?
解决方法:
还有另一种思考方式.您有100个处理器处理4000个数据块.您可以看到的一种方式是,每个处理器都会获得一个要操作的数据块.平均分配后,每个处理器将获得40个标记.处理器1将获得0-39,处理器2将获得40-79,依此类推.
这样思考,您不必担心处理器完成任务时会发生什么.循环一下:
block_size = len(tickers) / size # this will be 40 in your example
for i in range(block_size):
ticker = tickers[rank * block_size + i]
process(ticker)
def process(ticker):
# load data
# process data
# output data
这有意义吗?
[编辑]
如果您想了解更多信息,这实际上只是row-major order索引的一种变体,它是一种访问存储在单个内存中的多维数据的通用方法.