MPI python-Open-MPI

我有20,000 * 515 numpy矩阵,代表生物数据.我需要找到生物数据的相关系数,这意味着我将得到一个具有相关值的20,000 * 20,000矩阵.然后,如果每个相关系数大于阈值,则用1和0填充numpy数组.

我使用numpy.corrcoef来找到相关系数,它在我的机器上运行良好.

然后我想放入群集(有10台计算机,节点从2到8不等).当我尝试将它放入集群时,每个节点生成(40)随机数并从生物数据中得到那些40个随机列,产生20,000 * 40矩阵,我遇到内存问题说.

mpirun注意到,在节点名称上的PID#进程等级#退出信号9(被杀死).

然后我尝试重写程序,比如让每一行找到相关系数,如果值大于阈值,则将1存储在矩阵中,否则为0而不是创建相关矩阵.
但是运行这个程序需要1.30小时.我需要运行它100次.

任何人都可以建议一个更好的方法来做到这一点,比如如何通过在每个核心完成它的工作后分配作业来解决内存问题.我是MPI的新手.以下是我的代码.

如果您需要更多信息,请告诉我.
谢谢

    import numpy
    from mpi4py import MPI
    import time


    Size=MPI.COMM_WORLD.Get_size();
    Rank=MPI.COMM_WORLD.Get_rank();
    Name=MPI.Get_processor_name();

    RandomNumbers={};





    rndm_indx=numpy.random.choice(range(515),40,replace=False)
    rndm_indx=numpy.sort(rndm_indx)


    Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);




    RandomNumbers[Rank]=rndm_indx;
    CORR_CR=numpy.zeros((Data.shape[0],Data.shape[0]));


    start=time.time();
    for i in range(0,Data.shape[0]):
        Data[i]=Data[i]-np.mean(Data[i]);
        alpha1=1./np.linalg.norm(Data[i]);
        for j in range(i,Data.shape[0]):
           if(i==j):
               CORR_CR[i][j]=1;
           else:
               Data[j]=Data[j]-np.mean(Data[j]);
               alpha2=1./np.linalg.norm(Data[j]);
               corr=np.inner(Data[i],Data[j])*(alpha1*alpha2);
               corr=int(np.absolute(corrcoef)>=0.9)
               CORR_CR[i][j]=CORR_CR[j][i]=corr
    end=time.time();           

    CORR_CR=CORR_CR-numpy.eye(CORR_CR.shape[0]);  


    elapsed=(end-start)
    print('Total Time',elapsed)

最佳答案:

您发布的程序的执行时间在我的计算机上大约是96秒.在探索并行计算之前,让我们优化一些事情.

>让我们存储向量的范数,以避免每次需要时都计算它.得到alpha1 = 1. / numpy.linalg.norm(Data [i]);离开第二个循环是一个很好的起点.由于向量在计算期间不会改变,因此可以提前计算它们的范数:

alpha=numpy.zeros(Data.shape[0])
for i in range(0,Data.shape[0]):
  Data[i]=Data[i]-numpy.mean(Data[i])
  alpha[i]=1./numpy.linalg.norm(Data[i])

for i in range(0,Data.shape[0]):
  for j in range(i,Data.shape[0]):
    if(i==j):
       CORR_CR[i][j]=1;
    else:
       corr=numpy.inner(Data[i],Data[j])*(alpha[i]*alpha[j]);
       corr=int(numpy.absolute(corr)>=0.9)
       CORR_CR[i][j]=CORR_CR[j][i]=corr

计算时间已经减少到17秒.

>假设矢量不是高度相关的,大多数相关系数将四舍五入为零.因此,矩阵可能是sparce(充满零).让我们使用scipy.sparse.coo_matrix sparce矩阵格式,它非常容易填充:非空项目及其坐标i,j将存储在列表中.

data=[]
ii=[]
jj=[]
...
  if(corr!=0):
           data.append(corr)
           ii.append(i)
           jj.append(j)
           data.append(corr)
           ii.append(j)
           jj.append(i)
