我眼中的Hudi----数据库之Hudi

数据湖
数据湖或者Hudi是由大数据厂商提出来的。
数据量越大,越需要不同种类的存储,但是并不是所有企业的数据都是适合存储在廉价的HDFS集群之上的。

Apache Hudi让用户可以在Hadoop兼容的基础上存储大量数据,同时它还提供了两种原语操作,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是:

  • Updae/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时它还提供了写操作的事务保证,查询会处理最后一个提交的快照,并基于此给出结果。
  • 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定时间点获取表中已Updated/inserted/deleted的所有记录的增量流,并解锁新的查询姿势。

原语:计算机进程的控制通常是由原语完成的。所谓原语,一般是指由若干条指令组成的程序段,用来实现某个特定功能,在执行过程中不可中断。
表设计
在较高的层次上,用于写Hudi表的组件使用了一种受支持的方式嵌入到Apache Spark作业中,它会在支持DFS的存储上,生成代表Hudi表的一组文件。然后,用户可以使用诸如Apache Spark、Presto、Apache Hive之类的查询引擎,查询该表。Hudi表的三个主要组件:
1,有序的时间轴元数据,类似数据库事务日志
2,分层布局的数据文件,实际写入表中的数据
3,多种实现方式的索引,映射包含指定记录的数据集

时间轴
Hudi维护了一条包含在不同的即时时间(instant time)对数据集做的所有instant操作的timeline,从而提供表的即时视图,同时还支持按到达顺序进行数据检索。时间轴包含以下组件:

  • Instant action:在表上的操作类型
  • Instant time:操作开始的一个时间戳,该时间戳会按照开始时间顺序单调递增
  • state:即时状态,任意操作都可以处于以下三种状态
    1,Requested:表示已经安排操作行为,但是尚未开始
    2,Inflight:表示正在执行当前操作
    3,Completed:表示已经完成操作
    时间轴包含的操作类型:
  • commits:原子的写入一张表的操作
  • cleans:后台消除了表中旧版本数据,即表中不在需要的数据
  • delta_commit:增量提交,将一批数据原子写入到MergeOnRead表中,并且只记录到增量日志中
  • compaction:后台协调Hudi中的差异数据
  • rollback:回滚,删除在写入过程中的数据
  • savepoint:将某些文件标记"已经保存",以便清除数据时不会删除它们,一般用于表的还原,可以将数据还原到某个时间点

数据文件
Hudi将表组织成DFS上基本路径下的文件夹结构中,如果表是分区的,则在基本路径下还会有其他的分区,这些分区是包含在该分区数据的文件夹,与Hive表非常类似。
在每个分区内,文件被组织成文件组,由文件ID唯一表示。其中,每个切片包含基本列文件(.parquet)和一组日志文件(.log.*)。Hudi采用了MVCC设计,压缩操作会将日志和基本文件合并以产生新的文件片,而清除操作则将未使用/较旧的文件片删除,以回收DFS上的空间。

索引
Hudi通过索引机制提供高效的upsert操作,该机制会将一个记录键+分区路径组合一致性的映射到一个文件ID。
Hudi当前提供了三种索引实现,来映射一个记录键到包含该记录的文件ID。这将使得我们无需扫描表中的每条记录,就可以显著提高Upsert速度。

表类型
1,Copy On Write表
COW表写数据,直接写入到basefile(*.parquet),而不写入到log文件。所以,COW表的文件片只包含basefile(一个parquet构成一个文件片)
对于Update:该文件ID的最新版本都将被重写一次,并对所有已经更改的记录使用新值
对于insert:记录首先打包到每个分区路径中的最小文件中,直到达到配置的最大大小
仅使用列式存储,例如parquet,仅更新版本号,通过写入过程中执行同步合并来重写文件。
2,Merge On Read表
MOR表写数据时,记录首先会被快速写进日志文件,稍后会使用时间轴上的压缩操作将其与基本文件合并。根据查询是读取日志中的合并快照流,还是变更流,还是仅读取未合并的基础文件,MOR支持多种查询类型。
基于列式存储(parquet)和行式存储(avro)结合的文件进行存储,更新记录到增量文件
基于列式存储(parquet)和行式存储(avro)结合的文件进行存储,更新记录到增量文件,压缩同步和异步生成新版本的文件。
压缩
压缩是一个instant操作,它将一组文件片作为输入,将每个文件切片中的所有日志文件与其basefile文件(parquet文件)合并,以生成新的压缩文件片,并写为时间轴上的一个commit。压缩操作仅适合用于读取合并MOR表类型。

清理
清理是一项基本的instant操作,其执行的目的是删除旧的文件片,并限制表占用的存储空间。清理会在每次写操作之后自动执行。

DFS访问优化
Hudi对表中存储的数据执行了几种密钥管理功能,它能管理在DFS上存储数据的文件大小,计数,以及回收存储空间等。

查询

  • 快照查询:查询操作将查询最新快照的表数据。如果是Merge On Read类型的表,它将动态合并最新文件版本的基本数据和增量数据用于查询。如果是Copy On Write类型的表,它直接查询parquet表,同时提供upsert/delete操作。
  • 增量查询,查询只能看到写入表的新数据
  • 优化读查询,查询将查看给定提交/压缩操作的最新快照

