Flink-Sink(四)

Redis-Sink
使用Flink内嵌 RedisSink

<dependency> 
    <groupId>org.apache.bahir</groupId> 
    <artifactId>flink-connector-redis_2.11</artifactId> 
    <version>1.0</version> 
</dependency>


package com.text.sink

import java.net.InetSocketAddress
import java.util

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisClusterConfig, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object RedisSinkDemo {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8899)
    val result = stream.flatMap(_.split(" "))
      .map((_, 1)).keyBy(0).sum(1)
    result.print().setParallelism(1);
    // redis是单机
    val config = new FlinkJedisPoolConfig.Builder().setDatabase(3).setHost("192.168.75.91").setPort(6390).setPassword("aa123456").build()

    result.addSink(new RedisSink[(String, Int)](config, new RedisMapper[(String, Int)] {
      override def getCommandDescription: RedisCommandDescription = {
        // 选择redis存储方式
        new RedisCommandDescription(RedisCommand.HSET, "wc")
      }

      override def getKeyFromData(t: (String, Int)): String = {
        println(t._1)
        t._1
      }

      override def getValueFromData(t: (String, Int)): String = {
        println(t._2)
        t._2.toString
      }
    }))
    env.execute()
  }

}

 

Kafka-Sink

使用Fink内嵌:FlinkKafkaProducer

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka_2.11</artifactId>         
    <version>1.9.2</version> 
</dependency>


package com.text.sink

import java.lang
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord

object KafkaSinkDemo {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("ke01", 8890)
    val result = stream.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
    val props = new Properties()
    props.setProperty("bootstrap.servers", "ke01:9092,ke02:9092,ke03:9092")
    // 因为下面序列化了,所以这里没必要序列化
    result.addSink(new FlinkKafkaProducer[(String, Int)]("wc", new KafkaSerializationSchema[(String, Int)] {
      override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
        new ProducerRecord("wc", element._1.getBytes(), (element._2.toString).getBytes())
      }
    }, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
    env.execute()
  }
}

 

 

Mysql-Sink

1.Fink不支持mysql内嵌,所以自定义:RichSinkFunction

2. MySQL需要幂等性,就是没有数据的时候可正常插入,有数据的时候正常累加

<dependency> 
    <groupId>mysql</groupId> 
    <artifactId>mysql-connector-java</artifactId>     
    <version>5.1.44</version> 
</dependency>

package com.text.sink

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer

case class CarInfo(monitorId: String, carId: String, eventTime: String, speed: Long)

object MysqlSinkDemo {

  def main(args: Array[String]): Unit = {

    //需要保证幂等性,幂等性:就是多次写入,如何保证数据准确
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val props = new Properties()
    props.setProperty("bootstrap.servers", "ke04:9092,ke02:9092,ke03:9092")
    props.setProperty("group.id", "group_id")
    props.setProperty("key.deserializer", classOf[StringSerializer].getName)
    props.setProperty("value.deserializer", classOf[StringSerializer].getName)

    val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
      //什么时候停止,停止条件是什么
      override def isEndOfStream(nextElement: (String, String)): Boolean = false

      //要进行序列化的字节流
      override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
        val key = new String(consumerRecord.key(), "UTF-8")
        val value = new String(consumerRecord.value(), "UTF-8")
        (key, value)
      }

      override def getProducedType: TypeInformation[(String, String)] = {
        createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
      }
    }, props))

    stream.map(data => {
      val value = data._2
      val splits = value.split("\t")
      val monitorId = splits(0)
      (monitorId, 1)
    }).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] {
      // /value1:上次聚合完的结果 value2:当前的数据
      override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
        (value1._1, value1._2 + value2._2)
      }
    }).addSink(new MySQLCustomSink)
    env.execute()
  }

}

class MySQLCustomSink extends RichSinkFunction[(String, Int)] {
  var conn: Connection = _
  var insertPst: PreparedStatement = _
  var updatePst: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://ke01:3306/test", "root", "aa123456")
    insertPst = conn.prepareStatement("insert into car_flow(monitorId, count) values(?, ?)")
    updatePst = conn.prepareStatement("update car_flow set count = ? where monitorId = ?")
  }

  override def close(): Unit = {
    insertPst.close()
    updatePst.close()
    conn.close()
  }

  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    println(value)
    updatePst.setInt(1, value._2)
    updatePst.setString(2, value._1)
    updatePst.execute()
    println(updatePst.getUpdateCount)
    if (updatePst.getUpdateCount == 0) {
      println("insert")
      insertPst.setString(1, value._1)
      insertPst.setInt(2, value._2)
      insertPst.execute()
    }

  }
}

 

 

Socket-Sink

自定义

package com.text.sink

import java.io.PrintStream
import java.net.{InetAddress, Socket}
import java.util.Properties
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer

object SocketSink {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val props = new Properties()
    props.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092")
    props.setProperty("group.id", "flink-kafka-002")
    props.setProperty("key.deserializer", classOf[StringSerializer].getName)
    props.setProperty("value.deserializer", classOf[StringSerializer].getName)

