flink写入clickhouse之分布式表写入

flink写入clickhouse之分布式表写入.md

简介

之前基于clickhouse的官方jdbc包编写了sink,用于写入单表,见:https://www.cnblogs.com/sqhhh/p/15897275.html

clickhouse分布式表的写入,目前有2种方法:

  • 1.对着逻辑表写入:此方法可以当作是单表,利用单表写入的sink写入数据
  • 2.对着各个节点的物理表写入:网络上都推荐这种写入方式。具体见百度。

关于实现2的方案是:基于单表写入的方式,建立写入各个物理表的sink,然后统筹起来做成一个sink,再用轮询或者hash等方式来分流数据到各个单表sink种。

实现思路

观察原jdbc实现逻辑

1.原jdbc底层的使用是

JdbcSink.sink();

这是一个构造方法,本质上是实现了GenericJdbcSinkFunction的实例构造,代码如下:

/** A generic SinkFunction for JDBC. */
@Internal
public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
        implements CheckpointedFunction {
    private final AbstractJdbcOutputFormat<T> outputFormat;

    public GenericJdbcSinkFunction(@Nonnull AbstractJdbcOutputFormat<T> outputFormat) {
        this.outputFormat = Preconditions.checkNotNull(outputFormat);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RuntimeContext ctx = getRuntimeContext();
        outputFormat.setRuntimeContext(ctx);
        outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
    }

    @Override
    public void invoke(T value, Context context) throws IOException {
        outputFormat.writeRecord(value);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) {}

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        outputFormat.flush();
    }

    @Override
    public void close() {
        outputFormat.close();
    }
}

注:其中进行 checkpoint 时会调用 snapshotState(),这正是之前提到的利用checkpoint来进行提交的实现。

我们对源码进行如下改造:

public class ClickHouseSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction {
    private final List<AbstractJdbcOutputFormat<T>> outputFormatList;
    private final ShuntValue<T> shuntValueImp;
    private final int size;

    public ClickHouseSinkFunction(List<AbstractJdbcOutputFormat<T>> outputFormatList, ShuntValue<T> shuntValueImp) {
        this.outputFormatList = outputFormatList;
        this.shuntValueImp = shuntValueImp;
        this.size = outputFormatList.size();
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RuntimeContext ctx = getRuntimeContext();
        for (AbstractJdbcOutputFormat<T> outputFormat : outputFormatList) {
            outputFormat.setRuntimeContext(ctx);
            outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
        }
    }

    @Override
    public void invoke(T value, Context context) throws Exception {
        int x = Math.abs(shuntValueImp.shunt(value) % size);
        outputFormatList.get(x).writeRecord(value);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        for (AbstractJdbcOutputFormat<T> outputFormat : outputFormatList) {
            outputFormat.flush();
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

    }
}

将单个AbstractJdbcOutputFormat拓展成了一个list,并利用int x = Math.abs(shuntValueImp.shunt(value) % size) 来进行分流。
其中,shuntValueImp是个接口,实现如下:

public interface ShuntValue<T> extends Serializable {
    /**
     * 如何划分record的
     * @param value 单条记录
     * @return 通过该记录返回一个int,这个数值将会对集群节点数取模,用于分流到不同节点
     */
    int shunt(T value);
}

这个接口的作用是如何从单条数据中提取一个int数值,用于做分流用。

包装下常用设置

public class ClickHouseSinkBuilder<T> {
    private final String sql;
    private final JdbcStatementBuilder<T> statementBuilder;
    private JdbcExecutionOptions executionOptions = JdbcExecutionOptions.defaults();
    private List<String> clickHouseHosts;
    private String clickHousePort="8123";
    private String clickHouseUser="default";
    private String clickHousePassword="default";
    private String clickHouseDatabase="default";
    private ShuntValue<T> shuntValue=Object::hashCode;


