SparkStreaming消费Kafka后往Kafka后写数据案列(2)

案列一:

package com.lg.bigdata.streaming

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.Concat
import org.apache.spark.sql.Column
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.Milliseconds
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import scala.collection.mutable
import java.lang.Double
import java.util.UUID
import org.apache.spark.sql.Dataset
import org.apache.spark.rdd.RDD
import com.google.gson.JsonObject
import scala.util.parsing.json.JSONArray
import org.apache.hadoop.mapred.KeyValueTextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.spark.broadcast.Broadcast
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.TaskContext
import com.lg.bigdata.utils.JZWUtil

/** 
 *  	弃用
 * 	一:模块功能介绍
 * 			(1) 功能介绍:轨迹推算
 */
object KafkaAndJsonGJTS_back {
	def main(args:Array[String]):Unit={
			    val groupId = "jwz_GJ"
			    //val groupId = "jwz_test"
			    
					//1.创建SparkConf并初始化SSC,.setMaster("local[*]")
					val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaAndJsonGJTS_back")
					val ssc = new StreamingContext(sparkConf, Milliseconds(500))
					ssc.sparkContext.setLogLevel("WARN")
					
				  val   spark= SparkSession.builder().config(sparkConf).getOrCreate()
					val   sc=spark.sparkContext
					/*2.定义kafka参数将kafka参数映射为map
					 * earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
					 * 						如果offect不存在,自动重置偏移量为最小偏移量
					 * latest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
					 * 					如果offect不存在,自动重置偏移量为最大偏移量
					 * none  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
					 */

					val kafkaParams = Map[String, Object](
							"bootstrap.servers" -> "hadoop104:9092", //kafka链接地址
							"key.deserializer" -> classOf[StringDeserializer], //序列化
							"value.deserializer" -> classOf[StringDeserializer], //反序列化
							"group.id" -> groupId, //主题
							"auto.offset.reset" -> "earliest", //earliest latest
							"enable.auto.commit" -> (true: java.lang.Boolean) //是否让消费者自己提交偏移量(默认true)
							)

					val topics = Array("car")

					//3.通过KafkaUtil创建kafkaDSteam
					//官方推荐的直连方式,使用kafka底层的API,效率更高
					val kafkaDSteam = KafkaUtils.createDirectStream(
							ssc,
							LocationStrategies.PreferConsistent,
							ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

					//数据类型
					val schema = StructType(List(
							StructField("cameraId", StringType),
							StructField("time", StringType),
							StructField("lane_position", StringType),
							StructField("carType", StringType),
							StructField("speed", StringType),
							StructField("space", StringType)))

					//初始化轨迹的位置
					var mapLeft: mutable.Map[String, String] = mutable.Map()
					mapLeft("L")="in/left_lane/ZL.json"
					mapLeft("M")="in/left_lane/ZM.json"
					mapLeft("R")="in/left_lane/ZR.json"

					var mapReght: mutable.Map[String, String] = mutable.Map()
					mapReght("L")="in/reght_lane/YL.json"
					mapReght("M")="in/reght_lane/YM.json"
					mapReght("R")="in/reght_lane/YR.json"
					
					//变量往外抽
					val init:Int=43200
					var df:DataFrame=null
					var dfV158:DataFrame=null
					var dfV005:DataFrame=null
					var seV158:Array[Row]=null
					var seV005:Array[Row]=null
					var json8:DataFrame=null
					var json5:DataFrame=null
					var newJson8:String=null
					var newJson5:String=null
					var rdd8:RDD[String]=null
					var rdd5:RDD[String]=null
					
					//2.利用广播变量的形式,将kafkaProducer广播到每一个executor
					//广播kafkasink
				  val kafkaProducer:Broadcast[KafkaSink[String,String]]={
							val kafkaProducerConfig = {
									val p = new Properties()
											p.setProperty("bootstrap.servers", "hadoop104:9092") //kafka地址
											p.setProperty("key.serializer", classOf[StringSerializer].getName) //key序列化
											p.setProperty("value.serializer", classOf[StringSerializer].getName) //value序列化
											p
							} 
							sc.broadcast(KafkaSink[String,String](kafkaProducerConfig))
					}

					/**
					 * 将reduceB
					 * 处理JSON字符串为Row 生成RDD[Row] 然后通过schema创建DataFrame
					 * 左线 :V158
					 * 右线 :V005
					 */
					import org.apache.spark.sql.functions._
					kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => {
						if (!rdd.isEmpty()) { //数据不为空
						    df= spark.createDataFrame(rdd, schema)
								//主线左
								dfV158=df.filter("cameraId =='V158'").toDF()
								/*
								 * 第一步:拿到车辆的参数
								 * 	筛选列
								 *  	位置: lane_position:L,M,R
								 *	     车型: carType      :car→1,bus→2
								 *	      速度: speed        
								 * 		摄像头编号: cameraId
								 */
								if(dfV158.count()>0){
									    seV158=dfV158.select("lane_position","carType","cameraId","speed").collect()
											//第二步:拿到车辆的参数根据车辆信息读取JSON
											seV158.foreach(x⇒{
    												//读取对应车道的轨迹,缓存
    											  json8=spark.read.json(mapLeft.get(x.get(0).toString()).get).cache()
    											
														//(1)车型赋值
														var rowV158:DataFrame=null
														if(x.get(1).toString().equals("car")){
															rowV158=json8.withColumn("type",concat(json8.col("type"),lit("1")))
														}else{
															rowV158=json8.withColumn("type",concat(json8.col("type"),lit("2")))
														}

											    	//(2)车辆编号,唯一即可
														rowV158=rowV158.withColumn("fz_car_id",concat(json8.col("fz_car_id"),lit(uuid)))

														//(3)毫秒/12米
														val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString())))
														rowV158=rowV158.withColumn("time",(rowV158("id")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数))
														rowV158=rowV158.withColumn("is_show",concat(json8.col("is_show"),lit(time)))

														if(rowV158.count()>0){
														  //把spark的json格式数据转java可用的json 
													    newJson8=rowV158.toJSON.collectAsList().toString()
													    //结果写入Kakfa
													    kafkaProducer.value.send("GJTS_topic",newJson8)
														}
														
											})
								}

									//主线右
									dfV005=df.filter(" cameraId =='V005'").toDF()
											//筛选两个列
									if(dfV005.count()>0){
											    seV005=dfV005.select("lane_position","carType","cameraId","speed").collect()
  											  seV005.foreach(x⇒{

      													//读取对应车道的轨迹,缓存
      													json5=spark.read.json(mapReght.get(x.get(0).toString()).get).cache()

  															//(1)车型赋值
  															var rowV005:DataFrame=null
  															if(x.get(1).toString().equals("car")){
  																rowV005=json5.withColumn("type",concat(json5.col("type"),lit("1")))
  															}else{
  																rowV005=json5.withColumn("type",concat(json5.col("type"),lit("2")))
  															}

      													//(2)车辆编号,唯一即可
      															rowV005=rowV005.withColumn("fz_car_id",concat(json5.col("fz_car_id"),lit(uuid)))
    
      													//(3)毫秒/12米
      													val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString())))
      													    rowV005=rowV005.withColumn("time",(rowV005("id")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数))
      													    rowV005=rowV005.withColumn("is_show",concat(json5.col("is_show"),lit(time)))
    
      													 if(rowV005.count()>0){
      													      //把spark的json格式数据转java可用的json 
      													      newJson5=rowV005.toJSON.collectAsList().toString() 
            													//结果写入Kakfa
            													 kafkaProducer.value.send("GJTS_topic",newJson5)
      													}
    											  })
											}
						}
					})

					//启动采集器
					ssc.start()
					//Driver等待采集器的执行,采集器终止,Driver也会终止
					ssc.awaitTermination()
	}

	//车辆编号生成
	def uuid():String={
	 UUID.randomUUID().toString().replaceAll("-", "").toString()
	}
	class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
		/* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
		    lazy val producer = createProducer()
				def send(topic: String, key: K, value: V): Future[RecordMetadata] =
				producer.send(new ProducerRecord[K, V](topic, key, value))
				def send(topic: String, value: V): Future[RecordMetadata] =
				producer.send(new ProducerRecord[K, V](topic, value))
	}
	object KafkaSink {
		import scala.collection.JavaConversions._
		def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
				val createProducerFunc = () => {
					val producer = new KafkaProducer[K, V](config)
							sys.addShutdownHook {
						// Ensure that, on executor JVM shutdown, the Kafka producer sends
						// any buffered messages to Kafka before shutting down.
						producer.close()
					}
					producer
				}
				new KafkaSink(createProducerFunc)
		}
		def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
	}
}

  

 

案列二:

package com.lg.bigdata.streaming

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.Concat
import org.apache.spark.sql.Column
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.Milliseconds
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import scala.collection.mutable
import java.lang.Double
import java.util.UUID
import org.apache.spark.sql.Dataset
import org.apache.spark.rdd.RDD
import com.google.gson.JsonObject
import scala.util.parsing.json.JSONArray
import org.apache.hadoop.mapred.KeyValueTextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import java.util.Properties
import org.apache.kafka.clients.producer.KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.spark.broadcast.Broadcast
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.TaskContext
import com.lg.bigdata.utils.JZWUtil
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.LongType
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka010.HasOffsetRanges
import org.apache.spark.streaming.kafka010.CanCommitOffsets

/**
 * 	一:模块功能介绍
 * 			(1) 功能介绍:轨迹推算
 */
