目录
前言
Spark Streaming 通过 Push 和 Pull 两种方式对接 Flume 数据源。以 Spark Streaming 的角度来看,Push 方式属于推送(由 Flume 向 Spark 推送)而 Pull 属于拉取(Spark 拉取 Flume 的输出)。
不论以何种方式,开发过程类似,都是由 Spark Streaming 对接 Flume 数据流,Flume 做为 Spark Streaming 的数据源。Push 和 Pull 两者的差别主要体现在Flume Sink 的不同,而 Flume Source 与 Channel 不会受影响。
一、Push方式
Flume 将消息推送到 SparkStreaming 中 worker 的 executor 处理,但是其缺点是 flume 只能指定一个executor 来处理,这样会给单个节点中executor造成很大的压力,故不推荐该方式。
使用Push方式时,先运行spark程序,再执行flume程序
a. spark streaming code
Linking Denpency
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.2.0</version>
</dependency>
code
package cn.wsj.flume
import com.alibaba.fastjson.JSON
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
case class Shops(goodid:String,title:String,price:Double,shopid:Int,mark:String)
object Push_Flume {
def main(args: Array[String]): Unit = {
//1、创建sparkConf
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getName)
//2、创建sparkContext
val sc: SparkContext = new SparkContext(conf)
//3、创建StreamingContext
val ssc: StreamingContext = new StreamingContext(sc, Seconds(2))
//4、获取flume中的数据
val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(ssc, "192.168.237.1", 8888)
//5、从Dstream中获取flume中的数据并进行提取
stream.map(x => new String(x.event.getBody.array()).trim).map(x => {
val reg="(\\{(.*?)\\}).".r
val sp = reg.findFirstIn(x).get
JSON.parseObject(sp,classOf[Shops])
}).print()
ssc.start()
ssc.awaitTermination()
}
}
b. flume配置
-
agent
:新增一个agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source为spooldir
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/log/logs
a1.sources.r1.fileHeader = true
#channel为memory
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks为avor
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
# 这里配置的ip地址为本地windows的地址
a1.sinks.k1.hostname=192.168.237.1
# 端口与本地spark中的端口保持一致
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 100
-
jar包替换
:将一下jar包加入${FLUME_HOME}/lib下:
flume-ng-sdk-1.6.0-cdh5.5.0.jar
spark-streaming-flume_2.11-2.2.0.jar
spark-streaming-flume-sink_2.11-2.2.0.jar
c. Test
先运行spark程序,再执行flume程序
- flume-agent启动:
[root@sole flume_conf]# flume-ng agent -n a1 -c conf/ -f /opt/flume_conf/sparkpush.conf -Dflume.root.logger=INFO,console
- 观察本地程序窗口:
原数据:
处理后数据:
二、Poll方式
Spark Streaming 会从Flume中 拉取数据,可以指定多个flume地址。但是其处理该数据的worker可以指定多个,可以设置为默认值,故不会出现单个executor处理数据压力很大的情况。
a. spark streaming code
package cn.kgc.flume
import java.net.InetSocketAddress
import com.alibaba.fastjson.JSON
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
case class Shops(goodid:String,title:String,price:Double,shopid:Int,mark:String)
object Poll_Flume {
def main(args: Array[String]): Unit = {
//1、创建sparkConf
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getName)
//2、创建sparkContext
val sc: SparkContext = new SparkContext(conf)
//3、创建StreamingContext
val ssc: StreamingContext = new StreamingContext(sc, Seconds(2))
//定义一个flume地址集合,可以同时接受多个flume的数据
val address: Seq[InetSocketAddress] = Seq(new InetSocketAddress("192.168.237.160", 8888))
//4、获取flume中数据
val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK)
//5、从Dstream中获取flume中的数据提取并打印
stream.map(x => new String(x.event.getBody.array()).trim).map(x => {
val reg="(\\{(.*?)\\}).".r
val sp = reg.findFirstIn(x).get
JSON.parseObject(sp,classOf[Shops])
}).print()
ssc.start()
ssc.awaitTermination()
}
}
b. flume配置
-
jar包替换
:旧版本的jar包暂时使其失效,使用图中所示jar包(maven:scala-library)
这里jar包不替换的话,可能报错如下:java.lang.IllegalStateException: begin() called when transaction is OPEN
无可用scala-libray jar包则报错如下:java.lang.NoClassDefFoundError: scala/Function1
agent配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/log/logs
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.237.160
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 100
c.Test
-
spark对数据处理和push方法里的是一致的
-
flume-agent启动
[root@sole flume_conf]# flume-ng agent -n a1 -c conf/ -f /opt/flume_conf/sparkpoll.conf -Dflume.root.logger=INFO,console
处理结果数据:
PS:如果有写错或者写的不好的地方,欢迎各位大佬在评论区留下宝贵的意见或者建议,敬上!如果这篇博客对您有帮助,希望您可以顺手帮我点个赞!不胜感谢!
原创作者:wsjslient |