package streaming
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaWordCount {
def main(args: Array[String]): Unit = {
//1、创建StreamingContext
val ssc = new StreamingContext(new SparkContext(new SparkConf().setMaster("local[4]").setAppName("kafka")),Seconds(5))
ssc.sparkContext.setLogLevel("warn")
//2、消费kafka数据
//消费topic
val topics = Array("spark10")
//kafka参数配置
val kafkaParams = Map[String, Object](
//指明kafka borkerlist
"bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
//kafka message key反序列化器
"key.deserializer" -> classOf[StringDeserializer],
//kafka message value反序列化器
"value.deserializer" -> classOf[StringDeserializer],
//消费者组
"group.id" -> "kafka_spark_10",
//指明从什么位置开始消费kafka , latest 从最新的offset开始消费 , earliest 从小的offset开始消费
"auto.offset.reset" -> "latest",
//是否自动提交offset
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val source: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))
//3、数据处理
//数据处理、offset更新
source.foreachRDD(rdd=>{
//打印消息
rdd.map(_.value()).foreach(println(_))
//获取当前消费的offset
val offset = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//提交offset
source.asInstanceOf[CanCommitOffsets].commitAsync(offset)
})
//4、启动streaming
ssc.start()
//5、阻塞主线程
ssc.awaitTermination()
}
}
kafka优化
网络和io操作线程配置优化:
# broker处理消息的最大线程数
num.network.threads=xxx
# broker处理磁盘IO的线程数
num.io.threads=xxx
# 加入队列的最大请求数,超过该值,network thread阻塞
queued.max.requests=5000
# server使用的send buffer大小。
socket.send.buffer.bytes=1024000
# server使用的recive buffer大小。
socket.receive.buffer.bytes=1024000
为了大幅度提高producer写入吞吐量,需要定期批量写文件
# 每当producer写入10000条消息时,刷数据到磁盘
log.flush.interval.messages=10000
# 每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000
当kafka server的被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能磁盘空间不够用,kafka默认是保留7天。
# 保留三天,也可以更短
log.retention.hours=72
# 段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件)
log.segment.bytes=1073741824
数据重复消费和顺序消费问题:
Kafka直接写入页缓存page cache, 然后在持久化到磁盘
数据不丢失问题:
Kafka宕机,Leader 切换时可能导致数据丢失:
1.必须要求至少一个 Follower 在 ISR 列表里
2.每次写入数据的时候,要求 Leader 写入成功以外,至少一个 ISR 里的 Follower 也写成功
在broker中,保证数据不丢失主要是通过副本因子(冗余),防止数据丢失
在消费者消费数据的时候,只要每个消费者记录好offset值即可,就能保证数据不丢失
spark Streaming 优化
对于数据量较小的情况,一般是不会暴露问题的,但是数据量增大后,就会暴露各种问题,这就需要进行一些调优和参数配置。
1. 合理的批处理时间(batchDuration)
sparkStreaming在不断接受数据的同时,需要处理数据的时间,所以如果设置过短的批处理时间,会造成数据堆积,即未完成batch数据越来越多,从而发生阻塞
batchDuration本身不能小于500ms,这会导致sparkStreaming频繁提交作业,造成额外开销,减少整个系统的吞吐量,如果时间过长也会性能
合理的批处理时间,需要根据应用本身,集群资源情况,以及关注和监控spark streaming系统运行情况来调整,重点关注界面中的TotalDelay
2.合理的Kafka拉取量(maxRatePerPartition即Kafka每个partition拉取的数据的上限)
这个值默认是无上限的,即Kafka有多少数据,spark streaming就会一次性全拉出,但是批处理的时间是一定的,不会动态变化,如果Kafka这个数据频率过高就会照成数据堆积,阻塞
数据总量等于Kafka拉取数据量*partition数量调整两个参数 maxRatePerPartition 和 batchDuration 使得数据的拉取和处理能够平衡,尽可能地增加整个系统的吞吐量,可以观察监控界面中的 InputRate 和 ProcessingTime
3.用cache()函数将反复使用数据流Dstream缓存.防止过度调度资源造成网络开销
关注SchedulingDelay参数
4.设置合理的GC方式
对于 Spark 而言,垃圾回收采用 G1GC,而 SparkStreaming 采用 CMS
5.设置合理的parallelism
在spark streaming+Kafka中,我们采用Direct连接方式,spark中的partition和Kafka中的partition是一一对应的,一般默认设置Kafka中partition的数量
6.设置合理的CPU核数,内存,和executor的数量
7.使用高性能算子,Kryo优化序列化
8.在流式处理系统中,我们需要兼顾数据的接收和数据处理,即消费数据的速率要赶上生产数据的速率。当发现生产数据速率过慢时,可以考虑增加并行度,使用更多的接收器(Receiver);如果处理速度过慢,可以考虑加机器、优化程序逻辑及 GC 优化等方式
9.合理的内存管理
Spark 对于内存的使用主要有两类用途:执行(execution)和存储(storage)。执行类内存主要被用于 Shuffle 类操作、join 操作及排序(sort)和聚合(aggregation)类操作,而存储类内存主要用于缓存数据(caching)和集群间内部数据的传送, 执行类内存可以剥夺存储类内存空间,但是存储类内存空间有一个最低阈值会保证保留。
Jeady·
发布了25 篇原创文章 · 获赞 4 · 访问量 702
私信
关注