1 package com.bawei.foryk 2 3 import com.bawei.util.DateTools 4 5 6 object TrafficUtil { 7 8 //根据拍照的经纬度与*的经纬度计算距离判断位于几环 9 def circle(x:Int,y:Int): Int ={ 10 val distance: Long = Math.round( Math.sqrt( Math.pow(x - 39 ,2 ) + Math.pow(y - 116 ,2 ) )) 11 if(distance>0&&distance<=15) 2 12 else if(distance>15&&distance<=30) 3 13 else if(distance>30&&distance<=40) 4 14 else if(distance>40&&distance<=60) 5 15 else 6 16 } 17 18 //传递参数为机动车类型,以及提供的距离*距离计算公式按以下伪代码判断得到违反交规类型 19 def isRule(actiontime:String,carno:String,cartype:String,x:Int,y:Int):String={ 20 if(cartype=="A" && circle(x,y)==2) "摩托车A进入2环" 21 else if(cartype=="B" && circle(x,y)==4) "摩托车B进入4环" 22 else if(cartype=="C" && circle(x,y)==6 && !carno.startsWith("京")) "外地牌照不能进入5环" 23 else if(cartype=="C" && carno.startsWith("京")){ 24 val weishu: String = carno.substring(carno.length-1,carno.length) 25 val week: String = DateTools.dateToWeek(actiontime) 26 if((weishu.toInt + week.toInt)%2 ==0) "符合单双号限行规则" 27 else "本地牌照不符合单双号限行规则" 28 }else{ 29 "不违规" 30 } 31 } 32 33 34 35 36 37 //编写scala方法根据传递车牌号码,传递的日期 判断是否符合单双号限行规则 38 /*def isAllow(carno:String,actiontime:String): String ={ 39 if(carno.startsWith("京")){ 40 val weishu: String = carno.substring(carno.length-1,carno.length) 41 println(s"尾数是:${weishu}") 42 val week: String = DateTools.dateToWeek(actiontime) 43 if((weishu.toInt + week.toInt)%2 ==0) "符合单双号限行规则" 44 else "不符合单双号限行规则" 45 }else{ 46 "非本地拍照" 47 } 48 }*/ 49 50 51 52 def main(args: Array[String]): Unit = { 53 //println(distance(66,142)) 54 //println(isAllow("京H17453","2020-05-13 15:22:06")) 55 } 56 }
1 package com.bawei.foryk 2 3 import org.apache.spark.rdd.RDD 4 import org.apache.spark.sql.types.{DataTypes, StructType} 5 import org.apache.spark.sql.{DataFrame, SparkSession} 6 7 8 //case class Car(actiontime:String,carno:String,cartype:String,x:Int,y:Int,) 9 object SparkSqlTraffic02 { 10 11 def main(args: Array[String]): Unit = { 12 val spark: SparkSession = SparkSession 13 .builder() 14 .appName("SparkSqlTraffic01") 15 .master("local") 16 .getOrCreate() 17 18 //读取文件创建RDD 19 val lineRDD: RDD[String] = spark.sparkContext.textFile("./traffic/traffic.txt") 20 21 val tuple7RDD: RDD[(String, String, String, String, String, Long, String)] = lineRDD.map(line => { 22 val strings: Array[String] = line.split(",") 23 (strings(0), strings(1), strings(2), strings(3), strings(4), 24 TrafficUtil.circle(strings(3).toInt, strings(4).toInt), 25 TrafficUtil.isRule(strings(0), strings(1), strings(2), strings(3).toInt, strings(4).toInt) 26 ) 27 }) 28 29 /*var carSchema = StructType( 30 List(DataTypes.createStructField("actiontime", DataTypes.StringType, true), 31 DataTypes.createStructField("carno", DataTypes.StringType, true), 32 DataTypes.createStructField("cartype", DataTypes.StringType, true), 33 DataTypes.createStructField("x", DataTypes.IntegerType, true), 34 DataTypes.createStructField("y", DataTypes.IntegerType, true), 35 DataTypes.createStructField("circle", DataTypes.IntegerType, true), 36 DataTypes.createStructField("info", DataTypes.StringType, true)) 37 )*/ 38 import spark.implicits._ 39 val tupleDF: DataFrame = tuple7RDD.toDF() 40 tupleDF.createOrReplaceTempView("car") 41 42 43 //spark.sql("select * from car").show() 44 //使用sparksql按环线,车辆类型统计出现车辆个数如下 45 //spark.sql("select _6,_3,count(*) from car group by _6,_3 order by _6,_3").show() 46 //使用sparksql按车辆类型,违规类型统计车辆出现次数 47 //spark.sql("select _3,_7,count(*) from car where _7!='不违规' group by _3,_7 order by _3,_7 ").show() 48 //:按时间,环线各个环线内按机动车类型出现车辆个数 49 val resultRDD: DataFrame = spark.sql("select substring(_1,0,10),_6,_3,count(*) from car group by substring(_1,0,10),_6,_3 order by substring(_1,0,10),_6,_3") 50 resultRDD.coalesce(1).rdd.saveAsTextFile("./carresult") 51 52 53 spark.stop() 54 55 } 56 }
1 package com.bawei.foryk 2 3 import java.util.Properties 4 5 import org.apache.spark.rdd.RDD 6 import org.apache.spark.sql.{DataFrame, SparkSession} 7 8 /** 9 * 10 */ 11 case class Student(name:String,sex:String,age:Int) 12 object SparkSqlReview01 { 13 def main(args: Array[String]): Unit = { 14 val spark: SparkSession = SparkSession 15 .builder() 16 .appName("SparkSqlTraffic01") 17 .master("local") 18 .getOrCreate() 19 20 //读取文件创建RDD 21 val lineRDD: RDD[String] = spark.sparkContext.textFile("./traffic/data.txt") 22 23 val studentRDD: RDD[Student] = lineRDD.map(line => { 24 val strings: Array[String] = line.split(",") 25 Student(strings(0), strings(1), strings(2).toInt) 26 }) 27 28 import spark.implicits._ 29 val studentDF: DataFrame = studentRDD.toDF() 30 31 studentDF.createOrReplaceTempView("student") 32 33 val resultDF: DataFrame = spark.sql("select * from student where age <20") 34 35 val prop =new Properties() 36 prop.setProperty("user","root") 37 prop.setProperty("password","") 38 39 resultDF.write.jdbc("jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&characterEncoding=UTF-8","student",prop) 40 41 42 43 spark.stop() 44 45 } 46 47 }
1 package com.bawei.foryk 2 3 import org.apache.kafka.clients.consumer.ConsumerRecord 4 import org.apache.kafka.common.serialization.StringDeserializer 5 import org.apache.spark.streaming.dstream.{DStream, InputDStream} 6 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 7 import org.apache.spark.streaming.kafka010.KafkaUtils 8 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 9 import org.apache.spark.{SparkConf, SparkContext} 10 import org.apache.spark.streaming.{Seconds, StreamingContext} 11 12 /** 13 * 14 */ 15 object SparkStreamReview01 { 16 17 18 19 def main(args: Array[String]): Unit = { 20 var checkpointdir = "./checkdir2" 21 StreamingContext.getOrCreate(checkpointdir,()=>{ 22 createFunc(checkpointdir) 23 }) 24 } 25 26 def createFunc(checkpointdir:String): StreamingContext = { 27 28 val conf: SparkConf = new SparkConf().setAppName("SparkStreamReview01").setMaster("local[2]") 29 val sc = new SparkContext(conf) 30 31 sc.setLogLevel("WARN") 32 val ssc = new StreamingContext(sc,Seconds(5)) 33 ssc.checkpoint(checkpointdir) 34 35 36 val kafkaParams = Map[String, Object]( 37 "bootstrap.servers" -> "192.168.182.147:9092,192.168.182.148:9092,192.168.182.149:9092", 38 "key.deserializer" -> classOf[StringDeserializer], 39 "value.deserializer" -> classOf[StringDeserializer], 40 "group.id" -> "group1" 41 ) 42 //5、定义一个topics ,是一个集合,可以存放多个topic 43 val topics=Set("test") 44 //6、利用KafkaUtils.createDirectStream构建Dstream 45 val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams)) 46 //获取kafka中topic的数据 47 val socketline: DStream[String] = kafkaTopicDS.map(x=>x.value()) 48 49 val mapRDD: DStream[(String, Int)] = socketline.flatMap(_.split(" ")).map((_,1)) 50 51 //mapRDD.reduceByKey(_+_).print() 52 //mapRDD.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(10),Seconds(5)).print() 53 //mapRDD.countByValueAndWindow(Seconds(10),Seconds(5)).print() 54 55 val result: DStream[(String, Int)] = mapRDD.updateStateByKey((list: Seq[Int], option: Option[Int]) => { 56 // 57 var before = option.getOrElse(0) //获取上一次的累加结果 58 for (value <- list) { 59 before += value 60 } 61 Option(before) 62 }) 63 result.print() 64 65 ssc.start() 66 ssc.awaitTermination() 67 ssc 68 } 69 70 }