1.pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> </dependency>
2.详细代码
import java.util import java.util.Properties import com.google.gson.Gson import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory} import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.http.HttpHost import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} import org.apache.http.impl.client.BasicCredentialsProvider import org.apache.http.impl.nio.client.HttpAsyncClientBuilder import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback import org.elasticsearch.client.{Requests, RestClientBuilder} object FLink_Kafka_ES { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 非常关键,一定要设置启动检查点!! env.enableCheckpointing(1000) //设置kafka topic val topic: String = "test" //配置kafka参数 val props: Properties = new Properties props.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092") props.setProperty("group.id", "test01") props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") //导入隐式转换 import org.apache.flink.streaming.connectors.kafka._ import org.apache.flink.api.scala._ import scala.collection.JavaConverters._ val consumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), props) //设置最新的数据进行消费 consumer.setStartFromLatest() //构建数据源 val kafkaSource: DataStream[String] = env.addSource(consumer) //进行转换 val mapDS: DataStream[Map[String, AnyRef]] = kafkaSource.map(x => { //创建Gson解析对象, 把json转化成map (new Gson).fromJson(x, classOf[util.Map[String, AnyRef]]).asScala.toMap }) //配置ES节点信息 val httpHosts = new java.util.ArrayList[HttpHost] httpHosts.add(new HttpHost("10.11.159.106", 9204, "http")) //构建es sink val esSinkBuilder = new ElasticsearchSink.Builder[Map[String, AnyRef]]( httpHosts, new ElasticsearchSinkFunction[Map[String, AnyRef]] { override def process(t: Map[String, AnyRef], runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { val map: util.Map[String, AnyRef] = t.asJava val indexRequest: IndexRequest = Requests .indexRequest() .index("flink_kafka") //.`type`("kafka_data") //非必选项ES 7.x中不需要再设置文档 //.id(user_id) //设置文档id为插入数据的某个字段值 //.create(false) //是否自动创建索引,不推荐使用,最好提前在es中进行Mapping映射,当然如果你的时间字段能够被ES自动识别可以让它自动创建 //因为ES命名的问题,无法直接使用ES的命名 //如需使用 x.x 命名格式, 可以考虑嵌套map或者json //如使用嵌套map需注意把所有的 map 都需要转化成 java.util.map 否则会爆类型异常 .source(map) //发送请求,写入数据 requestIndexer.add(indexRequest) //写入数据成功输出一下 println("data saved successfully") } }) //设置es sink 的参数 esSinkBuilder.setRestClientFactory( new RestClientFactory { override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = { restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback { override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = { val provider: BasicCredentialsProvider = new BasicCredentialsProvider() //设置用户名和密码 val credentials: UsernamePasswordCredentials = new UsernamePasswordCredentials("elastic", "123456") //根据实际情况改变用户名和密码值,如果不需要用户名密码,字段可设为空字符串“” provider.setCredentials(AuthScope.ANY, credentials) httpClientBuilder.setDefaultCredentialsProvider(provider) } }) } }) //设置最大并行度,来一条请求处理一条 esSinkBuilder.setBulkFlushMaxActions(1) //进行重试的时间间隔。对于指数型则表示起始的基数 //esSink.setBulkFlushBackoffDelay(1) //失败重试的次数 esSink.setBulkFlushBackoffRetries(3) //重试策略,又可以分为以下两种类型 //a、指数型,表示多次重试之间的时间间隔按照指数方式进行增长。eg:2 -> 4 -> 8 ... //b、常数型,表示多次重试之间的时间间隔为固定常数。eg:2 -> 2 -> 2 ... esSink.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL) //设置批量提交时间间隔 //esSink.setBulkFlushInterval(100) //设置批量提交的最大字节 以MB为单位 //esSink.setBulkFlushMaxSizeMb(16) //es 容错处理 esSink.setFailureHandler( new ActionRequestFailureHandler { override def onFailure(actionRequest: ActionRequest, throwable: Throwable, i: Int, requestIndexer: RequestIndexer): Unit = { if (ExceptionUtils.findThrowable(throwable, classOf[EsRejectedExecutionException]).isPresent) { // ES队列满了,放回队列 requestIndexer.add(actionRequest) } else if (ExceptionUtils.findThrowable(throwable, classOf[SocketTimeoutException]).isPresent) { // ES超时异常,放回队列 requestIndexer.add(actionRequest) } else { // 其它异常,丢弃数据,记录日志 println(s"Sink to es exception ,exceptionData: "+actionRequest.toString+" exceptionStackTrace: " + org.apache.commons.lang.exception.ExceptionUtils.getFullStackTrace(throwable)) throw throwable } } } ) //设置最大并行度 mapDS.setMaxParallelism(1) //把数据sink到es mapDS.addSink(esSinkBuilder.build()) env.execute("Kafka_Flink") //生产数据命令如下 // $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic test //kafka中输入的测试数据 // {"id":1,"completed":false,"title":"delectus aut autem","userId":1} //查看索引 //Get _cat/indices //查看索引中的内容 //Get flink_kafka/_search //批量请求的配置;这将指示接收器在每个元素之后发出请求,否则将对它们进行缓冲。 } }参考原文链接:https://blog.csdn.net/hongchenshijie/article/details/109704636