flink写入clickhouse之单表写入
简介
flink有一个标准的jdbc sink,提供批量,定时的提交方法。
参考flink文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/jdbc/
同时,如果设置了checkpoint,在做checkpoint时候会进行一次提交。
基于这点,我们可以将jdbc sink的提交时间和数量设置的很大(即一次checkpoint间隔内达不到的标准),然后通过checkpoint时候进行的提交,来达到精确一次的效果。
关于写clickhouse,我们采用官方的包,是基于https的,适用于批量提交。
clickhouse的表有单表和分布式表之分,我们先进行单表的写入,即对着一个节点写入。
写入clickhouse单表
引入依赖
<!-- jdbc sink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- jdbc clickhouse -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.1</version>
</dependency>
jdbc sink一般使用方式
JdbcSink.sink(
"insert into tableName (id,name) values (?,?)",
new JdbcStatementBuilder<DwdOrderBean>() {
@Override
public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
try {
SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername(runEnv.getClickHouseUser())
.withPassword(runEnv.getClickHousePassword())
.build()
);
/**
* 用于设置clickhouse PreparedStatement的通用方法
*
* @param ps PreparedStatement实例
* @param fields 通过”实例对象.getClass().getDeclaredFields()“获得
* @param bean 实例对象
* @throws IllegalAccessException field.get抛出的错误
* @throws SQLException ps.set抛出的错误
*/
public static void setPs(PreparedStatement ps, Field[] fields, Object bean) throws IllegalAccessException, SQLException {
for (int i = 1; i <= fields.length; i++) {
Field field = fields[i - 1];
field.setAccessible(true);
Object o = field.get(bean);
if (o == null) {
ps.setNull(i, 0);
continue;
}
String fieldValue = o.toString();
if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {
ps.setObject(i, fieldValue);
} else {
ps.setNull(i, 0);
}
}
}
注1:其中第一个参数是sql语句,格式必须严格按照例子中的格式,列举出列名,后面以 ? 填充。因为后面会调用 JdbcStatementBuilder 读取每条数据来对该sql进行补全。
注2:其中JdbcStatementBuilder的实现,DwdOrderBean是对应clickhouse表结构创建的实例对象,后续通过对实例对象的属性循环传递,来设置到PreparedStatement中,如果列数很少,可以手动填写。
包装下常用设置
public class SinkSingleClickHouse<T> {
private final static String NA = "null";
private final SinkFunction<T> sink;
/**
* 获取clickhouse sinkFunction
*
* @param sql 插入语句,格式必须为 inert into table a,b values (?,?)
* @param jdbcStatementBuilder 如何用单条信息填充sql
* @param runEnv 执行环境
* @param database 表所在的数据库
*/
public SinkSingleClickHouse(String sql, JdbcStatementBuilder<T> jdbcStatementBuilder,
RunEnv runEnv, String database) {
sink = JdbcSink.sink(
sql,
jdbcStatementBuilder,
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername(runEnv.getClickHouseUser())
.withPassword(runEnv.getClickHousePassword())
.build()
);
}
/**
* 获取clickhouse sinkFunction
*
* @param sql 插入语句,格式必须为 inert into table a,b values (?,?)
* @param jdbcStatementBuilder 如何用单条信息填充sql
* @param runEnv 执行环境
* @param database 表所在的数据库
* @param batchIntervalMs 提交条件之:间隔
* @param batchSize 提交条件之:数据量
* @param maxRetries 提交重试次数
*/
public SinkSingleClickHouse(String sql, JdbcStatementBuilder<T> jdbcStatementBuilder,
RunEnv runEnv, String database,
int batchIntervalMs, int batchSize, int maxRetries) {
sink = JdbcSink.sink(
sql,
jdbcStatementBuilder,
JdbcExecutionOptions.builder()
.withBatchIntervalMs(batchIntervalMs)
.withBatchSize(batchSize)
.withMaxRetries(maxRetries)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername(runEnv.getClickHouseUser())
.withPassword(runEnv.getClickHousePassword())
.build()
);
}
public SinkFunction<T> getSink() {
return sink;
}
/**
* 用于设置clickhouse PreparedStatement的通用方法
*
* @param ps PreparedStatement实例
* @param fields 通过”实例对象.getClass().getDeclaredFields()“获得
* @param bean 实例对象
* @throws IllegalAccessException field.get抛出的错误
* @throws SQLException ps.set抛出的错误
*/
public static void setPs(PreparedStatement ps, Field[] fields, Object bean) throws IllegalAccessException, SQLException {
for (int i = 1; i <= fields.length; i++) {
Field field = fields[i - 1];
field.setAccessible(true);
Object o = field.get(bean);
if (o == null) {
ps.setNull(i, 0);
continue;
}
String fieldValue = o.toString();
if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {
ps.setObject(i, fieldValue);
} else {
ps.setNull(i, 0);
}
}
}
}
注1:其中JdbcExecutionOptions,即使我们设置批量和定时的地方,如果不传,会有默认值。
最终使用
SinkFunction<DwdOrderBean> sinkClickhouse = new SinkSingleClickHouse<>("insert into tableName (id,name) values (?,?)",
new JdbcStatementBuilder<DwdOrderBean>() {
@Override
public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
try {
SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
},
uat,
"dwd_cdp")
.getSink();
最终如上,我们即可得到一个对着单表写入的sink