我想在PySpark MLlib中构建一个简单的自定义Estimator.我有here可以写一个自定义的Transformer,但我不知道如何在Estimator上做.我也不明白@keyword_only是做什么的,为什么我需要这么多的setter和getter. Scikit-learn似乎有适合自定义模型的文档(see here但PySpark没有.
示例模型的伪代码:
class NormalDeviation():
def __init__(self, threshold = 3):
def fit(x, y=None):
self.model = {'mean': x.mean(), 'std': x.std()]
def predict(x):
return ((x-self.model['mean']) > self.threshold * self.model['std'])
def decision_function(x): # does ml-lib support this?
解决方法:
一般来说没有文档,因为对于Spark 1.6 / 2.0,大多数相关的API并不是公开的.它应该在Spark 2.1.0中更改(参见SPARK-7146).
API相对复杂,因为它必须遵循特定的约定才能使给定的Transformer或Estimator与Pipeline API兼容.读取和写入或网格搜索等功能可能需要其中一些方法.其他,比如keyword_only只是一个简单的帮助者,并不是严格要求的.
假设您已为平均参数定义了以下混合:
from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp
class HasMean(Params):
mean = Param(Params._dummy(), "mean", "mean",
typeConverter=TypeConverters.toFloat)
def __init__(self):
super(HasMean, self).__init__()
def setMean(self, value):
return self._set(mean=value)
def getMean(self):
return self.getOrDefault(self.mean)
标准差参数:
class HasStandardDeviation(Params):
stddev = Param(Params._dummy(), "stddev", "stddev",
typeConverter=TypeConverters.toFloat)
def __init__(self):
super(HasStandardDeviation, self).__init__()
def setStddev(self, value):
return self._set(stddev=value)
def getStddev(self):
return self.getOrDefault(self.stddev)
和门槛:
class HasCenteredThreshold(Params):
centered_threshold = Param(Params._dummy(),
"centered_threshold", "centered_threshold",
typeConverter=TypeConverters.toFloat)
def __init__(self):
super(HasCenteredThreshold, self).__init__()
def setCenteredThreshold(self, value):
return self._set(centered_threshold=value)
def getCenteredThreshold(self):
return self.getOrDefault(self.centered_threshold)
你可以创建基本的Estimator,如下所示:
class NormalDeviation(Estimator, HasInputCol,
HasPredictionCol, HasCenteredThreshold):
def _fit(self, dataset):
c = self.getInputCol()
mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
return (NormalDeviationModel()
.setInputCol(c)
.setMean(mu)
.setStddev(sigma)
.setCenteredThreshold(self.getCenteredThreshold())
.setPredictionCol(self.getPredictionCol()))
class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
HasMean, HasStandardDeviation, HasCenteredThreshold):
def _transform(self, dataset):
x = self.getInputCol()
y = self.getPredictionCol()
threshold = self.getCenteredThreshold()
mu = self.getMean()
sigma = self.getStddev()
return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)
最后它可以使用如下:
df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])
normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
model = Pipeline(stages=[normal_deviation]).fit(df)
model.transform(df).show()
## +---+----+----------+
## | id| x|prediction|
## +---+----+----------+
## | 1| 2.0| false|
## | 2| 3.0| false|
## | 3| 0.0| false|
## | 4|99.0| true|
## +---+----+----------+