Redis-Sink
使用Flink内嵌 RedisSink
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency> package com.text.sink import java.net.InetSocketAddress import java.util import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisClusterConfig, FlinkJedisPoolConfig} import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object RedisSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("ke01", 8899) val result = stream.flatMap(_.split(" ")) .map((_, 1)).keyBy(0).sum(1) result.print().setParallelism(1); // redis是单机 val config = new FlinkJedisPoolConfig.Builder().setDatabase(3).setHost("192.168.75.91").setPort(6390).setPassword("aa123456").build() result.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] { override def getCommandDescription: RedisCommandDescription = { // 选择redis存储方式 new RedisCommandDescription(RedisCommand.HSET, "wc") } override def getKeyFromData(t: (String, Int)): String = { println(t._1) t._1 } override def getValueFromData(t: (String, Int)): String = { println(t._2) t._2.toString } })) env.execute() } }
Kafka-Sink
使用Fink内嵌:FlinkKafkaProducer
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.2</version> </dependency> package com.text.sink import java.lang import java.util.Properties import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema} import org.apache.kafka.clients.producer.ProducerRecord object KafkaSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("ke01", 8890) val result = stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1) val props = new Properties() props.setProperty("bootstrap.servers", "ke01:9092,ke02:9092,ke03:9092") // 因为下面序列化了,所以这里没必要序列化 result.addSink(new FlinkKafkaProducer[(String, Int)]("wc", new KafkaSerializationSchema[(String, Int)] { override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord("wc", element._1.getBytes(), (element._2.toString).getBytes()) } }, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)) env.execute() } }
Mysql-Sink
1.Fink不支持mysql内嵌,所以自定义:RichSinkFunction
2. MySQL需要幂等性,就是没有数据的时候可正常插入,有数据的时候正常累加
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency> package com.text.sink import java.sql.{Connection, DriverManager, PreparedStatement} import java.util.Properties import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer case class CarInfo(monitorId: String, carId: String, eventTime: String, speed: Long) object MysqlSinkDemo { def main(args: Array[String]): Unit = { //需要保证幂等性,幂等性:就是多次写入,如何保证数据准确 val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers", "ke04:9092,ke02:9092,ke03:9092") props.setProperty("group.id", "group_id") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { //什么时候停止,停止条件是什么 override def isEndOfStream(nextElement: (String, String)): Boolean = false //要进行序列化的字节流 override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, props)) stream.map(data => { val value = data._2 val splits = value.split("\t") val monitorId = splits(0) (monitorId, 1) }).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] { // /value1:上次聚合完的结果 value2:当前的数据 override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = { (value1._1, value1._2 + value2._2) } }).addSink(new MySQLCustomSink) env.execute() } } class MySQLCustomSink extends RichSinkFunction[(String, Int)] { var conn: Connection = _ var insertPst: PreparedStatement = _ var updatePst: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://ke01:3306/test", "root", "aa123456") insertPst = conn.prepareStatement("insert into car_flow(monitorId, count) values(?, ?)") updatePst = conn.prepareStatement("update car_flow set count = ? where monitorId = ?") } override def close(): Unit = { insertPst.close() updatePst.close() conn.close() } override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = { println(value) updatePst.setInt(1, value._2) updatePst.setString(2, value._1) updatePst.execute() println(updatePst.getUpdateCount) if (updatePst.getUpdateCount == 0) { println("insert") insertPst.setString(1, value._1) insertPst.setInt(2, value._2) insertPst.execute() } } }
Socket-Sink
自定义
package com.text.sink import java.io.PrintStream import java.net.{InetAddress, Socket} import java.util.Properties import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation} import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer object SocketSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") props.setProperty("group.id", "flink-kafka-002") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { // 什么时候停止,停止条件是什么 override def isEndOfStream(nextElement: (String, String)): Boolean = false // 要进行序列化的字节流 override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } // 指定一下返回的数据类型 flink提供的类型 override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, props)) stream.map(data => { val value = data._2 val splits = value.split("\t") val monitorId = splits(0) (monitorId, 1) }).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] { override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = { (value1._1, value1._2 + value2._2) } }).addSink(new SocketCustomSink("ke01", 8891)) env.execute() } } class SocketCustomSink(host: String, port: Int) extends RichSinkFunction[(String, Int)] { var socket: Socket = _ var writer: PrintStream = _ override def open(parameters: Configuration): Unit = { socket = new Socket(InetAddress.getByName(host), port) writer = new PrintStream(socket.getOutputStream) } override def close(): Unit = { socket.close() writer.close() } override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = { writer.println(value._1 + "\t" + value._2) writer.flush() } }
File-Sink
支持分桶写入,每一个桶就是一个目录,默认每隔一个小时会产生一个分桶,每个桶下面会存储每一个 Thread的处理结果,可以设置一些文件滚动的策略(文件打开、文件大小等),防止出现大量的小文件 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.9.2</version> </dependency>
package com.text.sink import java.util.Properties import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer object FileSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers", "ke04:9092,ke02:9092,ke03:9092") props.setProperty("group.id", "group_id_03") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { override def isEndOfStream(nextElement: (String, String)): Boolean = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, props)) val restStream = stream.map(data => { val value = data._2 val splits = value.split("\t") val monitorId = splits(0) (monitorId, 1) }).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] { override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = { (value1._1, value2._2 + value1._2) } }).map(x => x._1 + "\t" + x._2) val rolling:DefaultRollingPolicy[String,String] = DefaultRollingPolicy.create() //当文件大小超过256 则滚动产生一个小文件 .withMaxPartSize(256*1024*1024) //当文件超过2s没有写入新数据,则滚动产生一个小文件 .withInactivityInterval(2000) //文件打开时间超过2s 则滚动产生一个小文件 每隔2s产生一个小文件 .withRolloverInterval(2000) .build() val sink = StreamingFileSink.forRowFormat(new Path("D:\\code\\scala\\test\\test07\\data\\tmp"), new SimpleStringEncoder[String]("UTF-8")) .withBucketCheckInterval(1000) .withRollingPolicy(rolling) .build() restStream.addSink(sink) env.execute() } }
HBase-Sink
计算结果写入sink 两种实现方式: 1. map算子写入 频繁创建hbase连接 2. process写入 适合批量写入hbase <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.3</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.3.3</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.3</version> </dependency>package com.text.sink
import java.util.{Date, Properties} import akka.remote.serialization.StringSerializer import com.text.utils.DataUtils import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HTable, Put} import org.apache.hadoop.hbase.util.Bytes object HBaseSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") props.setProperty("group.id", "flink-kafka-003") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), props)) stream.map(row => { val arr = row.split("\t") (arr(0), 1) }).keyBy(_._1) .reduce((v1: (String, Int), v2: (String, Int)) => { (v1._1, v1._2 + v2._2) }).process(new ProcessFunction[(String, Int), (String, Int)] { var htab: HTable = _ override def open(parameters: Configuration): Unit = { val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "ke02:2181,ke03:2181,ke04:2181") val hbaseName = "car_flow" htab = new HTable(conf, hbaseName) } override def close(): Unit = { htab.close() } override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = { val min = DataUtils.getDateStrByDate(new Date()) val put = new Put(Bytes.toBytes(value._1)) put.addColumn(Bytes.toBytes("count"), Bytes.toBytes(min), Bytes.toBytes(value._2)) htab.put(put) } }) env.execute() } }