Spark开发_Spark数据变换-透视(Pivot)

数据变换

 长表和宽表的变换,使用Spark进行变换,SQL中使用 case when。涉及维度比较多的时候,采用数据透视的方式进行数据变换
 在Spark SQL 3.0.1 中有相关的子句实现了。
  PIVOT ( { aggregate_expression [ AS aggregate_expression_alias ] } [ , ... ]FOR column_list IN ( expression_list ) )
 目前使用的版本不支持,所以采用了spark API在Dataset中的实现的透视功能,如下所示,开发使用的是Java语言。

附录:

     /**
     *   groupBy 分组字段: 按照不需要转换的字段分组,本例中是年月;pivot 跟在groupby后
     *   pivot 维度转换字段:使用pivot函数进行透视,透视过程中可以提供第二个参数来明确指定使用哪些数据项;
     *   agg 汇总数字字段:本例中是 stu_cnt
     *   na() 空值处理 
     */
 package com.le;

 import org.apache.spark.sql.*;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import static org.apache.spark.sql.functions.sum;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;

public class DataTransfer {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .master("local[2]")
                .appName("DataSetTransferExample")
               // .enableHiveSupport()
                .getOrCreate();
        List<Row> dimensionData = Arrays.asList(
                RowFactory.create("CD165",   "2",  "Test", 1),
                RowFactory.create("CD165",   "3",  "Action" ,1),
                RowFactory.create("CD165",   "2",  "Learn",1),
                RowFactory.create("CD165",   "1",  "Test",1)
        );
        StructType dimensionSchema = new StructType(new StructField[]{
                new StructField("grade_id", DataTypes.StringType, false, Metadata.empty()),
                new StructField("stu_state", DataTypes.StringType, false, Metadata.empty()),
                new StructField("stu_detail", DataTypes.StringType, false, Metadata.empty()),
                new StructField("stu_cnt", DataTypes.IntegerType, false, Metadata.empty())
        });

        Dataset<Row> dimensionDF = spark.createDataFrame(dimensionData, dimensionSchema);
        dimensionDF.show();

        ArrayList arrColName = new ArrayList();
        arrColName.add("Test");
        arrColName.add("Action");
        Dataset<Row> df_pivot = dimensionDF
                .groupBy("grade_id","stu_state")
                .pivot("stu_detail",arrColName)
                .agg(sum("stu_cnt"))
                .na().fill(0)
       ;
        df_pivot.show();
        Dataset<Row> df_pivot_2 = df_pivot
                .groupBy("grade_id")
                .pivot("stu_state")
                .agg(sum("Test"))
                .na().fill(0);
        Dataset<Row> df_pivot_3 = df_pivot
                .groupBy("grade_id")
                .pivot("stu_state")
                .agg(sum("Action"))
                .na().fill(0);
        df_pivot_2.show();
        Dataset<Row>  resultDF = df_pivot_2
                .withColumnRenamed("stu_state", "post_id_acc")
                .join(df_pivot_3,"grade_id");
        resultDF.show();
    }
}

其他

 stack 是内置函数
 SELECT * FROM person PIVOT ( SUM(age) AS a, AVG(class) AS c  FOR name IN ('John' AS john, 'Mike' AS mike) );
  在Spark SQL 中3.0.1版本中有相关的实现
   https://spark.apache.org/docs/3.0.1/sql-ref-syntax-qry-select-pivot.html

参考

Spark实现行列转换pivot和unpivot https://juejin.im/post/6844903619171631117
Spark--透视函数pivot应用(行列转换) https://www.jianshu.com/p/36bdf156cbda
http://spark.apache.org/docs/latest/sql-ref-syntax.html
http://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-pivot.html
https://sparkbyexamples.com/spark/how-to-pivot-table-and-unpivot-a-spark-dataframe/#pivot-performance
上一篇:C语言快速排序算法(递归实现)


下一篇:python实现快排算法,传统快排算法,数据结构