数据湖
数据湖或者Hudi是由大数据厂商提出来的。
数据量越大,越需要不同种类的存储,但是并不是所有企业的数据都是适合存储在廉价的HDFS集群之上的。
Apache Hudi让用户可以在Hadoop兼容的基础上存储大量数据,同时它还提供了两种原语操作,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是:
- Updae/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时它还提供了写操作的事务保证,查询会处理最后一个提交的快照,并基于此给出结果。
- 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定时间点获取表中已Updated/inserted/deleted的所有记录的增量流,并解锁新的查询姿势。
原语:计算机进程的控制通常是由原语完成的。所谓原语,一般是指由若干条指令组成的程序段,用来实现某个特定功能,在执行过程中不可中断。
表设计
在较高的层次上,用于写Hudi表的组件使用了一种受支持的方式嵌入到Apache Spark作业中,它会在支持DFS的存储上,生成代表Hudi表的一组文件。然后,用户可以使用诸如Apache Spark、Presto、Apache Hive之类的查询引擎,查询该表。Hudi表的三个主要组件:
1,有序的时间轴元数据,类似数据库事务日志
2,分层布局的数据文件,实际写入表中的数据
3,多种实现方式的索引,映射包含指定记录的数据集
时间轴
Hudi维护了一条包含在不同的即时时间(instant time)对数据集做的所有instant操作的timeline,从而提供表的即时视图,同时还支持按到达顺序进行数据检索。时间轴包含以下组件:
- Instant action:在表上的操作类型
- Instant time:操作开始的一个时间戳,该时间戳会按照开始时间顺序单调递增
- state:即时状态,任意操作都可以处于以下三种状态
1,Requested:表示已经安排操作行为,但是尚未开始
2,Inflight:表示正在执行当前操作
3,Completed:表示已经完成操作
时间轴包含的操作类型: - commits:原子的写入一张表的操作
- cleans:后台消除了表中旧版本数据,即表中不在需要的数据
- delta_commit:增量提交,将一批数据原子写入到MergeOnRead表中,并且只记录到增量日志中
- compaction:后台协调Hudi中的差异数据
- rollback:回滚,删除在写入过程中的数据
- savepoint:将某些文件标记"已经保存",以便清除数据时不会删除它们,一般用于表的还原,可以将数据还原到某个时间点
数据文件
Hudi将表组织成DFS上基本路径下的文件夹结构中,如果表是分区的,则在基本路径下还会有其他的分区,这些分区是包含在该分区数据的文件夹,与Hive表非常类似。
在每个分区内,文件被组织成文件组,由文件ID唯一表示。其中,每个切片包含基本列文件(.parquet)和一组日志文件(.log.*)。Hudi采用了MVCC设计,压缩操作会将日志和基本文件合并以产生新的文件片,而清除操作则将未使用/较旧的文件片删除,以回收DFS上的空间。
索引
Hudi通过索引机制提供高效的upsert操作,该机制会将一个记录键+分区路径组合一致性的映射到一个文件ID。
Hudi当前提供了三种索引实现,来映射一个记录键到包含该记录的文件ID。这将使得我们无需扫描表中的每条记录,就可以显著提高Upsert速度。
表类型
1,Copy On Write表
COW表写数据,直接写入到basefile(*.parquet),而不写入到log文件。所以,COW表的文件片只包含basefile(一个parquet构成一个文件片)
对于Update:该文件ID的最新版本都将被重写一次,并对所有已经更改的记录使用新值
对于insert:记录首先打包到每个分区路径中的最小文件中,直到达到配置的最大大小
仅使用列式存储,例如parquet,仅更新版本号,通过写入过程中执行同步合并来重写文件。
2,Merge On Read表
MOR表写数据时,记录首先会被快速写进日志文件,稍后会使用时间轴上的压缩操作将其与基本文件合并。根据查询是读取日志中的合并快照流,还是变更流,还是仅读取未合并的基础文件,MOR支持多种查询类型。
基于列式存储(parquet)和行式存储(avro)结合的文件进行存储,更新记录到增量文件
基于列式存储(parquet)和行式存储(avro)结合的文件进行存储,更新记录到增量文件,压缩同步和异步生成新版本的文件。
压缩
压缩是一个instant操作,它将一组文件片作为输入,将每个文件切片中的所有日志文件与其basefile文件(parquet文件)合并,以生成新的压缩文件片,并写为时间轴上的一个commit。压缩操作仅适合用于读取合并MOR表类型。
清理
清理是一项基本的instant操作,其执行的目的是删除旧的文件片,并限制表占用的存储空间。清理会在每次写操作之后自动执行。
DFS访问优化
Hudi对表中存储的数据执行了几种密钥管理功能,它能管理在DFS上存储数据的文件大小,计数,以及回收存储空间等。
查询
- 快照查询:查询操作将查询最新快照的表数据。如果是Merge On Read类型的表,它将动态合并最新文件版本的基本数据和增量数据用于查询。如果是Copy On Write类型的表,它直接查询parquet表,同时提供upsert/delete操作。
- 增量查询,查询只能看到写入表的新数据
- 优化读查询,查询将查看给定提交/压缩操作的最新快照
Hudi的特点
1,近实时摄取
将外部数据(例如事件日志,数据库,外部源)如何摄取到Hadoop Data Lake是一个众所周知的问题。在大多数Hadoop部署中,经常会以零碎的方式,使用多种摄取工具解决,这些数据对整个组织是最具有价值的。
对于RDBMS关系型的摄入,Hudi提供了更快的Upsert操作。例如,可以通过MySQL binLog的形式或者Sqoop导入到hdfs上对应的Hudi表中,这样操作比Sqoop批量合并job和复杂合并工作流更加快速高效。
对于NoSQL的数据库,这种可以存储十亿行的数据库,完全采用批量加载是不可行的,并且如果摄取数据要跟上通常较高的更新量,则需要更有效的方法。
即时对于像Kafka这样不可变数据库源,Hudi也会在HDFS上强制执行最小文件大小,从而通过整体解决Hadoop领域中小文件过多的问题,改善NameNode的运行状况。对于事件流尤为重要,因为事件流通常较大,并且如果管理不善,可能严重损害Hadoop集群。
在所有来源中,Hudi都增加了急需的功能,即通过提交概念将新数据原子推送给消费者,避免摄入数据失败。
2,近实时分析
Hadoop上交互式的SQL解决方案有Presto和Spark SQL。将数据的更新事件缩短至几分钟,Hudi可以提供多种高效的替代方案,并可以对存储在DFS中的多个大小表进行实时分析。
3,增量处理管道
4,DFS的数据分散
Hudi可以像Kafka一样,用于数据分散,将每个管道的数据输出到Hudi表中,然后将其递增尾部以获取新数据并写入到服务存储中。
HUDI增删改查部分
将集群配置文件复制到,项目resource源码包下,使得本地环境可以访问hadoop集群
如:core-site.xml,hdfs-site.xml,hive-site.xml,yarn-site.xml
Hudi写入数据到HDFS
// 不带分区写入
def insert(): Unit = {
import org.apache.spark.sql.functions._
val commitTime = System.currentTimeMillis().toString
val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
val insertData = spark.read.parquet("/tmp/one.parquet")
.withColumn("ts", lit(commitTime))
insertData.write.format("org.apache.hudi")
// 设置主键列名
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
// 设置数据更新时间的列名
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
// 并行度参数设置
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// table name 设置
.option(HoodieWriteConfig.TABLE_NAME, "test")
.mode(SaveMode.Overwrite)
// 写入路径设置
.save("/tmp/hudi")
}
//带分区写入
def insertWithPartition(): Unit = {
import org.apache.spark.sql.functions._
val commitTime = System.currentTimeMillis().toString
val spark = SparkSession.builder().appName("hudi.insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
val frame: DataFrame = spark.read.parquet("/tmp/one.parquet")
.withColumn("ts", lit(commitTime))
.withColumn("uuid", col("uid"))
.withColumn("hudipartition", concat_ws("/", col("uid"), col("province")))
frame.write.format("org.apache.hudi")
//设置主键列名
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
//设置数据更新时间的列名
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
//设置表名
.option("hoodie.table.name", "testTable")
//设置分区
.option("hoodie.datasource.write.partition.field", "hudipartition")
.mode(SaveMode.Overwrite)
.save("/tmp/hudiOne")
}
查询Hudi
//查询hudi
def query() = {
val basepath = "/tmp/hudi2"
val spark = SparkSession.builder().appName("query insert")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.master("local[*]").getOrCreate()
val tripsSnapshotDF = spark
.read.format("org.apache.hudi")
.load(basepath + "/*/*")
tripsSnapshotDF.show()
}
//增量查询
def incrementalQuery() = {
val beginTime = 20201212130000l
val spark = SparkSession.builder().appName("query insert")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.master("local[*]").getOrCreate()
val frame: DataFrame = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
//设置开始查询的时间戳 不需要设置结束时间戳
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginTime)
.load("/tmp/hudiOne")
frame.show()
println(frame.count())
}
修改Hudi上的数据
//修改Hdfs上的Hudi数据,根据uid生成一份最新的修改数据
def updateData() = {
import org.apache.spark.sql.functions._
val commitTime = System.currentTimeMillis().toString
val spark = SparkSession.builder().appName("hudi.insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
val frame: DataFrame = spark.read.parquet("/tmp/one.parquet")
.withColumn("ts", lit(commitTime))
.withColumn("uuid", col("uid"))
.withColumn("hudipartition", concat_ws("/", col("uid"), col("province")))
frame.write.format("org.apache.hudi")
//设置主键列名
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
//设置数据更新时间的列名
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
//设置表名
.option("hoodie.table.name", "testTable")
//设置分区
.option("hoodie.datasource.write.partition.field", "hudipartition")
.mode(SaveMode.Append)
.save("/tmp/hudiOne")
}
Hudi集成hive
将编译好的hudi-adoop-mr包复制到hive lib下
def hiveSync() = {
val commitTime = System.currentTimeMillis().toString
val spark = SparkSession.builder.appName("upsert partition").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
val upsertData = spark.read.parquet("/tmp/one.parquet")
.withColumn("ts", lit(commitTime))
upsertData.write.format("org.apache.hudi")
// 设置主键列名
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
// 分区列设置
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "age")
// 设置要同步的hive库名
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "one")
// 设置要同步的hive表名
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "test_partition")
// 设置数据集注册并同步到hive
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
// 设置当分区变更时,当前数据的分区目录是否变更
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
// 设置要同步的分区列名
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "age") //hive 表同步的分区列
// 设置jdbc 连接同步
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://192.168.10.111:10000")
// hudi表名称设置
.option(HoodieWriteConfig.TABLE_NAME, "test_partition")
// 用于将分区字段值提取到Hive分区列中的类,这里我选择使用当前分区的值同步
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
// 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
.option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
// 并行度参数设置
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(SaveMode.Append)
.save("/tmp/hudi");
}