flink写入clickhouse之分布式表写入.md
简介
之前基于clickhouse的官方jdbc包编写了sink,用于写入单表,见:https://www.cnblogs.com/sqhhh/p/15897275.html
clickhouse分布式表的写入,目前有2种方法:
- 1.对着逻辑表写入:此方法可以当作是单表,利用单表写入的sink写入数据
- 2.对着各个节点的物理表写入:网络上都推荐这种写入方式。具体见百度。
关于实现2的方案是:基于单表写入的方式,建立写入各个物理表的sink,然后统筹起来做成一个sink,再用轮询或者hash等方式来分流数据到各个单表sink种。
实现思路
观察原jdbc实现逻辑
1.原jdbc底层的使用是
JdbcSink.sink();
这是一个构造方法,本质上是实现了GenericJdbcSinkFunction的实例构造,代码如下:
/** A generic SinkFunction for JDBC. */
@Internal
public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
implements CheckpointedFunction {
private final AbstractJdbcOutputFormat<T> outputFormat;
public GenericJdbcSinkFunction(@Nonnull AbstractJdbcOutputFormat<T> outputFormat) {
this.outputFormat = Preconditions.checkNotNull(outputFormat);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
@Override
public void invoke(T value, Context context) throws IOException {
outputFormat.writeRecord(value);
}
@Override
public void initializeState(FunctionInitializationContext context) {}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
outputFormat.flush();
}
@Override
public void close() {
outputFormat.close();
}
}
注:其中进行 checkpoint 时会调用 snapshotState(),这正是之前提到的利用checkpoint来进行提交的实现。
我们对源码进行如下改造:
public class ClickHouseSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction {
private final List<AbstractJdbcOutputFormat<T>> outputFormatList;
private final ShuntValue<T> shuntValueImp;
private final int size;
public ClickHouseSinkFunction(List<AbstractJdbcOutputFormat<T>> outputFormatList, ShuntValue<T> shuntValueImp) {
this.outputFormatList = outputFormatList;
this.shuntValueImp = shuntValueImp;
this.size = outputFormatList.size();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
for (AbstractJdbcOutputFormat<T> outputFormat : outputFormatList) {
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
}
@Override
public void invoke(T value, Context context) throws Exception {
int x = Math.abs(shuntValueImp.shunt(value) % size);
outputFormatList.get(x).writeRecord(value);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
for (AbstractJdbcOutputFormat<T> outputFormat : outputFormatList) {
outputFormat.flush();
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}
}
将单个AbstractJdbcOutputFormat拓展成了一个list,并利用int x = Math.abs(shuntValueImp.shunt(value) % size) 来进行分流。
其中,shuntValueImp是个接口,实现如下:
public interface ShuntValue<T> extends Serializable {
/**
* 如何划分record的
* @param value 单条记录
* @return 通过该记录返回一个int,这个数值将会对集群节点数取模,用于分流到不同节点
*/
int shunt(T value);
}
这个接口的作用是如何从单条数据中提取一个int数值,用于做分流用。
包装下常用设置
public class ClickHouseSinkBuilder<T> {
private final String sql;
private final JdbcStatementBuilder<T> statementBuilder;
private JdbcExecutionOptions executionOptions = JdbcExecutionOptions.defaults();
private List<String> clickHouseHosts;
private String clickHousePort="8123";
private String clickHouseUser="default";
private String clickHousePassword="default";
private String clickHouseDatabase="default";
private ShuntValue<T> shuntValue=Object::hashCode;
private SinkFunction<T> buildAll(String sql,
JdbcStatementBuilder<T> statementBuilder,
JdbcExecutionOptions executionOptions,
List<String> clickHouseHosts,
String clickHousePort,
String clickHouseUser,
String clickHousePassword,
String clickHouseDatabase,
ShuntValue<T> shuntValue) {
List<JdbcConnectionOptions> connectionOptionsList = new ArrayList<>(5);
for (String clickHouseHost : clickHouseHosts) {
JdbcConnectionOptions build = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://" + clickHouseHost + ":" + clickHousePort + "/" + clickHouseDatabase)
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername(clickHouseUser)
.withPassword(clickHousePassword)
.build();
connectionOptionsList.add(build);
}
List<AbstractJdbcOutputFormat<T>> outputFormatList = new ArrayList<>(5);
for (JdbcConnectionOptions singleConnectionOption : connectionOptionsList) {
AbstractJdbcOutputFormat<T> format = new JdbcBatchingOutputFormat<>(
new SimpleJdbcConnectionProvider(singleConnectionOption),
executionOptions,
context -> {
Preconditions.checkState(
!context.getExecutionConfig().isObjectReuseEnabled(),
"objects can not be reused with JDBC sink function");
return JdbcBatchStatementExecutor.simple(
sql, statementBuilder, Function.identity());
},
JdbcBatchingOutputFormat.RecordExtractor.identity());
outputFormatList.add(format);
}
return new ClickHouseSinkFunction<T>(outputFormatList, shuntValue);
}
public static <T> ClickHouseSinkBuilder<T> builder(String sql, JdbcStatementBuilder<T> statementBuilder, List<String> clickHouseHosts) {
return new ClickHouseSinkBuilder<T>(sql, statementBuilder, clickHouseHosts);
}
public ClickHouseSinkBuilder(String sql, JdbcStatementBuilder<T> statementBuilder, List<String> clickHouseHosts) {
this.sql = sql;
this.statementBuilder = statementBuilder;
this.clickHouseHosts = clickHouseHosts;
}
public ClickHouseSinkBuilder<T> setExecutionOptions(JdbcExecutionOptions executionOptions) {
this.executionOptions = executionOptions;
return this;
}
public ClickHouseSinkBuilder<T> setClickHouseHosts(List<String> clickHouseHosts) {
this.clickHouseHosts = clickHouseHosts;
return this;
}
public ClickHouseSinkBuilder<T> setClickHousePort(String clickHousePort) {
this.clickHousePort = clickHousePort;
return this;
}
public ClickHouseSinkBuilder<T> setClickHouseUser(String clickHouseUser) {
this.clickHouseUser = clickHouseUser;
return this;
}
public ClickHouseSinkBuilder<T> setClickHousePassword(String clickHousePassword) {
this.clickHousePassword = clickHousePassword;
return this;
}
public ClickHouseSinkBuilder<T> setClickHouseDatabase(String clickHouseDatabase) {
this.clickHouseDatabase = clickHouseDatabase;
return this;
}
public ClickHouseSinkBuilder<T> setShuntValue(ShuntValue<T> shuntValue) {
this.shuntValue = shuntValue;
return this;
}
public SinkFunction<T> build() {
return buildAll(sql, statementBuilder, executionOptions, clickHouseHosts, clickHousePort, clickHouseUser, clickHousePassword, clickHouseDatabase, shuntValue);
}
}
使用
ArrayList<String> hosts = new ArrayList<String>(3);
hosts.add("host1");
hosts.add("host2");
SinkFunction<DwdOrderBean> sinkClusterClickHouse = ClickHouseSinkBuilder.builder(
"insert into tableName (id,name) values (?,?)",
new JdbcStatementBuilder<DwdOrderBean>() {
@Override
public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
try {
SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
},
hosts)
.setShuntValue(value -> value.getTenant_code().hashCode())
.setClickHouseHosts(hosts)
.setClickHousePassword("pas")
.setClickHousePort("222")
.setClickHouseUser("user")
.setClickHouseDatabase("default")
.setExecutionOptions(new JdbcExecutionOptions.Builder()
.withBatchIntervalMs(5000L)
.withBatchSize(50000)
.withMaxRetries(3)
.build())
.build();