066 基于checkpoint的HA机制实现

1.说明

  针对需要恢复的应用场景,提供了HA的的机制

  内部实现原理:基于checkpoint的

  当程序被kill的时候,下次恢复的时候,会从checkpoint对用的文件中进行数据的恢复

2.HA原理

  当job执行的时候,将数据同步到checkpoint设置的对应文件夹中
  同步的数据包括:
    类的信息(包名 + 类名)
    Job DAG执行图(在运行后,代码的DAG图不能进行任何修改,否则下次执行的时候会报错<类型不匹配>; 只要DAG图不变,其它API内部的代码执行逻辑可以随便更改)
  Job执行的源数据

二:程序

1.程序

 package com.stream.it

 import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext} object HAKafkaWordcount {
def main(args: Array[String]): Unit = {
val conf=new SparkConf()
.setAppName("spark-streaming-wordcount")
.setMaster("local[*]")
val sc=SparkContext.getOrCreate(conf)
val checkpointDir = "hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/chkdir02" /**
* 构造StreamingContext对象
*
* @return
*/
def createStreamingContextFunc(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(checkpointDir)
val kafkaParams=Map("group.id"->"stream-sparking-0",
"zookeeper.connect"->"linux-hadoop01.ibeifeng.com:2181/kafka",
"auto.offset.reset"->"smallest"
)
val topics=Map("beifeng"->1)
val dStream=KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
ssc, //给定sparkStreaming的上下文
kafkaParams, //kafka的参数信息,通过kafka HightLevelComsumerApi连接
topics, //给定读取对应的topic的名称以及读取数据的线程数量
StorageLevel.MEMORY_AND_DISK_2 //数据接收器接收到kafka的数据后的保存级别
).map(_._2) val resultWordcount=dStream
.filter(line=>line.nonEmpty)
.flatMap(line=>line.split(" ").map((_,1)))
.reduceByKey(_+_)
resultWordcount.foreachRDD(rdd=>{
rdd.foreachPartition(iter=>iter.foreach(println))
})
ssc
} val ssc = StreamingContext.getOrCreate(
checkpointPath = checkpointDir,
creatingFunc = createStreamingContextFunc
) //启动
ssc.start()
//等到
ssc.awaitTermination()
}
}

2.注意点

  HA第一次执行后,以后如果代码进行改动(创建StreamingContext的代码改动),不会得到反应(会直接从checkpoint中读取数据进行StreamingContext的恢复) ===> 解决SparkStreaming和Kafka集成的时候offset偏移量管理的问题

上一篇:060 关于Hive的调优(本身,sql,mapreduce)


下一篇:Serial interface (RS-232)