flink写入es

flink写入es

介绍

主要介绍实际中flink如何写入设置es

flink版本:1.13.2

github地址:https://github.com/dahai1996/mdw-flink-quickstart


写入es

引入依赖

    <!--es-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.logging.log4j</groupId>
					<artifactId>log4j-to-slf4j</artifactId>
				</exclusion>
				<!--                <exclusion>-->
				<!--                    <groupId>com.fasterxml.jackson.core</groupId>-->
				<!--                    <artifactId>jackson-core</artifactId>-->
				<!--                </exclusion>-->
			</exclusions>
		</dependency>

注:排除日志的包防止冲突打不出日志

正常使用

List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost(ip, host, "http"));
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ESSink()
        );
        /*     必须设置flush参数     */
        //刷新前缓冲的最大动作量
        esSinkBuilder.setBulkFlushMaxActions(10);
        //刷新前缓冲区的最大数据大小(以MB为单位)
        esSinkBuilder.setBulkFlushMaxSizeMb(5);
        //论缓冲操作的数量或大小如何都要刷新的时间间隔
        esSinkBuilder.setBulkFlushInterval(5000L);

    //数据流添加sink
    dataStream.addSink(esSinkBuilder.build());

注:其中ESSink()方法是如何写入es的具体实现,大概如下:

public static class ESSink implements ElasticsearchSinkFunction<String> 

写一个类包装下,方便后面快速创建

public class SinkEs<T> {
    public List<HttpHost> httpHosts = new ArrayList<>(1);
    public ElasticsearchSink.Builder<T> esSinkBuilder;

    /**
     * 获取es sinkFunction
     * @param runEnv 包含执行环境地址的枚举类
     * @param elasticsearchSinkFunction es转化单条数据的逻辑方法
     */
    public SinkEs(RunEnv runEnv, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
        httpHosts.add(new HttpHost(runEnv.getEsHost(), runEnv.getEsPort(), "http"));
        esSinkBuilder = new ElasticsearchSink.Builder<T>(
                httpHosts,
                elasticsearchSinkFunction
        );
        esSinkBuilder.setBulkFlushMaxActions(1);
        esSinkBuilder.setBulkFlushMaxSizeMb(1);
        esSinkBuilder.setBulkFlushInterval(5000L);
        esSinkBuilder.setRestClientFactory(new RestClientFactory() {
            @Override
            public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
                restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                        httpAsyncClientBuilder.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
                            @Override
                            public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) {
                                return Duration.ofMinutes(5).toMillis();
                            }
                        });
                        return httpAsyncClientBuilder;
                    }
                });
            }
        });
    }

    /**
     * 获取es sinkFunction
     * @param runEnv 包含执行环境地址的枚举类
     * @param elasticsearchSinkFunction elasticsearchSinkFunction es转化单条数据的逻辑方法
     * @param bulkFlushMaxActions 刷新前缓冲的最大动作量
     * @param bulkFlushMaxSizeMb 刷新前缓冲区的最大数据大小(以MB为单位)
     * @param bulkFlushInterval 论缓冲操作的数量或大小如何都要刷新的时间间隔
     */
    public SinkEs(RunEnv runEnv, ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
                  int bulkFlushMaxActions, int bulkFlushMaxSizeMb, Long bulkFlushInterval) {
        httpHosts.add(new HttpHost(runEnv.getEsHost(), runEnv.getEsPort(), "http"));
        esSinkBuilder = new ElasticsearchSink.Builder<T>(
                httpHosts,
                elasticsearchSinkFunction
        );
        esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
        esSinkBuilder.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);
        esSinkBuilder.setBulkFlushInterval(bulkFlushInterval);
    }


    public ElasticsearchSink<T> getSink() {
        return esSinkBuilder.build();
    }
}

之后可以快速创建es sink了:

SinkFunction<String> sinkEs = new SinkEs<>(
                uat,
                new ElasticsearchSinkFunction<String>() {
                    @Override
                    public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {

                    }
                },
                1,
                5,
                5000L)
                .getSink();

注:这里的uat变量是包含了各个环境地址的枚举类,详情查看github代码

注2:其中设置 setRestClientFactory() 方法的相关代码功能如下:

es客户端会创建一个存活时间无限的长连接,后续以使用这个长连接发送请求到服务器
如果长连接死亡,后续还是会使用这个长连接,就会报错。
因此上面设置了长连接存活时间
具体哪个博客看的遗忘了

上一篇:【二叉树】二叉树的锯齿形层序遍历


下一篇:【ES学习系列】Elasticsearch环境安装