flink写入clickhouse之单表写入

flink写入clickhouse之单表写入

简介

flink有一个标准的jdbc sink,提供批量,定时的提交方法。

参考flink文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/jdbc/

同时,如果设置了checkpoint,在做checkpoint时候会进行一次提交。

基于这点,我们可以将jdbc sink的提交时间和数量设置的很大(即一次checkpoint间隔内达不到的标准),然后通过checkpoint时候进行的提交,来达到精确一次的效果。

关于写clickhouse,我们采用官方的包,是基于https的,适用于批量提交。

clickhouse的表有单表和分布式表之分,我们先进行单表的写入,即对着一个节点写入。

写入clickhouse单表

引入依赖

    <!--   jdbc sink-->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-jdbc_2.11</artifactId>
		<version>${flink.version}</version>
	</dependency>
    <!-- jdbc  clickhouse -->
	<dependency>
		<groupId>ru.yandex.clickhouse</groupId>
		<artifactId>clickhouse-jdbc</artifactId>
		<version>0.3.1</version>
	</dependency>

jdbc sink一般使用方式

JdbcSink.sink(
                "insert into tableName (id,name) values (?,?)",
                new JdbcStatementBuilder<DwdOrderBean>() {
                    @Override
                    public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
                        Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
                        try {
                            SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        }
                    }
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUsername(runEnv.getClickHouseUser())
                        .withPassword(runEnv.getClickHousePassword())
                        .build()
        );
    /**
     * 用于设置clickhouse PreparedStatement的通用方法
     *
     * @param ps     PreparedStatement实例
     * @param fields 通过”实例对象.getClass().getDeclaredFields()“获得
     * @param bean   实例对象
     * @throws IllegalAccessException field.get抛出的错误
     * @throws SQLException           ps.set抛出的错误
     */
    public static void setPs(PreparedStatement ps, Field[] fields, Object bean) throws IllegalAccessException, SQLException {
        for (int i = 1; i <= fields.length; i++) {
            Field field = fields[i - 1];
            field.setAccessible(true);
            Object o = field.get(bean);
            if (o == null) {
                ps.setNull(i, 0);
                continue;
            }
            String fieldValue = o.toString();
            if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {
                ps.setObject(i, fieldValue);
            } else {
                ps.setNull(i, 0);
            }
        }
    }

注1:其中第一个参数是sql语句,格式必须严格按照例子中的格式,列举出列名,后面以 ? 填充。因为后面会调用 JdbcStatementBuilder 读取每条数据来对该sql进行补全。
注2:其中JdbcStatementBuilder的实现,DwdOrderBean是对应clickhouse表结构创建的实例对象,后续通过对实例对象的属性循环传递,来设置到PreparedStatement中,如果列数很少,可以手动填写。

包装下常用设置

public class SinkSingleClickHouse<T> {
    private final static String NA = "null";
    private final SinkFunction<T> sink;

    /**
     * 获取clickhouse sinkFunction
     *
     * @param sql                  插入语句,格式必须为  inert into table  a,b values (?,?)
     * @param jdbcStatementBuilder 如何用单条信息填充sql
     * @param runEnv               执行环境
     * @param database             表所在的数据库
     */
    public SinkSingleClickHouse(String sql, JdbcStatementBuilder<T> jdbcStatementBuilder,
                                RunEnv runEnv, String database) {
        sink = JdbcSink.sink(
                sql,
                jdbcStatementBuilder,
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUsername(runEnv.getClickHouseUser())
                        .withPassword(runEnv.getClickHousePassword())
                        .build()
        );
    }

    /**
     * 获取clickhouse sinkFunction
     *
     * @param sql                  插入语句,格式必须为  inert into table  a,b values (?,?)
     * @param jdbcStatementBuilder 如何用单条信息填充sql
     * @param runEnv               执行环境
     * @param database             表所在的数据库
     * @param batchIntervalMs      提交条件之:间隔
     * @param batchSize            提交条件之:数据量
     * @param maxRetries           提交重试次数
     */
    public SinkSingleClickHouse(String sql, JdbcStatementBuilder<T> jdbcStatementBuilder,
                                RunEnv runEnv, String database,
                                int batchIntervalMs, int batchSize, int maxRetries) {
        sink = JdbcSink.sink(
                sql,
                jdbcStatementBuilder,
                JdbcExecutionOptions.builder()
                        .withBatchIntervalMs(batchIntervalMs)
                        .withBatchSize(batchSize)
                        .withMaxRetries(maxRetries)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUsername(runEnv.getClickHouseUser())
                        .withPassword(runEnv.getClickHousePassword())
                        .build()
        );
    }

    public SinkFunction<T> getSink() {
        return sink;
    }

    /**
     * 用于设置clickhouse PreparedStatement的通用方法
     *
     * @param ps     PreparedStatement实例
     * @param fields 通过”实例对象.getClass().getDeclaredFields()“获得
     * @param bean   实例对象
     * @throws IllegalAccessException field.get抛出的错误
     * @throws SQLException           ps.set抛出的错误
     */
    public static void setPs(PreparedStatement ps, Field[] fields, Object bean) throws IllegalAccessException, SQLException {
        for (int i = 1; i <= fields.length; i++) {
            Field field = fields[i - 1];
            field.setAccessible(true);
            Object o = field.get(bean);
            if (o == null) {
                ps.setNull(i, 0);
                continue;
            }
            String fieldValue = o.toString();
            if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {
                ps.setObject(i, fieldValue);
            } else {
                ps.setNull(i, 0);
            }
        }
    }
}

注1:其中JdbcExecutionOptions,即使我们设置批量和定时的地方,如果不传,会有默认值。

最终使用

SinkFunction<DwdOrderBean> sinkClickhouse = new SinkSingleClickHouse<>("insert into tableName (id,name) values (?,?)",
                new JdbcStatementBuilder<DwdOrderBean>() {
                    @Override
                    public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
                        Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
                        try {
                            SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        }
                    }
                },
                uat,
                "dwd_cdp")
                .getSink();

最终如上,我们即可得到一个对着单表写入的sink

上一篇:cygwin1.dll window丢失


下一篇:递归