Spark Streaming 消费kafka数据,并手动维护offset

1.简介

本文基于redis和mysql分别实现手动维护消费kafka的offset。

2.代码实现

2.1基于redis

import java.util

import com.bigdata.analysis.travel.RedisUtil
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.Jedis

import scala.collection.mutable

/**
  * @ description: 手动维护offset到redis
  * @ author: spencer
  * @ date: 2021/1/8 14:10
  */
object RedisOffsetManager extends OffsetManager {
  /**
    * 获取redis中的偏移量
    * @param topics
    * @param groupId
    * @return
    */
  override def getOffset(topics: Array[String], groupId: String): mutable.Map[TopicPartition, Long] = {
    val jedis: Jedis = RedisUtil.getJedis()
    val offsetMap = mutable.Map[TopicPartition, Long]()
    for (topic <- topics) {
      val map: util.Map[String, String] = jedis.hgetAll(topic)

      import scala.collection.JavaConverters._
      for ((groupidAnd, offset) <- map.asScala){
        val group: String = groupidAnd.split("\\|")(0)
        val partition: Int = groupidAnd.split("\\|")(1).toInt
        if (group == groupId){
          offsetMap += new TopicPartition(topic, partition) -> offset.toLong
        }
      }

    }
    offsetMap
  }

  /**
    * 更新偏移量到redis
    * @param groupId
    * @param offsetRanges
    */
  override def saveOffset(groupId: String, offsetRanges: Array[OffsetRange]) = {
    val jedis: Jedis = RedisUtil.getJedis()
    for (offset <- offsetRanges) {
      val topic: String = offset.topic
      val partition: Int = offset.partition
//      val fromOffset: Long = offset.fromOffset
      val untilOffset: Long = offset.untilOffset

      val partitionId: String = groupId + "|" + partition
      jedis.hset(topic, partitionId, untilOffset.toString)
    }
  }
}

函数主类

import java.lang

import com.bigdata.analysis.travel.offset.RedisOffsetManager
import com.bigdata.conf.ConfigurationManager
import com.bigdata.constant.MyConstant
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
  * @ description: 获取kafka中的的数据,并手动维护偏移量到redis
  * @ author: spencer
  * @ date: 2021/1/8 15:52
  */
object TravelRealtimeAnalysis {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("TravelRealtimeAnalysis")
      .setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(10))

    // 设置checkpoint到HDFS
    ssc.checkpoint("hdfs://flink101:9000/travel/checkpoint")

    // 配置kafka相关参数
    val brokers: String = ConfigurationManager.config.getString(MyConstant.KAFKA_BROKERS)
    val topics: Array[String] = Array("travel_ods_orders")
    val kafkaParams = Map(
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "travel_consumer_id",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean) // 不自動提交偏移量,手动维护偏移量
    )

    // TODO:实现步骤
    /**
      * 1.获取redis/mysql中保存的偏移量
      * 2.根据偏移量,获取kafka中的数据源
      * 3.在获取的第一手数据InputDStream中更新偏移量
      * 4.执行业务逻辑
      * 5.启动
      *
      */
    val offsetRanges: mutable.Map[TopicPartition, Long] = RedisOffsetManager.getOffset(topics, "travel_consumer_id")
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = if (offsetRanges.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe(topics, kafkaParams)
      )
    } else {
      KafkaUtils.createDirectStream(
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsetRanges)
      )
    }

    // 必须在在第一手的InputDStream,否则当中的RDD就不是KafkaRDD,
    // 如果在其它操作之后才写入偏移量则会抛出异常:spark.rdd.MapPartitionsRDD cannot be cast to streaming.kafka010.HasOffsetRange
    kafkaDStream.foreachRDD(rdd => {
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition(_ => {
        RedisOffsetManager.saveOffset("travel_consumer_id", offsetRanges)
      })
    })

    ssc.start()
    ssc.awaitTermination()
  }

}

2.2 基于mysql


object MysqlOffsetManager extends OffsetManager{
  /**
    * 保存偏移量到本地
    *
    * @param groupId
    * @param offsetRanges
    */
  def saveOffset(groupId: String, offsetRanges: Array[OffsetRange]) = {
    val helper: JDBCHelper = JDBCHelper.getInstance()
    val conn: Connection = helper.getConnection
    val sql =
      """
        |replace into kafka_offset(group_id, topic, partition_id, fromOffset, untilOffset) values(?, ?, ?, ?, ?)
      """.stripMargin
    val pstmt: PreparedStatement = conn.prepareStatement(sql)

    for (offset <- offsetRanges) {
      pstmt.setString(1, groupId)
      pstmt.setString(2, offset.topic)
      pstmt.setInt(3, offset.partition)
      pstmt.setLong(4, offset.fromOffset)
      pstmt.setLong(5, offset.untilOffset)
      pstmt.executeUpdate()
    }

    pstmt.close()
    conn.close()

  }

  /**
    * 获取本地偏移量
    *
    * @param groupId
    * @param topics
    * @return
    */
  def getOffset(topics: Array[String], groupId: String) = {
    val sql = "select * from kafka_offset where group_id = ? and topic = ?"
    val helper: JDBCHelper = JDBCHelper.getInstance()
    val conn: Connection = helper.getConnection
    val pstmt: PreparedStatement = conn.prepareStatement(sql)
    var resultSet: ResultSet = null

    val offsetMap = mutable.Map[TopicPartition, Long]()
    for (topic <- topics) {
      pstmt.setString(1, groupId)
      pstmt.setString(2, topic)
      resultSet = pstmt.executeQuery()

      while (resultSet.next()) {
        offsetMap += new TopicPartition(resultSet.getString("topic"), resultSet.getInt("partition_id")) -> resultSet.getString("untilOffset").toLong
      }
    }

    resultSet.close()
    pstmt.close()
    conn.close()

    offsetMap
  }

}

上一篇:SparkStreaming Kafka 维护offset


下一篇:springBoot 集成第三方融云设置群组