kafka整合sparkStreaming及优化

package streaming

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaWordCount {

  def main(args: Array[String]): Unit = {

    //1、创建StreamingContext
    val ssc = new StreamingContext(new SparkContext(new SparkConf().setMaster("local[4]").setAppName("kafka")),Seconds(5))

    ssc.sparkContext.setLogLevel("warn")
    //2、消费kafka数据
    //消费topic
    val topics = Array("spark10")

    //kafka参数配置
    val kafkaParams = Map[String, Object](
    //指明kafka borkerlist
    "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
    //kafka message key反序列化器
    "key.deserializer" -> classOf[StringDeserializer],
    //kafka message value反序列化器
    "value.deserializer" -> classOf[StringDeserializer],
    //消费者组
    "group.id" -> "kafka_spark_10",
    //指明从什么位置开始消费kafka , latest 从最新的offset开始消费 , earliest 从小的offset开始消费
    "auto.offset.reset" -> "latest",
    //是否自动提交offset
    "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val source: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))
    //3、数据处理
    //数据处理、offset更新
    source.foreachRDD(rdd=>{
      //打印消息
      rdd.map(_.value()).foreach(println(_))
      //获取当前消费的offset
      val offset = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //提交offset
      source.asInstanceOf[CanCommitOffsets].commitAsync(offset)
    })
    //4、启动streaming
    ssc.start()
    //5、阻塞主线程
    ssc.awaitTermination()
  }
}

kafka优化
网络和io操作线程配置优化:
# broker处理消息的最大线程数
    num.network.threads=xxx
#  broker处理磁盘IO的线程数
    num.io.threads=xxx
# 加入队列的最大请求数,超过该值,network thread阻塞
    queued.max.requests=5000
# server使用的send buffer大小。
    socket.send.buffer.bytes=1024000
# server使用的recive buffer大小。
    socket.receive.buffer.bytes=1024000
为了大幅度提高producer写入吞吐量,需要定期批量写文件
# 每当producer写入10000条消息时,刷数据到磁盘
  log.flush.interval.messages=10000
# 每间隔1秒钟时间,刷数据到磁盘
  log.flush.interval.ms=1000
  当kafka server的被写入海量消息后,会生成很多数据文件,且占用大量磁盘空间,如果不及时清理,可能磁盘空间不够用,kafka默认是保留7天。
  # 保留三天,也可以更短 
  log.retention.hours=72
# 段文件配置1GB,有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件)
   log.segment.bytes=1073741824 
   
数据重复消费和顺序消费问题:
Kafka直接写入页缓存page cache, 然后在持久化到磁盘
数据不丢失问题:
Kafka宕机,Leader 切换时可能导致数据丢失:
1.必须要求至少一个 Follower 在 ISR 列表里
2.每次写入数据的时候,要求 Leader 写入成功以外,至少一个 ISR 里的 Follower 也写成功

在broker中,保证数据不丢失主要是通过副本因子(冗余),防止数据丢失
在消费者消费数据的时候,只要每个消费者记录好offset值即可,就能保证数据不丢失
spark Streaming 优化
对于数据量较小的情况,一般是不会暴露问题的,但是数据量增大后,就会暴露各种问题,这就需要进行一些调优和参数配置。
1. 合理的批处理时间(batchDuration)
sparkStreaming在不断接受数据的同时,需要处理数据的时间,所以如果设置过短的批处理时间,会造成数据堆积,即未完成batch数据越来越多,从而发生阻塞
batchDuration本身不能小于500ms,这会导致sparkStreaming频繁提交作业,造成额外开销,减少整个系统的吞吐量,如果时间过长也会性能
合理的批处理时间,需要根据应用本身,集群资源情况,以及关注和监控spark streaming系统运行情况来调整,重点关注界面中的TotalDelay
2.合理的Kafka拉取量(maxRatePerPartition即Kafka每个partition拉取的数据的上限)
这个值默认是无上限的,即Kafka有多少数据,spark streaming就会一次性全拉出,但是批处理的时间是一定的,不会动态变化,如果Kafka这个数据频率过高就会照成数据堆积,阻塞
数据总量等于Kafka拉取数据量*partition数量调整两个参数 maxRatePerPartition 和 batchDuration 使得数据的拉取和处理能够平衡,尽可能地增加整个系统的吞吐量,可以观察监控界面中的 InputRate 和 ProcessingTime
3.用cache()函数将反复使用数据流Dstream缓存.防止过度调度资源造成网络开销
关注SchedulingDelay参数
4.设置合理的GC方式
对于 Spark 而言,垃圾回收采用 G1GC,而 SparkStreaming 采用 CMS
5.设置合理的parallelism
在spark streaming+Kafka中,我们采用Direct连接方式,spark中的partition和Kafka中的partition是一一对应的,一般默认设置Kafka中partition的数量
6.设置合理的CPU核数,内存,和executor的数量
7.使用高性能算子,Kryo优化序列化
8.在流式处理系统中,我们需要兼顾数据的接收和数据处理,即消费数据的速率要赶上生产数据的速率。当发现生产数据速率过慢时,可以考虑增加并行度,使用更多的接收器(Receiver);如果处理速度过慢,可以考虑加机器、优化程序逻辑及 GC 优化等方式
9.合理的内存管理
Spark 对于内存的使用主要有两类用途:执行(execution)和存储(storage)。执行类内存主要被用于 Shuffle 类操作、join 操作及排序(sort)和聚合(aggregation)类操作,而存储类内存主要用于缓存数据(caching)和集群间内部数据的传送, 执行类内存可以剥夺存储类内存空间,但是存储类内存空间有一个最低阈值会保证保留。
	

kafka整合sparkStreaming及优化kafka整合sparkStreaming及优化 Jeady· 发布了25 篇原创文章 · 获赞 4 · 访问量 702 私信 关注
上一篇:尝试在android中流式网络摄像机时黑屏


下一篇:Spark学习9 Spark Streaming流式数据处理组件学习