SparkStreaming整合SparkSql的程序中spark的重要对象创建的顺序可能会导致程序报错。
可按照 sparkConf、SparkContext、StreamingContext、SparkSession的顺序。
//TODO 1、创建ssc对象
val conf = new SparkConf().setAppName("BoxLogStreamingDeal").setMaster("yarn")
.set("spark.defalut.parallelism", "500")
//每秒钟每个分区kafka拉取消息的速率
.set("spark.streaming.kafka.maxRatePerPartition", "500")
// 序列化
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 建议开启rdd的压缩
.set("spark.io.compression.codec", "snappy")
// .set("spark.driver.allowMultipleContexts","true")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(60))
// ssc.checkpoint("/jar/sparkstreaming/boxLog/checkpoint")
val spark = SparkSession.builder()
.config(conf)
.config("hive.metastore.uris","thrift://××××:7004")
.enableHiveSupport()
.getOrCreate()
//TODO 2、准备kafka的参数
val (brokerList, boxLogTopic, groupId) = (ConfigrautionUtils.brokerList,ConfigrautionUtils.boxLogTopic,ConfigrautionUtils.groupId)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokerList,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> "false",
"group.id" -> groupId)
//TODO 3、消费kafka数据,获取数据流
val kafkaStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](List(boxLogTopic), kafkaParams)
)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yzf</groupId>
<artifactId>boxLog</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<spark.version>2.4.3</spark.version>
<scala.version>2.11</scala.version>
<mysql.version>5.1.48</mysql.version>
<redis.version>3.0.0</redis.version>
<kudu.version>1.10.0</kudu.version>
<zk.version>0.10</zk.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.4</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.16</version>
<scope>provided</scope>
</dependency>
<!-- 导入spark sql的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!--添加hive依赖-->
<!--spark-streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!--spark-streaming-kafka-plugin-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- 导入spark kudu的依赖 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>${kudu.version}</version>
<scope>provided</scope>
</dependency>
<!--redis 依赖-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${redis.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</exclusion>
</exclusions>
<version>1.3.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.3.0</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<!-- maven 打包插件 打原始jar包 第三方依赖打入jar包中-->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<!--这里要替换成jar包main方法所在类 -->
<mainClass>com.yzf.streaming.boxlog.BoxLogStreamingDeal</mainClass>
</manifest>
<manifestEntries>
<Class-Path>.</Class-Path>
</manifestEntries>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>