05-spark streaming & kafka

1、如何消费已经被消费过的数据?

    答:采用不同的group

2、如何自定义去消费已经消费过的数据?

    Conosumer.properties配置文件中有两个重要参数

    auto.commit.enable:如果为true,则consumer的消费偏移offset会被记录到zookeeper。下次consumer启动时会从此位置继续消费

    auto.offset.reset 该参数只接受两个常量largest和Smallest,分别表示将当前offset指到日志文件的最开始位置和最近的位置。

    实现自定义消费进度还是挺复杂的!这里略,知道有上面两个参数就行

3、kafka partition和consumer数目关系

    1)如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 。

    2)如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀

     最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目-->12 。

    3)如果consumer从多个partition读到数据,不保证数据间的顺序性,

     kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

    4)增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

4、kafka topic 副本分配?

    Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量

    1)如何分配副本:

      Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica(副本)),
      Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其分区目录下的Log中。每个Follower都从Leader pull数据。

    2)Kafka分配Replica的算法如下:

     将所有Broker(假设共n个Broker)和待分配的Partition排序
     将第i个Partition分配到第(i mod n)个Broker上
     将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

    注:Follower只提供读服务,可以供消费者读。但是Fllower不提供写服务,leader供生产者写。

5、kafka如何设置生存周期与清理数据?

    server.properties 找相关配置

6、kafka direct是什么? 为什么要用这个,有什么优点?和其他的有什么区别。

  1)数据不丢失,数据一定会被处理

      Direct的方式是会直接操作kafka底层的元数据信息,这样如果计算失败了,可以把数据重新读一下,重新处理。
      即数据一定会被处理。拉数据,是RDD在执行的时候直接去拉数据。(如果不开启wal,Receiver数据可能会发生丢失,所以有些数据不会被处理)

  2)RDD和kafka的patition是一对一

      由于底层是直接读数据,没有所谓的Receiver,RDD的partition和kafka的partition是一致的。而Receiver的方式,这2个partition是没任何关系的。
      所以读数据的地方,处理数据的地方和驱动数据处理的程序都在同样的机器上,可以边读边写,这样就可以极大的提高性能。

      不足之处是由于RDD和kafka的patition是一对一的,想提高并行度就会比较麻烦。提高并行度还是repartition,即重新分区,因为产生shuffle,很耗时。

  3)不需要开启wal机制

      减少了写文件,极大的提升了效率,还至少能节省一倍的磁盘空间

7、Spark-Streaming获取kafka数据的两种方式,并简要介绍他们的优缺点?

  Receiver方式:

      当一个任务从driver发送到executor执行的时候,这时候,将数据拉取到executor中做操作,但是如果数据太大的话,这时候不能全放在内存中,
      receiver通过WAL,设置了本地存储,他会存放本地,保证数据不丢失,然后使用Kafka高级API通过zk来维护偏移量,保证数据的衔接性,
      其实可以说,receiver数据在zk获取的,这种方式效率低,而且极易容易出现数据丢失

  Direct 方式:

      他使用Kafka底层Api 并且消费者直接连接kafka的分区上,因为createDirectStream创建的DirectKafkaInputDStream每个batch所对应的RDD的分区与kafka分区一一对应,
      但是需要自己维护偏移量,迭代计算,即用即取即丢,不会给内存造成太大的压力,这样效率很高

8、 kafka在高并发的情况下,如何避免消息丢失和消息重复?

  消息丢失解决方案:

      首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=-1,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功

  消息重复解决方案:

      1、消息可以使用唯一id标识
      2、生产者(acks=1 代表至少成功发送一次)
      3、消费者 (offset手动提交,业务逻辑成功处理后,提交offset)
      4、落表(主键或者唯一索引的方式,避免重复数据)
      5、业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)

