sparkstreaming2.2使用checkpoint保存kafka偏移量

1、测试代码

package kafka.comsumer

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.PropUtil

/**
  * @author yanghongbo
  * @date 2019/7/25 10:03
  * @description: Checkpoint可以同时维护多个topic的offset,并确保可以读取一次,但是代码改动则需要清空checkpoints
  */

object CheckpointOffset {

  //获取参数
  val prop = new PropUtil("config.properties")
  val oracleUrl = prop.getProp("ORACLE_URL")
  val oracleUser = prop.getProp("ORACLE_USER")
  val oraclePassword = prop.getProp("ORACLE_PASSWORD")
  val brokers = prop.getProp("KAFKA_BROKERS")
  //  val checkpointDir = prop.getProp("checkpointDir")
  val checkpointDir = "./CheckpointOffset"
  val groupName: String = this.getClass.getName

  def functionToCreateContext(): StreamingContext = {

    //获取SparkSession连接,没有则创建
    val spark = SparkSession.builder().appName(groupName).master("local[3]").getOrCreate()
    //        val spark = SparkSession.builder().appName("SparkToOracleStatus").getOrCreate()
    val sc = spark.sparkContext
    //设置日志级别
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(3))
    ssc.checkpoint(checkpointDir)

    //读取的topic
    val topics = Array("testTopic2", "DC_HISTORY_STATUS_T")

    //配置kafka参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupName,
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    var kafkaStream: InputDStream[ConsumerRecord[String, String]] = null

    kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    kafkaStream.foreachRDD(kafkaRDD => {

      //todo 可注释
      val offsetRanges: Array[OffsetRange] = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges

      for (o <- offsetRanges) {
        println(o)
      }

      //获取message信息
      val value: RDD[String] = kafkaRDD.map(x => {
        x.value()
      })

      //逻辑处理
      value.foreachPartition(rdds => {
        rdds.foreach(x => {
          println(x)
        })
      })

    })

    ssc
  }

  def main(args: Array[String]): Unit = {
    // 创建context
    val context = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)

    // 启动流计算
    context.start()
    context.awaitTermination()
  }
}

2、utils

package utils

import java.io.InputStream
import java.util.Properties

/**
  * 读取配置文件信息
  * @param file
  */

class PropUtil(val file: String) {
  var prop = new Properties()

  def getProp(key: String): String = {
    val ipStream: InputStream = this.getClass.getResourceAsStream("/config.properties")
    prop.load(ipStream)
    prop.getProperty(key)
  }
}

3、pom

<properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
        <hadoop.version>3.0.0</hadoop.version>
        <hbase.version>2.0.0</hbase.version>
        <ojdbc7>12.1.0.2</ojdbc7>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.6.6</version>
        </dependency>
        <!-- 导入scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- 导入spark的依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- 指定hadoop-client API的版本 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- 导入spark sql的依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark如果想整合Hive,必须加入hive的支持 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark steaming的依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- sparkSteaming跟Kafka整合的依赖 -->
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>


        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

        <dependency>
            <groupId>com.github.noraui</groupId>
            <artifactId>ojdbc7</artifactId>
            <version>${ojdbc7}</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

 

上一篇:checkpoint


下一篇:9、flink的状态与容错