1.简介
本文基于redis和mysql分别实现手动维护消费kafka的offset。
2.代码实现
2.1基于redis
import java.util
import com.bigdata.analysis.travel.RedisUtil
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.Jedis
import scala.collection.mutable
/**
* @ description: 手动维护offset到redis
* @ author: spencer
* @ date: 2021/1/8 14:10
*/
object RedisOffsetManager extends OffsetManager {
/**
* 获取redis中的偏移量
* @param topics
* @param groupId
* @return
*/
override def getOffset(topics: Array[String], groupId: String): mutable.Map[TopicPartition, Long] = {
val jedis: Jedis = RedisUtil.getJedis()
val offsetMap = mutable.Map[TopicPartition, Long]()
for (topic <- topics) {
val map: util.Map[String, String] = jedis.hgetAll(topic)
import scala.collection.JavaConverters._
for ((groupidAnd, offset) <- map.asScala){
val group: String = groupidAnd.split("\\|")(0)
val partition: Int = groupidAnd.split("\\|")(1).toInt
if (group == groupId){
offsetMap += new TopicPartition(topic, partition) -> offset.toLong
}
}
}
offsetMap
}
/**
* 更新偏移量到redis
* @param groupId
* @param offsetRanges
*/
override def saveOffset(groupId: String, offsetRanges: Array[OffsetRange]) = {
val jedis: Jedis = RedisUtil.getJedis()
for (offset <- offsetRanges) {
val topic: String = offset.topic
val partition: Int = offset.partition
// val fromOffset: Long = offset.fromOffset
val untilOffset: Long = offset.untilOffset
val partitionId: String = groupId + "|" + partition
jedis.hset(topic, partitionId, untilOffset.toString)
}
}
}
函数主类
import java.lang
import com.bigdata.analysis.travel.offset.RedisOffsetManager
import com.bigdata.conf.ConfigurationManager
import com.bigdata.constant.MyConstant
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* @ description: 获取kafka中的的数据,并手动维护偏移量到redis
* @ author: spencer
* @ date: 2021/1/8 15:52
*/
object TravelRealtimeAnalysis {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf: SparkConf = new SparkConf()
.setAppName("TravelRealtimeAnalysis")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
// 设置checkpoint到HDFS
ssc.checkpoint("hdfs://flink101:9000/travel/checkpoint")
// 配置kafka相关参数
val brokers: String = ConfigurationManager.config.getString(MyConstant.KAFKA_BROKERS)
val topics: Array[String] = Array("travel_ods_orders")
val kafkaParams = Map(
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "travel_consumer_id",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: lang.Boolean) // 不自動提交偏移量,手动维护偏移量
)
// TODO:实现步骤
/**
* 1.获取redis/mysql中保存的偏移量
* 2.根据偏移量,获取kafka中的数据源
* 3.在获取的第一手数据InputDStream中更新偏移量
* 4.执行业务逻辑
* 5.启动
*
*/
val offsetRanges: mutable.Map[TopicPartition, Long] = RedisOffsetManager.getOffset(topics, "travel_consumer_id")
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = if (offsetRanges.isEmpty) {
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topics, kafkaParams)
)
} else {
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsetRanges)
)
}
// 必须在在第一手的InputDStream,否则当中的RDD就不是KafkaRDD,
// 如果在其它操作之后才写入偏移量则会抛出异常:spark.rdd.MapPartitionsRDD cannot be cast to streaming.kafka010.HasOffsetRange
kafkaDStream.foreachRDD(rdd => {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(_ => {
RedisOffsetManager.saveOffset("travel_consumer_id", offsetRanges)
})
})
ssc.start()
ssc.awaitTermination()
}
}
2.2 基于mysql
object MysqlOffsetManager extends OffsetManager{
/**
* 保存偏移量到本地
*
* @param groupId
* @param offsetRanges
*/
def saveOffset(groupId: String, offsetRanges: Array[OffsetRange]) = {
val helper: JDBCHelper = JDBCHelper.getInstance()
val conn: Connection = helper.getConnection
val sql =
"""
|replace into kafka_offset(group_id, topic, partition_id, fromOffset, untilOffset) values(?, ?, ?, ?, ?)
""".stripMargin
val pstmt: PreparedStatement = conn.prepareStatement(sql)
for (offset <- offsetRanges) {
pstmt.setString(1, groupId)
pstmt.setString(2, offset.topic)
pstmt.setInt(3, offset.partition)
pstmt.setLong(4, offset.fromOffset)
pstmt.setLong(5, offset.untilOffset)
pstmt.executeUpdate()
}
pstmt.close()
conn.close()
}
/**
* 获取本地偏移量
*
* @param groupId
* @param topics
* @return
*/
def getOffset(topics: Array[String], groupId: String) = {
val sql = "select * from kafka_offset where group_id = ? and topic = ?"
val helper: JDBCHelper = JDBCHelper.getInstance()
val conn: Connection = helper.getConnection
val pstmt: PreparedStatement = conn.prepareStatement(sql)
var resultSet: ResultSet = null
val offsetMap = mutable.Map[TopicPartition, Long]()
for (topic <- topics) {
pstmt.setString(1, groupId)
pstmt.setString(2, topic)
resultSet = pstmt.executeQuery()
while (resultSet.next()) {
offsetMap += new TopicPartition(resultSet.getString("topic"), resultSet.getInt("partition_id")) -> resultSet.getString("untilOffset").toLong
}
}
resultSet.close()
pstmt.close()
conn.close()
offsetMap
}
}