一、redis sink
对应jar包
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
将文件内容写入到hash中
代码:
object RedisSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt") val dataStream: DataStream[SensorReading] = streamFromFile.map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) //redis sink val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").setPort(6379).build() dataStream.addSink(new RedisSink(config,new MyRedisMapper)) env.execute("redis sink test") } } class MyRedisMapper extends RedisMapper[SensorReading]{ //命令为hset,键为sensor_temperature override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature") } //field为传感器id override def getKeyFromData(t: SensorReading): String = t.id //value为温度 override def getValueFromData(t: SensorReading): String = t.temperature.toString }
redis查看结果
127.0.0.1:6379> hgetall sensor_temperature 1) "sensor_1" 2) "35.80018327300259" 3) "sensor_6" 4) "15.402984393403084" 5) "sensor_10" 6) "38.101067604893444" 7) "sensor_7" 8) "6.720945201171228"
二、es sink
对应jar包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.7.2</version> </dependency>
将文件内容写入到es中
代码:
object EsSinkTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt") val dataStream: DataStream[SensorReading] = streamFromFile.map(d => { val arr = d.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble) }) //es sink val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("hadoop101",9200)) //创建一个es sink的builder val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts, new ElasticsearchSinkFunction[SensorReading] { override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { println("保存数据:" + t) //包装成map val map = new util.HashMap[String, String]() map.put("sensor_id", t.id) map.put("temperature", t.temperature.toString) map.put("ts", t.timestamp.toString) //创建index request,准备发送数据 val indexRequest: IndexRequest = Requests.indexRequest().index("sensor").`type`("redingdata").source(map) //利用requestIndexer发送请求,写入数据 requestIndexer.add(indexRequest) println("保存成功") } }) esSinkBuilder dataStream.addSink(esSinkBuilder.build()) env.execute("redis sink test") } }
es中查看结果
{ "took" : 148, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 6, "max_score" : 1.0, "hits" : [ { "_index" : "sensor", "_type" : "redingdata", "_id" : "QXpZnnEBUwLRQchmepbT", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_6", "temperature" : "15.402984393403084", "ts" : "1547718201" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "RnpZnnEBUwLRQchme5ZS", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_7", "temperature" : "6.720945201171228", "ts" : "1547718202" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "Q3pZnnEBUwLRQchmepbr", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_1", "temperature" : "35.80018327300259", "ts" : "1547718199" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "QnpZnnEBUwLRQchmepbo", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_1", "temperature" : "30.8", "ts" : "1547718200" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "RHpZnnEBUwLRQchmepbs", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_1", "temperature" : "40.8", "ts" : "1547718201" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "RXpZnnEBUwLRQchmepbu", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_10", "temperature" : "38.101067604893444", "ts" : "1547718205" } } ] } }