import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
//二次运行会从头读,因为只有获取偏移量没有提交偏移量
object DemoOffset01 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g00003",
"auto.offset.reset" -> "earliest" ,
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics: Array[String] =Array("test02")
val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
kafkaDstream.foreachRDD(rdd=>{
//kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量
val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (range <- offsetRange) {
println(s"topic:{range.topic},partition:{range.partition},fromoffset:{range.fromoffset},utiloffset:{range.utiloffset}")
}
val res: RDD[String] = rdd.map(_.value())
res.foreach(println)
})
ssc.start()
ssc.awaitTermination()
//对sparkstreaming编程就是对RDD进行编程
}
}
升级版2.0:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
//二次运行会从头读,因为只有获取偏移量没有提交偏移量
object DemoOffset01 {
def main(args: Array[String]): Unit = {
//val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g00003",
"auto.offset.reset" -> "earliest" ,
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics: Array[String] =Array("helloword")
val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//forearchRDD传入的函数在driver端被不停地周期地运行
kafkaDstream.foreachRDD(rdd=>{
if(!rdd.isEmpty()){
//kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量
val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (range <- offsetRange) {
println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}")
}
//对数据进行处理
val res: RDD[String] = rdd.map(_.value())
res.foreach(println)
//将偏移量提交到kafka特殊的topic__consumer_offsets中
val offsetres: Unit = kafkaDstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRange)
}
})
ssc.start()
ssc.awaitTermination()
//对sparkstreaming编程就是对RDD进行编程
}
}
补充:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
object CommitOffsetDemo02 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g00004",
"auto.offset.reset" -> "earliest" ,
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics: Array[String] =Array("helloword")
val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//对 kafkaDstream进行transformation得到的是MapPARTITOONRDD,所以要想获取偏移量只能从第一手DirectKafkaInputDStream里获取
val lines: DStream[String] = kafkaDstream.map(_.value())
lines.foreachRDD(rdd=>{
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (range <- offsetRanges) {
println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}")
}
rdd.foreach(x=>println(x))
})
}
}
补充:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
object CommitOffsetDemo02 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("AggregateOperator").setMaster("local").set("spark.testing.memory", "512000000")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g00004",
"auto.offset.reset" -> "earliest" ,
"enable.auto.commit" -> (false: java.lang.Boolean)//在executor端提交
)
val topics: Array[String] =Array("helloword")
val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//DStream调用transformation会生成一个新的DStream,但是调用ation不会生成一个新的DStream
//对 kafkaDstream进行transformation得到的是MapPARTITOONRDD,所以要想获取偏移量只能从第一手DirectKafkaInputDStream里获取
//kafkaclient(driver端)负责从topic中获取偏移量(决定了一个批次的客户端读多少数据),生成的tasks将会被序列化到executor里面的线程池,所以在executor中才读取kafka cluster中的数据,提交偏移量到__consumer_offsets这个topic中
val lines: DStream[String] = kafkaDstream.map(_.value())
lines.foreachRDD(rdd=>{
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (range <- offsetRanges) {
println(s"topic:${range.topic},partition:${range.partition},fromoffset:${range.fromOffset},utiloffset:${range.untilOffset}")
}
rdd.foreach(x=>println(x))
})
}
}
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
//从kafka中读取数据,完成聚合类操作,将偏移量和计算好的聚合类结果同时写入到mysql中,mysql支持事务,保证计算好的聚合结果和偏移量同时写入成功
object CommitOffsetDemo03 {
def main(args: Array[String]): Unit = {
val AppName: String =args(0)
val groupid: String =args(1)
//val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val conf: SparkConf = new SparkConf().setAppName(AppName).setMaster("local[*]").set("spark.testing.memory", "512000000")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" -> "earliest" ,
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics: Array[String] =Array("helloword")
val kafkaDstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//forearchRDD传入的函数在driver端被不停地周期地运行
kafkaDstream.foreachRDD(rdd=>{
if(!rdd.isEmpty()){
//kafkaRDD实现了HasOffsetRanges的特质,只有kafkardd中有偏移量
val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//对数据进行处理,调用RDD的transaction和action是在driver调用的,里面的恶函数是在executor调用的
val res: RDD[(String, Int)] = rdd.map(_.value()).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//将计算好的结果收集到driver端
val result: Array[(String, Int)] = res.collect()
var connection:Connection = null
var pstm1:PreparedStatement = null
var pstm2:PreparedStatement= null
try {
//创建一个JDBC链接,导入jdbc的依赖
val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mq01?characterEncoding=utf_8", "root", "123456")
//开启事务
connection.setAutoCommit(false)
//分别将计算好的偏移量(driver)和计算好的结果(收集到driver端 )写入到mysql端,两者都在一个进程里,用一个链接开启一个事务把他俩全写进去
//创建preparestatement
val pstm1: PreparedStatement = connection .prepareStatement("INSERT INTO t_wordcount(word,count) VALUES (?,?) ON DUPLICATE KEY UPDATE COUNT=COUNT +?;")
for (tp <- result) {
pstm1.setString(1,tp._1)
pstm1.setInt(2,tp._2)
pstm1.setInt(3,tp._2)
//没确认的数据就是脏数据
pstm1.executeUpdate()
}
val pstm2: PreparedStatement =connection.prepareStatement("INSERT INTO t_kafuka_offset VALUES(?,?,?) ON DUPLICATE KEY UPDATE OFFSET=?;")
for (range <- offsetRange) {
val topic: String = range.topic
val partition: Int = range.partition
//无需获取fromoffset
val offset: Long = range.untilOffset
pstm2.setString(1,AppName+"_"+groupid)
pstm2.setString(2,topic+"_"+partition)
pstm2.setLong(3,offset)
pstm2.setLong(4,offset)
pstm2.executeUpdate()
}
//提交事务
connection.commit()
} catch {
case e:Exception => {
connection.rollback()
throw e
//提交事务失败就要回滚事务
//停止程序
ssc.stop(true)
}
}
finally {
if(pstm1!=null){
pstm1.close()
}
if(pstm2!=null){
pstm2.close()
}
if(connection!=null){
connection.close()
}
}
}
})
ssc.start()
ssc.awaitTermination()
//对sparkstreaming编程就是对RDD进行编程
}
}