对连续值处理
0.binarizer/二值化
from __future__ import print_function from pyspark.sql import SparkSession from pyspark.ml.feature import Binarizer
spark = SparkSession\ .builder\ .appName("BinarizerExample")\ .getOrCreate() continuousDataFrame = spark.createDataFrame([ (0, 1.1), (1, 8.5), (2, 5.2) ], ["id", "feature"]) binarizer = Binarizer(threshold=5.1, inputCol="feature", outputCol="binarized_feature") binarizedDataFrame = binarizer.transform(continuousDataFrame) print("Binarizer output with Threshold = %f" % binarizer.getThreshold()) binarizedDataFrame.show() spark.stop()
结果:
Binarizer output with Threshold = 5.100000 +---+-------+-----------------+ | id|feature|binarized_feature| +---+-------+-----------------+ | 0| 1.1| 0.0| | 1| 8.5| 1.0| | 2| 5.2| 1.0| +---+-------+-----------------+
1.按照给定边界离散化
from __future__ import print_function from pyspark.sql import SparkSession from pyspark.ml.feature import Bucketizer spark = SparkSession\ .builder\ .appName("BucketizerExample")\ .getOrCreate() splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]#-float("inf"):指的是负无穷 data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)] dataFrame = spark.createDataFrame(data, ["features"]) bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures") # 按照给定的边界进行分桶 bucketedData = bucketizer.transform(dataFrame) print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1)) bucketedData.show() spark.stop()
结果:
Bucketizer output with 4 buckets +--------+----------------+ |features|bucketedFeatures| +--------+----------------+ | -999.9| 0.0| | -0.5| 1.0| | -0.3| 1.0| | 0.0| 2.0| | 0.2| 2.0| | 999.9| 3.0| +--------+----------------+
2.quantile_discretizer/按分位数离散化
from __future__ import print_function from pyspark.ml.feature import QuantileDiscretizer from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("QuantileDiscretizerExample")\ .getOrCreate() data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)] df = spark.createDataFrame(data, ["id", "hour"]) df = df.repartition(1)#数据量小设置为1个分区,这样不出错!数据量大的话可以设置为多个分区。 # 分成3个桶进行离散化 discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result") result = discretizer.fit(df).transform(df) result.show() spark.stop()
结果:
+---+----+------+ | id|hour|result| +---+----+------+ | 0|18.0| 2.0| | 1|19.0| 2.0| | 2| 8.0| 1.0| | 3| 5.0| 0.0| | 4| 2.2| 0.0| | 5| 9.2| 1.0| | 6|14.4| 2.0| +---+----+------+
3.最大最小值幅度缩放
from __future__ import print_function from pyspark.ml.feature import MaxAbsScaler from pyspark.ml.linalg import Vectors from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("MaxAbsScalerExample")\ .getOrCreate() dataFrame = spark.createDataFrame([ (0, Vectors.dense([1.0, 0.1, -8.0]),),#dense表示稠密向量 (1, Vectors.dense([2.0, 1.0, -4.0]),), (2, Vectors.dense([4.0, 10.0, 8.0]),) ], ["id", "features"]) scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")#最大最小值用于缩放 # 计算最大最小值用于缩放 scalerModel = scaler.fit(dataFrame)#fit与transform分开写,因为fit的数据还要用于测试集的变换 # 缩放幅度到[-1, 1]之间 scaledData = scalerModel.transform(dataFrame) scaledData.select("features", "scaledFeatures").show() spark.stop()
结果:
+--------------+----------------+ | features| scaledFeatures| +--------------+----------------+ |[1.0,0.1,-8.0]|[0.25,0.01,-1.0]| |[2.0,1.0,-4.0]| [0.5,0.1,-0.5]| |[4.0,10.0,8.0]| [1.0,1.0,1.0]| +--------------+----------------+