asdfghjkl

 1 package com.bawei.foryk
 2 
 3 import com.bawei.util.DateTools
 4 
 5 
 6 object TrafficUtil {
 7 
 8   //根据拍照的经纬度与*的经纬度计算距离判断位于几环
 9   def circle(x:Int,y:Int): Int ={
10     val distance: Long = Math.round( Math.sqrt( Math.pow(x - 39 ,2  ) +  Math.pow(y - 116 ,2  ) ))
11     if(distance>0&&distance<=15) 2
12     else if(distance>15&&distance<=30) 3
13     else if(distance>30&&distance<=40) 4
14     else if(distance>40&&distance<=60) 5
15     else 6
16   }
17 
18   //传递参数为机动车类型,以及提供的距离*距离计算公式按以下伪代码判断得到违反交规类型
19   def isRule(actiontime:String,carno:String,cartype:String,x:Int,y:Int):String={
20     if(cartype=="A" && circle(x,y)==2) "摩托车A进入2环"
21     else if(cartype=="B" && circle(x,y)==4) "摩托车B进入4环"
22     else if(cartype=="C" && circle(x,y)==6 && !carno.startsWith("京")) "外地牌照不能进入5环"
23     else if(cartype=="C" && carno.startsWith("京")){
24       val weishu: String = carno.substring(carno.length-1,carno.length)
25       val week: String = DateTools.dateToWeek(actiontime)
26       if((weishu.toInt + week.toInt)%2 ==0) "符合单双号限行规则"
27       else "本地牌照不符合单双号限行规则"
28     }else{
29       "不违规"
30     }
31   }
32 
33 
34 
35 
36 
37   //编写scala方法根据传递车牌号码,传递的日期 判断是否符合单双号限行规则
38   /*def isAllow(carno:String,actiontime:String): String ={
39     if(carno.startsWith("京")){
40       val weishu: String = carno.substring(carno.length-1,carno.length)
41       println(s"尾数是:${weishu}")
42       val week: String = DateTools.dateToWeek(actiontime)
43       if((weishu.toInt + week.toInt)%2 ==0) "符合单双号限行规则"
44       else "不符合单双号限行规则"
45     }else{
46       "非本地拍照"
47     }
48   }*/
49 
50 
51 
52   def main(args: Array[String]): Unit = {
53     //println(distance(66,142))
54     //println(isAllow("京H17453","2020-05-13 15:22:06"))
55   }
56 }
 1 package com.bawei.foryk
 2 
 3 import org.apache.spark.rdd.RDD
 4 import org.apache.spark.sql.types.{DataTypes, StructType}
 5 import org.apache.spark.sql.{DataFrame, SparkSession}
 6 
 7 
 8 //case class Car(actiontime:String,carno:String,cartype:String,x:Int,y:Int,)
 9 object SparkSqlTraffic02 {
10 
11   def main(args: Array[String]): Unit = {
12     val spark: SparkSession = SparkSession
13       .builder()
14       .appName("SparkSqlTraffic01")
15       .master("local")
16       .getOrCreate()
17 
18     //读取文件创建RDD
19     val lineRDD: RDD[String] = spark.sparkContext.textFile("./traffic/traffic.txt")
20 
21     val tuple7RDD: RDD[(String, String, String, String, String, Long, String)] = lineRDD.map(line => {
22       val strings: Array[String] = line.split(",")
23       (strings(0), strings(1), strings(2), strings(3), strings(4),
24         TrafficUtil.circle(strings(3).toInt, strings(4).toInt),
25         TrafficUtil.isRule(strings(0), strings(1), strings(2), strings(3).toInt, strings(4).toInt)
26       )
27     })
28 
29     /*var carSchema = StructType(
30       List(DataTypes.createStructField("actiontime", DataTypes.StringType, true),
31         DataTypes.createStructField("carno", DataTypes.StringType, true),
32         DataTypes.createStructField("cartype", DataTypes.StringType, true),
33         DataTypes.createStructField("x", DataTypes.IntegerType, true),
34         DataTypes.createStructField("y", DataTypes.IntegerType, true),
35         DataTypes.createStructField("circle", DataTypes.IntegerType, true),
36         DataTypes.createStructField("info", DataTypes.StringType, true))
37     )*/
38     import spark.implicits._
39     val tupleDF: DataFrame = tuple7RDD.toDF()
40     tupleDF.createOrReplaceTempView("car")
41 
42 
43     //spark.sql("select * from car").show()
44     //使用sparksql按环线,车辆类型统计出现车辆个数如下
45     //spark.sql("select _6,_3,count(*) from car group by _6,_3 order by _6,_3").show()
46     //使用sparksql按车辆类型,违规类型统计车辆出现次数
47     //spark.sql("select _3,_7,count(*) from car where _7!='不违规' group by _3,_7 order by _3,_7 ").show()
48     //:按时间,环线各个环线内按机动车类型出现车辆个数
49     val resultRDD: DataFrame = spark.sql("select substring(_1,0,10),_6,_3,count(*) from car group by substring(_1,0,10),_6,_3 order by substring(_1,0,10),_6,_3")
50     resultRDD.coalesce(1).rdd.saveAsTextFile("./carresult")
51 
52 
53     spark.stop()
54 
55   }
56 }
 1 package com.bawei.foryk
 2 
 3 import java.util.Properties
 4 
 5 import org.apache.spark.rdd.RDD
 6 import org.apache.spark.sql.{DataFrame, SparkSession}
 7 
 8 /**
 9   * 
10   */
11 case class Student(name:String,sex:String,age:Int)
12 object SparkSqlReview01 {
13   def main(args: Array[String]): Unit = {
14     val spark: SparkSession = SparkSession
15       .builder()
16       .appName("SparkSqlTraffic01")
17       .master("local")
18       .getOrCreate()
19 
20     //读取文件创建RDD
21     val lineRDD: RDD[String] = spark.sparkContext.textFile("./traffic/data.txt")
22 
23     val studentRDD: RDD[Student] = lineRDD.map(line => {
24       val strings: Array[String] = line.split(",")
25       Student(strings(0), strings(1), strings(2).toInt)
26     })
27 
28     import spark.implicits._
29     val studentDF: DataFrame = studentRDD.toDF()
30 
31     studentDF.createOrReplaceTempView("student")
32 
33     val resultDF: DataFrame = spark.sql("select * from student where age <20")
34 
35     val prop =new Properties()
36     prop.setProperty("user","root")
37     prop.setProperty("password","")
38 
39     resultDF.write.jdbc("jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&characterEncoding=UTF-8","student",prop)
40 
41 
42 
43     spark.stop()
44 
45   }
46 
47 }
 1 package com.bawei.foryk
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerRecord
 4 import org.apache.kafka.common.serialization.StringDeserializer
 5 import org.apache.spark.streaming.dstream.{DStream, InputDStream}
 6 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 7 import org.apache.spark.streaming.kafka010.KafkaUtils
 8 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
 9 import org.apache.spark.{SparkConf, SparkContext}
