Flink实例(十二): connectors(十一)elasticsearch 写 入

1 工程目录

Flink实例(十二): connectors(十一)elasticsearch 写 入

 pom.xml

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>

2 flink 写入 hbase

package com.atguigu.flink.app

import java.util

import com.atguigu.flink.bean.SensorReading
import com.atguigu.flink.source.HbaseSource
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

object ESSinkApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //调用addSource以此来作为数据输入端
    val stream: scala.DataStream[SensorReading] = env.addSource(new HbaseSource)

    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
    val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
      httpHosts,
      new ElasticsearchSinkFunction[SensorReading]{
        override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          // 构造数据格式
          val hashMap = new util.HashMap[String, String]()
          hashMap.put("data", t.toString)

          // 创建请求
          val indexRequest = Requests
            .indexRequest()
            .index("sensor") // 索引是sensor,相当于数据库
            .`type`("readingData") // es6必须写这一行代码
            .source(hashMap)// 数据源

          // 提交数据
          requestIndexer.add(indexRequest)
        }

      }
    )

    // 设置每一批写入es多少数据
    esSinkBuilder.setBulkFlushMaxActions(1)

    stream.addSink(esSinkBuilder.build())

    // 打印流
    stream.print()

    // 执行主程序
    env.execute()

  }

}

 

上一篇:Flink实战(八) - Streaming Connectors 编程


下一篇:kafka connector