按月、按季、按年读取parquet文件
案列
// 启动spark yarn
spark_yarn目录:./bin/spark-shell --master yarn --deploy-mode client --executor-cores 4 --num-executors 3
// 设置基础目录:parquet文件时按日期分片存储
scala> val basePath = "hdfs://192.168.88.1:8020/data/userzyk.parquet/"
// 读取一个不同目录的parquet文件,也可以读取多个逗号分隔(url,url...)比如按季统计
scala> val parquetFile = spark.read.option("basePath",basePath).parquet(basePath + "date=2021-01-*" +"/group=zyk/project=zyk")
//获取map集合key为“user”的json对象或字符串
scala> val users = parquetFile.select("args.user")
//拿到字符串后,只取userid,根据userid切分获取第2个值,再切分....
scala> val userids = users.withColumn("tmp_", split($"user", "userid")).select($"tmp_".getItem(1).as("tmp2")).withColumn("tmp3_", split($"tmp2", "'")).select($"tmp3_".getItem(2).as("userid"))
//此时就只有一个字段了,userid去重,统计id数量
scala> userids.dropDuplicates("userid").count()
//上述步骤的连贯操作,统计季度
scala> spark.read.option("basePath",basePath).parquet(basePath + "date=2020-01-*" +"/group=zyk/project=zyk",basePath + "date=2020-02-*"+"/group=zyk/project=zyk",basePath + "date=2020-03-*"+"/group=zyk/project=zyk").select("args.user").withColumn("tmp_", split($"user", "userid")).select($"tmp_".getItem(1).as("tmp2")).withColumn("tmp3_", split($"tmp2", "'")).select($"tmp3_".getItem(2).as("userid")).dropDuplicates("userid").count()
常用命令:
//取3个值打印
DataFormat.take(3).foreach(println)
//创建视图 相当于表,可以用sql查询
DataFormat.createOrReplaceTempView("t_userid")
// 打印创建的表结构信息
DataFormat.printSchema()
//sql语句
ar sql:String =
"""
|select distinct * from t_userid
|""".stripMargin
//根据sql查询
spark.sql(sql).show()
//具体目录取值
spark.read.format("parquet").load("hdfs://192.168.88.1:8020/data/userzyk.parquet/date=2019-12-22").show()
spark具体使用或快速入门请参考我的博客:https://blog.csdn.net/z1987865446