Hudi的特点
1,近实时摄取
将外部数据(例如事件日志,数据库,外部源)如何摄取到Hadoop Data Lake是一个众所周知的问题。在大多数Hadoop部署中,经常会以零碎的方式,使用多种摄取工具解决,这些数据对整个组织是最具有价值的。
对于RDBMS关系型的摄入,Hudi提供了更快的Upsert操作。例如,可以通过MySQL binLog的形式或者Sqoop导入到hdfs上对应的Hudi表中,这样操作比Sqoop批量合并job和复杂合并工作流更加快速高效。
对于NoSQL的数据库,这种可以存储十亿行的数据库,完全采用批量加载是不可行的,并且如果摄取数据要跟上通常较高的更新量,则需要更有效的方法。
即时对于像Kafka这样不可变数据库源,Hudi也会在HDFS上强制执行最小文件大小,从而通过整体解决Hadoop领域中小文件过多的问题,改善NameNode的运行状况。对于事件流尤为重要,因为事件流通常较大,并且如果管理不善,可能严重损害Hadoop集群。
在所有来源中,Hudi都增加了急需的功能,即通过提交概念将新数据原子推送给消费者,避免摄入数据失败。
2,近实时分析
Hadoop上交互式的SQL解决方案有Presto和Spark SQL。将数据的更新事件缩短至几分钟,Hudi可以提供多种高效的替代方案,并可以对存储在DFS中的多个大小表进行实时分析。
3,增量处理管道
4,DFS的数据分散
Hudi可以像Kafka一样,用于数据分散,将每个管道的数据输出到Hudi表中,然后将其递增尾部以获取新数据并写入到服务存储中。

HUDI增删改查部分
将集群配置文件复制到,项目resource源码包下,使得本地环境可以访问hadoop集群
如:core-site.xml,hdfs-site.xml,hive-site.xml,yarn-site.xml

Hudi写入数据到HDFS

  // 不带分区写入
  def insert(): Unit = {
    import org.apache.spark.sql.functions._
    val commitTime = System.currentTimeMillis().toString
    val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val insertData = spark.read.parquet("/tmp/one.parquet")
      .withColumn("ts", lit(commitTime))
    insertData.write.format("org.apache.hudi")
      // 设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
      // 设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
      // 并行度参数设置
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      // table name 设置
      .option(HoodieWriteConfig.TABLE_NAME, "test")
      .mode(SaveMode.Overwrite)
      // 写入路径设置
      .save("/tmp/hudi")
  }
    //带分区写入
  def insertWithPartition(): Unit = {
    import org.apache.spark.sql.functions._
    val commitTime = System.currentTimeMillis().toString
    val spark = SparkSession.builder().appName("hudi.insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val frame: DataFrame = spark.read.parquet("/tmp/one.parquet")
      .withColumn("ts", lit(commitTime))
      .withColumn("uuid", col("uid"))
      .withColumn("hudipartition", concat_ws("/", col("uid"), col("province")))
    frame.write.format("org.apache.hudi")
      //设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
      //设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
      //设置表名
      .option("hoodie.table.name", "testTable")
      //设置分区
      .option("hoodie.datasource.write.partition.field", "hudipartition")
      .mode(SaveMode.Overwrite)
      .save("/tmp/hudiOne")
  }

查询Hudi

  //查询hudi
  def query() = {
    val basepath = "/tmp/hudi2"
    val spark = SparkSession.builder().appName("query insert")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .master("local[*]").getOrCreate()
    val tripsSnapshotDF = spark
      .read.format("org.apache.hudi")
      .load(basepath + "/*/*")
    tripsSnapshotDF.show()
  }
    //增量查询
  def incrementalQuery() = {
    val beginTime = 20201212130000l
    val spark = SparkSession.builder().appName("query insert")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .master("local[*]").getOrCreate()
    val frame: DataFrame = spark.read.format("org.apache.hudi")
      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
      //设置开始查询的时间戳 不需要设置结束时间戳
      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginTime)
      .load("/tmp/hudiOne")
    frame.show()
    println(frame.count())
  }

修改Hudi上的数据

 //修改Hdfs上的Hudi数据,根据uid生成一份最新的修改数据
  def updateData() = {
    import org.apache.spark.sql.functions._
    val commitTime = System.currentTimeMillis().toString
    val spark = SparkSession.builder().appName("hudi.insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val frame: DataFrame = spark.read.parquet("/tmp/one.parquet")
      .withColumn("ts", lit(commitTime))
      .withColumn("uuid", col("uid"))
      .withColumn("hudipartition", concat_ws("/", col("uid"), col("province")))
    frame.write.format("org.apache.hudi")
      //设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
      //设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
      //设置表名
      .option("hoodie.table.name", "testTable")
      //设置分区
      .option("hoodie.datasource.write.partition.field", "hudipartition")
      .mode(SaveMode.Append)
      .save("/tmp/hudiOne")
  }

Hudi集成hive
将编译好的hudi-adoop-mr包复制到hive lib下

  def hiveSync() = {
    val commitTime = System.currentTimeMillis().toString
    val spark = SparkSession.builder.appName("upsert partition").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val upsertData = spark.read.parquet("/tmp/one.parquet")
      .withColumn("ts", lit(commitTime))
    upsertData.write.format("org.apache.hudi")
      // 设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uid")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
      // 分区列设置
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "age")
      // 设置要同步的hive库名
      .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "one")
      // 设置要同步的hive表名
      .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "test_partition")
      // 设置数据集注册并同步到hive
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
      // 设置当分区变更时,当前数据的分区目录是否变更
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
      // 设置要同步的分区列名
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "age") //hive 表同步的分区列
      // 设置jdbc 连接同步
      .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://192.168.10.111:10000")
      // hudi表名称设置
      .option(HoodieWriteConfig.TABLE_NAME, "test_partition")
      // 用于将分区字段值提取到Hive分区列中的类,这里我选择使用当前分区的值同步
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
      // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
      .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
      // 并行度参数设置
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .mode(SaveMode.Append)
      .save("/tmp/hudi");
  }

上一篇:数据湖风暴来袭,EMR重磅发布Apache Hudi


下一篇:Hudi编译