9、kafka怎么保证数据消费一次且仅消费一次

    幂等producer:保证发送单个分区的消息只会发送一次,不会出现重复消息

    事务(transaction):保证原子性地写入到多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚

    流处理EOS:流处理本质上可看成是“读取-处理-写入”的管道。此EOS保证整个过程的操作是原子性。注意,这只适用于Kafka Streams

10、kafka怎么保证数据的一致性和可靠性

    可以通过acks参数设置数据可靠性的级别

    0: 不论写入是否成功,server不需要给Producer发送Response,如果发生异常,server会终止连接,触发Producer更新meta数据;
    1: Leader写入成功后即发送Response,此种情况如果Leader fail,会丢失数据
    -1: 等待所有ISR接收到消息后再给Producer发送Response,这是最强保证

11、kafka到spark streaming怎么保证数据完整性,怎么保证数据不重复消费?

    Receiver方式

      开启WAL(预写日志),将从kafka中接受到的数据写入到日志文件中,所有数据从失败中可恢复。
    Direct方式

      依靠checkpoint机制来保证。(记录消费者组的偏移量)

    重复消费:

      幂等,事务

12、kafka的消费者高阶和低阶API有什么区别?

    kafka 提供了两套 consumer API:The high-level Consumer API和 The SimpleConsumer API

    high-level consumer API :提供了一个从 kafka 消费数据的高层抽象,

    SimpleConsumer API :则需要开发人员更多地关注细节。

13、kafka的exactly-once

    幂等producer:保证发送单个分区的消息只会发送一次,不会出现重复消息
    事务(transaction):保证原子性地写入到多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚
    流处理EOS:流处理本质上可看成是“读取-处理-写入”的管道。此EOS保证整个过程的操作是原子性。注意,这只适用于Kafka Streams

14、如何保证从Kafka获取数据不丢失?

    1.生产者数据的不丢失

        kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。

    2.消费者数据的不丢失

        通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,接着上次的offset进行消费即可。

15、 spark实时作业宕掉,kafka指定的topic数据堆积怎么办?

    应对措施:

      ①spark.streaming.concurrentJobs=10:提高Job并发数,从源码中可以察觉到,这个参数其实是指定了一个线程池的核心线程数而已,没有指定时,默认为1。

      ②spark.streaming.kafka.maxRatePerPartition=2000:设置每秒每个分区最大获取日志数,控制处理数据量,保证数据均匀处理。

      ③spark.streaming.kafka.maxRetries=50:获取topic分区leaders及其最新offsets时,调大重试次数。

      ④在应用级别配置重试

        spark.yarn.maxAppAttempts=5
        # 尝试失败有效间隔时间设置
        spark.yarn.am.attemptFailuresValidityInterval=1h

16、produce向kafka中发送数据产生的offset 怎么算(给你传入几条大小的消息 求offset是多少)

    什么是offset

      offset是consumer position,Topic的每个Partition都有各自的offset.
      消费者需要自己保留一个offset,从kafka 获取消息时,只拉去当前offset 以后的消息。
      Kafka 的scala/java 版的client 已经实现了这部分的逻辑,将offset 保存到zookeeper 上

    1) auto.offset.reset

      如果Kafka没有开启Consumer,只有Producer生产了数据到Kafka中,此后开启Consumer。
      在这种场景下,将auto.offset.reset设置为largest(最近),那么Consumer会读取不到之前Produce的消息,只有新Produce的消息才会被Consumer消费

    2) auto.commit.enable(例如true,表示offset自动提交到Zookeeper)

    3) auto.commit.interval.ms(例如60000,每隔1分钟offset提交到Zookeeper)

    4) offsets.storage

      Select where offsets should be stored (zookeeper or kafka).默认是Zookeeper

    5) 基于offset的重复读

    6) Kafka的可靠性保证(消息消费和Offset提交的时机决定了At most once和At least once语义)

    Kafka默认实现了At least once语义(数据一定会被处理,但可能重复消费!)

上一篇:Neutron:ML2 Core Plugin


下一篇:HDU 1698——Just a Hook——————【线段树区间替换、区间求和】