kafka结合Spark-streming的直连(Direct)方式

转载自 https://www.cnblogs.com/dongxiucai/p/9971868.html

说明:此程序使用的scala编写

在spark-stream+kafka使用的时候,有两种连接方式一种是Receiver连接方式,一种是Direct连接方式。

  两种连接方式简介:

  Receiver接受固定时间间隔的数据(放在内存中),达到固定的时间才进行处理,效率极并且容易丢失数据。通过高阶API,不用管理偏移量,由zk管理,若是拉取的数据超过,executor内存大小,消息会存放到磁盘上面。0.10之后被舍弃。   弊端:效率极并且容易丢失数据

 

                        kafka结合Spark-streming的直连(Direct)方式

  直连(Direct)方式:**********重点 相当于直接连接到了kafka的分区上面,舍弃了高阶API,所以需要自己手动管理偏移量。运用底层API。效率高。需要手动的维护偏移量。企业生产使用。   好处:不会走磁盘了,在拉取数据的时候,会有一个预处理机制。效率高。

 

                              kafka结合Spark-streming的直连(Direct)方式

  两者的区别:  

  Receiver连接方式:他使用的是高级API实现Offset自动管理,不需要我们管理,所以他的灵活性特别差,不好,而且他处理数据的时候,如果某一时刻所传来的数据量特别大那么就会造成磁盘溢写的情况,他通过WALs进行磁盘的写入。   直连方式:他使用的是底层的API实现Offset我们开发人员管理,这样的话,他的灵活性很好,并且可以保证数据的安全性,而且不用孤单行数据量过大。   现在主要使用的Direct直连的方式,而不在使用receiver方式   直连代码如下:  
  1 import kafka.common.TopicAndPartition
  2 import kafka.message.MessageAndMetadata
  3 import kafka.serializer.StringDecoder
  4 import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
  5 import org.I0Itec.zkclient.ZkClient
  6 import org.apache.spark.SparkConf
  7 import org.apache.spark.rdd.RDD
  8 import org.apache.spark.streaming.dstream.InputDStream
  9 import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
 10 import org.apache.spark.streaming.{Duration, StreamingContext}
 11 import redis.clients.jedis.Jedis
 12 
 13 object KafkaDirectConsumer {
 14   def main(args: Array[String]): Unit = {
 15     // 创建streaming
 16     val conf = new SparkConf().setAppName("demo").setMaster("local[2]")
 17     val ssc = new StreamingContext(conf, Duration(5000))
 18     // 创建
 19     // 指定消费者组
 20     val groupid = "gp01"
 21     // 消费者
 22     val topic = "tt1"
 23     // 创建zk集群连接
 24     val zkQuorum = "spark101:2181,spark102:2181,spark103:2181"
 25     // 创建kafka的集群连接
 26     val brokerList = "spark101:9092,spark102:9092,spark103:9092"
 27     // 创建消费者的集合
 28     // 在streaming中可以同时消费多个topic
 29     val topics: Set[String] = Set(topic)
 30     // 创建一个zkGroupTopicDir对象
 31     // 此对象里面存放这zk组和topicdir的对应信息
 32     // 就是在zk中写入kafka的目录
 33     // 传入 消费者组,消费者,会根据传入的参数生成dir然后存放在zk中
 34     val TopicDir = new ZKGroupTopicDirs(groupid, topic)
 35     // 获取存放在zk中的dir目录信息 /gp01/offset/tt
 36     val zkTopicPath: String = s"${TopicDir.consumerOffsetDir}"
 37     // 准备kafka的信息、
 38     val kafkas = Map(
 39       // 指向kafka的集群
 40       "metadata.broker.list" -> brokerList,
 41       // 指定消费者组
 42       "group.id" -> groupid,
 43       // 从头开始读取数据
 44       "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
 45     )
 46     // 创建一个zkClint客户端,用host 和 ip 创建
 47     // 用于从zk中读取偏移量数据,并更新偏移量
 48     // 传入zk集群连接
 49     val zkClient = new ZkClient(zkQuorum)
 50     // 拿到zkClient后去,zk中查找是否存在文件
 51     // /gp01/offset/tt/0/10001
 52     // /gp01/offset/tt/1/20001
 53     // /gp01/offset/tt/2/30001
 54     val clientOffset = zkClient.countChildren(zkTopicPath)
 55     // 创建空的kafkaStream 里面用于存放从kafka接收到的数据
 56     var kafkaStream: InputDStream[(String, String)] = null
 57     // 创建一个存放偏移量的Map
 58     // TopicAndPartition [/gp01/offset/tt/0,10001]
 59     var fromOffsets: Map[TopicAndPartition, Long] = Map()
 60     // 判断,是否妇女放过offset,若是存放过,则直接从记录的
 61     // 偏移量开始读
 62     if (clientOffset > 0) {
 63       // clientOffset的数量就是 分区的数目量
 64       for (i <- 0 until clientOffset) {
 65         // 取出 /gp01/offset/tt/i/ 10001 -> 偏移量
 66         val paratitionOffset = zkClient.readData[String](s"${zkTopicPath}/${i}")
 67         // tt/ i
 68         val tp = TopicAndPartition(topic, i)
 69         // 添加到存放偏移量的Map中
 70         fromOffsets += (tp -> paratitionOffset.toLong)
 71       }
 72       // 现在已经把偏移量全部记录在Map中了
 73       // 现在读kafka中的消息
 74       // key 是kafka的kay,为null, value是kafka中的消息
 75       // 这个会将kafka的消息进行transform 最终kafka的数据都会变成(kafka的key,message)这样的tuple
 76       val messageHandlers = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
 77       // 通过kafkaUtils来创建DStream
 78       // String,String,StringDecoder,StringDecoder,(String,String)
 79       //   key,value,key的解码方式,value的解码方式,(接受的数据格式)
 80       kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
 81         ssc, kafkas, fromOffsets, messageHandlers
 82       )
 83     } else { // 若是不存在,则直接从头读
 84       // 根据kafka的配置
 85       kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkas, topics)
 86     }
 87 
 88     // 偏移量范围
 89     var offsetRanges = Array[OffsetRange]()
 90 
 91     kafkaStream.foreachRDD {
 92       kafkaRDD =>
 93         // 得到kafkaRDD,强转为HasOffsetRanges,获得偏移量
 94         // 只有Kafka可以强转为HasOffsetRanges
 95         offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
 96 
 97         // 触发Action,这里去第二个值为真实的数据
 98         val mapRDD = kafkaRDD.map(_._2)
 99         /*=================================================*/      // mapRDD为数据,在这里对数据操作     // 在这里写你自己的业务处理代码代码     // 此程序可以直接拿来使用,经历过层层考验     /*=================================================*/
100 
101         // 存储更新偏移量
102         for (o <- offsetRanges) {
103           // 获取dir
104           val zkPath = s"${zkTopicPath}/${o.partition}"
105           ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)
106         }
107     }
108 
109     ssc.start()
110     ssc.awaitTermination()
111 
112   }
113 }

以上为Direct直连方式的代码,直接可以使用的,根据自己的集群,和topic,groupid等配置稍作修改即可。

 
上一篇:2021最新整理阿里云云计算ACP题库1200+


下一篇:警惕Oracle数据库性能“隐形杀手”