spark-shell操作hudi并使用hbase作为索引

前言

接上一篇文章,上篇文章说到hudi适配hbase 2.2.6,这篇文章在spark-shell中操作hudi,并使用hbase作为索引。要完成以下实验,请先确保你已经按照文章步骤对hudi进行适配。并且得到了hudi-spark3-bundle_2.12-0.9.0.jar

当然,如果你想先做一个实验,那么可以从这里以下链接下载我已经编译好的jar包。

组件版本以及前提要求:

组件版本:

hudi 0.9.0

hbase 2.2.6

spark 3.0.1

hadoop 3.2.0

hive 3.1.2

zookeeper:3.5.9

前提要求:

要完成以下实验,当然首先你需要有一个可以用的hadoop 3.2.0集群、hbase 2.2.6集群、主机环境中已经下载spark 3.0.1二进制包。

环境说明:

本实验环境使用的相关配置如下:

  • hdfs:hdfs://host117:8020
  • zookeeper:host117:2181
  • hbase对应zk_node_path:/hbase-secure
  • 在hbase上建一个名为hudi_hbase_index_test、列族为_s的表用于存放索引信息。命令为
create 'hudi_hbase_index_test', '_s'

拷贝hbase相关包到spark的jars目录下

我们在spark中使用hbase作为hudi的索引时,需要hbase相关jar包,所以我们需要将hbase目录下的以下jar包拷贝到spark的jars目录下:

  • hbase-protocol-shaded-2.2.6.jar
  • hbase-shaded-netty-2.2.1.jar
  • hbase-shaded-miscellaneous-2.2.1.jar

拷贝hudi-spark3-bundle_2.12-0.9.0.jar到spark的jars目录下

cp hudi-spark3-bundle_2.12-0.9.0.jar spark/jars

启动spark-shell执行hudi相关操作

启动spark-shell

./bin/spark-shell --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

spark-shell操作hudi并使用hbase作为索引

使用DataGenerator类生成随机数据并写入hudi

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieHBaseIndexConfig._
import org.apache.hudi.config.HoodieIndexConfig._

val tableName = "spark_hudi_hbase_index_test"
val basePath =  "hdfs://host117:8020/tmp/spark_hudi_hbase_index_test"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator").
  option(INDEX_TYPE.key(), "HBASE").
  option(ZKPORT.key(), "2181").
  option(QPS_FRACTION.key(), 0.5).
  option(TABLENAME.key(), "hudi_hbase_index_test").
  option(ZK_NODE_PATH.key(), "/hbase-secure").
  option(ZKQUORUM.key(), "host117").
  option(MAX_QPS_FRACTION.key(), 10000).
  option(MIN_QPS_FRACTION.key(), 1000).
  option(SLEEP_MS_FOR_PUT_BATCH.key(), 100).
  option(SLEEP_MS_FOR_GET_BATCH.key(), 100).
  option(GET_BATCH_SIZE.key(), 100).
  option(QPS_ALLOCATOR_CLASS_NAME.key(), "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator").
  option(UPDATE_PARTITION_PATH_ENABLE.key(), "false").
  option(PUT_BATCH_SIZE_AUTO_COMPUTE.key(), "false").
  option(MAX_QPS_PER_REGION_SERVER.key(), 1000).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

注意事项:在使用hbase作为索引时,官网上关于hbase index 的配置说,某些配置项是可选的,但是实际在操作过程中发现其实那些配置项是必选的,比如QPS_ALLOCATOR_CLASS_NAME.key(),所以如果你在实际操作过程中,如果发现存在空指针错误的报错,那么可以按照报错信息查看是不是某些配置没有配导致的。

查看hbase上hudi表的索引信息

在完成上述数据写入之后,我们查看hbase中关于该表的索引信息:

spark-shell操作hudi并使用hbase作为索引

查看hudi表中的数据

执行如下命令

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*")
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

查询结果

spark-shell操作hudi并使用hbase作为索引

更新hudi表中数据

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator").
  option(INDEX_TYPE.key(), "HBASE").
  option(ZKPORT.key(), "2181").
  option(TABLENAME.key(), "hudi_hbase_index_test").
  option(ZK_NODE_PATH.key(), "/hbase-secure").
  option(ZKQUORUM.key(), "host117").
  option(MAX_QPS_FRACTION.key(), 10000).
  option(MIN_QPS_FRACTION.key(), 1000).
  option(QPS_FRACTION.key(), 0.5).
  option(SLEEP_MS_FOR_PUT_BATCH.key(), 100).
  option(SLEEP_MS_FOR_GET_BATCH.key(), 100).
  option(GET_BATCH_SIZE.key(), 100).
  option(QPS_ALLOCATOR_CLASS_NAME.key(), "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator").
  option(UPDATE_PARTITION_PATH_ENABLE.key(), "true").
  option(PUT_BATCH_SIZE_AUTO_COMPUTE.key(), "true").
  option(MAX_QPS_PER_REGION_SERVER.key(), 1000).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

增量查询hudi表中数据

spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*").
  createOrReplaceTempView("hudi_trips_snapshot")

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in

// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

相关结果如下所示:

spark-shell操作hudi并使用hbase作为索引

上一篇:hive metastore配置kerberos认证


下一篇:用了CSDN浏览器插件之后,我的工作效率上来了,业余生活也丰富了