Hudi Java Client 测试

Hudi 0.7.0

Hudi Jave Client 测试

<dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-java-client</artifactId>
    <version>0.7.0</version>
</dependency>

将hudi 0.7 版本编译好的 hudi-example-0.7.0.jar 放入项目lib中

代码

package com.hjl.hudi;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.*;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
import org.apache.hudi.index.HoodieIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @Description Hudi Java Client 测试
 * @Author jiale.he
 * @Date 2021-02-01 11:03 周一
 */
public class HudiClientTest {

    private static final Logger logger = LoggerFactory.getLogger(HudiClientTest.class);

    public static void main(String[] args) throws Exception {

        // String tablePath = "hdfs://localhost:8020/spark_hudi/huditable";
        String tablePath = "/Users/jiale.he/IdeaProjects/hudi-learn/src/main/resources/huditable";
        String tableName = "huditable";

        // 测试数据器
        HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();

        Configuration hadoopConf = new Configuration();

        // 初始化表
        Path path = new Path(tablePath);
        FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
        if (!fs.exists(path)) {
            // 检查路径是否存在
            // 初始化hudi table 创建hudi表的tablepath,写入初始化元数据信息
            HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.COPY_ON_WRITE, tableName, HoodieAvroPayload.class.getName());
        }

        // 创建write client conf
        HoodieWriteConfig hudiWriteConf = HoodieWriteConfig.newBuilder()
                // 数据schema
                .withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA)
                // 数据插入更新并行度
                .withParallelism(2, 2)
                // 数据删除并行度
                .withDeleteParallelism(2)
                // hudi表索引类型,内存
                .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
                // 合并
                .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build())
                .withPath(tablePath)
                .forTable(tableName)
                .build();

        // 获得hudi write client
        HoodieJavaWriteClient<HoodieAvroPayload> client = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), hudiWriteConf);

        // 插入
        List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = insert(dataGen, client);
        // 更新
        upsert(dataGen, client, hoodieRecords);
        // 删除
        delete(dataGen, client, hoodieRecords);

        client.close();
    }

    /**
     * 删除
     *
     * @param dataGen       数据生成器
     * @param client        client
     * @param hoodieRecords records
     */
    public static void delete(HoodieExampleDataGenerator dataGen, 
                              HoodieJavaWriteClient client, 
                              List<HoodieRecord<HoodieAvroPayload>> hoodieRecords) {
        String newCommitTime = client.startCommit();
        logger.info("Starting Commit: " + newCommitTime);
        int deleteNum = hoodieRecords.size() / 2;
        List<HoodieKey> deleteRecords = hoodieRecords
                .stream()
                .map(HoodieRecord::getKey)
                .limit(deleteNum)
                .collect(Collectors.toList());
        List<WriteStatus> deleteStatus = client.delete(deleteRecords, newCommitTime);
        client.commit(newCommitTime, deleteStatus);
    }

    /**
     * 更新
     *
     * @param dataGen       数据生成器
     * @param client        client
     * @param hoodieRecords records
     * @return records
     */
    public static List<HoodieRecord<HoodieAvroPayload>> upsert(HoodieExampleDataGenerator dataGen, 
                                                               HoodieJavaWriteClient client, 
                                                               List<HoodieRecord<HoodieAvroPayload>> hoodieRecords) {
        String newCommitTime = client.startCommit();
        logger.info("Starting Commit: " + newCommitTime);
        List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = dataGen.generateUpdates(newCommitTime, 4);
        hoodieRecords.addAll(toBeUpdated);
        List<HoodieRecord<HoodieAvroPayload>> writeRecords = hoodieRecords
                .stream()
                .map(record -> new HoodieRecord<HoodieAvroPayload>(record))
                .collect(Collectors.toList());
        List<WriteStatus> upsert = client.upsert(writeRecords, newCommitTime);
        client.commit(newCommitTime, upsert);
        return hoodieRecords;
    }

    /**
     * 插入
     *
     * @param dataGen 数据生成器
     * @param client  client
     */
    public static List<HoodieRecord<HoodieAvroPayload>> insert(HoodieExampleDataGenerator dataGen, 
                                                               HoodieJavaWriteClient client) {
        // upsert
        // 开启提交
        String newCommitTime = client.startCommit();
        logger.info("Starting Commit: " + newCommitTime);

        // 生成数据
        List<HoodieRecord<HoodieAvroPayload>> records = dataGen.generateInserts(newCommitTime, 10);
        List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = new ArrayList<>(records);
        List<HoodieRecord<HoodieAvroPayload>> writeRecords = hoodieRecords
                .stream()
                .map(record -> new HoodieRecord<HoodieAvroPayload>(record))
                .collect(Collectors.toList());
        // 获取upsertStatus
        List<WriteStatus> upsertStatus = client.upsert(writeRecords, newCommitTime);
        // 写入commit文件
        client.commit(newCommitTime, upsertStatus);

        return hoodieRecords;
    }


}

Hudi Java Client 测试

使用spark读取hudi数据

+-------------------+--------------------+------------------------------------+----------------------+-------------------------------------------------------------------+---+------------------------------------+--------------------+---------------------+-------------------+------------------+-------------------+-------------------+------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key                  |_hoodie_partition_path|_hoodie_file_name                                                  |ts |uuid                                |rider               |driver               |begin_lat          |begin_lon         |end_lat            |end_lon            |fare              |
+-------------------+--------------------+------------------------------------+----------------------+-------------------------------------------------------------------+---+------------------------------------+--------------------+---------------------+-------------------+------------------+-------------------+-------------------+------------------+
|20210201150109     |20210201150109_0_20 |d8d2eda9-47c6-4da7-84ad-fd012364ddb1|2020/01/03            |0eec6b6e-cff0-4eec-af70-47702394c031-0_0-0-0_20210201150109.parquet|0  |d8d2eda9-47c6-4da7-84ad-fd012364ddb1|rider-20210201150107|driver-20210201150107|0.33922164839486424|0.909372837469859 |0.9017656600243008 |0.8236411667430927 |2.0856583634078385|
|20210201150109     |20210201150109_0_12 |149c2df6-32b9-4114-aeef-c51802428e8b|2020/01/02            |8ef23f07-0ac9-4a9e-b9a8-57ab11e122e7-0_0-0-0_20210201150109.parquet|0  |149c2df6-32b9-4114-aeef-c51802428e8b|rider-20210201150107|driver-20210201150107|0.6662084366450246 |0.9065078444936647|0.7124299678100179 |0.05336723040266267|38.63372961020515 |
|20210201150109     |20210201150109_0_13 |746ec9d7-001d-434c-8b0f-538dd85efb42|2020/01/02            |8ef23f07-0ac9-4a9e-b9a8-57ab11e122e7-0_0-0-0_20210201150109.parquet|0  |746ec9d7-001d-434c-8b0f-538dd85efb42|rider-20210201150107|driver-20210201150107|0.4106290929046368 |0.964603455586492 |0.13957566957654388|0.45400191464227213|81.37564420028626 |
+-------------------+--------------------+------------------------------------+----------------------+-------------------------------------------------------------------+---+------------------------------------+--------------------+---------------------+-------------------+------------------+-------------------+-------------------+------------------+
上一篇:Python模拟进程状态


下一篇:干货!Apache Hudi如何智能处理小文件问题