Spark Straming 集成Flume实例

目录

前言

       Spark Streaming 通过 Push 和 Pull 两种方式对接 Flume 数据源。以 Spark Streaming 的角度来看,Push 方式属于推送(由 Flume 向 Spark 推送)而 Pull 属于拉取(Spark 拉取 Flume 的输出)。

       不论以何种方式,开发过程类似,都是由 Spark Streaming 对接 Flume 数据流,Flume 做为 Spark Streaming 的数据源。Push 和 Pull 两者的差别主要体现在Flume Sink 的不同,而 Flume Source 与 Channel 不会受影响。


一、Push方式

Flume 将消息推送到 SparkStreaming 中 worker 的 executor 处理,但是其缺点是 flume 只能指定一个executor 来处理,这样会给单个节点中executor造成很大的压力,故不推荐该方式。

使用Push方式时,先运行spark程序,再执行flume程序

a. spark streaming code

  • Linking Denpency
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.62</version>
    </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

  • code
package cn.wsj.flume

import com.alibaba.fastjson.JSON
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

case class Shops(goodid:String,title:String,price:Double,shopid:Int,mark:String)

object Push_Flume {
  def main(args: Array[String]): Unit = {
    //1、创建sparkConf
    val conf = new SparkConf()
      .setMaster("local[4]")
      .setAppName(this.getClass.getName)
    //2、创建sparkContext
    val sc: SparkContext = new SparkContext(conf)
    //3、创建StreamingContext
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(2))
    //4、获取flume中的数据
    val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(ssc, "192.168.237.1", 8888)
    //5、从Dstream中获取flume中的数据并进行提取
     stream.map(x =>  new String(x.event.getBody.array()).trim).map(x => {
       val reg="(\\{(.*?)\\}).".r
       val sp = reg.findFirstIn(x).get
       JSON.parseObject(sp,classOf[Shops])
     }).print()

    ssc.start()
    ssc.awaitTermination()


  }
}

b. flume配置

  • agent:新增一个agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#source为spooldir
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/log/logs
a1.sources.r1.fileHeader = true

#channel为memory
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000

#sinks为avor
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
# 这里配置的ip地址为本地windows的地址
a1.sinks.k1.hostname=192.168.237.1
# 端口与本地spark中的端口保持一致
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 100
  • jar包替换:将一下jar包加入${FLUME_HOME}/lib下:
flume-ng-sdk-1.6.0-cdh5.5.0.jar
spark-streaming-flume_2.11-2.2.0.jar
spark-streaming-flume-sink_2.11-2.2.0.jar


c. Test

先运行spark程序,再执行flume程序

  • flume-agent启动:
[root@sole flume_conf]# flume-ng agent -n a1 -c conf/ -f /opt/flume_conf/sparkpush.conf -Dflume.root.logger=INFO,console
  • 观察本地程序窗口:

原数据:

Spark Straming 集成Flume实例
处理后数据:

Spark Straming 集成Flume实例


二、Poll方式

Spark Streaming 会从Flume中 拉取数据,可以指定多个flume地址。但是其处理该数据的worker可以指定多个,可以设置为默认值,故不会出现单个executor处理数据压力很大的情况。

a. spark streaming code

package cn.kgc.flume

import java.net.InetSocketAddress

import com.alibaba.fastjson.JSON
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

case class Shops(goodid:String,title:String,price:Double,shopid:Int,mark:String)

object Poll_Flume {
  def main(args: Array[String]): Unit = {
    //1、创建sparkConf
    val conf = new SparkConf()
      .setMaster("local[4]")
      .setAppName(this.getClass.getName)
    //2、创建sparkContext
    val sc: SparkContext = new SparkContext(conf)
    //3、创建StreamingContext
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(2))
    //定义一个flume地址集合,可以同时接受多个flume的数据
    val address: Seq[InetSocketAddress] = Seq(new InetSocketAddress("192.168.237.160", 8888))
    //4、获取flume中数据
    val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK)
    //5、从Dstream中获取flume中的数据提取并打印
    stream.map(x =>  new String(x.event.getBody.array()).trim).map(x => {
      val reg="(\\{(.*?)\\}).".r
      val sp = reg.findFirstIn(x).get
      JSON.parseObject(sp,classOf[Shops])
    }).print()

    ssc.start()
    ssc.awaitTermination()
  }
}


b. flume配置

  • jar包替换:旧版本的jar包暂时使其失效,使用图中所示jar包(maven:scala-library)

Spark Straming 集成Flume实例

这里jar包不替换的话,可能报错如下:java.lang.IllegalStateException: begin() called when transaction is OPEN
无可用scala-libray jar包则报错如下:java.lang.NoClassDefFoundError: scala/Function1


  • agent配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/log/logs
a1.sources.r1.fileHeader = true

#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000

#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.237.160
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 100

c.Test

  • spark对数据处理和push方法里的是一致的

  • flume-agent启动

[root@sole flume_conf]# flume-ng agent -n a1 -c conf/ -f /opt/flume_conf/sparkpoll.conf -Dflume.root.logger=INFO,console

处理结果数据:

Spark Straming 集成Flume实例


PS:如果有写错或者写的不好的地方,欢迎各位大佬在评论区留下宝贵的意见或者建议,敬上!如果这篇博客对您有帮助,希望您可以顺手帮我点个赞!不胜感谢!

原创作者:wsjslient

作者主页:https://blog.csdn.net/wsjslient


上一篇:WDK7600编译器环境配置


下一篇:win10系统下下载安装Linux子系统