案列一:
package com.lg.bigdata.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.SparkContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.Concat import org.apache.spark.sql.Column import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.Milliseconds import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row import scala.collection.mutable import java.lang.Double import java.util.UUID import org.apache.spark.sql.Dataset import org.apache.spark.rdd.RDD import com.google.gson.JsonObject import scala.util.parsing.json.JSONArray import org.apache.hadoop.mapred.KeyValueTextInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import java.util.Properties import org.apache.kafka.clients.producer.KafkaProducer import java.util.concurrent.Future import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.clients.producer.ProducerRecord import org.apache.spark.broadcast.Broadcast import org.apache.kafka.common.serialization.StringSerializer import org.apache.spark.TaskContext import com.lg.bigdata.utils.JZWUtil /** * 弃用 * 一:模块功能介绍 * (1) 功能介绍:轨迹推算 */ object KafkaAndJsonGJTS_back { def main(args:Array[String]):Unit={ val groupId = "jwz_GJ" //val groupId = "jwz_test" //1.创建SparkConf并初始化SSC,.setMaster("local[*]") val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaAndJsonGJTS_back") val ssc = new StreamingContext(sparkConf, Milliseconds(500)) ssc.sparkContext.setLogLevel("WARN") val spark= SparkSession.builder().config(sparkConf).getOrCreate() val sc=spark.sparkContext /*2.定义kafka参数将kafka参数映射为map * earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 * 如果offect不存在,自动重置偏移量为最小偏移量 * latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 * 如果offect不存在,自动重置偏移量为最大偏移量 * none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 */ val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop104:9092", //kafka链接地址 "key.deserializer" -> classOf[StringDeserializer], //序列化 "value.deserializer" -> classOf[StringDeserializer], //反序列化 "group.id" -> groupId, //主题 "auto.offset.reset" -> "earliest", //earliest latest "enable.auto.commit" -> (true: java.lang.Boolean) //是否让消费者自己提交偏移量(默认true) ) val topics = Array("car") //3.通过KafkaUtil创建kafkaDSteam //官方推荐的直连方式,使用kafka底层的API,效率更高 val kafkaDSteam = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) //数据类型 val schema = StructType(List( StructField("cameraId", StringType), StructField("time", StringType), StructField("lane_position", StringType), StructField("carType", StringType), StructField("speed", StringType), StructField("space", StringType))) //初始化轨迹的位置 var mapLeft: mutable.Map[String, String] = mutable.Map() mapLeft("L")="in/left_lane/ZL.json" mapLeft("M")="in/left_lane/ZM.json" mapLeft("R")="in/left_lane/ZR.json" var mapReght: mutable.Map[String, String] = mutable.Map() mapReght("L")="in/reght_lane/YL.json" mapReght("M")="in/reght_lane/YM.json" mapReght("R")="in/reght_lane/YR.json" //变量往外抽 val init:Int=43200 var df:DataFrame=null var dfV158:DataFrame=null var dfV005:DataFrame=null var seV158:Array[Row]=null var seV005:Array[Row]=null var json8:DataFrame=null var json5:DataFrame=null var newJson8:String=null var newJson5:String=null var rdd8:RDD[String]=null var rdd5:RDD[String]=null //2.利用广播变量的形式,将kafkaProducer广播到每一个executor //广播kafkasink val kafkaProducer:Broadcast[KafkaSink[String,String]]={ val kafkaProducerConfig = { val p = new Properties() p.setProperty("bootstrap.servers", "hadoop104:9092") //kafka地址 p.setProperty("key.serializer", classOf[StringSerializer].getName) //key序列化 p.setProperty("value.serializer", classOf[StringSerializer].getName) //value序列化 p } sc.broadcast(KafkaSink[String,String](kafkaProducerConfig)) } /** * 将reduceB * 处理JSON字符串为Row 生成RDD[Row] 然后通过schema创建DataFrame * 左线 :V158 * 右线 :V005 */ import org.apache.spark.sql.functions._ kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => { if (!rdd.isEmpty()) { //数据不为空 df= spark.createDataFrame(rdd, schema) //主线左 dfV158=df.filter("cameraId =='V158'").toDF() /* * 第一步:拿到车辆的参数 * 筛选列 * 位置: lane_position:L,M,R * 车型: carType :car→1,bus→2 * 速度: speed * 摄像头编号: cameraId */ if(dfV158.count()>0){ seV158=dfV158.select("lane_position","carType","cameraId","speed").collect() //第二步:拿到车辆的参数根据车辆信息读取JSON seV158.foreach(x⇒{ //读取对应车道的轨迹,缓存 json8=spark.read.json(mapLeft.get(x.get(0).toString()).get).cache() //(1)车型赋值 var rowV158:DataFrame=null if(x.get(1).toString().equals("car")){ rowV158=json8.withColumn("type",concat(json8.col("type"),lit("1"))) }else{ rowV158=json8.withColumn("type",concat(json8.col("type"),lit("2"))) } //(2)车辆编号,唯一即可 rowV158=rowV158.withColumn("fz_car_id",concat(json8.col("fz_car_id"),lit(uuid))) //(3)毫秒/12米 val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString()))) rowV158=rowV158.withColumn("time",(rowV158("id")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数)) rowV158=rowV158.withColumn("is_show",concat(json8.col("is_show"),lit(time))) if(rowV158.count()>0){ //把spark的json格式数据转java可用的json newJson8=rowV158.toJSON.collectAsList().toString() //结果写入Kakfa kafkaProducer.value.send("GJTS_topic",newJson8) } }) } //主线右 dfV005=df.filter(" cameraId =='V005'").toDF() //筛选两个列 if(dfV005.count()>0){ seV005=dfV005.select("lane_position","carType","cameraId","speed").collect() seV005.foreach(x⇒{ //读取对应车道的轨迹,缓存 json5=spark.read.json(mapReght.get(x.get(0).toString()).get).cache() //(1)车型赋值 var rowV005:DataFrame=null if(x.get(1).toString().equals("car")){ rowV005=json5.withColumn("type",concat(json5.col("type"),lit("1"))) }else{ rowV005=json5.withColumn("type",concat(json5.col("type"),lit("2"))) } //(2)车辆编号,唯一即可 rowV005=rowV005.withColumn("fz_car_id",concat(json5.col("fz_car_id"),lit(uuid))) //(3)毫秒/12米 val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString()))) rowV005=rowV005.withColumn("time",(rowV005("id")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数)) rowV005=rowV005.withColumn("is_show",concat(json5.col("is_show"),lit(time))) if(rowV005.count()>0){ //把spark的json格式数据转java可用的json newJson5=rowV005.toJSON.collectAsList().toString() //结果写入Kakfa kafkaProducer.value.send("GJTS_topic",newJson5) } }) } } }) //启动采集器 ssc.start() //Driver等待采集器的执行,采集器终止,Driver也会终止 ssc.awaitTermination() } //车辆编号生成 def uuid():String={ UUID.randomUUID().toString().replaceAll("-", "").toString() } class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { /* This is the key idea that allows us to work around running into NotSerializableExceptions. */ lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) } object KafkaSink { import scala.collection.JavaConversions._ def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) sys.addShutdownHook { // Ensure that, on executor JVM shutdown, the Kafka producer sends // any buffered messages to Kafka before shutting down. producer.close() } producer } new KafkaSink(createProducerFunc) } def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap) } }
案列二:
package com.lg.bigdata.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.SparkContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.Concat import org.apache.spark.sql.Column import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.Milliseconds import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row import scala.collection.mutable import java.lang.Double import java.util.UUID import org.apache.spark.sql.Dataset import org.apache.spark.rdd.RDD import com.google.gson.JsonObject import scala.util.parsing.json.JSONArray import org.apache.hadoop.mapred.KeyValueTextInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import java.util.Properties import org.apache.kafka.clients.producer.KafkaProducer import java.util.concurrent.Future import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.clients.producer.ProducerRecord import org.apache.spark.broadcast.Broadcast import org.apache.kafka.common.serialization.StringSerializer import org.apache.spark.TaskContext import com.lg.bigdata.utils.JZWUtil import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.types.LongType import org.apache.spark.SparkException import org.apache.spark.streaming.kafka010.HasOffsetRanges import org.apache.spark.streaming.kafka010.CanCommitOffsets /** * 一:模块功能介绍 * (1) 功能介绍:轨迹推算 */ object KafkaAndJsonGJTS { val groupId = "jwz_test" //val groupId = "jwz_GJ" def main(args: Array[String]): Unit = { //1.创建SparkConf并初始化SSC,.setMaster("local[*]") val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaAndJsonGJTS") //设置序列化器为KryoSerializer //sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ssc = new StreamingContext(sparkConf,Milliseconds(500)) //500毫秒: ssc.sparkContext.setLogLevel("WARN") val spark= SparkSession.builder().config(sparkConf).getOrCreate() val sc=spark.sparkContext /*2.定义kafka参数将kafka参数映射为map * earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 * 如果offect不存在,自动重置偏移量为最小偏移量 * latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 * 如果offect不存在,自动重置偏移量为最大偏移量 * none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 */ val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop104:9092", //kafka链接地址 "key.deserializer" -> classOf[StringDeserializer], //序列化 "value.deserializer" -> classOf[StringDeserializer], //反序列化 "group.id" -> groupId, //主题 "auto.offset.reset" -> "latest", //earliest latest "enable.auto.commit" -> (true: java.lang.Boolean), //是否让消费者自己提交偏移量(默认true) "auto.commit.interval.ms" -> "500" //自动提交的时间 ) val topics = Array("car") //3.通过KafkaUtil创建kafkaDSteam //官方推荐的直连方式,使用kafka底层的API,效率更高 val kafkaDSteam = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) //数据类型 val schema = StructType(List( StructField("cameraId", StringType), StructField("time", StringType), StructField("lane_position", StringType), StructField("carType", StringType), StructField("speed", StringType), StructField("space", StringType))) //变量往外抽 val init: Int = 43200 var carjson:Dataset[Row]=null var singleCarTrack: DataFrame=null var datacar: Array[Row] = null var rddString:RDD[String]= null var newJson:String =null var singleCarTrack_1:DataFrame=null //2.利用广播变量的形式,将kafkaProducer广播到每一个executor //广播kafkasink val kafkaProducer:Broadcast[KafkaSink[String,String]]={ val kafkaProducerConfig = { val p = new Properties() p.setProperty("bootstrap.servers", "hadoop104:9092") //kafka地址 p.setProperty("key.serializer", classOf[StringSerializer].getName) //key序列化 p.setProperty("value.serializer", classOf[StringSerializer].getName) //value序列化 p } sc.broadcast(KafkaSink[String,String](kafkaProducerConfig)) } /** * 每个摄像头单独推 * 将reduceB * 处理JSON字符串为Row 生成RDD[Row] 然后通过schema创建DataFrame * */ import org.apache.spark.sql.functions._ kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => { //获取偏移量信息 if (!rdd.isEmpty()) { //数据不为空 //第一步:得到原始数据 datacar= spark.createDataFrame(rdd, schema).select("lane_position","carType","cameraId","speed").collect() /* 拿到车辆的参数: * 筛选列 * 位置: lane_position:L,M,R * 车型: carType :car→1,bus→2 * 速度: speed * 摄像头编号: cameraId * */ if(datacar.size>0){ //第三步:原始数据与json的比对,得到对应的轨迹点 datacar.foreach(x⇒{ //第二步:拿到车辆的JSON信息 carjson=spark.read.json(getPath.get(x.apply(2)+""+x.apply(0)).get) //单个车辆单路摄像头的json轨迹 singleCarTrack=carjson.filter("cam_ID=='"+x.apply(2)+"' and lane=='"+x.apply(0)+"'").toDF() //(1)数据处理1:车型赋值 if(x.get(1).toString().equals("car")){ singleCarTrack_1=singleCarTrack.withColumn("type",concat(singleCarTrack.col("type"),lit("1"))) }else{ singleCarTrack_1=singleCarTrack.withColumn("type",concat(singleCarTrack.col("type"),lit("2"))) } //(2)车辆编号,唯一即可 singleCarTrack_1=singleCarTrack_1.withColumn("fz_car_id",concat(singleCarTrack_1.col("fz_car_id"),lit(uuid))) //(3) 给当前的json设置递增的新id singleCarTrack_1=singleCarTrack_1.withColumn("newid",row_number().over(Window.partitionBy(lit(1)).orderBy(lit(1).cast(LongType)))) //(4) 数据处理2:根据当前速度算出每12米的毫秒数(毫秒/12米) val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString()))) singleCarTrack_1=singleCarTrack_1.withColumn("time",(singleCarTrack_1("newid")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数)) //把spark的json格式数据转java可用的json,追加 [ ] if(singleCarTrack_1.count()>0){ newJson=singleCarTrack_1.toJSON.collectAsList().toString() val rddrow=sc.makeRDD(Seq(newJson)) rddrow.foreach(record⇒{ kafkaProducer.value.send("GJTS_topic",record) }) } }) } } }) //启动采集器 ssc.start() //Driver等待采集器的执行,采集器终止,Driver也会终止 ssc.awaitTermination() } def getPath():mutable.Map[String, String]={ var mapPath: mutable.Map[String, String] = mutable.Map() mapPath("V140L")="in/left_lane/ZL.json" mapPath("V153L")="in/left_lane/ZL.json" mapPath("V108L")="in/left_lane/ZL.json" mapPath("V158L")="in/left_lane/ZL.json" mapPath("V122L")="in/left_lane/ZL.json" mapPath("V098L")="in/left_lane/ZL.json" mapPath("V150L")="in/left_lane/ZL.json" mapPath("V134L")="in/left_lane/ZL.json" mapPath("V085L")="in/left_lane/ZL.json" mapPath("V114L")="in/left_lane/ZL.json" mapPath("V146L")="in/left_lane/ZL.json" mapPath("V125L")="in/left_lane/ZL.json" mapPath("V143L")="in/left_lane/ZL.json" mapPath("V131L")="in/left_lane/ZL.json" mapPath("V102L")="in/left_lane/ZL.json" mapPath("V137L")="in/left_lane/ZL.json" mapPath("V089L")="in/left_lane/ZL.json" mapPath("V128L")="in/left_lane/ZL.json" mapPath("V093L")="in/left_lane/ZL.json" mapPath("V118L")="in/left_lane/ZL.json" // ------------------------------------------ mapPath("V140M")="in/left_lane/ZM.json" mapPath("V153M")="in/left_lane/ZM.json" mapPath("V108M")="in/left_lane/ZM.json" mapPath("V158M")="in/left_lane/ZM.json" mapPath("V122M")="in/left_lane/ZM.json" mapPath("V098M")="in/left_lane/ZM.json" mapPath("V150M")="in/left_lane/ZM.json" mapPath("V134M")="in/left_lane/ZM.json" mapPath("V085M")="in/left_lane/ZM.json" mapPath("V114M")="in/left_lane/ZM.json" mapPath("V146M")="in/left_lane/ZM.json" mapPath("V125M")="in/left_lane/ZM.json" mapPath("V143M")="in/left_lane/ZM.json" mapPath("V131M")="in/left_lane/ZM.json" mapPath("V102M")="in/left_lane/ZM.json" mapPath("V137M")="in/left_lane/ZM.json" mapPath("V089M")="in/left_lane/ZM.json" mapPath("V128M")="in/left_lane/ZM.json" mapPath("V093M")="in/left_lane/ZM.json" mapPath("V118M")="in/left_lane/ZM.json" // ------------------------------------------ mapPath("V140R")="in/left_lane/ZR.json" mapPath("V153R")="in/left_lane/ZR.json" mapPath("V108R")="in/left_lane/ZR.json" mapPath("V158R")="in/left_lane/ZR.json" mapPath("V122R")="in/left_lane/ZR.json" mapPath("V098R")="in/left_lane/ZR.json" mapPath("V150R")="in/left_lane/ZR.json" mapPath("V134R")="in/left_lane/ZR.json" mapPath("V085R")="in/left_lane/ZR.json" mapPath("V114R")="in/left_lane/ZR.json" mapPath("V146R")="in/left_lane/ZR.json" mapPath("V125R")="in/left_lane/ZR.json" mapPath("V143R")="in/left_lane/ZR.json" mapPath("V131R")="in/left_lane/ZR.json" mapPath("V102R")="in/left_lane/ZR.json" mapPath("V137R")="in/left_lane/ZR.json" mapPath("V089R")="in/left_lane/ZR.json" mapPath("V128R")="in/left_lane/ZR.json" mapPath("V093R")="in/left_lane/ZR.json" mapPath("V118R")="in/left_lane/ZR.json" //====================================== mapPath("V032L")="in/reght_lane/YL.json" mapPath("V072L")="in/reght_lane/YL.json" mapPath("V029L")="in/reght_lane/YL.json" mapPath("V005L")="in/reght_lane/YL.json" mapPath("V051L")="in/reght_lane/YL.json" //-------------------------------------- mapPath("V009L")="in/reght_lane/YL.json" mapPath("V027L")="in/reght_lane/YL.json" mapPath("V062L")="in/reght_lane/YL.json" mapPath("V039L")="in/reght_lane/YL.json" mapPath("V067L")="in/reght_lane/YL.json" //-------------------------------------- mapPath("V035L")="in/reght_lane/YL.json" mapPath("V058L")="in/reght_lane/YL.json" mapPath("V018L")="in/reght_lane/YL.json" mapPath("V045L")="in/reght_lane/YL.json" mapPath("V042L")="in/reght_lane/YL.json" //-------------------------------------- mapPath("V048L")="in/reght_lane/YL.json" mapPath("V014L")="in/reght_lane/YL.json" mapPath("V024L")="in/reght_lane/YL.json" mapPath("V076L")="in/reght_lane/YL.json" mapPath("V054L")="in/reght_lane/YL.json" //====================================== mapPath("V032M")="in/reght_lane/YM.json" mapPath("V072M")="in/reght_lane/YM.json" mapPath("V029M")="in/reght_lane/YM.json" mapPath("V005M")="in/reght_lane/YM.json" mapPath("V051M")="in/reght_lane/YM.json" //-------------------------------------- mapPath("V009M")="in/reght_lane/YM.json" mapPath("V027M")="in/reght_lane/YM.json" mapPath("V062M")="in/reght_lane/YM.json" mapPath("V039M")="in/reght_lane/YM.json" mapPath("V067M")="in/reght_lane/YM.json" //-------------------------------------- mapPath("V035M")="in/reght_lane/YM.json" mapPath("V058M")="in/reght_lane/YM.json" mapPath("V018M")="in/reght_lane/YM.json" mapPath("V045M")="in/reght_lane/YM.json" mapPath("V042M")="in/reght_lane/YM.json" //-------------------------------------- mapPath("V048M")="in/reght_lane/YM.json" mapPath("V014M")="in/reght_lane/YM.json" mapPath("V024M")="in/reght_lane/YM.json" mapPath("V076M")="in/reght_lane/YM.json" mapPath("V054M")="in/reght_lane/YM.json" //====================================== mapPath("V032R")="in/reght_lane/YR.json" mapPath("V072R")="in/reght_lane/YR.json" mapPath("V029R")="in/reght_lane/YR.json" mapPath("V005R")="in/reght_lane/YR.json" mapPath("V051R")="in/reght_lane/YR.json" //-------------------------------------- mapPath("V009R")="in/reght_lane/YR.json" mapPath("V027R")="in/reght_lane/YR.json" mapPath("V062R")="in/reght_lane/YR.json" mapPath("V039R")="in/reght_lane/YR.json" mapPath("V067R")="in/reght_lane/YR.json" //-------------------------------------- mapPath("V035R")="in/reght_lane/YR.json" mapPath("V058R")="in/reght_lane/YR.json" mapPath("V018R")="in/reght_lane/YR.json" mapPath("V045R")="in/reght_lane/YR.json" mapPath("V042R")="in/reght_lane/YR.json" //-------------------------------------- mapPath("V048R")="in/reght_lane/YR.json" mapPath("V014R")="in/reght_lane/YR.json" mapPath("V024R")="in/reght_lane/YR.json" mapPath("V076R")="in/reght_lane/YR.json" mapPath("V054R")="in/reght_lane/YR.json" mapPath } //车辆编号生成 def uuid(): String = { UUID.randomUUID().toString().replaceAll("-", "").toString() } //1.首先需要将KafkaProducer利用lazy val的方式进行包装 class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { //这是一个关键的想法,使我们能够绕过运行到NotSerializableExceptions。 lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) } object KafkaSink { import scala.collection.JavaConversions._ def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) sys.addShutdownHook { //确保在executor JVM关闭时,Kafka生产者发送 //关闭前任何缓冲的消息。 producer.close() } producer } new KafkaSink(createProducerFunc) } def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap) } }