    private SinkFunction<T> buildAll(String sql,
                                     JdbcStatementBuilder<T> statementBuilder,
                                     JdbcExecutionOptions executionOptions,
                                     List<String> clickHouseHosts,
                                     String clickHousePort,
                                     String clickHouseUser,
                                     String clickHousePassword,
                                     String clickHouseDatabase,
                                     ShuntValue<T> shuntValue) {


        List<JdbcConnectionOptions> connectionOptionsList = new ArrayList<>(5);
        for (String clickHouseHost : clickHouseHosts) {
            JdbcConnectionOptions build = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:clickhouse://" + clickHouseHost + ":" + clickHousePort + "/" + clickHouseDatabase)
                    .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                    .withUsername(clickHouseUser)
                    .withPassword(clickHousePassword)
                    .build();
            connectionOptionsList.add(build);
        }

        List<AbstractJdbcOutputFormat<T>> outputFormatList = new ArrayList<>(5);
        for (JdbcConnectionOptions singleConnectionOption : connectionOptionsList) {
            AbstractJdbcOutputFormat<T> format = new JdbcBatchingOutputFormat<>(
                    new SimpleJdbcConnectionProvider(singleConnectionOption),
                    executionOptions,
                    context -> {
                        Preconditions.checkState(
                                !context.getExecutionConfig().isObjectReuseEnabled(),
                                "objects can not be reused with JDBC sink function");
                        return JdbcBatchStatementExecutor.simple(
                                sql, statementBuilder, Function.identity());
                    },
                    JdbcBatchingOutputFormat.RecordExtractor.identity());
            outputFormatList.add(format);
        }

        return new ClickHouseSinkFunction<T>(outputFormatList, shuntValue);
    }

    public static <T> ClickHouseSinkBuilder<T> builder(String sql, JdbcStatementBuilder<T> statementBuilder, List<String> clickHouseHosts) {
        return new ClickHouseSinkBuilder<T>(sql, statementBuilder, clickHouseHosts);
    }

    public ClickHouseSinkBuilder(String sql, JdbcStatementBuilder<T> statementBuilder, List<String> clickHouseHosts) {
        this.sql = sql;
        this.statementBuilder = statementBuilder;
        this.clickHouseHosts = clickHouseHosts;
    }

    public ClickHouseSinkBuilder<T> setExecutionOptions(JdbcExecutionOptions executionOptions) {
        this.executionOptions = executionOptions;
        return this;
    }

    public ClickHouseSinkBuilder<T> setClickHouseHosts(List<String> clickHouseHosts) {
        this.clickHouseHosts = clickHouseHosts;
        return this;
    }

    public ClickHouseSinkBuilder<T> setClickHousePort(String clickHousePort) {
        this.clickHousePort = clickHousePort;
        return this;
    }

    public ClickHouseSinkBuilder<T> setClickHouseUser(String clickHouseUser) {
        this.clickHouseUser = clickHouseUser;
        return this;
    }

    public ClickHouseSinkBuilder<T> setClickHousePassword(String clickHousePassword) {
        this.clickHousePassword = clickHousePassword;
        return this;
    }

    public ClickHouseSinkBuilder<T> setClickHouseDatabase(String clickHouseDatabase) {
        this.clickHouseDatabase = clickHouseDatabase;
        return this;
    }

    public ClickHouseSinkBuilder<T> setShuntValue(ShuntValue<T> shuntValue) {
        this.shuntValue = shuntValue;
        return this;
    }

    public SinkFunction<T> build() {
        return buildAll(sql, statementBuilder, executionOptions, clickHouseHosts, clickHousePort, clickHouseUser, clickHousePassword, clickHouseDatabase, shuntValue);
    }

}

使用

ArrayList<String> hosts = new ArrayList<String>(3);
        hosts.add("host1");
        hosts.add("host2");
SinkFunction<DwdOrderBean> sinkClusterClickHouse = ClickHouseSinkBuilder.builder(
                "insert into tableName (id,name) values (?,?)",
                new JdbcStatementBuilder<DwdOrderBean>() {
                    @Override
                    public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
                        Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
                        try {
                            SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        }
                    }
                },
                hosts)
                .setShuntValue(value -> value.getTenant_code().hashCode())
                .setClickHouseHosts(hosts)
                .setClickHousePassword("pas")
                .setClickHousePort("222")
                .setClickHouseUser("user")
                .setClickHouseDatabase("default")
                .setExecutionOptions(new JdbcExecutionOptions.Builder()
                        .withBatchIntervalMs(5000L)
                        .withBatchSize(50000)
                        .withMaxRetries(3)
                        .build())
                .build();
上一篇:python之bytes和string(转)


下一篇:Java 数组阻塞队列 ArrayBlockingQueue