    val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
      // 什么时候停止,停止条件是什么
      override def isEndOfStream(nextElement: (String, String)): Boolean = false

      // 要进行序列化的字节流
      override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
        val key = new String(consumerRecord.key(), "UTF-8")
        val value = new String(consumerRecord.value(), "UTF-8")
        (key, value)
      }

      // 指定一下返回的数据类型 flink提供的类型
      override def getProducedType: TypeInformation[(String, String)] = {
        createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
      }
    }, props))

    stream.map(data => {
      val value = data._2
      val splits = value.split("\t")
      val monitorId = splits(0)
      (monitorId, 1)
    }).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] {
      override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
        (value1._1, value1._2 + value2._2)
      }
    }).addSink(new SocketCustomSink("ke01", 8891))
    env.execute()
  }
}

class SocketCustomSink(host: String, port: Int) extends RichSinkFunction[(String, Int)] {
  var socket: Socket = _
  var writer: PrintStream = _

  override def open(parameters: Configuration): Unit = {
    socket = new Socket(InetAddress.getByName(host), port)
    writer = new PrintStream(socket.getOutputStream)
  }

  override def close(): Unit = {
    socket.close()
    writer.close()
  }

  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    writer.println(value._1 + "\t" + value._2)
    writer.flush()
  }
}

 

 

File-Sink

支持分桶写入,每一个桶就是一个目录,默认每隔一个小时会产生一个分桶,每个桶下面会存储每一个 Thread的处理结果,可以设置一些文件滚动的策略(文件打开、文件大小等),防止出现大量的小文件

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-filesystem_2.11</artifactId>
  <version>1.9.2</version>
</dependency>


package com.text.sink import java.util.Properties import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringSerializer object FileSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers", "ke04:9092,ke02:9092,ke03:9092") props.setProperty("group.id", "group_id_03") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { override def isEndOfStream(nextElement: (String, String)): Boolean = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } override def getProducedType: TypeInformation[(String, String)] = { createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) } }, props)) val restStream = stream.map(data => { val value = data._2 val splits = value.split("\t") val monitorId = splits(0) (monitorId, 1) }).keyBy(_._1).reduce(new ReduceFunction[(String, Int)] { override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = { (value1._1, value2._2 + value1._2) } }).map(x => x._1 + "\t" + x._2) val rolling:DefaultRollingPolicy[String,String] = DefaultRollingPolicy.create() //当文件大小超过256 则滚动产生一个小文件 .withMaxPartSize(256*1024*1024) //当文件超过2s没有写入新数据,则滚动产生一个小文件 .withInactivityInterval(2000) //文件打开时间超过2s 则滚动产生一个小文件 每隔2s产生一个小文件 .withRolloverInterval(2000) .build() val sink = StreamingFileSink.forRowFormat(new Path("D:\\code\\scala\\test\\test07\\data\\tmp"), new SimpleStringEncoder[String]("UTF-8")) .withBucketCheckInterval(1000) .withRollingPolicy(rolling) .build() restStream.addSink(sink) env.execute() } }

 

 

HBase-Sink

计算结果写入sink 两种实现方式: 1. map算子写入 频繁创建hbase连接 2. process写入 适合批量写入hbase

<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-client</artifactId>
  <version>1.3.3</version>
</dependency>
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-common</artifactId>
  <version>1.3.3</version>
</dependency>
<dependency>
  <groupId>org.apache.hbase</groupId>
  <artifactId>hbase-server</artifactId>
  <version>1.3.3</version>
</dependency>

package com.text.sink
import java.util.{Date, Properties} import akka.remote.serialization.StringSerializer import com.text.utils.DataUtils import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{HTable, Put} import org.apache.hadoop.hbase.util.Bytes object HBaseSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers", "ke02:9092,ke03:9092,ke04:9092") props.setProperty("group.id", "flink-kafka-003") props.setProperty("key.deserializer", classOf[StringSerializer].getName) props.setProperty("value.deserializer", classOf[StringSerializer].getName) val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), props)) stream.map(row => { val arr = row.split("\t") (arr(0), 1) }).keyBy(_._1) .reduce((v1: (String, Int), v2: (String, Int)) => { (v1._1, v1._2 + v2._2) }).process(new ProcessFunction[(String, Int), (String, Int)] { var htab: HTable = _ override def open(parameters: Configuration): Unit = { val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "ke02:2181,ke03:2181,ke04:2181") val hbaseName = "car_flow" htab = new HTable(conf, hbaseName) } override def close(): Unit = { htab.close() } override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = { val min = DataUtils.getDateStrByDate(new Date()) val put = new Put(Bytes.toBytes(value._1)) put.addColumn(Bytes.toBytes("count"), Bytes.toBytes(min), Bytes.toBytes(value._2)) htab.put(put) } }) env.execute() } }

 

上一篇:[NOI2012] 美食节


下一篇:Flink Sink到File(文件)