flink写入es
介绍
主要介绍实际中flink如何写入设置es
flink版本:1.13.2
github地址:https://github.com/dahai1996/mdw-flink-quickstart
写入es
引入依赖
<!--es-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
<!-- <exclusion>-->
<!-- <groupId>com.fasterxml.jackson.core</groupId>-->
<!-- <artifactId>jackson-core</artifactId>-->
<!-- </exclusion>-->
</exclusions>
</dependency>
注:排除日志的包防止冲突打不出日志
正常使用
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost(ip, host, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ESSink()
);
/* 必须设置flush参数 */
//刷新前缓冲的最大动作量
esSinkBuilder.setBulkFlushMaxActions(10);
//刷新前缓冲区的最大数据大小(以MB为单位)
esSinkBuilder.setBulkFlushMaxSizeMb(5);
//论缓冲操作的数量或大小如何都要刷新的时间间隔
esSinkBuilder.setBulkFlushInterval(5000L);
//数据流添加sink
dataStream.addSink(esSinkBuilder.build());
注:其中ESSink()方法是如何写入es的具体实现,大概如下:
public static class ESSink implements ElasticsearchSinkFunction<String>
写一个类包装下,方便后面快速创建
public class SinkEs<T> {
public List<HttpHost> httpHosts = new ArrayList<>(1);
public ElasticsearchSink.Builder<T> esSinkBuilder;
/**
* 获取es sinkFunction
* @param runEnv 包含执行环境地址的枚举类
* @param elasticsearchSinkFunction es转化单条数据的逻辑方法
*/
public SinkEs(RunEnv runEnv, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
httpHosts.add(new HttpHost(runEnv.getEsHost(), runEnv.getEsPort(), "http"));
esSinkBuilder = new ElasticsearchSink.Builder<T>(
httpHosts,
elasticsearchSinkFunction
);
esSinkBuilder.setBulkFlushMaxActions(1);
esSinkBuilder.setBulkFlushMaxSizeMb(1);
esSinkBuilder.setBulkFlushInterval(5000L);
esSinkBuilder.setRestClientFactory(new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) {
return Duration.ofMinutes(5).toMillis();
}
});
return httpAsyncClientBuilder;
}
});
}
});
}
/**
* 获取es sinkFunction
* @param runEnv 包含执行环境地址的枚举类
* @param elasticsearchSinkFunction elasticsearchSinkFunction es转化单条数据的逻辑方法
* @param bulkFlushMaxActions 刷新前缓冲的最大动作量
* @param bulkFlushMaxSizeMb 刷新前缓冲区的最大数据大小(以MB为单位)
* @param bulkFlushInterval 论缓冲操作的数量或大小如何都要刷新的时间间隔
*/
public SinkEs(RunEnv runEnv, ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
int bulkFlushMaxActions, int bulkFlushMaxSizeMb, Long bulkFlushInterval) {
httpHosts.add(new HttpHost(runEnv.getEsHost(), runEnv.getEsPort(), "http"));
esSinkBuilder = new ElasticsearchSink.Builder<T>(
httpHosts,
elasticsearchSinkFunction
);
esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
esSinkBuilder.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);
esSinkBuilder.setBulkFlushInterval(bulkFlushInterval);
}
public ElasticsearchSink<T> getSink() {
return esSinkBuilder.build();
}
}
之后可以快速创建es sink了:
SinkFunction<String> sinkEs = new SinkEs<>(
uat,
new ElasticsearchSinkFunction<String>() {
@Override
public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
}
},
1,
5,
5000L)
.getSink();
注:这里的uat变量是包含了各个环境地址的枚举类,详情查看github代码
注2:其中设置 setRestClientFactory() 方法的相关代码功能如下:
es客户端会创建一个存活时间无限的长连接,后续以使用这个长连接发送请求到服务器
如果长连接死亡,后续还是会使用这个长连接,就会报错。
因此上面设置了长连接存活时间
具体哪个博客看的遗忘了