package com.shujia.flink.sink import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object Demo1Sink { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val studentDS: DataStream[String] = env.readTextFile("data/students.txt") //使用自定义的sink studentDS.addSink(new MySinkFunction) env.execute() } } /** * RichSinkFunction: 多了open 和close 方法 * SinkFunction: * */ class MySinkFunction extends RichSinkFunction[String] { /** * open 函数在invoke函数之前执行,每一个分区中只执行一次 * * arameters flink 配置文件对象 */ var con: Connection = _ override def open(parameters: Configuration): Unit = { println("正在建立数据库链接") //1、加载驱动 Class.forName("com.mysql.jdbc.Driver") //22、建立链接 con = DriverManager.getConnection("jdbc:mysql://master:3306/tour?useUnicode=true&characterEncoding=utf-8", "root", "123456") } //在invoke函数之后执行,每一个分区中只执行一次 override def close(): Unit = { //关闭链接 con.close() } /** * invoke 将数据发送出去的函数,每一条数据都会执行一次 * * @param value ds 的一行数据 * @param context 上下文对象 */ override def invoke(value: String, context: SinkFunction.Context[_]): Unit = { //将数据保存到mysql中 val stat: PreparedStatement = con.prepareStatement("insert into student(id,name,age,gender,clazz) values(?,?,?,?,?)") val split: Array[String] = value.split(",") stat.setString(1, split(0)) stat.setString(2, split(1)) stat.setInt(3, split(2).toInt) stat.setString(4, split(3)) stat.setString(5, split(4)) //执行插入 stat.execute() } }
package com.shujia.flink.sink import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer object Demo2SinkKafka { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val studentDS: DataStream[String] = env.readTextFile("data/students.txt") /** * 创建生产者 */ val flinkKafkaProducer = new FlinkKafkaProducer[String]( "master:9092", // broker 列表 "student", // 目标 topic new SimpleStringSchema) // 序列化 schema //将数据sink 到kafa中 studentDS.addSink(flinkKafkaProducer) env.execute() } }
package com.shujia.flink.sink import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer} object Demo3Onkafka { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment /** * 读取kafka中的数据 * */ val properties = new Properties() properties.setProperty("bootstrap.servers", "master:9092") properties.setProperty("group.id", "asdasdasd") /** * kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 4 --topic lines * * kafka-console-producer.sh --broker-list master:9092 --topic lines */ //创建消费者 val flinkKafkaConsumer = new FlinkKafkaConsumer[String]( "lines", new SimpleStringSchema(), properties) flinkKafkaConsumer.setStartFromEarliest() // 尽可能从最早的记录开始 val linesDS: DataStream[String] = env.addSource(flinkKafkaConsumer) val countDS: DataStream[String] = linesDS.flatMap(_.split(",")) .map((_, 1)) .keyBy(_._1) .sum(1) .map(kv => kv._1 + "\t" + kv._2) countDS.print() /** * 创建 kafka 生产者 * */ val flinkKafkaProducer = new FlinkKafkaProducer[String]( "master:9092", // broker 列表 "count", // 目标 topic new SimpleStringSchema) // 序列化 schema countDS.addSink(flinkKafkaProducer) env.execute() /** * * kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic count * * 如果需要提交到服务器运行需要将 flink-sql-connector-kafka_2.11-1.11.0 上传奥 flink 的lib目录下 * */ } }