10 import org.apache.spark.streaming.{Seconds, StreamingContext}
11 
12 /**
13   * 
14   */
15 object SparkStreamReview01 {
16 
17 
18 
19   def main(args: Array[String]): Unit = {
20     var checkpointdir = "./checkdir2"
21     StreamingContext.getOrCreate(checkpointdir,()=>{
22       createFunc(checkpointdir)
23     })
24   }
25 
26   def createFunc(checkpointdir:String): StreamingContext = {
27 
28     val conf: SparkConf = new SparkConf().setAppName("SparkStreamReview01").setMaster("local[2]")
29     val sc = new SparkContext(conf)
30 
31     sc.setLogLevel("WARN")
32     val ssc = new StreamingContext(sc,Seconds(5))
33     ssc.checkpoint(checkpointdir)
34 
35 
36     val kafkaParams = Map[String, Object](
37       "bootstrap.servers" -> "192.168.182.147:9092,192.168.182.148:9092,192.168.182.149:9092",
38       "key.deserializer" -> classOf[StringDeserializer],
39       "value.deserializer" -> classOf[StringDeserializer],
40       "group.id" -> "group1"
41     )
42     //5、定义一个topics ,是一个集合,可以存放多个topic
43     val topics=Set("test")
44     //6、利用KafkaUtils.createDirectStream构建Dstream
45     val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))
46     //获取kafka中topic的数据
47     val socketline: DStream[String] = kafkaTopicDS.map(x=>x.value())
48 
49     val mapRDD: DStream[(String, Int)] = socketline.flatMap(_.split(" ")).map((_,1))
50 
51     //mapRDD.reduceByKey(_+_).print()
52     //mapRDD.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(10),Seconds(5)).print()
53     //mapRDD.countByValueAndWindow(Seconds(10),Seconds(5)).print()
54 
55     val result: DStream[(String, Int)] = mapRDD.updateStateByKey((list: Seq[Int], option: Option[Int]) => {
56       //
57       var before = option.getOrElse(0) //获取上一次的累加结果
58       for (value <- list) {
59         before += value
60       }
61       Option(before)
62     })
63     result.print()
64 
65     ssc.start()
66     ssc.awaitTermination()
67     ssc
68   }
69 
70 }

 

上一篇:python常用模块——argparse


下一篇:20、集合(List接口补充)