package com.njbdqn
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
object Exam {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[*]").appName("country").getOrCreate()
val rdd = sparkSession.sparkContext.textFile("hdfs://192.168.126.200:9000/app/data/exam/countrydata.csv").map(a=>{
val arr = a.split(",")
(arr(0),arr(1),arr(2),arr(3),arr(4),arr(5),arr(6))
})
//统计每个国家在数据截止统计时的累计确诊人数。
// val confirmedCount = rdd.map(a=>{
// (a._2,a._4,a._5)
// }).groupBy(_._3).flatMap(x=>{
// x._2.toArray.sortBy(x => x._2).reverse.take(1)
// })
//统计全世界在数据截止统计时的总感染人数
// val sumPeople = confirmedCount.map(_._1.toInt).sum().toInt
// print(sumPeople)
// 统计每个大洲中每日累计确诊人数最多的国家及确诊人数,
// 并输出 20200607 这一天各大洲当日累计确诊人数最多的国家及确诊人数
// val dayPeople = rdd.map(a=>{
// (a._3.toInt,a._7+a._4,a._5,a._4)
// }).groupBy(_._2).flatMap(x=>{
// x._2.toArray.sortBy(x => x._1).reverse.take(1)
// })
// dayPeople.filter(_._4.equals("20200408")).map(x=>(x._1,x._3)).foreach(println)
//统计每个大洲中每日累计确诊人数最多的国家及确诊人数,并输出 20200607 这一天各
//大洲当日累计确诊人数最多的国家及确诊人数。
// val daySumPeople = rdd.map(a=>{
// (a._2.toInt,a._7+a._4,a._5,a._4)
// }).groupBy(_._2).flatMap(x=>{
// x._2.toArray.sortBy(x => x._1).reverse.take(1)
// })
// daySumPeople.filter(_._4.equals("20200607")).map(x=>(x._1,x._3)).foreach(println)
//统计每个大洲每月累计确诊人数,显示 202006 这个月每个大洲的累计确诊人数。
val mouthSumPeople = rdd.map(a=>{
(a._2.toInt,a._7+a._4.substring(0,6),a._7,a._4.substring(0,6))
}).groupBy(_._2).map(x=>{
(x._2.toArray.map(t=>t._1.toInt).max,
x._2.toArray.map(t=>t._4).distinct.mkString(""),
x._2.toArray.map(t=>t._3).distinct.mkString(""))
})
mouthSumPeople.filter(_._2.equals("202006")).map(x=>(x._1,x._3)).foreach(println)
sparkSession.stop()
}
}
1.数据准备(共 10 分)
请在 HDFS 中创建目录/app/data/exam,并将 countrydata.csv 传到该目录。
hdfs dfs -mkdir -p /app/data/exam
hdfs dfs -put /root/countrydata.csv /app/data/exam
2.在 Spark-Shell 中,加载 HDFS 文件系统 countrydata.csv 文件,并使用 RDD 完成以下
统计计算。(共 45 分)
①统计每个国家在数据截止统计时的累计确诊人数。(9 分)
package com.njbdqn
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
object Exam {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[*]").appName("country").getOrCreate()
val rdd = sparkSession.sparkContext.textFile("hdfs://192.168.126.200:9000/app/data/exam/countrydata.csv").map(a=>{
val arr = a.split(",")
(arr(0),arr(1),arr(2),arr(3),arr(4),arr(5),arr(6))
})
//统计每个国家在数据截止统计时的累计确诊人数。
val confirmedCount = rdd.map(a=>{
(a._2,a._4,a._5)
}).groupBy(_._3).flatMap(x=>{
x._2.toArray.sortBy(x => x._2).reverse.take(1)
}).foreach(println)
sparkSession.stop()
}
}
②统计全世界在数据截止统计时的总感染人数。(9 分)
val sumPeople = confirmedCount.map(_._1.toInt).sum().toInt
print(sumPeople)
③统计每个大洲中每日新增确诊人数最多的国家及确诊人数,并输出 20200408 这一天各
大洲当日新增确诊人数最多的国家及确诊人数。(9 分)
val dayPeople = rdd.map(a=>{
(a._3.toInt,a._7+a._4,a._5,a._4)
}).groupBy(_._2).flatMap(x=>{
x._2.toArray.sortBy(x => x._1).reverse.take(1)
})
dayPeople.filter(_._4.equals("20200408")).map(x=>(x._1,x._3)).foreach(println)
④统计每个大洲中每日累计确诊人数最多的国家及确诊人数,并输出 20200607 这一天各
大洲当日累计确诊人数最多的国家及确诊人数。(9 分)
val daySumPeople = rdd.map(a=>{
(a._2.toInt,a._7+a._4,a._5,a._4)
}).groupBy(_._2).flatMap(x=>{
x._2.toArray.sortBy(x => x._1).reverse.take(1)
})
daySumPeople.filter(_._4.equals("20200607")).map(x=>(x._1,x._3)).foreach(println)
⑤统计每个大洲每月累计确诊人数,显示 202006 这个月每个大洲的累计确诊人数。(9分)
3.创建 HBase 数据表(共 5 分)
在 HBase 中创建命名空间(namespace)exam,在该命名空间下创建 covid19_world 表,
使用大洲和统计日期的组合作为 RowKey(如“亚洲 20200520”),该表下有 1 个列族 record。record 列族用于统计疫情数据(每个大洲当日新增确诊人数最多的国家 record:maxIncreaseCountry 及其新增确诊人数 record:maxIncreaseCount)。
create 'exam:covid19_world','record'
4.请在 Hive 中创建数据库 exam,在该数据库中创建外部表 ex_exam_record 指向
/app/data/exam 下的疫情数据 ;创建外部表 ex_exam_covid19_record 映射至 HBase 中的
exam:covid19_world 表的 record 列族(共 15 分)
create database exam;
create external table ex_exam_record(
id string,
confirmedCount int,
confirmedIncr int,
recordDate string,
countryName string,
countryShortCode string,
continent string
)
row format delimited fields terminated by ','
location '/app/data/exam';
create external table ex_exam_covid19_record(
key string,
maxIncreaseCountry string,
maxIncreaseCount int
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
With serdeproperties ("hbase.columns.mapping"=":key,
record:maxIncreaseCountry,record:maxIncreaseCount")
tblproperties("hbase.table.name"="exam:covid19_world");
5. 使用 ex_exam_record 表中的数据(共 25 分)
①统计每个大洲中每日新增确诊人数最多的国家,将 continent 和 recordDate 合并成
rowkey,并保存到 ex_exam_covid19_record 表中。(20 分)
insert into table ex_exam_covid19_record
select rowkey,countryName,confirmedIncr from (
select concat(continent,recordDate) as rowkey,
rank()over(partition by continent,recordDate order by confirmedIncr desc) as rank,countryName,confirmedIncr
from ex_exam_record)a
where rank=1;
②完成统计后,在 HBase Shell 中遍历 exam:covid19_world 表中的前 20 条数据。(5 分)
scan 'exam:covid19_world',{LIMIT=>20}