spqrk 读取parquet文件按月、按季、按年统计

按月、按季、按年读取parquet文件

案列
// 启动spark yarn
spark_yarn目录:./bin/spark-shell --master yarn --deploy-mode client  --executor-cores 4 --num-executors 3
//  设置基础目录:parquet文件时按日期分片存储
scala>  val basePath = "hdfs://192.168.88.1:8020/data/userzyk.parquet/"
// 读取一个不同目录的parquet文件,也可以读取多个逗号分隔(url,url...)比如按季统计 
scala>  val parquetFile = spark.read.option("basePath",basePath).parquet(basePath + "date=2021-01-*" +"/group=zyk/project=zyk")
//获取map集合key为“user”的json对象或字符串
scala> val users = parquetFile.select("args.user")
//拿到字符串后,只取userid,根据userid切分获取第2个值,再切分....
scala>  val userids = users.withColumn("tmp_", split($"user", "userid")).select($"tmp_".getItem(1).as("tmp2")).withColumn("tmp3_", split($"tmp2", "'")).select($"tmp3_".getItem(2).as("userid"))
//此时就只有一个字段了,userid去重,统计id数量
scala>  userids.dropDuplicates("userid").count()


//上述步骤的连贯操作,统计季度
scala> spark.read.option("basePath",basePath).parquet(basePath + "date=2020-01-*" +"/group=zyk/project=zyk",basePath + "date=2020-02-*"+"/group=zyk/project=zyk",basePath + "date=2020-03-*"+"/group=zyk/project=zyk").select("args.user").withColumn("tmp_", split($"user", "userid")).select($"tmp_".getItem(1).as("tmp2")).withColumn("tmp3_", split($"tmp2", "'")).select($"tmp3_".getItem(2).as("userid")).dropDuplicates("userid").count()

常用命令:

//取3个值打印
DataFormat.take(3).foreach(println)

//创建视图 相当于表,可以用sql查询
DataFormat.createOrReplaceTempView("t_userid")

// 打印创建的表结构信息
DataFormat.printSchema()
//sql语句
ar sql:String =
  """
    |select distinct * from t_userid
    |""".stripMargin
 //根据sql查询
spark.sql(sql).show()
//具体目录取值
spark.read.format("parquet").load("hdfs://192.168.88.1:8020/data/userzyk.parquet/date=2019-12-22").show()

spark具体使用或快速入门请参考我的博客:https://blog.csdn.net/z1987865446

上一篇:flowable-流程中心设计之中间事件(六)


下一篇:abp vnext swagger 注释