我有一组数据框,每列有4列和1,000,000行.
对于每一行,我想运行超几何测试,将这些列的4个值作为输入并返回p值(使用超几何分布的累积概率密度函数).
我已经尝试了两种基于SciPy的实现(下面),但两者都严重缩放.
有没有其他方法可以提高我的效率?
我有一个用R编写的工作解决方案(在底部)但不幸的是代码必须用Python编写,
因为它将用于从Postgres DB加载数据的Airflow任务中,此时没有用于R的Postgres挂钩.
缓慢的SciPy实现
这样创建的样本数据(使用10,000行而不是完整的52 * 1,000,000行):
import numpy as np
import pandas as pd
from scipy.stats import hypergeom
from timeit import default_timer as timer
n_rows = 10000
n_total = 1000
max_good = 400
max_sample = 200
s = 100
df = pd.DataFrame({
'ngood': np.random.hypergeometric(ngood=max_good, nbad=n_total - max_good,
nsample=s, size=n_rows),
'nsamp': np.random.hypergeometric(ngood=max_sample, nbad=n_total - max_sample,
nsample=s, size=n_rows)
})
df = df.assign(kgood=np.array([
np.random.hypergeometric(ngood=ngood, nbad=n_total - ngood,
nsample=nsamp, size=1)
for ngood, nsamp
in zip(df.ngood, df.nsamp)
]))
基于理解的缓慢实现:
start = timer()
res = [
hypergeom.cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)
for ngood_found, ngood, nsamp
in zip(df.kgood, df.ngood, df.nsamp)
]
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))
[0.44247900002512713, 0.71587318053768023, 0.97215178135616498]
Elapsed time: 2.405838s
基于numpy矢量化的缓慢实现:
vectorized_test = np.vectorize(hypergeom.cdf, otypes=[np.float], excluded='M')
start = timer()
res = vectorized_test(k=df.kgood.values, M=n_total,
n=df.ngood.values, N=df.nsamp.values)
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))
[ 0.442479 0.71587318 0.97215178]
Elapsed time: 2.518952s
快速实施
这表明上述计算可以在几毫秒内完成.
诀窍是phyper在C级上进行矢量化,
而不是基本上是python循环AFAIK的numpy矢量化.
library(microbenchmark)
n_rows <- 10000
n_total <- 1000
max_good <- 400
max_sample <- 200
s <- 100
df <- data.frame(
ngood = rhyper(nn=n_rows, m=max_good, n=n_total - max_good, k=s),
nsamp = rhyper(nn=n_rows, m=max_sample, n=n_total - max_sample, k=s)
)
df$kgood <- rhyper(nn=n_rows, m=df$ngood, n=n_total - df$ngood, k=df$nsamp)
microbenchmark(
res <- phyper(q = df$k, m = df$ngood, n = n_total - df$ngood, k=df$nsamp)
)
Unit: milliseconds
expr
phyper(q = df$k, m = df$ngood, n = n_total - df$ngood, k = df$nsamp)
min lq mean median uq max neval
2.984852 3.00838 3.350509 3.134745 3.439138 5.462694 100
解决方法:
通过缓存hypergeom.cdf的结果可以获得小的改进:
from functools import lru_cache
#@lru_cache(maxsize = 16*1024)
#def fn(k, n, N):
# return hypergeom.cdf(k = k, M=n_total, n = n, N = N)
data = {}
def fn(k, n, N):
key = (k, n, N)
if not key in data:
val = hypergeom.cdf(k = k, M=n_total, n = n, N = N)
data[key] = val
else:
val = data[key]
return val
start = timer()
res = [
fn(ngood_found, ngood, nsamp)
for ngood_found, ngood, nsamp
in zip(df.kgood, df.ngood, df.nsamp)
]
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))
这在我的机器上给出:经过的时间:0.279891s(0.315840s与lru_cache)
编辑:
实际上,似乎瓶颈似乎是超几何CDF本身的计算(而不是for循环的开销).
为了测试这个,我从GSL包为函数gsl_cdf_hypergeometric_P创建了一个SWIG文件_cdf.i.
%module cdf
%{
#include "gsl/gsl_cdf.h"
%}
double gsl_cdf_hypergeometric_P(int, int, int, int);
然后将此文件“转换”为包,其中包含:
swig -c++ -python _cdf.i
g++ -fPIC -c _cdf_wrap.c -I${HOME}/venvs/p3/include/python3.5m
g++ -shared _cdf_wrap.o -o _cdf.so -lgsl
然后可以在原始示例中直接使用它:
import numpy as np
import pandas as pd
from scipy.stats import hypergeom
from timeit import default_timer as timer
from cdf import gsl_cdf_hypergeometric_P
n_rows = 10000
n_total = 1000
max_good = 400
max_sample = 200
s = 100
df = pd.DataFrame({
'ngood': np.random.hypergeometric(ngood=max_good, nbad=n_total - max_good,
nsample=s, size=n_rows),
'nsamp': np.random.hypergeometric(ngood=max_sample, nbad=n_total - max_sample,
nsample=s, size=n_rows)
})
df = df.assign(kgood=np.array([
np.random.hypergeometric(ngood=ngood, nbad=n_total - ngood,
nsample=nsamp, size=1)
for ngood, nsamp
in zip(df.ngood, df.nsamp)
]))
start = timer()
res = [
hypergeom.cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)
for ngood_found, ngood, nsamp
in zip(df.kgood, df.ngood, df.nsamp)
]
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))
def cdf(k, M, n, N):
return gsl_cdf_hypergeometric_P(int(k), int(n), int(M-n), int(N))
start = timer()
res = [
cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)
for ngood_found, ngood, nsamp
in zip(df.kgood, df.ngood, df.nsamp)
]
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))
这会产生:
[0.58605423287644209, 0.38055520197355552, 0.70597920363472055, 0.99728041338849138, 0.79797439957395955, 0.42245057292366844, 0.58627644982763727, 0.74819471224742817, 0.75121042470714849, 0.48561471798885397]
Elapsed time: 2.069916s
[0.5860542328771666, 0.38055520197384757, 0.7059792036350717, 0.997280413389543, 0.7979743995750694, 0.4224505729249291, 0.5862764498272103, 0.7481947122472634, 0.7512104247082603, 0.4856147179890127]
Elapsed time: 0.018253s
所以即使使用普通的for循环,速度也非常快.