Flink 三种Sink redis,es和jdbc

一、redis sink

对应jar包

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

将文件内容写入到hash中

代码:

object RedisSinkTest {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")

    val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    })

    //redis sink
    val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").setPort(6379).build()
    dataStream.addSink(new RedisSink(config,new MyRedisMapper))

    env.execute("redis sink test")
  }

}

class MyRedisMapper extends RedisMapper[SensorReading]{

  //命令为hset,键为sensor_temperature
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
  }

  //field为传感器id
  override def getKeyFromData(t: SensorReading): String = t.id

  //value为温度
  override def getValueFromData(t: SensorReading): String = t.temperature.toString
}

redis查看结果

127.0.0.1:6379> hgetall sensor_temperature
1) "sensor_1"
2) "35.80018327300259"
3) "sensor_6"
4) "15.402984393403084"
5) "sensor_10"
6) "38.101067604893444"
7) "sensor_7"
8) "6.720945201171228"

  

二、es sink

对应jar包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

将文件内容写入到es中

代码:

object EsSinkTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")

    val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
      val arr = d.split(",")
      SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
    })

    //es sink
    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("hadoop101",9200))
    //创建一个es sink的builder
    val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts, new ElasticsearchSinkFunction[SensorReading] {
      override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        println("保存数据:" + t)
        //包装成map
        val map = new util.HashMap[String, String]()
        map.put("sensor_id", t.id)
        map.put("temperature", t.temperature.toString)
        map.put("ts", t.timestamp.toString)

        //创建index request,准备发送数据
        val indexRequest: IndexRequest = Requests.indexRequest().index("sensor").`type`("redingdata").source(map)

        //利用requestIndexer发送请求,写入数据
        requestIndexer.add(indexRequest)

        println("保存成功")
      }
    })
    esSinkBuilder

    dataStream.addSink(esSinkBuilder.build())

    env.execute("redis sink test")
  }

}

es中查看结果

{
  "took" : 148,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 6,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "QXpZnnEBUwLRQchmepbT",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_6",
          "temperature" : "15.402984393403084",
          "ts" : "1547718201"
        }
      },
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "RnpZnnEBUwLRQchme5ZS",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_7",
          "temperature" : "6.720945201171228",
          "ts" : "1547718202"
        }
      },
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "Q3pZnnEBUwLRQchmepbr",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_1",
          "temperature" : "35.80018327300259",
          "ts" : "1547718199"
        }
      },
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "QnpZnnEBUwLRQchmepbo",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_1",
          "temperature" : "30.8",
          "ts" : "1547718200"
        }
      },
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "RHpZnnEBUwLRQchmepbs",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_1",
          "temperature" : "40.8",
          "ts" : "1547718201"
        }
      },
      {
        "_index" : "sensor",
        "_type" : "redingdata",
        "_id" : "RXpZnnEBUwLRQchmepbu",
        "_score" : 1.0,
        "_source" : {
          "sensor_id" : "sensor_10",
          "temperature" : "38.101067604893444",
          "ts" : "1547718205"
        }
      }
    ]
  }
}

 

Flink 三种Sink redis,es和jdbc

上一篇:[SQL]1211


下一篇:136-如何访问redis数据库