Spark机器学习基础-特征工程

对连续值处理

0.binarizer/二值化

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.feature import Binarizer

  

spark = SparkSession\
        .builder\
        .appName("BinarizerExample")\
        .getOrCreate()
        
continuousDataFrame = spark.createDataFrame([
    (0, 1.1),
    (1, 8.5),
    (2, 5.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=5.1, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()

spark.stop()

  结果:

Binarizer output with Threshold = 5.100000
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    1.1|              0.0|
|  1|    8.5|              1.0|
|  2|    5.2|              1.0|
+---+-------+-----------------+

1.按照给定边界离散化

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.feature import Bucketizer

spark = SparkSession\
    .builder\
    .appName("BucketizerExample")\
    .getOrCreate()

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]#-float("inf"):指的是负无穷

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# 按照给定的边界进行分桶
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.show()

spark.stop()

  结果:

Bucketizer output with 4 buckets
+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
|  -999.9|             0.0|
|    -0.5|             1.0|
|    -0.3|             1.0|
|     0.0|             2.0|
|     0.2|             2.0|
|   999.9|             3.0|
+--------+----------------+

2.quantile_discretizer/按分位数离散化

from __future__ import print_function
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("QuantileDiscretizerExample")\
    .getOrCreate()

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
df = spark.createDataFrame(data, ["id", "hour"])
df = df.repartition(1)#数据量小设置为1个分区,这样不出错!数据量大的话可以设置为多个分区。

# 分成3个桶进行离散化
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()

spark.stop()

  结果:

+---+----+------+
| id|hour|result|
+---+----+------+
|  0|18.0|   2.0|
|  1|19.0|   2.0|
|  2| 8.0|   1.0|
|  3| 5.0|   0.0|
|  4| 2.2|   0.0|
|  5| 9.2|   1.0|
|  6|14.4|   2.0|
+---+----+------+

3.最大最小值幅度缩放

from __future__ import print_function
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("MaxAbsScalerExample")\
    .getOrCreate()

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),#dense表示稠密向量
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")#最大最小值用于缩放

# 计算最大最小值用于缩放
scalerModel = scaler.fit(dataFrame)#fit与transform分开写,因为fit的数据还要用于测试集的变换

# 缩放幅度到[-1, 1]之间
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()

spark.stop()

 结果:

+--------------+----------------+
|      features|  scaledFeatures|
+--------------+----------------+
|[1.0,0.1,-8.0]|[0.25,0.01,-1.0]|
|[2.0,1.0,-4.0]|  [0.5,0.1,-0.5]|
|[4.0,10.0,8.0]|   [1.0,1.0,1.0]|
+--------------+----------------+

 

 



上一篇:Spark RDD到DataFrame python


下一篇:08 学生课程分数的Spark SQL分析