Spark Streaming整合Kafka及示例

Spark Streaming整合Kafka及示例

Spark和kafka整合有2中方式 : Receiver 和 Dirct
主要学习Dirct方式
一、Receiver

Spark Streaming整合Kafka及示例

二、Direct

Spark Streaming整合Kafka及示例

三、代码演示

完整pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jiang</groupId>
    <artifactId>spark</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.12.10</scala.version>
        <hadoop.version>3.2.0</hadoop.version>
        <spark.version>3.0.1</spark.version>
    </properties>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.10</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.1</version>
        </dependency>

         <!--https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>

        <!--mysql数据库访问-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.37</version>
        </dependency>

        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.7.8</version>
        </dependency>

    </dependencies>

    <build>
        <!--资源文件夹-->
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>
        <!--声明并引入构建的插件-->
            <!--用于编译Scala代码到class-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <!--    -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

</project>
1、自动提交偏移量

scala代码:

package com.jiang.streaming_kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object Kafka_Demo01 {

  def main(args: Array[String]): Unit = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次
    // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
    // state 存在checkpoint
    ssc.checkpoint("./ckp")

    //TODO 1.加载数据从Kafka

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.89.15:9092",
      "key.deserializer" -> classOf[StringDeserializer],  // key的反序列化规则
      "value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
      "group.id" -> "sparkdemo",  // 消费者组名称
      // earliest:表示有offset记录从offset记录开始消费,没有就从最早消息消费
      // latest:表示有offset记录从offset记录开始消费,没有就从最新的消息开始消费
      // none :表示有offset记录从offset记录开始消费,没有就报错
      "auto.offset.reset" -> "latest",
      "enable.auto.interval.ms" -> "1000",  // 自动提交时间
      "enable.auto.commit" -> (true: java.lang.Boolean) // 是否自动提交
    )

    val topics = Array("spark-demo")  // 要订阅的主题
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) // 消费策略,使用源码中推荐的
    )

    val infoDS: DStream[String] = kafkaDS.map(record => {
      val topic: String = record.topic()
      val partition: Int = record.partition()
      val offset: Long = record.offset()
      val key: String = record.key()
      val value: String = record.value()
      val info: String = s"""topic:${topic},partition:${partition},offset:${offset},key:${key},value:${value}"""
      info
    })

    //TODO 3.输出结果
    infoDS.print()

    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop()
  }

}
2、手动提交偏移量

scala代码:

package com.jiang.streaming_kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object Kafka_Demo02 {

  def main(args: Array[String]): Unit = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次
    // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
    // state 存在checkpoint
    ssc.checkpoint("./ckp")

    //TODO 1.加载数据从Kafka

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.89.15:9092",
      "key.deserializer" -> classOf[StringDeserializer],  // key的反序列化规则
      "value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
      "group.id" -> "sparkdemo",  // 消费者组名称
      // earliest:表示有offset记录从offset记录开始消费,没有就从最早消息消费
      // latest:表示有offset记录从offset记录开始消费,没有就从最新的消息开始消费
      // none :表示有offset记录从offset记录开始消费,没有就报错
      "auto.offset.reset" -> "latest",
      //"enable.auto.interval.ms" -> "1000",  // 自动提交时间
      "enable.auto.commit" -> (false: java.lang.Boolean) // 是否自动提交
    )

    val topics = Array("spark-demo")  // 要订阅的主题
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) // 消费策略,使用源码中推荐的
    )

    //TODO 2.处理消息
    // 注意提交时间:应该是消费完一小批就该提交一次offset,而DStream一小批的体现是RDD
    kafkaDS.foreachRDD(rdd => {
      if(!rdd.isEmpty()){
        // 消费
        rdd.foreach(record =>{
          val topic: String = record.topic()
          val partition: Int = record.partition()
          val offset: Long = record.offset()
          val key: String = record.key()
          val value: String = record.value()
          val info: String = s"""topic:${topic},partition:${partition},offset:${offset},key:${key},value:${value}"""

          println("消费到的消息详细信息:" + info)
        })
        // 提交
        // 获取rdd中的offset相关信息
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        // 提交
        kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        println("当前批次的数据已消费并手动提交")
      }
    })


    //TODO 3.输出结果
//    infoDS.print()

    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop()
  }

}
3.手动提交偏移量到Mysql

scala代码:

package com.jiang.streaming_kafka

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable


object Kafka_Demo03 {

