实验是最能定义数据科学家日常生活的词。为了为给定的问题构建一个合适的机器学习模型,数据科学家需要训练多个模型。此过程包括诸如寻找模型的最佳超参数、使用 K 折交叉验证模型,有时甚至训练具有多个输出的模型等任务。前面提到的所有这些任务都很耗时,但对于模型开发的成功来说却极为重要。在这篇博文中,我们将展示如何应用 PySpark Pandas UDF(一个用于在 Spark 集群上分发 Python 函数的框架)来提高数据科学家的日常工作效率。
PySpark 如何实现 Pandas UDF(用户定义函数)?
顾名思义,PySpark Pandas UDF 是一种使用 Pandas DataFrame 在 PySpark 中实现用户定义函数 (UDF) 的方法。PySpark API 文档给出的定义如下:
“Pandas UDF 是用户定义的函数,由 Spark 执行,使用 Arrow 传输数据,Pandas 执行数据,允许向量化操作。Pandas UDF 是使用
pandas_udf
作为装饰器或包装函数来定义的,不需要额外的配置。Pandas UDF 通常表现为常规的 PySpark 函数 API。”
在这篇文章中,我们将探索PandasUDFType.GROUPED_MAP
,或者在 PySpark 的最新版本中,也称为pyspark.sql.GroupedData.applyInPandas
. 主要思想很简单,Pandas UDF 分组数据允许在数据集的每一组中进行操作。由于 spark 中的分组操作是跨集群节点计算的,因此我们可以以允许在不同节点计算不同模型的方式操作我们的数据集。是的,我的兄弟们……永远不要低估一个groupBy
.
配置
在进入应用 Pandas UDF 的细节之前,让我们用一些模块、全局变量和常用函数设置环境。
第一步是导入将在这个小实验中使用的所有模块。
import pandas as pd
from catboost import CatBoostClassifier
from itertools import product
from pyspark.sql import DataFrame
from pyspark.sql import functions as sf
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import (
DoubleType, FloatType, IntegerType, StringType, StructField, StructType
)
from sklearn.datasets import make_multilabel_classification
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
并设置一些将要多次使用的全局变量。
N_FEATURES = 20
N_CLASSES = 10
本文探索的每项任务的一个常见步骤是机器学习模型的训练和评估。此步骤封装在以下函数中,该函数根据 CatBoost 模型的准确度得分来训练和评估该模型。
def train_and_evaluate_model(X_train, y_train, X_test, y_test, kwargs={}):
# split data
X_train, X_eval, y_train, y_eval = train_test_split(X_train, y_train, test_size=0.2, random_state=42)
# create model
model = CatBoostClassifier(
nan_mode='Min',
random_seed=42,
boosting_type='Plain',
bootstrap_type='Bernoulli',
rsm=0.1,
loss_function='Logloss',
use_best_model=True,
early_stopping_rounds=100,
**kwargs
)
# fit model
model.fit(X_train.values, y_train.values, eval_set=(X_eval, y_eval))
# evaluate model
accuracy = accuracy_score(model.predict(X_test), y_test)
return accuracy
为了训练和测试我们的 CatBoost 模型,我们还需要一些数据。因此,让我们使用 scikit-learn 的make_multilabel_classification
函数创建我们的数据集,并从中构建我们的 PySpark DataFrame。
X, y = make_multilabel_classification(
n_samples=10000,
n_features=N_FEATURES,
n_classes=N_CLASSES,
random_state=42
)
pdf = pd.DataFrame(X)
for i in range(N_CLASSES):
pdf[f'y_{i}'] = y[:, i]
df = spark.createDataFrame(pdf)
print(f'number of rows in the dataset: {df.count()}')
number of rows in the dataset: 10000
df.limit(5).toPandas()
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | ... | y_0 | y_1 | y_2 | y_3 | y_4 | y_5 | y_6 | y_7 | y_8 | y_9 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2.0 | 2.0 | 0.0 | 1.0 | 3.0 | 5.0 | 0.0 | 3.0 | 4.0 | 1.0 | ... | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 | 0 | 0 |
1 | 4.0 | 3.0 | 2.0 | 2.0 | 0.0 | 4.0 | 1.0 | 2.0 | 0.0 | 3.0 | ... | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 | 1 |
2 | 2.0 | 2.0 | 3.0 | 0.0 | 0.0 | 0.0 | 0.0 | 6.0 | 0.0 | 3.0 | ... | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 |
3 | 0.0 | 1.0 | 4.0 | 4.0 | 2.0 | 0.0 | 2.0 | 1.0 | 3.0 | 2.0 | ... | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
4 | 0.0 | 0.0 | 7.0 | 2.0 | 1.0 | 0.0 | 1.0 | 2.0 | 1.0 | 2.0 | ... | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 1 |
5 行 × 30 列
最后,为了更高效的 Spark 计算,我们将启用基于 arrow 的列式数据传输。
spark.conf.set('spark.sql.execution.arrow.enabled', 'true')
分布式网格搜索
在机器学习中,超参数是其值用于控制模型架构及其学习过程的参数。通常在训练模型时,您需要优化这些超参数,但是,尽管 ML 能够找到最佳内部参数和决策阈值,但超参数是手动设置的。
如果搜索空间包含太多可能性,您将需要花费大量时间进行测试以找到超参数的最佳组合。加速此任务的一种方法是将搜索过程分布在 Spark 集群的节点上。
这种方法产生的一个问题是:“好吧,但我使用的算法尚未在 Spark 上实现,我如何在这些限制下分配这个过程?” 别担心!这是我们在这里要回答的问题!
首先,我们必须定义超参数搜索空间。为此,我们将创建一个辅助 PySpark DataFrame,其中每一行都是一组唯一的超参数。
values_range = list(
product(
[200, 210, 220, 230, 240, 250, 260, 270, 280, 290],
[3, 4, 5, 6, 7],
[0.02, 0.07, 0.1, 0.15, 0.2],
['MinEntropy', 'Uniform', 'UniformAndQuantiles', 'GreedyLogSum'],
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0],
[0.5, 0.6, 0.7, 0.8],
)
)
schema = StructType(
[
StructField('iterations', IntegerType(), True),
StructField('depth', IntegerType(), True),
StructField('learning_rate', DoubleType(), True),
StructField('feature_border_type', StringType(), True),
StructField('l2_leaf_reg', FloatType(), True),
StructField('subsample', FloatType(), True)
]
)
df_grid = spark.createDataFrame(data=values_range, schema=schema)
df_grid = df_grid.withColumn('replication_id', sf.monotonically_increasing_id())
df_grid.limit(5).toPandas()
iterations | depth | learning_rate | feature_border_type | l2_leaf_reg | subsample | replication_ID | |
---|---|---|---|---|---|---|---|
0 | 200 | 4 | 0.1 | Uniform | 2.0 | 0.5 | 171798691840 |
1 | 200 | 4 | 0.1 | Uniform | 2.0 | 0.6 | 171798691841 |
2 | 200 | 4 | 0.1 | Uniform | 2.0 | 0.7 | 171798691842 |
3 | 200 | 4 | 0.1 | Uniform | 2.0 | 0.8 | 171798691843 |
4 | 200 | 4 | 0.1 | Uniform | 3.0 | 0.5 | 171798691844 |
print(f'number of different hyperparameter combinations: {df_grid.count()}')
number of different hyperparameter combinations: 24000
对于每个超参数行,我们想要复制我们的数据,以便我们以后可以单独处理每个超参数集。
df_replicated = df.crossJoin(df_grid)
print(f'number of rows in the replicated dataset: {df_replicated.count()}')
number of rows in the replicated dataset: 240000000
最后一步是指定每个 Spark 节点将如何处理数据。为此,我们定义了run_model
函数。它从输入 Spark DataFrame 中提取超参数和数据,然后训练和评估模型,返回其结果。
# declare the schema for the output of our function
schema = StructType(
[
StructField('replication_id', IntegerType(),True),
StructField('accuracy', FloatType(),True),
StructField("iterations", IntegerType(), True),
StructField("depth", IntegerType(), True),
StructField("learning_rate", DoubleType(), True),
StructField("feature_border_type", StringType(), True),
StructField("l2_leaf_reg", FloatType(), True),
StructField("subsample", FloatType(), True)
]
)
# decorate our function with pandas_udf decorator
@pandas_udf(schema, sf.PandasUDFType.GROUPED_MAP)
def hyperparameter_search(pdf):
# get hyperparameter values
kwargs = {
'iterations': pdf.iterations.values[0],
'depth': pdf.depth.values[0],
'learning_rate': pdf.learning_rate.values[0],
'feature_border_type': pdf.feature_border_type.values[0],
'l2_leaf_reg': pdf.l2_leaf_reg.values[0],
'subsample': pdf.subsample.values[0]
}
# get data and label
X = pdf[[str(i) for i in range(N_FEATURES)]]
y = pdf['y_0']
# split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# get accuracy
accuracy = train_and_evaluate_model(X_train, y_train, X_test, y_test, kwargs)
# return results as pandas DF
kwargs.update({
'replication_id': pdf.replication_id.values[0],
'accuracy': accuracy
})
results = pd.DataFrame([kwargs])
return results
我们现在可以按 对 Spark 数据帧进行分组replication_id
并应用该run_model
函数。这样,每个超参数组合都将用于在分布式系统中训练不同的模型。
results = df_replicated.groupby('replication_id').apply(hyperparameter_search)
%%time
results.sort('accuracy', ascending=False).limit(5).toPandas()
CPU times: user 11.6 s, sys: 13.5 s, total: 25.1 s
Wall time: 29min 10s
replication_id | accuracy | iterations | depth | learning_rate | feature_border_type | l2_leaf_reg | subsample | |
---|---|---|---|---|---|---|---|---|
0 | 24 | 0.9145 | 210 | 7 | 0.20 | Uniform | 6.0 | 0.6 |
1 | 22 | 0.9125 | 250 | 3 | 0.20 | Uniform | 2.0 | 0.5 |
2 | 13 | 0.9125 | 230 | 6 | 0.15 | MinEntropy | 3.0 | 0.7 |
3 | 11 | 0.9125 | 290 | 3 | 0.20 | Uniform | 5.0 | 0.7 |
4 | 7 | 0.9125 | 220 | 3 | 0.10 | MinEntropy | 6.0 | 0.5 |
通过这种分布式方法,我们能够在 29 分钟内运行 24000 个超参数组合。
分布式 K Fold 交叉验证
有了最优的超参数集,另一个重要的任务是对模型进行 K-Fold 交叉验证,以防止(或最小化)过拟合的不良影响。在这个实验中添加的折叠越多,你的模型就越健壮。然而,你将不得不花更多的时间来训练每个折叠的模型。同样,避免时间陷阱的一种方法是使用 Spark 并在 Spark 集群的单个节点上计算每个折叠。
我们以与分布网格搜索的方式非常相似的方式执行此操作,不同之处在于我们根据折叠数复制我们的数据集。所以如果我们的交叉验证使用 8 折,我们的数据集将被复制 8 次。
在这里,我们的第一步是定义我们想要交叉验证模型的折叠次数。
N_FOLDS = 8
在此之后,我们定义了一些代码来根据上面定义的折叠数随机拆分我们的数据集。
proportion = 1 / N_FOLDS
splits = df.randomSplit([proportion] * N_FOLDS, 42)
df_folds = splits[0].withColumn('fold', sf.lit(0))
for i in range(1, N_FOLDS):
df_folds = df_folds.union(
splits[i].withColumn('fold', sf.lit(i))
)
拆分后,我们将数据集复制 K 次。
df_numbers = spark.createDataFrame(
pd.DataFrame(list(range(N_FOLDS)),columns=['replication_id'])
)
df_numbers.toPandas()
replication_id | |
---|---|
0 | 0 |
1 | 1 |
2 | 2 |
3 | 3 |
4 | 4 |
5 | 5 |
6 | 6 |
7 | 7 |
df_replicated = df_folds.crossJoin(df_numbers)
print(f'number of rows in the replicated dataset: {df_replicated.count()}')
number of rows in the replicated dataset: 80000
与网格搜索方法相比,我们还有另一个不同之处。在下面的函数中,我们根据 replication_id
和 fold_id
定义训练和测试数据集。如果replication_id
等于fold_id
,我们将该折叠设置为测试折叠,而其余折叠用作训练集。
# declare the schema for the output of our function
schema = StructType(
[
StructField('replication_id', IntegerType(), True),
StructField('accuracy', FloatType(), True)
]
)
# decorate our function with pandas_udf decorator
@pandas_udf(schema, sf.PandasUDFType.GROUPED_MAP)
def cross_validation(pdf):
# get repliaction id
replication_id = pdf.replication_id.values[0]
# get data and label
columns = [str(i) for i in range(N_FEATURES)]
X_train = pdf[pdf.fold != replication_id][columns]
X_test = pdf[pdf.fold == replication_id][columns]
y_train = pdf[pdf.fold != replication_id]['y_0']
y_test = pdf[pdf.fold == replication_id]['y_0']
# get accuracy
accuracy = train_and_evaluate_model(X_train, y_train, X_test, y_test)
# return results as pandas DF
results = pd.DataFrame([{
'replication_id': replication_id,
'accuracy': accuracy
}])
# save the model (if you want to retrieve it later)
return results
使用此方法可能需要考虑的一件事是如何保存每个经过训练的模型,因为每个模型都在不同的节点中进行训练。为此,根据您的云提供商,您可以使用一些开发的 Python 库将文件从集群节点直接传输到云存储桶(如 Google Cloud Storage 或 Amazon S3)。但是,如果您只对交叉验证模型的性能感兴趣,那么上面的函数就足够了。
results = df_replicated.groupby('replication_id').apply(cross_validation)
%%time
results.sort('accuracy', ascending=False).toPandas()
CPU times: user 1.03 s, sys: 1.24 s, total: 2.27 s
Wall time: 35.9 s
replication_id | accuracy | |
---|---|---|
0 | 4 | 0.900715 |
1 | 5 | 0.895292 |
2 | 3 | 0.893720 |
3 | 2 | 0.893601 |
4 | 1 | 0.891801 |
5 | 7 | 0.890048 |
6 | 0 | 0.883293 |
7 | 6 | 0.882946 |
在这个实验中,我们仅在 35 秒内评估了 8 个折叠(集群的每个节点中一个)。最佳折叠(编号 4)的准确度得分为 0.900。
分布式多输出模型
遵循相同的理念,我们可以利用 PySpark Pandas UDF 来分发多输出模型的训练。对于这个任务,我们有一组特征和一组标签,我们必须用相同的训练数据为每个标签训练一个模型。
一些软件包scikit-learn
已经实现了这种随机森林算法的方法。CatBoost
还可以选择多输出训练。然而,与单输出 API 相比,这些实现具有有限的超参数和损失函数选项。考虑到这一点,Pandas UDF 是一次自动训练多个模型的替代方案,它使用任何其他机器学习库通常为单输出模型训练提供的所有选项。
由于我们的数据集有多个标签列,这次的方法是以一种可以复制每个特定标签的数据的方式来旋转我们的数据。因此,我们创建一列来映射每个标签,并将所有标签附加到一个标签列中,如下所示:
features = [f'{i}' for i in range(N_FEATURES)]
targets = [f'y_{i}' for i in range(N_CLASSES)]
df_multipe_output = df.select(
*features,
sf.lit(targets[0]).alias('y_group'),
sf.col(targets[0]).alias('Y')
)
for target in targets[1:]:
df_multipe_output = df_multipe_output.union(
df.select(
*features,
sf.lit(target).alias('y_group'),
sf.col(target).alias('Y')
)
)
print(f'number of rows in the dataset: {df_multipe_output.count()}')
number of rows in the dataset: 100000
df_multipe_output.limit(5).toPandas()
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | ... | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | y_group | y | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1.0 | 3.0 | 9.0 | 1.0 | 6.0 | 0.0 | 5.0 | 0.0 | 4.0 | 1.0 | ... | 2.0 | 1.0 | 3.0 | 1.0 | 1.0 | 1.0 | 2.0 | 3.0 | y_0 | 0 |
1 | 1.0 | 4.0 | 2.0 | 1.0 | 4.0 | 2.0 | 1.0 | 2.0 | 0.0 | 1.0 | ... | 3.0 | 2.0 | 5.0 | 2.0 | 2.0 | 3.0 | 3.0 | 3.0 | y_0 | 1 |
2 | 2.0 | 6.0 | 3.0 | 6.0 | 0.0 | 5.0 | 4.0 | 3.0 | 2.0 | 4.0 | ... | 2.0 | 1.0 | 3.0 | 0.0 | 5.0 | 4.0 | 3.0 | 1.0 | y_0 | 0 |
3 | 3.0 | 2.0 | 0.0 | 1.0 | 5.0 | 3.0 | 0.0 | 3.0 | 2.0 | 3.0 | ... | 3.0 | 1.0 | 0.0 | 6.0 | 1.0 | 0.0 | 3.0 | 1.0 | y_0 | 1 |
4 | 6.0 | 3.0 | 0.0 | 0.0 | 3.0 | 6.0 | 0.0 | 2.0 | 3.0 | 0.0 | ... | 4.0 | 3.0 | 6.0 | 7.0 | 0.0 | 5.0 | 6.0 | 3.0 | y_0 | 0 |
5 行 × 22 列
定义了我们的 spark 多输出数据集后,我们准备定义执行模型训练的函数。
# declare the schema for the output of our function
schema = StructType(
[
StructField('y_group', StringType(), True),
StructField('accuracy', FloatType(), True)
]
)
# decorate our function with pandas_udf decorator
@pandas_udf(schema, sf.PandasUDFType.GROUPED_MAP)
def multi_models(pdf):
# get group
y_group = pdf.y_group.values[0]
# get data and label
X = pdf.drop(['Y', 'y_group'], axis=1)
y = pdf['Y']
# split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# get accuracy
accuracy = train_and_evaluate_model(X_train, y_train, X_test, y_test)
# return results as pandas DF
results = pd.DataFrame([{
'y_group': y_group,
'accuracy': accuracy
}])
return results
一切设置好后,就可以在y_group
列上调用 groupBy 方法来分发每个模型的训练。
results = df_multipe_output.groupby('y_group').apply(multi_models).orderBy('accuracy')
%%time
results.sort('accuracy', ascending=False).limit(5).toPandas()
CPU times: user 193 ms, sys: 195 ms, total: 388 ms
Wall time: 9.24 s
y_group | accuracy | |
---|---|---|
0 | y_6 | 0.9740 |
1 | y_4 | 0.9330 |
2 | y_5 | 0.9325 |
3 | y_8 | 0.8990 |
4 | y_0 | 0.8910 |
结论
在这篇文章中,我们展示了一些示例,说明如何使用 PySpark Pandas UDF 来分发涉及机器学习模型训练的流程。展示的一些方法可用于节省时间或进行更大规模的实验,否则会占用过多内存或成本过高。