object KafkaAndJsonGJTS {
      	val groupId = "jwz_test"
					//val groupId = "jwz_GJ"
			def main(args: Array[String]): Unit = {
				

							//1.创建SparkConf并初始化SSC,.setMaster("local[*]")
							val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaAndJsonGJTS")
							//设置序列化器为KryoSerializer
							//sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

							val ssc = new StreamingContext(sparkConf,Milliseconds(500)) //500毫秒:
							ssc.sparkContext.setLogLevel("WARN")

							val spark= SparkSession.builder().config(sparkConf).getOrCreate()
							val sc=spark.sparkContext

							/*2.定义kafka参数将kafka参数映射为map
							 * earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
							 * 						如果offect不存在,自动重置偏移量为最小偏移量
							 * latest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
							 * 					如果offect不存在,自动重置偏移量为最大偏移量
							 * none  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
							 */

							val kafkaParams = Map[String, Object](
									"bootstrap.servers" -> "hadoop104:9092", //kafka链接地址
									"key.deserializer" -> classOf[StringDeserializer], //序列化
									"value.deserializer" -> classOf[StringDeserializer], //反序列化
									"group.id" -> groupId, //主题
									"auto.offset.reset" -> "latest", //earliest latest
									"enable.auto.commit" -> (true: java.lang.Boolean), //是否让消费者自己提交偏移量(默认true)
									"auto.commit.interval.ms" -> "500" //自动提交的时间
									)

							val topics = Array("car")

							//3.通过KafkaUtil创建kafkaDSteam
							//官方推荐的直连方式,使用kafka底层的API,效率更高
							val kafkaDSteam = KafkaUtils.createDirectStream(
									ssc,
									LocationStrategies.PreferConsistent,
									ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

							//数据类型
							val schema = StructType(List(
									StructField("cameraId", StringType),
									StructField("time", StringType),
									StructField("lane_position", StringType),
									StructField("carType", StringType),
									StructField("speed", StringType),
									StructField("space", StringType)))


							//变量往外抽
							val init: Int = 43200
							var carjson:Dataset[Row]=null
							var singleCarTrack: DataFrame=null
							var datacar: Array[Row] = null
							var rddString:RDD[String]= null
							var newJson:String =null
							var singleCarTrack_1:DataFrame=null

							//2.利用广播变量的形式,将kafkaProducer广播到每一个executor
							//广播kafkasink
							val kafkaProducer:Broadcast[KafkaSink[String,String]]={
							val kafkaProducerConfig = {
									val p = new Properties()
											p.setProperty("bootstrap.servers", "hadoop104:9092") //kafka地址
											p.setProperty("key.serializer", classOf[StringSerializer].getName) //key序列化
											p.setProperty("value.serializer", classOf[StringSerializer].getName) //value序列化
											p
							} 
							sc.broadcast(KafkaSink[String,String](kafkaProducerConfig))
					}
							/**
							 *	 每个摄像头单独推
							 * 	将reduceB
							 * 	处理JSON字符串为Row 生成RDD[Row] 然后通过schema创建DataFrame
							 *
							 */
							import org.apache.spark.sql.functions._
							kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => {
							  //获取偏移量信息
						
								if (!rdd.isEmpty()) { //数据不为空
    									//第一步:得到原始数据	
    									datacar= spark.createDataFrame(rdd, schema).select("lane_position","carType","cameraId","speed").collect()
											/*	拿到车辆的参数:
											 * 	筛选列
											 *  	位置: lane_position:L,M,R
											 *	     车型: carType      :car→1,bus→2
											 *	      速度: speed
											 * 		摄像头编号: cameraId
											 * */

											if(datacar.size>0){
												//第三步:原始数据与json的比对,得到对应的轨迹点
												datacar.foreach(x⇒{
    													//第二步:拿到车辆的JSON信息
    													carjson=spark.read.json(getPath.get(x.apply(2)+""+x.apply(0)).get)
											

															//单个车辆单路摄像头的json轨迹
															singleCarTrack=carjson.filter("cam_ID=='"+x.apply(2)+"' and lane=='"+x.apply(0)+"'").toDF()
															
															//(1)数据处理1:车型赋值
															if(x.get(1).toString().equals("car")){
																singleCarTrack_1=singleCarTrack.withColumn("type",concat(singleCarTrack.col("type"),lit("1")))
															}else{
																singleCarTrack_1=singleCarTrack.withColumn("type",concat(singleCarTrack.col("type"),lit("2")))
															}

    													//(2)车辆编号,唯一即可
    													singleCarTrack_1=singleCarTrack_1.withColumn("fz_car_id",concat(singleCarTrack_1.col("fz_car_id"),lit(uuid)))

															//(3) 给当前的json设置递增的新id
															singleCarTrack_1=singleCarTrack_1.withColumn("newid",row_number().over(Window.partitionBy(lit(1)).orderBy(lit(1).cast(LongType))))

															//(4) 数据处理2:根据当前速度算出每12米的毫秒数(毫秒/12米)
															val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString())))
															singleCarTrack_1=singleCarTrack_1.withColumn("time",(singleCarTrack_1("newid")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数))
															
															//把spark的json格式数据转java可用的json,追加 [  ]
															if(singleCarTrack_1.count()>0){
															  	newJson=singleCarTrack_1.toJSON.collectAsList().toString()
															  	val rddrow=sc.makeRDD(Seq(newJson))
															  	rddrow.foreach(record⇒{
															  	  kafkaProducer.value.send("GJTS_topic",record)
															  	})
														      
															}
														
												})
											}
								}
							})

							//启动采集器
							ssc.start()
							//Driver等待采集器的执行,采集器终止,Driver也会终止
							ssc.awaitTermination()
	}

	def getPath():mutable.Map[String, String]={
			var mapPath: mutable.Map[String, String] = mutable.Map()
					mapPath("V140L")="in/left_lane/ZL.json"
					mapPath("V153L")="in/left_lane/ZL.json"
					mapPath("V108L")="in/left_lane/ZL.json"
					mapPath("V158L")="in/left_lane/ZL.json"
					mapPath("V122L")="in/left_lane/ZL.json"

					mapPath("V098L")="in/left_lane/ZL.json"
					mapPath("V150L")="in/left_lane/ZL.json"
					mapPath("V134L")="in/left_lane/ZL.json"
					mapPath("V085L")="in/left_lane/ZL.json"
					mapPath("V114L")="in/left_lane/ZL.json"

					mapPath("V146L")="in/left_lane/ZL.json"
					mapPath("V125L")="in/left_lane/ZL.json"
					mapPath("V143L")="in/left_lane/ZL.json"
					mapPath("V131L")="in/left_lane/ZL.json"
					mapPath("V102L")="in/left_lane/ZL.json"

					mapPath("V137L")="in/left_lane/ZL.json"
					mapPath("V089L")="in/left_lane/ZL.json"
					mapPath("V128L")="in/left_lane/ZL.json"
					mapPath("V093L")="in/left_lane/ZL.json"
					mapPath("V118L")="in/left_lane/ZL.json"
					//	------------------------------------------
					mapPath("V140M")="in/left_lane/ZM.json"
					mapPath("V153M")="in/left_lane/ZM.json"
					mapPath("V108M")="in/left_lane/ZM.json"
					mapPath("V158M")="in/left_lane/ZM.json"
					mapPath("V122M")="in/left_lane/ZM.json"

					mapPath("V098M")="in/left_lane/ZM.json"
					mapPath("V150M")="in/left_lane/ZM.json"
					mapPath("V134M")="in/left_lane/ZM.json"
					mapPath("V085M")="in/left_lane/ZM.json"
					mapPath("V114M")="in/left_lane/ZM.json"

					mapPath("V146M")="in/left_lane/ZM.json"
					mapPath("V125M")="in/left_lane/ZM.json"
					mapPath("V143M")="in/left_lane/ZM.json"
					mapPath("V131M")="in/left_lane/ZM.json"
					mapPath("V102M")="in/left_lane/ZM.json"

					mapPath("V137M")="in/left_lane/ZM.json"
					mapPath("V089M")="in/left_lane/ZM.json"
					mapPath("V128M")="in/left_lane/ZM.json"
					mapPath("V093M")="in/left_lane/ZM.json"
					mapPath("V118M")="in/left_lane/ZM.json"
					//	------------------------------------------
					mapPath("V140R")="in/left_lane/ZR.json"
					mapPath("V153R")="in/left_lane/ZR.json"
					mapPath("V108R")="in/left_lane/ZR.json"
					mapPath("V158R")="in/left_lane/ZR.json"
					mapPath("V122R")="in/left_lane/ZR.json"

					mapPath("V098R")="in/left_lane/ZR.json"
					mapPath("V150R")="in/left_lane/ZR.json"
					mapPath("V134R")="in/left_lane/ZR.json"
					mapPath("V085R")="in/left_lane/ZR.json"
					mapPath("V114R")="in/left_lane/ZR.json"

					mapPath("V146R")="in/left_lane/ZR.json"
					mapPath("V125R")="in/left_lane/ZR.json"
					mapPath("V143R")="in/left_lane/ZR.json"
					mapPath("V131R")="in/left_lane/ZR.json"
					mapPath("V102R")="in/left_lane/ZR.json"

					mapPath("V137R")="in/left_lane/ZR.json"
					mapPath("V089R")="in/left_lane/ZR.json"
					mapPath("V128R")="in/left_lane/ZR.json"
					mapPath("V093R")="in/left_lane/ZR.json"
					mapPath("V118R")="in/left_lane/ZR.json"

					//======================================
					mapPath("V032L")="in/reght_lane/YL.json"
					mapPath("V072L")="in/reght_lane/YL.json"
					mapPath("V029L")="in/reght_lane/YL.json"
					mapPath("V005L")="in/reght_lane/YL.json"
					mapPath("V051L")="in/reght_lane/YL.json"
					//--------------------------------------
					mapPath("V009L")="in/reght_lane/YL.json"
					mapPath("V027L")="in/reght_lane/YL.json"
					mapPath("V062L")="in/reght_lane/YL.json"
					mapPath("V039L")="in/reght_lane/YL.json"
					mapPath("V067L")="in/reght_lane/YL.json"
					//--------------------------------------
					mapPath("V035L")="in/reght_lane/YL.json"
					mapPath("V058L")="in/reght_lane/YL.json"
					mapPath("V018L")="in/reght_lane/YL.json"
					mapPath("V045L")="in/reght_lane/YL.json"
					mapPath("V042L")="in/reght_lane/YL.json"
					//--------------------------------------
					mapPath("V048L")="in/reght_lane/YL.json"
					mapPath("V014L")="in/reght_lane/YL.json"
					mapPath("V024L")="in/reght_lane/YL.json"
					mapPath("V076L")="in/reght_lane/YL.json"
					mapPath("V054L")="in/reght_lane/YL.json"
					//======================================
					mapPath("V032M")="in/reght_lane/YM.json"
					mapPath("V072M")="in/reght_lane/YM.json"
					mapPath("V029M")="in/reght_lane/YM.json"
					mapPath("V005M")="in/reght_lane/YM.json"
					mapPath("V051M")="in/reght_lane/YM.json"
					//--------------------------------------
					mapPath("V009M")="in/reght_lane/YM.json"
					mapPath("V027M")="in/reght_lane/YM.json"
					mapPath("V062M")="in/reght_lane/YM.json"
					mapPath("V039M")="in/reght_lane/YM.json"
					mapPath("V067M")="in/reght_lane/YM.json"
					//--------------------------------------
					mapPath("V035M")="in/reght_lane/YM.json"
					mapPath("V058M")="in/reght_lane/YM.json"
					mapPath("V018M")="in/reght_lane/YM.json"
					mapPath("V045M")="in/reght_lane/YM.json"
					mapPath("V042M")="in/reght_lane/YM.json"
					//--------------------------------------
					mapPath("V048M")="in/reght_lane/YM.json"
					mapPath("V014M")="in/reght_lane/YM.json"
					mapPath("V024M")="in/reght_lane/YM.json"
					mapPath("V076M")="in/reght_lane/YM.json"
					mapPath("V054M")="in/reght_lane/YM.json"

					//======================================
					mapPath("V032R")="in/reght_lane/YR.json"
					mapPath("V072R")="in/reght_lane/YR.json"
					mapPath("V029R")="in/reght_lane/YR.json"
					mapPath("V005R")="in/reght_lane/YR.json"
					mapPath("V051R")="in/reght_lane/YR.json"
					//--------------------------------------
					mapPath("V009R")="in/reght_lane/YR.json"
					mapPath("V027R")="in/reght_lane/YR.json"
					mapPath("V062R")="in/reght_lane/YR.json"
					mapPath("V039R")="in/reght_lane/YR.json"
					mapPath("V067R")="in/reght_lane/YR.json"
					//--------------------------------------
					mapPath("V035R")="in/reght_lane/YR.json"
					mapPath("V058R")="in/reght_lane/YR.json"
					mapPath("V018R")="in/reght_lane/YR.json"
					mapPath("V045R")="in/reght_lane/YR.json"
					mapPath("V042R")="in/reght_lane/YR.json"
					//--------------------------------------
					mapPath("V048R")="in/reght_lane/YR.json"
					mapPath("V014R")="in/reght_lane/YR.json"
					mapPath("V024R")="in/reght_lane/YR.json"
					mapPath("V076R")="in/reght_lane/YR.json"
					mapPath("V054R")="in/reght_lane/YR.json"

					mapPath
	}

	//车辆编号生成
	def uuid(): String = {
			UUID.randomUUID().toString().replaceAll("-", "").toString()
	}

	//1.首先需要将KafkaProducer利用lazy val的方式进行包装
	class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
		//这是一个关键的想法,使我们能够绕过运行到NotSerializableExceptions。
		lazy val producer = createProducer()
				def send(topic: String, key: K, value: V): Future[RecordMetadata] =
				producer.send(new ProducerRecord[K, V](topic, key, value))

				def send(topic: String, value: V): Future[RecordMetadata] =
				producer.send(new ProducerRecord[K, V](topic, value))
	}
	object KafkaSink {
		import scala.collection.JavaConversions._
		def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
				val createProducerFunc = () => {
					val producer = new KafkaProducer[K, V](config)
						sys.addShutdownHook {
						//确保在executor JVM关闭时,Kafka生产者发送
						//关闭前任何缓冲的消息。
						producer.close()
					}
					producer
				}
				new KafkaSink(createProducerFunc)
		}
		def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
	}
}

  

上一篇:算法与数据结构笔记


下一篇:this的指向