前言
接上一篇文章,上篇文章说到hudi适配hbase 2.2.6,这篇文章在spark-shell中操作hudi,并使用hbase作为索引。要完成以下实验,请先确保你已经按照文章步骤对hudi进行适配。并且得到了hudi-spark3-bundle_2.12-0.9.0.jar
当然,如果你想先做一个实验,那么可以从这里以下链接下载我已经编译好的jar包。
- hudi-spark3-bundle_2.12-0.9.0.jar
- hbase-shaded-netty-2.2.1.jar
- hbase-shaded-miscellaneous-2.2.1.jar
- hbase-protocol-shaded-2.2.6.jar
组件版本以及前提要求:
组件版本:
hudi 0.9.0
hbase 2.2.6
spark 3.0.1
hadoop 3.2.0
hive 3.1.2
zookeeper:3.5.9
前提要求:
要完成以下实验,当然首先你需要有一个可以用的hadoop 3.2.0集群、hbase 2.2.6集群、主机环境中已经下载spark 3.0.1二进制包。
环境说明:
本实验环境使用的相关配置如下:
- hdfs:hdfs://host117:8020
- zookeeper:host117:2181
- hbase对应zk_node_path:/hbase-secure
- 在hbase上建一个名为hudi_hbase_index_test、列族为_s的表用于存放索引信息。命令为
create 'hudi_hbase_index_test', '_s'
拷贝hbase相关包到spark的jars目录下
我们在spark中使用hbase作为hudi的索引时,需要hbase相关jar包,所以我们需要将hbase目录下的以下jar包拷贝到spark的jars目录下:
- hbase-protocol-shaded-2.2.6.jar
- hbase-shaded-netty-2.2.1.jar
- hbase-shaded-miscellaneous-2.2.1.jar
拷贝hudi-spark3-bundle_2.12-0.9.0.jar到spark的jars目录下
cp hudi-spark3-bundle_2.12-0.9.0.jar spark/jars
启动spark-shell执行hudi相关操作
启动spark-shell
./bin/spark-shell --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
使用DataGenerator类生成随机数据并写入hudi
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieHBaseIndexConfig._
import org.apache.hudi.config.HoodieIndexConfig._
val tableName = "spark_hudi_hbase_index_test"
val basePath = "hdfs://host117:8020/tmp/spark_hudi_hbase_index_test"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator").
option(INDEX_TYPE.key(), "HBASE").
option(ZKPORT.key(), "2181").
option(QPS_FRACTION.key(), 0.5).
option(TABLENAME.key(), "hudi_hbase_index_test").
option(ZK_NODE_PATH.key(), "/hbase-secure").
option(ZKQUORUM.key(), "host117").
option(MAX_QPS_FRACTION.key(), 10000).
option(MIN_QPS_FRACTION.key(), 1000).
option(SLEEP_MS_FOR_PUT_BATCH.key(), 100).
option(SLEEP_MS_FOR_GET_BATCH.key(), 100).
option(GET_BATCH_SIZE.key(), 100).
option(QPS_ALLOCATOR_CLASS_NAME.key(), "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator").
option(UPDATE_PARTITION_PATH_ENABLE.key(), "false").
option(PUT_BATCH_SIZE_AUTO_COMPUTE.key(), "false").
option(MAX_QPS_PER_REGION_SERVER.key(), 1000).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
注意事项:在使用hbase作为索引时,官网上关于hbase index 的配置说,某些配置项是可选的,但是实际在操作过程中发现其实那些配置项是必选的,比如QPS_ALLOCATOR_CLASS_NAME.key()
,所以如果你在实际操作过程中,如果发现存在空指针错误的报错,那么可以按照报错信息查看是不是某些配置没有配导致的。
查看hbase上hudi表的索引信息
在完成上述数据写入之后,我们查看hbase中关于该表的索引信息:
查看hudi表中的数据
执行如下命令
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
查询结果
更新hudi表中数据
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator").
option(INDEX_TYPE.key(), "HBASE").
option(ZKPORT.key(), "2181").
option(TABLENAME.key(), "hudi_hbase_index_test").
option(ZK_NODE_PATH.key(), "/hbase-secure").
option(ZKQUORUM.key(), "host117").
option(MAX_QPS_FRACTION.key(), 10000).
option(MIN_QPS_FRACTION.key(), 1000).
option(QPS_FRACTION.key(), 0.5).
option(SLEEP_MS_FOR_PUT_BATCH.key(), 100).
option(SLEEP_MS_FOR_GET_BATCH.key(), 100).
option(GET_BATCH_SIZE.key(), 100).
option(QPS_ALLOCATOR_CLASS_NAME.key(), "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator").
option(UPDATE_PARTITION_PATH_ENABLE.key(), "true").
option(PUT_BATCH_SIZE_AUTO_COMPUTE.key(), "true").
option(MAX_QPS_PER_REGION_SERVER.key(), 1000).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
增量查询hudi表中数据
spark.
read.
format("hudi").
load(basePath + "/*/*/*/*").
createOrReplaceTempView("hudi_trips_snapshot")
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in
// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()
相关结果如下所示: