官网详解地址
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
手动提交offset,以保证数据不会丢失,尤其是在网络抖动严重的情况下,但是如果kafka挂掉重启后,可能会造成一些其他问题,
例如找不到保存的offset,这个具体问题再具体分析,先上代码。
import java.sql.{DriverManager, ResultSet}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{OffsetRange, _}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
*
- 使用Spark-Kafka-0-10版本整合,并手动提交偏移量,维护到MySQL中
*/
object SparkKafkaTest2 {
def main(args: Array[String]): Unit = {
//1.创建StreamingContext
val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))
//准备连接Kafka的参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "server1:9092,server2:9092,server3:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SparkKafkaTest",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
val topics = Array("spark_kafka_test").toSet
val recordDStream: DStream[ConsumerRecord[String, String]] = if (offsetMap.size > 0) { //有记录offset
println("MySQL中记录了offset,则从该offset处开始消费")
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent, //位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应
Subscribe[String, String](topics, kafkaParams, offsetMap)) //消费策略,源码强烈推荐使用该策略
} else { //没有记录offset
println("没有记录offset,则直接连接,从latest开始消费")
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent, //位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应
Subscribe[String, String](topics, kafkaParams)) //消费策略,源码强烈推荐使用该策略
}
recordDStream.foreachRDD {
messages =>
if (messages.count() > 0) { //当前这一时间批次有数据
messages.foreachPartition { messageIter =>
messageIter.foreach { message =>
//println(message.toString())
}
}
val offsetRanges: Array[OffsetRange] = messages.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset}")
}
//手动提交offset,默认提交到Checkpoint中
//recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
//实际中偏移量可以提交到MySQL/Redis中
saveOffsetRanges("SparkKafkaTest", offsetRanges)
}
}
ssc.start()
ssc.awaitTermination()
}
/**
- 从数据库读取偏移量
*/
def getOffsetMap(groupid: String, topic: String) = {
Class.forName("com.mysql.jdbc.Driver")
val connection = DriverManager.getConnection("jdbc:mysql://172.31.98.108:3306/bj_pfdh?characterEncoding=UTF-8", "root", "iflytek@web")
val sqlselect = connection.prepareStatement("""
select * from kafka_offset
where groupid=? and topic =?
""")
sqlselect.setString(1, groupid)
sqlselect.setString(2, topic)
val rs: ResultSet = sqlselect.executeQuery()
val offsetMap = mutable.Map[TopicPartition, Long]()
while (rs.next()) {
offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset")
}
rs.close()
sqlselect.close()
connection.close()
offsetMap
}
/**
- 将偏移量保存到数据库
*/
def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {
val connection = DriverManager.getConnection("jdbc:mysql://172.31.98.108:3306/bj_pfdh?characterEncoding=UTF-8", "root", "iflytek@web")
//replace into表示之前有就替换,没有就插入
val select_ps = connection.prepareStatement("""
select count(*) as count from kafka_offset
where `groupid`=? and `topic`=? and `partition`=?
""")
val update_ps = connection.prepareStatement("""
update kafka_offset set `offset`=?
where `groupid`=? and `topic`=? and `partition`=?
""")
val insert_ps = connection.prepareStatement("""
INSERT INTO kafka_offset(`groupid`, `topic`, `partition`, `offset`)
VALUE(?,?,?,?)
""")
for (o <- offsetRange) {
select_ps.setString(1, groupid)
select_ps.setString(2, o.topic)
select_ps.setInt(3, o.partition)
val select_resut = select_ps.executeQuery()
// println(select_resut.)// .getInt("count"))
while (select_resut.next()) {
println(select_resut.getInt("count"))
if (select_resut.getInt("count") > 0) {
//update
update_ps.setLong(1, o.untilOffset)
update_ps.setString(2, groupid)
update_ps.setString(3, o.topic)
update_ps.setInt(4, o.partition)
update_ps.executeUpdate()
} else {
//insert
insert_ps.setString(1, groupid)
insert_ps.setString(2, o.topic)
insert_ps.setInt(3, o.partition)
insert_ps.setLong(4, o.untilOffset)
insert_ps.executeUpdate()
}
}
}
select_ps.close()
update_ps.close()
insert_ps.close()
connection.close()
}
如果报错连不上数据库或连接数据库地址失败,请查看是否添加了mysql客户端jar包。
--------五维空间s