sparkShell操作hudi

使用sparkShell连接hudi

[root@ha1 bin]#spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.3,org.apache.spark:spark-avro_2.11:2.4.4,org.apache.avro:avro:1.8.2 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

sparkShell操作hudi

创建表

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

// 表名
val tableName = "hudi_trips_cow"
// 基本路径
val basePath = "file:///tmp/hudi_trips_cow"
// 数据类
val dataGen = new DataGenerator

插入数据

// 生成10条打车行程数据
val inserts = convertToStringList(dataGen.generateInserts(10))
// 创建DataFrame,并写入数据
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
// 将DataFrame写入Hudi表中
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  // mode(Overwrite)如果表已经存在,则覆盖并重新创建该表。
  mode(Overwrite).
  save(basePath)

查询数据

// 将数据文件加载到DataFrame中
val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*")

// 创建临时表
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
+------------------+-------------------+-------------------+---+
|              fare|          begin_lon|          begin_lat| ts|
+------------------+-------------------+-------------------+---+
|34.158284716382845|0.46157858450465483| 0.4726905879569653|0.0|
|  43.4923811219014| 0.8779402295427752| 0.6100070562136587|0.0|
| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|0.0|
| 93.56018115236618|0.14285051259466197|0.21624150367601136|0.0|
| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|0.0|
| 66.62084366450246|0.03844104444445928| 0.0750588760043035|0.0|
| 41.06290929046368| 0.8192868687714224|  0.651058505660742|0.0|
| 27.79478688582596| 0.6273212202489661|0.11488393157088261|0.0|
+------------------+-------------------+-------------------+---+

// hudi会新增的_hoodie_commit_time是时间戳字段,里面存储的是摄入时间;_hoodie_partition_path是数据块路径字段。
scala> spark.sql("select _hoodie_commit_time,_hoodie_partition_path from  hudi_trips_snapshot").show()
+-------------------+----------------------+
|_hoodie_commit_time|_hoodie_partition_path|
+-------------------+----------------------+
|     20210427222000|  americas/united_s...|
|     20210427222000|  americas/united_s...|
|     20210427222000|  americas/united_s...|
|     20210427222000|  americas/united_s...|
|     20210427222000|  americas/united_s...|
|     20210427222000|  americas/brazil/s...|
|     20210427222000|  americas/brazil/s...|
|     20210427222000|  americas/brazil/s...|
|     20210427222000|    asia/india/chennai|
|     20210427222000|    asia/india/chennai|
+-------------------+----------------------+

sparkShell操作hudi

更新数据

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  // mode(Append)追加数据
  mode(Append).
  save(basePath)

增量查询

spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*").
  createOrReplaceTempView("hudi_trips_snapshot")

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // 设置增量时间戳范围

// 增量查询
val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
// 创建临时表
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

时间点查询

val beginTime = "000" // “ 000”表示最早的提交时间
val endTime = commits(commits.length - 2) // 设置增量时间戳范围

//增量查询
val tripsPointInTimeDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  option(END_INSTANTTIME_OPT_KEY, endTime).
  load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

删除数据

// 获取所有数据
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
// 取出两条要删除的数据
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

// 删除
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY,"delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

// 再查询
val roAfterDeleteViewDF = spark.
  read.
  format("hudi").
  load(basePath + "/*/*/*/*")

roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// 应该返回两条数据
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
上一篇:浅谈如何做好EPC项目设计的概算控制和管理


下一篇:稳定性测试,需要对EPC或DIC任务进行翻倍操作,供后续使用