...
CORR_CR=scipy.sparse.coo_matrix((data,(ii,jj)), shape=(Data.shape[0],Data.shape[0]))

计算时间降至13秒(可忽略的改进?),并且内存占用大大减少.如果要考虑更大的数据集,这将是一项重大改进.

> python中的for循环非常低效.例如,见for loop in python is 10x slower than matlab.但是有很多方法可以解决,例如使用vectorized operations或优化的迭代器,例如numpy.nditer提供的迭代器.for循环效率低下的原因之一是python是一种解释语言:在此过程中不进行编译.因此,为了克服这个问题,可以使用像Cython这样的工具编译代码中最棘手的部分.

>代码的关键部分是用Cython编写的,专用文件为correlator.pyx.
> Cython将此文件转换为correlator.c文件
>此文件由您最喜欢的c编译器gcc编译,以构建共享库correlator.so
>导入相关器后,可在程序中使用优化函数.

correlator.pyx的内容从Numpy vs Cython speed开始,如下所示:

import numpy

cimport numpy
cimport scipy.linalg.cython_blas
ctypedef numpy.float64_t DTYPE_t
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def process(numpy.ndarray[DTYPE_t, ndim=2] array,numpy.ndarray[DTYPE_t, ndim=1] alpha,int imin,int imax):

    cdef unsigned int rows = array.shape[0]
    cdef  int cols = array.shape[1]
    cdef unsigned int row, row2
    cdef  int one=1
    ii=[]
    jj=[]
    data=[]

    for row in range(imin,imax):
        for row2 in range(row,rows):
            if row==row2:
               data.append(0)
               ii.append(row)
               jj.append(row2)
            else:
                corr=scipy.linalg.cython_blas.ddot(&cols,&array[row,0],&one,&array[row2,0],&one)*alpha[row]*alpha[row2]
                corr=int(numpy.absolute(corr)>=0.9)
                if(corr!=0):
                    data.append(corr)
                    ii.append(row)
                    jj.append(row2)

                    data.append(corr)
                    ii.append(row2)
                    jj.append(row)

    return ii,jj,data

其中scipy.linalg.cython_blas.ddot()将执行内部产品.

要进行cythonize并编译.pyx文件,可以使用以下makefile(我希望您使用的是Linux …)

all: correlator correlatorb


correlator: correlator.pyx
    cython -a correlator.pyx

correlatorb: correlator.c
    gcc -shared -pthread -fPIC -fwrapv -O2 -Wall -fno-strict-aliasing -I/usr/include/python2.7 -o correlator.so correlator.c

新的相关函数在主python文件中通过以下方式调用:

import correlator
ii,jj,data=correlator.process(Data,alpha,0,Data.shape[0])

通过使用编译循环,时间降至5.4秒!它已经快了十倍.而且,这个计算是在一个进程上执行的!

我们来介绍并行计算.
请注意,函数进程中添加了两个参数:imin和imax.这些参数表示计算出的CORR_CR行的范围.执行它以便预期并行计算的使用.实际上,并行化程序的一种简单方法是将外部for循环(索引i)拆分为不同的进程.

每个进程将对索引i的特定范围执行外部for循环,其被计算以便平衡进程之间的工作负载.

该计划必须完成以下操作:

>进程0(“根进程”)读取文件中的向量数据.
>使用MPI函数bcast()将数据广播到所有进程.
>计算每个过程要考虑的索引i的范围.
>相关系数由相应索引的每个过程计算.矩阵的非空项存储在每个进程的列表数据ii,jj中.
>通过调用MPI函数gather()在根进程上收集这些列表.它生成三个Size列表列表,这些列表被连接起来以获得创建sparce邻接矩阵所需的3个列表.

这里是代码:

import numpy
from mpi4py import MPI
import time
import scipy.sparse

import warnings
warnings.simplefilter('ignore',scipy.sparse.SparseEfficiencyWarning)

Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();

#a dummy set of data is created here. 
#Samples such that (i-j)%10==0 are highly correlated.
RandomNumbers={};
rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)

Data=numpy.ascontiguousarray(numpy.zeros((2000,515),dtype=numpy.float64))
if Rank==0:
    #Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);
    Data=numpy.ascontiguousarray(numpy.random.rand(2000,515))
    lin=numpy.linspace(0.,1.,515)
    for i in range(Data.shape[0]):
         Data[i]+=numpy.sin((1+i%10)*10*lin)*100

start=time.time();

#braodcasting the matrix
Data=MPI.COMM_WORLD.bcast(Data, root=0)

RandomNumbers[Rank]=rndm_indx;
print Data.shape[0]

#an array to store the inverse of the norm of each sample
alpha=numpy.zeros(Data.shape[0],dtype=numpy.float64)
for i in range(0,Data.shape[0]):
        Data[i]=Data[i]-numpy.mean(Data[i])
        if numpy.linalg.norm(Data[i])==0:
            print "process "+str(Rank)+" is facing a big problem"
        else:
            alpha[i]=1./numpy.linalg.norm(Data[i])


#balancing the computational load between the processes. 
#Each process must perform about Data.shape[0]*Data.shape[0]/(2*Size) correlation.
#each process cares for a set of rows. 
#Of course, the last rank must care about more rows than the first one.

ilimits=numpy.zeros(Size+1,numpy.int32)
if Rank==0:
    nbtaskperprocess=Data.shape[0]*Data.shape[0]/(2*Size)
    icurr=0
    for i in range(Size):
        nbjob=0
        while(nbjob<nbtaskperprocess and icurr<=Data.shape[0]):
            nbjob+=(Data.shape[0]-icurr)
            icurr+=1
        ilimits[i+1]=icurr
    ilimits[Size]=Data.shape[0]
ilimits=MPI.COMM_WORLD.bcast(ilimits, root=0)          

#the "local" job has been cythonized in file main2.pyx
import correlator
ii,jj,data=correlator.process(Data,alpha,ilimits[Rank],ilimits[Rank+1])

#gathering the matrix inputs from every processes on the root process.
data = MPI.COMM_WORLD.gather(data, root=0)
ii = MPI.COMM_WORLD.gather(ii, root=0)
jj = MPI.COMM_WORLD.gather(jj, root=0)

if Rank==0:
   #concatenate the lists
   data2=sum(data,[])
   ii2=sum(ii,[])
   jj2=sum(jj,[])
   #create the adjency matrix
   CORR_CR=scipy.sparse.coo_matrix((data2,(ii2,jj2)), shape=(Data.shape[0],Data.shape[0]))

   print CORR_CR

end=time.time();           

elapsed=(end-start)
print('Total Time',elapsed)

通过运行mpirun -np 4 main.py,计算时间为3.4秒.这不是快4倍……这可能是由于计算的瓶颈是标量产品的计算,这需要大的内存带宽.而我的个人电脑在内存带宽方面确实有限……

最后评论:有很多改进的可能性.
– Data中的向量被复制到每个进程……这会影响程序所需的内存.以不同的方式调度计算并将内存与通信进行交易可以克服这个问题……
– 每个进程仍然计算所有向量的规范……可以通过让每个进程计算某些向量的规范并在alpha上使用MPI函数allreduce()来改进它…

该邻接矩阵怎么办?

我想你已经知道了这个问题的答案,但你可以将这个邻接矩阵提供给sparse.csgraph routines,例如connected_components()laplacian().确实,你离spectral clustering不远!

例如,如果集群很明显,使用connected_components()就足够了:

if Rank==0:
    # coo to csr format
    S=CORR_CR.tocsr()
    print S
    n_components, labels=scipy.sparse.csgraph.connected_components(S, directed=False, connection='weak', return_labels=True)

    print "number of different famillies "+str(n_components)

    numpy.set_printoptions(threshold=numpy.nan)
    print labels
上一篇:1.1 海思3518 H264编码


下一篇:linux – 如何在使用mpirun时使分析器(valgrind,perf,pprof)使用调试符号来获取/使用本地版本的库?