spark-streaming

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
//二次运行会从头读,因为只有获取偏移量没有提交偏移量
object DemoOffset01 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g00003",
      "auto.offset.reset" -> "earliest" ,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics: Array[String] =Array("test02")
    val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    kafkaDstream.foreachRDD(rdd=>{
      //kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量
      val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (range <- offsetRange) {
         println(s"topic:{range.topic},partition:{range.partition},fromoffset:{range.fromoffset},utiloffset:{range.utiloffset}")
      }
      val res: RDD[String] = rdd.map(_.value())
      res.foreach(println)

    })
    ssc.start()
    ssc.awaitTermination()

    //对sparkstreaming编程就是对RDD进行编程
  }

}

升级版2.0:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
//二次运行会从头读,因为只有获取偏移量没有提交偏移量
object DemoOffset01 {
  def main(args: Array[String]): Unit = {
    //val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g00003",
      "auto.offset.reset" -> "earliest" ,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics: Array[String] =Array("helloword")
    val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    //forearchRDD传入的函数在driver端被不停地周期地运行
    kafkaDstream.foreachRDD(rdd=>{
      if(!rdd.isEmpty()){
        //kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量
        val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (range <- offsetRange) {
          println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}")
        }
        //对数据进行处理
        val res: RDD[String] = rdd.map(_.value())
        res.foreach(println)
        //将偏移量提交到kafka特殊的topic__consumer_offsets中
        val offsetres: Unit = kafkaDstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRange)


      }

    })
    ssc.start()
    ssc.awaitTermination()

    //对sparkstreaming编程就是对RDD进行编程
  }

}

补充:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}

object CommitOffsetDemo02 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g00004",
      "auto.offset.reset" -> "earliest" ,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics: Array[String] =Array("helloword")
    val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    //对 kafkaDstream进行transformation得到的是MapPARTITOONRDD,所以要想获取偏移量只能从第一手DirectKafkaInputDStream里获取
    val lines: DStream[String] = kafkaDstream.map(_.value())
    lines.foreachRDD(rdd=>{
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (range <- offsetRanges) {
        println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}")
      }
      rdd.foreach(x=>println(x))
    })
  }

}

补充:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}

object CommitOffsetDemo02 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g00004",
      "auto.offset.reset" -> "earliest" ,
      "enable.auto.commit" -> (false: java.lang.Boolean)//在executor端提交
    )
    val topics: Array[String] =Array("helloword")
    val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    //DStream调用transformation会生成一个新的DStream,但是调用ation不会生成一个新的DStream
    //对 kafkaDstream进行transformation得到的是MapPARTITOONRDD,所以要想获取偏移量只能从第一手DirectKafkaInputDStream里获取
    //kafkaclient(driver端)负责从topic中获取偏移量(决定了一个批次的客户端读多少数据),生成的tasks将会被序列化到executor里面的线程池,所以在executor中才读取kafka cluster中的数据,提交偏移量到__consumer_offsets这个topic中
    val lines: DStream[String] = kafkaDstream.map(_.value())
    lines.foreachRDD(rdd=>{
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (range <- offsetRanges) {
        println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}")
      }
      rdd.foreach(x=>println(x))
    })
  }

}
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}

//从kafka中读取数据,完成聚合类操作,将偏移量和计算好的聚合类结果同时写入到mysql中,mysql支持事务,保证计算好的聚合结果和偏移量同时写入成功
object CommitOffsetDemo03 {
  def main(args: Array[String]): Unit = {
    val AppName: String =args(0)
    val groupid: String =args(1)
    //val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
    val conf: SparkConf = new SparkConf().setAppName(AppName).setMaster("local[*]").set("spark.testing.memory", "512000000")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest" ,
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics: Array[String] =Array("helloword")
    val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
    //forearchRDD传入的函数在driver端被不停地周期地运行
    kafkaDstream.foreachRDD(rdd=>{
      if(!rdd.isEmpty()){
        //kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量
        val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        //对数据进行处理,调用RDD的transaction和action是在driver调用的,里面的恶函数是在executor调用的
        val res: RDD[(String, Int)] = rdd.map(_.value()).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
        //将计算好的结果收集到driver端
        val result: Array[(String, Int)] = res.collect()
        var connection:Connection = null
        var pstm1:PreparedStatement  = null
        var pstm2:PreparedStatement= null
          try {
            //创建一个JDBC链接,导入jdbc的依赖
            val  connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mq01?characterEncoding=utf_8", "root", "123456")
            //开启事务
            connection.setAutoCommit(false)
            //分别将计算好的偏移量(driver)和计算好的结果(收集到driver端 )写入到mysql端,两者都在一个进程里,用一个链接开启一个事务把他俩全写进去
            //创建preparestatement
            val  pstm1: PreparedStatement = connection .prepareStatement("INSERT INTO t_wordcount(word,count) VALUES (?,?) ON DUPLICATE KEY UPDATE COUNT=COUNT +?;")
            for (tp <- result) {
              pstm1.setString(1,tp._1)
              pstm1.setInt(2,tp._2)
              pstm1.setInt(3,tp._2)
              //没确认的数据就是脏数据
              pstm1.executeUpdate()
            }
            val pstm2: PreparedStatement =connection.prepareStatement("INSERT INTO t_kafuka_offset VALUES(?,?,?)  ON DUPLICATE KEY UPDATE OFFSET=?;")
              for (range <- offsetRange) {
              val topic: String = range.topic
              val partition: Int = range.partition
              //无需获取fromoffset
              val offset: Long = range.untilOffset
                pstm2.setString(1,AppName+"_"+groupid)
                pstm2.setString(2,topic+"_"+partition)
                pstm2.setLong(3,offset)
                pstm2.setLong(4,offset)
                pstm2.executeUpdate()
            }
            //提交事务
            connection.commit()
          } catch {
            case e:Exception => {
              connection.rollback()
              throw  e
              //提交事务失败就要回滚事务
              //停止程序
              ssc.stop(true)
            }
          }
          finally {
            if(pstm1!=null){
              pstm1.close()
            }
            if(pstm2!=null){
              pstm2.close()
            }
            if(connection!=null){
              connection.close()
            }

          }
      }
    })
    ssc.start()
    ssc.awaitTermination()
    //对sparkstreaming编程就是对RDD进行编程

  }

}
上一篇:Spark Streaming整合Kafka及示例


下一篇:java数组初始化赋值,聪明人已经收藏了!