  def main(args: Array[String]): Unit = {

    //TODO 0.准备环境
    val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
    val sc:SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次
    // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
    // state 存在checkpoint
    ssc.checkpoint("./ckp")

    //TODO 1.加载数据从Kafka

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.89.15:9092",
      "key.deserializer" -> classOf[StringDeserializer],  // key的反序列化规则
      "value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
      "group.id" -> "sparkdemo",  // 消费者组名称
      // earliest:表示有offset记录从offset记录开始消费,没有就从最早消息消费
      // latest:表示有offset记录从offset记录开始消费,没有就从最新的消息开始消费
      // none :表示有offset记录从offset记录开始消费,没有就报错
      "auto.offset.reset" -> "latest",
      //"enable.auto.interval.ms" -> "1000",  // 自动提交时间
      "enable.auto.commit" -> (false: java.lang.Boolean) // 是否自动提交
    )

    val topics = Array("spark-demo")  // 要订阅的主题

    val offsetsMap: mutable.Map[TopicPartition,Long] = OffsetUtil.getOffsetMap("sparkdemo","spark-demo")
    val kafkaDS: InputDStream[ConsumerRecord[String, String]] = if(offsetsMap.size > 0){
      println("Mysql中存储了该消费者组 消费该主题的偏移量记录,接下来从记录处开始消费")

      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetsMap) // 消费策略,使用源码中推荐的
      )
    }else{
      println("Mysql中没有存储该消费者组 消费该主题的偏移量记录,接下来从latest开始消费")
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) // 消费策略,使用源码中推荐的
      )
    }

    //TODO 2.处理消息
    // 注意提交时间:应该是消费完一小批就该提交一次offset,而DStream一小批的体现是RDD
    kafkaDS.foreachRDD(rdd => {
      if(!rdd.isEmpty()){
        // 消费
        rdd.foreach(record =>{
          val topic: String = record.topic()
          val partition: Int = record.partition()
          val offset: Long = record.offset()
          val key: String = record.key()
          val value: String = record.value()
          val info: String = s"""topic:${topic},partition:${partition},offset:${offset},key:${key},value:${value}"""
          println("消费到的消息详细信息:" + info)
        })
        // 提交
        // 获取rdd中的offset相关信息
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        // 提交
//        kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        OffsetUtil.saveOffsetRanges("sparkdemo",offsetRanges)
        println("当前批次的数据已消费并手动提交到Mysql")
      }
    })


    //TODO 3.输出结果
//    infoDS.print()

    //TODO 4.启动并等待结果
    ssc.start()
    ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来

    //TODO 5.关闭资源
    ssc.stop(stopSparkContext = true,stopGracefully = true)

  }
  object OffsetUtil{

    val url = "jdbc:mysql://localhost:3306/bigdata_test"
    var username = "root"
    val passwd = "123456"


    // 将偏移量保存到数据库
    def saveOffsetRanges(groupid:String,offsetRange:Array[OffsetRange]) = {
      val conn: Connection = DriverManager.getConnection(url, username, passwd)
      // replace into 表示之前有就替换,没有就插入
      val ps: PreparedStatement = conn.prepareStatement("replace into offset (`topic`,`partition`,`groupid`,`offset`) values (?,?,?,?)")
      for(o <- offsetRange){
        ps.setString(1,o.topic)
        ps.setInt(2,o.partition)
        ps.setString(3,groupid)
        ps.setLong(4,o.untilOffset)
        ps.executeUpdate()
      }
      ps.close()
      conn.close()
    }

    // 从数据库读取偏移量Map(主题分区,offset)
    def getOffsetMap(groupid:String,topic:String):mutable.Map[TopicPartition,Long] ={
      val conn: Connection = DriverManager.getConnection(url, username, passwd)
      val ps: PreparedStatement = conn.prepareStatement("select * from offset where groupid=? and topic=?")
      ps.setString(1,groupid)
      ps.setString(2,topic)
      val rs: ResultSet = ps.executeQuery()
      // Map(主题分区,offset)
      val offsetMap = mutable.Map[TopicPartition,Long]()
      // new TopicPartition(rs.getString("topic"),rs.getInt("partition"),rs.getLong("offset"))
      while(rs.next()){
        offsetMap += new TopicPartition(rs.getString("topic"),rs.getInt("partition")) -> rs.getLong("offset")
      }

      rs.close()
      ps.close()
      conn.close()
      offsetMap
    }
  }
}
上一篇:Flink 中的窗口


下一篇:spark-streaming