Spark Streaming整合Kafka及示例
Spark和kafka整合有2中方式 : Receiver 和 Dirct
主要学习Dirct方式
一、Receiver
二、Direct
三、代码演示
完整pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jiang</groupId>
<artifactId>spark</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.12.10</scala.version>
<hadoop.version>3.2.0</hadoop.version>
<spark.version>3.0.1</spark.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<!--mysql数据库访问-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>portable-1.7.8</version>
</dependency>
</dependencies>
<build>
<!--资源文件夹-->
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<!--声明并引入构建的插件-->
<!--用于编译Scala代码到class-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<!-- -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1、自动提交偏移量
scala代码:
package com.jiang.streaming_kafka
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Kafka_Demo01 {
def main(args: Array[String]): Unit = {
//TODO 0.准备环境
val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc:SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次
// The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
// state 存在checkpoint
ssc.checkpoint("./ckp")
//TODO 1.加载数据从Kafka
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.89.15:9092",
"key.deserializer" -> classOf[StringDeserializer], // key的反序列化规则
"value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
"group.id" -> "sparkdemo", // 消费者组名称
// earliest:表示有offset记录从offset记录开始消费,没有就从最早消息消费
// latest:表示有offset记录从offset记录开始消费,没有就从最新的消息开始消费
// none :表示有offset记录从offset记录开始消费,没有就报错
"auto.offset.reset" -> "latest",
"enable.auto.interval.ms" -> "1000", // 自动提交时间
"enable.auto.commit" -> (true: java.lang.Boolean) // 是否自动提交
)
val topics = Array("spark-demo") // 要订阅的主题
val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) // 消费策略,使用源码中推荐的
)
val infoDS: DStream[String] = kafkaDS.map(record => {
val topic: String = record.topic()
val partition: Int = record.partition()
val offset: Long = record.offset()
val key: String = record.key()
val value: String = record.value()
val info: String = s"""topic:${topic},partition:${partition},offset:${offset},key:${key},value:${value}"""
info
})
//TODO 3.输出结果
infoDS.print()
//TODO 4.启动并等待结果
ssc.start()
ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来
//TODO 5.关闭资源
ssc.stop()
}
}
2、手动提交偏移量
scala代码:
package com.jiang.streaming_kafka
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object Kafka_Demo02 {
def main(args: Array[String]): Unit = {
//TODO 0.准备环境
val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc:SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次
// The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
// state 存在checkpoint
ssc.checkpoint("./ckp")
//TODO 1.加载数据从Kafka
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.89.15:9092",
"key.deserializer" -> classOf[StringDeserializer], // key的反序列化规则
"value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
"group.id" -> "sparkdemo", // 消费者组名称
// earliest:表示有offset记录从offset记录开始消费,没有就从最早消息消费
// latest:表示有offset记录从offset记录开始消费,没有就从最新的消息开始消费
// none :表示有offset记录从offset记录开始消费,没有就报错
"auto.offset.reset" -> "latest",
//"enable.auto.interval.ms" -> "1000", // 自动提交时间
"enable.auto.commit" -> (false: java.lang.Boolean) // 是否自动提交
)
val topics = Array("spark-demo") // 要订阅的主题
val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) // 消费策略,使用源码中推荐的
)
//TODO 2.处理消息
// 注意提交时间:应该是消费完一小批就该提交一次offset,而DStream一小批的体现是RDD
kafkaDS.foreachRDD(rdd => {
if(!rdd.isEmpty()){
// 消费
rdd.foreach(record =>{
val topic: String = record.topic()
val partition: Int = record.partition()
val offset: Long = record.offset()
val key: String = record.key()
val value: String = record.value()
val info: String = s"""topic:${topic},partition:${partition},offset:${offset},key:${key},value:${value}"""
println("消费到的消息详细信息:" + info)
})
// 提交
// 获取rdd中的offset相关信息
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 提交
kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
println("当前批次的数据已消费并手动提交")
}
})
//TODO 3.输出结果
// infoDS.print()
//TODO 4.启动并等待结果
ssc.start()
ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来
//TODO 5.关闭资源
ssc.stop()
}
}
3.手动提交偏移量到Mysql
scala代码:
package com.jiang.streaming_kafka
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Kafka_Demo03 {
def main(args: Array[String]): Unit = {
//TODO 0.准备环境
val conf:SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc:SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5)) // 每隔5秒分一个批次
// The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
// state 存在checkpoint
ssc.checkpoint("./ckp")
//TODO 1.加载数据从Kafka
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.89.15:9092",
"key.deserializer" -> classOf[StringDeserializer], // key的反序列化规则
"value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
"group.id" -> "sparkdemo", // 消费者组名称
// earliest:表示有offset记录从offset记录开始消费,没有就从最早消息消费
// latest:表示有offset记录从offset记录开始消费,没有就从最新的消息开始消费
// none :表示有offset记录从offset记录开始消费,没有就报错
"auto.offset.reset" -> "latest",
//"enable.auto.interval.ms" -> "1000", // 自动提交时间
"enable.auto.commit" -> (false: java.lang.Boolean) // 是否自动提交
)
val topics = Array("spark-demo") // 要订阅的主题
val offsetsMap: mutable.Map[TopicPartition,Long] = OffsetUtil.getOffsetMap("sparkdemo","spark-demo")
val kafkaDS: InputDStream[ConsumerRecord[String, String]] = if(offsetsMap.size > 0){
println("Mysql中存储了该消费者组 消费该主题的偏移量记录,接下来从记录处开始消费")
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offsetsMap) // 消费策略,使用源码中推荐的
)
}else{
println("Mysql中没有存储该消费者组 消费该主题的偏移量记录,接下来从latest开始消费")
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, // 位置策略,使用源码中推荐的
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) // 消费策略,使用源码中推荐的
)
}
//TODO 2.处理消息
// 注意提交时间:应该是消费完一小批就该提交一次offset,而DStream一小批的体现是RDD
kafkaDS.foreachRDD(rdd => {
if(!rdd.isEmpty()){
// 消费
rdd.foreach(record =>{
val topic: String = record.topic()
val partition: Int = record.partition()
val offset: Long = record.offset()
val key: String = record.key()
val value: String = record.value()
val info: String = s"""topic:${topic},partition:${partition},offset:${offset},key:${key},value:${value}"""
println("消费到的消息详细信息:" + info)
})
// 提交
// 获取rdd中的offset相关信息
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 提交
// kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
OffsetUtil.saveOffsetRanges("sparkdemo",offsetRanges)
println("当前批次的数据已消费并手动提交到Mysql")
}
})
//TODO 3.输出结果
// infoDS.print()
//TODO 4.启动并等待结果
ssc.start()
ssc.awaitTermination() // 流式应用启动之后需要一直运行等待手动停止/等待数据到来
//TODO 5.关闭资源
ssc.stop(stopSparkContext = true,stopGracefully = true)
}
object OffsetUtil{
val url = "jdbc:mysql://localhost:3306/bigdata_test"
var username = "root"
val passwd = "123456"
// 将偏移量保存到数据库
def saveOffsetRanges(groupid:String,offsetRange:Array[OffsetRange]) = {
val conn: Connection = DriverManager.getConnection(url, username, passwd)
// replace into 表示之前有就替换,没有就插入
val ps: PreparedStatement = conn.prepareStatement("replace into offset (`topic`,`partition`,`groupid`,`offset`) values (?,?,?,?)")
for(o <- offsetRange){
ps.setString(1,o.topic)
ps.setInt(2,o.partition)
ps.setString(3,groupid)
ps.setLong(4,o.untilOffset)
ps.executeUpdate()
}
ps.close()
conn.close()
}
// 从数据库读取偏移量Map(主题分区,offset)
def getOffsetMap(groupid:String,topic:String):mutable.Map[TopicPartition,Long] ={
val conn: Connection = DriverManager.getConnection(url, username, passwd)
val ps: PreparedStatement = conn.prepareStatement("select * from offset where groupid=? and topic=?")
ps.setString(1,groupid)
ps.setString(2,topic)
val rs: ResultSet = ps.executeQuery()
// Map(主题分区,offset)
val offsetMap = mutable.Map[TopicPartition,Long]()
// new TopicPartition(rs.getString("topic"),rs.getInt("partition"),rs.getLong("offset"))
while(rs.next()){
offsetMap += new TopicPartition(rs.getString("topic"),rs.getInt("partition")) -> rs.getLong("offset")
}
rs.close()
ps.close()
conn.close()
offsetMap
}
}
}