flink sink

package com.shujia.flink.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object Demo1Sink {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val studentDS: DataStream[String] = env.readTextFile("data/students.txt")


    //使用自定义的sink
    studentDS.addSink(new MySinkFunction)

    env.execute()
  }

}

/**
  * RichSinkFunction: 多了open 和close 方法
  * SinkFunction:
  *
  */

class MySinkFunction extends RichSinkFunction[String] {

  /**
    * open 函数在invoke函数之前执行,每一个分区中只执行一次
    *
    * arameters flink 配置文件对象
    */
  var con: Connection = _

  override def open(parameters: Configuration): Unit = {
    println("正在建立数据库链接")
    //1、加载驱动

    Class.forName("com.mysql.jdbc.Driver")

    //22、建立链接
    con = DriverManager.getConnection("jdbc:mysql://master:3306/tour?useUnicode=true&characterEncoding=utf-8", "root", "123456")


  }

  //在invoke函数之后执行,每一个分区中只执行一次
  override def close(): Unit = {

    //关闭链接
    con.close()
  }


  /**
    * invoke 将数据发送出去的函数,每一条数据都会执行一次
    *
    * @param value   ds 的一行数据
    * @param context 上下文对象
    */
  override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {

    //将数据保存到mysql中
    val stat: PreparedStatement = con.prepareStatement("insert into student(id,name,age,gender,clazz) values(?,?,?,?,?)")

    val split: Array[String] = value.split(",")

    stat.setString(1, split(0))
    stat.setString(2, split(1))
    stat.setInt(3, split(2).toInt)
    stat.setString(4, split(3))
    stat.setString(5, split(4))

    //执行插入
    stat.execute()
  }
}
package com.shujia.flink.sink

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
object Demo2SinkKafka {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val studentDS: DataStream[String] = env.readTextFile("data/students.txt")

    /**
      * 创建生产者
      */

    val flinkKafkaProducer = new FlinkKafkaProducer[String](
      "master:9092", // broker 列表
      "student", // 目标 topic
      new SimpleStringSchema) // 序列化 schema


    //将数据sink 到kafa中
    studentDS.addSink(flinkKafkaProducer)


    env.execute()
  }

}
package com.shujia.flink.sink

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
object Demo3Onkafka {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    /**
      * 读取kafka中的数据
      *
      */

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "asdasdasd")


    /**
      * kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 4 --topic lines
      *
      * kafka-console-producer.sh --broker-list master:9092 --topic lines
      */
    //创建消费者
    val flinkKafkaConsumer = new FlinkKafkaConsumer[String](
      "lines",
      new SimpleStringSchema(),
      properties)

    flinkKafkaConsumer.setStartFromEarliest() // 尽可能从最早的记录开始


    val linesDS: DataStream[String] = env.addSource(flinkKafkaConsumer)

    val countDS: DataStream[String] = linesDS.flatMap(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)
      .map(kv => kv._1 + "\t" + kv._2)


    countDS.print()


    /**
      * 创建 kafka 生产者
      *
      */

    val flinkKafkaProducer = new FlinkKafkaProducer[String](
      "master:9092", // broker 列表
      "count", // 目标 topic
      new SimpleStringSchema) // 序列化 schema

    countDS.addSink(flinkKafkaProducer)


    env.execute()

    /**
      *
      * kafka-console-consumer.sh --bootstrap-server  master:9092  --from-beginning --topic count
      *
      * 如果需要提交到服务器运行需要将 flink-sql-connector-kafka_2.11-1.11.0 上传奥 flink 的lib目录下
      *
      */

  }

}

 

flink sink

上一篇:跨包并发控制


下一篇:【C#】 URL Protocol