文章目录
生产SparkStreaming数据零丢失实验
SparkStreaming Kafka 维护offset 官网有三种实现方式
- Checkpoints
- Kafka itself
- Your own data store
这里第一种方式不推荐使用,这里只介绍后面两种方式
Kafka itself
代码分析
object MyKafkaConsumer { def main(args: Array[String]): Unit = { val ssc = StreamingContestUtils.getStreamingContext(this.getClass.getSimpleName) val TOPIC= "spark_test2" val groupId = "spark_test2_group" val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "kafka01:9092,kafka02:9092,kafka03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", // 初次启动从最开始的位置开始消费 "enable.auto.commit" -> (false: java.lang.Boolean) // 自动提交设置为 false ) val topics = Array(TOPIC) val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent,//均匀分发到executor Subscribe[String, String](topics, kafkaParams) ) stream.foreachRDD({ rdd => println(s"-----rdd.partitions.size------------------ ${rdd.partitions.size}") // 获取当前批次的offset数据 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } //TODO ... 业务处理 // 在kafka 自身维护提交 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }) ssc.start() ssc.awaitTermination() } }
初次启动程序:
-----rdd.partitions.size------------------ 1
spark_test2 0 0 44
-----rdd.partitions.size------------------ 1
spark_test2 0 44 44
-----rdd.partitions.size------------------ 1
spark_test2 0 44 44
spark UI 图
可以看出第一个批次是有所有kafka的数据的。
停止程序,重启后
-----rdd.partitions.size------------------ 1
spark_test2 0 44 44
-----rdd.partitions.size------------------ 1
spark_test2 0 44 44
-----rdd.partitions.size------------------ 1
spark_test2 0 44 44
重启后,是没有重头去消费的。
own data store for MySql
mysql 维护offset公共方法
/** * 建表语句 * * create table kafka_offset( topic varchar(30), group_id varchar(30), partition_id int(5), fromOffset bigint(18), untilOffset bigint(18), primary key(topic,group_id,partition_id) ); * */ object MysqlOffsetManager extends OffsetManager { /** * 存储偏移量 * * @param topic * @param groupId * @param partitionId * @param fromoffset * @param untilOffset */ override def storeOffsets(topic: String, groupId: String, partitionId: Int, fromoffset: Long,untilOffset:Long): Unit = { DBs.setupAll() DB.autoCommit(implicit session => { SQL("replace into kafka_offset values(?,?,?,?,?)") .bind(topic,groupId,partitionId,fromoffset,untilOffset) .update() .apply() }) } /** * 获取偏移量 * * @param topic * @param groupId * @return */ override def obtainOffsets(topic: String, groupId: String): Map[TopicPartition, Long] ={ DBs.setupAll() val offsetList = DB.readOnly(implicit session => { SQL("select topic,partition_id,untilOffset from kafka_offset where topic = ? and group_id = ? ") .bind(topic, groupId) .map(x => { (x.string("topic"),x.int("partition_id"), x.long("untilOffset")) }).toList() .apply() }) offsetList.map( x => { new TopicPartition(topic,x._2) -> x._3 }).toMap } }
业务处理代码:
def main(args: Array[String]): Unit = { val ssc = StreamingContestUtils.getStreamingContext(this.getClass.getSimpleName) val TOPIC= "spark_test2" val groupId = "spark_test3_group" val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "kafka01:9092,kafka02:9092,kafka03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array(TOPIC) val fromOffset = MysqlOffsetManager.obtainOffsets(TOPIC,groupId) fromOffset.foreach(println) val stream = if (fromOffset.isEmpty){ println("从头开始消费...") KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent,//均匀分发到executor Subscribe[String, String](topics, kafkaParams) ) }else{ println("从已存在记录开始消费...") KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent,//均匀分发到executor Subscribe[String, String](topics, kafkaParams,fromOffset) ) } var offsetRanges:Array[OffsetRange] = Array.empty stream.transform(rdd => { offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition{iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) //println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } rdd }).map(x => (x.key(),x.value()) ).foreachRDD(rdd => { //遍历不同分区的offset信息,并更新在MySQL中 offsetRanges.foreach(x => { println(s"${x.topic},${groupId},${x.partition},${x.fromOffset},${x.untilOffset}") MysqlOffsetManager.storeOffsets(x.topic,groupId,x.partition,x.fromOffset,x.untilOffset) }) } ) ssc.start() ssc.awaitTermination() }
打印结果:
(spark_test2-0,55)
从已存在记录开始消费…
spark_test2,spark_test3_group,0,55,66
spark_test2,spark_test3_group,0,66,66
spark_test2,spark_test3_group,0,66,66
维护成功
own data store for Redis
override def storeOffsets(topic: String, groupId: String, partition: Int, fromoffset: Long, untilOffset: Long): Unit = { val jedis = RedisUtils.getConnection() val key = topic+"_"+groupId jedis.hset(key,partition.toString,untilOffset.toString) jedis.close() } override def obtainOffsets(topic: String, groupId: String): Map[TopicPartition, Long] ={ val jedis = RedisUtils.getConnection() val key = topic+"_"+groupId val offsets = jedis.hgetAll(key) import scala.collection.JavaConversions._ val fromOffsets = offsets.toMap.map(x => { new TopicPartition(topic, x._1.toInt) -> x._2.toLong }) fromOffsets }
业务代码
val fromOffset = RedisOffsetManager.obtainOffsets(TOPIC,groupId) fromOffset.foreach(println) val stream = if (fromOffset.isEmpty){ println("从头开始消费...") KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent,//均匀分发到executor Subscribe[String, String](topics, kafkaParams) ) }else{ println("从已存在记录开始消费...") KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent,//均匀分发到executor Subscribe[String, String](topics, kafkaParams,fromOffset) ) } var offsetRanges:Array[OffsetRange] = Array.empty stream.transform(rdd => { offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition{iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) //println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } rdd }).map(x => (x.key(),x.value()) ).foreachRDD(rdd => { //遍历不同分区的offset信息,并更新在MySQL中 offsetRanges.foreach(x => { println(s"${x.topic},${groupId},${x.partition},${x.fromOffset},${x.untilOffset}") RedisOffsetManager.storeOffsets(x.topic,groupId,x.partition,x.fromOffset,x.untilOffset) // MysqlOffsetManager.storeOffsets(x.topic,groupId,x.partition,x.fromOffset,x.untilOffset) }) } )
第一次启动程序:
从头开始消费…
spark_test2,spark_test4_group,0,0,66
spark_test2,spark_test4_group,0,66,66
spark_test2,spark_test4_group,0,66,66
第二次启动程序:
从已存在记录开始消费…
spark_test2,spark_test4_group,0,66,66
spark_test2,spark_test4_group,0,66,66
spark_test2,spark_test4_group,0,66,66
spark_test2,spark_test4_group,0,66,66
offset 维护成功。
生产SparkStreaming数据零丢失实验成功。
由于kafka 是非事务性的,有时候,业务数据提交失败了,但是offset维护成功了,也会导致丢数据,
要做到精准一次性消费语义,还需要采用别的方式方法。
比如 将offset 和业务数据捆绑一起提交,做到要么一起成功,要么一起失败。
转载于:https://blog.csdn.net/kzw11/article/details/102978350