Hudi 0.7.0
Hudi Jave Client 测试
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-java-client</artifactId>
<version>0.7.0</version>
</dependency>
将hudi 0.7 版本编译好的 hudi-example-0.7.0.jar 放入项目lib中
代码
package com.hjl.hudi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.*;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
import org.apache.hudi.index.HoodieIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @Description Hudi Java Client 测试
* @Author jiale.he
* @Date 2021-02-01 11:03 周一
*/
public class HudiClientTest {
private static final Logger logger = LoggerFactory.getLogger(HudiClientTest.class);
public static void main(String[] args) throws Exception {
// String tablePath = "hdfs://localhost:8020/spark_hudi/huditable";
String tablePath = "/Users/jiale.he/IdeaProjects/hudi-learn/src/main/resources/huditable";
String tableName = "huditable";
// 测试数据器
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();
Configuration hadoopConf = new Configuration();
// 初始化表
Path path = new Path(tablePath);
FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
if (!fs.exists(path)) {
// 检查路径是否存在
// 初始化hudi table 创建hudi表的tablepath,写入初始化元数据信息
HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.COPY_ON_WRITE, tableName, HoodieAvroPayload.class.getName());
}
// 创建write client conf
HoodieWriteConfig hudiWriteConf = HoodieWriteConfig.newBuilder()
// 数据schema
.withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA)
// 数据插入更新并行度
.withParallelism(2, 2)
// 数据删除并行度
.withDeleteParallelism(2)
// hudi表索引类型,内存
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
// 合并
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build())
.withPath(tablePath)
.forTable(tableName)
.build();
// 获得hudi write client
HoodieJavaWriteClient<HoodieAvroPayload> client = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), hudiWriteConf);
// 插入
List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = insert(dataGen, client);
// 更新
upsert(dataGen, client, hoodieRecords);
// 删除
delete(dataGen, client, hoodieRecords);
client.close();
}
/**
* 删除
*
* @param dataGen 数据生成器
* @param client client
* @param hoodieRecords records
*/
public static void delete(HoodieExampleDataGenerator dataGen,
HoodieJavaWriteClient client,
List<HoodieRecord<HoodieAvroPayload>> hoodieRecords) {
String newCommitTime = client.startCommit();
logger.info("Starting Commit: " + newCommitTime);
int deleteNum = hoodieRecords.size() / 2;
List<HoodieKey> deleteRecords = hoodieRecords
.stream()
.map(HoodieRecord::getKey)
.limit(deleteNum)
.collect(Collectors.toList());
List<WriteStatus> deleteStatus = client.delete(deleteRecords, newCommitTime);
client.commit(newCommitTime, deleteStatus);
}
/**
* 更新
*
* @param dataGen 数据生成器
* @param client client
* @param hoodieRecords records
* @return records
*/
public static List<HoodieRecord<HoodieAvroPayload>> upsert(HoodieExampleDataGenerator dataGen,
HoodieJavaWriteClient client,
List<HoodieRecord<HoodieAvroPayload>> hoodieRecords) {
String newCommitTime = client.startCommit();
logger.info("Starting Commit: " + newCommitTime);
List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = dataGen.generateUpdates(newCommitTime, 4);
hoodieRecords.addAll(toBeUpdated);
List<HoodieRecord<HoodieAvroPayload>> writeRecords = hoodieRecords
.stream()
.map(record -> new HoodieRecord<HoodieAvroPayload>(record))
.collect(Collectors.toList());
List<WriteStatus> upsert = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, upsert);
return hoodieRecords;
}
/**
* 插入
*
* @param dataGen 数据生成器
* @param client client
*/
public static List<HoodieRecord<HoodieAvroPayload>> insert(HoodieExampleDataGenerator dataGen,
HoodieJavaWriteClient client) {
// upsert
// 开启提交
String newCommitTime = client.startCommit();
logger.info("Starting Commit: " + newCommitTime);
// 生成数据
List<HoodieRecord<HoodieAvroPayload>> records = dataGen.generateInserts(newCommitTime, 10);
List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = new ArrayList<>(records);
List<HoodieRecord<HoodieAvroPayload>> writeRecords = hoodieRecords
.stream()
.map(record -> new HoodieRecord<HoodieAvroPayload>(record))
.collect(Collectors.toList());
// 获取upsertStatus
List<WriteStatus> upsertStatus = client.upsert(writeRecords, newCommitTime);
// 写入commit文件
client.commit(newCommitTime, upsertStatus);
return hoodieRecords;
}
}
使用spark读取hudi数据
+-------------------+--------------------+------------------------------------+----------------------+-------------------------------------------------------------------+---+------------------------------------+--------------------+---------------------+-------------------+------------------+-------------------+-------------------+------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key |_hoodie_partition_path|_hoodie_file_name |ts |uuid |rider |driver |begin_lat |begin_lon |end_lat |end_lon |fare |
+-------------------+--------------------+------------------------------------+----------------------+-------------------------------------------------------------------+---+------------------------------------+--------------------+---------------------+-------------------+------------------+-------------------+-------------------+------------------+
|20210201150109 |20210201150109_0_20 |d8d2eda9-47c6-4da7-84ad-fd012364ddb1|2020/01/03 |0eec6b6e-cff0-4eec-af70-47702394c031-0_0-0-0_20210201150109.parquet|0 |d8d2eda9-47c6-4da7-84ad-fd012364ddb1|rider-20210201150107|driver-20210201150107|0.33922164839486424|0.909372837469859 |0.9017656600243008 |0.8236411667430927 |2.0856583634078385|
|20210201150109 |20210201150109_0_12 |149c2df6-32b9-4114-aeef-c51802428e8b|2020/01/02 |8ef23f07-0ac9-4a9e-b9a8-57ab11e122e7-0_0-0-0_20210201150109.parquet|0 |149c2df6-32b9-4114-aeef-c51802428e8b|rider-20210201150107|driver-20210201150107|0.6662084366450246 |0.9065078444936647|0.7124299678100179 |0.05336723040266267|38.63372961020515 |
|20210201150109 |20210201150109_0_13 |746ec9d7-001d-434c-8b0f-538dd85efb42|2020/01/02 |8ef23f07-0ac9-4a9e-b9a8-57ab11e122e7-0_0-0-0_20210201150109.parquet|0 |746ec9d7-001d-434c-8b0f-538dd85efb42|rider-20210201150107|driver-20210201150107|0.4106290929046368 |0.964603455586492 |0.13957566957654388|0.45400191464227213|81.37564420028626 |
+-------------------+--------------------+------------------------------------+----------------------+-------------------------------------------------------------------+---+------------------------------------+--------------------+---------------------+-------------------+------------------+-------------------+-------------------+------------------+