实验四 Spark Streaming编程初级实践

一、Flume简介

数据流 :数据流通常被视为一个随时间延续而无限增长的动态数据集合,是一组顺序、大量、快速、连续到达的数据序列。通过对流数据处理,可以进行卫星云图监测、股市走向分析、网络攻击判断、传感器实时信号分析。

二、Flume安装配置

1.下载安装包

https://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

2.解压文件夹

通过xshell把安装包上传虚拟机

解压文件,修改文件名称

1.sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt

2.sudo mv ./apache-flume-1.7.0-bin ./flume

3.sudo chown -R root:root  ./flume

3.配置环境变量

输入:

sudo vim ~/.bashrc

在文件里面添加

(根据你自己的安装路径)

export JAVA_HOME=/opt/jdk-1.8;

export FLUME_HOME=/opt/flume                   

export FLUME_CONF_DIR=$FLUME_HOME/conf

export PATH=$PATH:$FLUME_HOME/bin

4.刷新环境变量

source ~/.bashrc

5.修改配置文件

cd /opt/flume/conf

cp ./flume-env.sh.template ./flume-env.sh

修改flume-env.sh文件

在文件开头增加一行,设置JAVA_HOME

vim flume-env.sh


export JAVA_HOME=/opt/jdk-1.8;

三、查看flume版本信息

/opt/flume/bin/flume-ng version

四、使用Avro数据源测试Flume

Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。请对Flume的相关配置文件进行设置,从而可以实现如下功能:在一个终端中新建一个文件helloworld.txt(里面包含一行文本“Hello World”),在另外一个终端中启动Flume以后,可以把helloworld.txt中的文本内容显示出来。

cd /opt/flume/conf

1. 新建文件avro.conf

vim avro.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#定制source,绑定channel、主机以及端口
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

#描述并配置sinks组件
a1.sinks.k1.type = logger

#描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将sources和sink通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

上面Avro Source参数说明如下:

 Avro Source的别名是avro,也可以使用完整类别名称org.apache.flume.source.AvroSource,因此,上面有一行设置是a1.sources.r1.type = avro,表示数据源的类型是avro。

  bind绑定的ip地址或主机名,使用0.0.0.0表示绑定机器所有的接口。      

a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。

port表示绑定的端口。

a1.sources.r1.port = 4141,表示绑定的端口是4141。

 a1.sinks.k1.type = logger,表示sinks的类型是logger。

2.启动flume agent a1

这个终端不要关闭

 3.新建输入文件

cd /opt/flume

echo “Hello World”>> ./helloworld.txt

4.执行命令

/opt/flume/bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /opt/flume/helloworld.txt

执行之后,我们就可以在前面不让关闭的那个终端看到Hello World了 

五、使用netcat数据源测试Flume

在一个Linux终端(这里称为“Flume终端”)中,启动Flume,在另一个终端(这里称为“Telnet终端”)中,输入命令“telnet localhost 44444”,然后,在Telnet终端中输入任何字符,让这些字符可以顺利地在Flume终端中显示出来。

 1.创建netcat的agent配置

cd /opt/flume

vim netcat.conf

 

    #/usr/local/flume/conf/netcat.conf
    # Name the components on this agent  
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source  
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
        #同上,记住该端口名

    # Describe the sink  
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory  
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel  
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

2.启动flume agent

cd /opt/flume/conf

 不要关闭这个终端

/opt/flume/bin/flume-ng agent --conf ./conf --conf-file netcat.conf --name a1 -Dflume.root.logger=INFO,console

 3.新打开一个终端

输入:

telnet localhost 44444

或者

nc localhost 44444

 #在这个终端输入字符串就可以显示在前面那个终端里了,但是中文是不支持的,显示长度也有限

如果报出下面的错误:

解决方法:

安装telnet插件

yum install telnet

在前面的终端查看数据

 

六、使用Flume作为Spark Streaming数据源

Flume是非常流行的日志采集系统,可以作为Spark Streaming的高级数据源。请把Flume Source设置为netcat类型,从终端上不断给Flume Source发送各种消息,Flume把消息汇集到Sink,这里把Sink类型设置为avro,由Sink把消息推送给Spark Streaming,由自己编写的Spark Streaming应用程序对消息进行处理。

1.创建flume-to-spark.conf

cd /opt/flume/conf

vim flume-to-spark.conf

把以下内容输入文件 

        #flume-to-spark.conf: A single-node Flume configuration
        # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 33333

        # Describe the sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = localhost
        a1.sinks.k1.port =44444

        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000000
        a1.channels.c1.transactionCapacity = 1000000

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

#说明:

1.Flume suorce类为netcat,绑定到localhost的33333端口,消息可以通过telnet localhost 33333 发送到flume suorce

2.Flume Sink类为avro,绑定44444端口,flume sink通过localhost 44444端口把消息发送出来。而spark streaming程序一直监听44444端口。

#注意!!先不要启动Flume agent,因为44444端口还没打开,sink的消息无处可去,44444端口由spark streaming程序打开。

2.spark准备工作

下载spark-streaming-flume_2.11-2.3.4.jar

官网:

https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume

镜像网站(下载速度较快)

https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-flume_2.11/

3.放置文件

把这个jar文件放到/opt/spark/jars/flume目录下

把文件拖进去就行了

cd /usr/local/spark/jars

mkdir flume

4.修改spark目录下的

vim spark-env.sh

添加以下内容 

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/opt/hbase/bin/hbase classpath):/usr/local/spark/examples/jars/*:/opt/spark/jars/kafka/*:/usr/local/kafka/libs/*:/opt/spark/jars/flume/*:/opt/flume/lib/*

七、编写spark程序使用Flume数据源

1.创建python文件

cd /opt/spark-2.4.0

mkdir test

cd test

vim FlumeEventCount.py
from __future__ import print_function
 
import sys
 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
import pyspark
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
 
    sc = SparkContext(appName="FlumeEventCount")
    ssc = StreamingContext(sc, 2)
 
    hostname= sys.argv[1]
    port = int(sys.argv[2])
    stream = FlumeUtils.createStream(ssc, hostname, port,pyspark.StorageLevel.MEMORY_AND_DISK_SER_2)
    stream.count().map(lambda cnt : "Recieve " + str(cnt) +" Flume events!!!!").pprint()
 
    ssc.start()
    ssc.awaitTermination()                             

注意:可能需要安装pyspark,命令为:

pip3 install pyspark

2. 测试实际效果

(1)启动Spark streaming程序
./bin/spark-submit --driver-class-path /opt/spark-2.4.0/jars/*:/opt/spark-2.4.0/jars/flume/* ./test/FlumeEventCount.py localhost 44444

 (2)启动Flume Agent

启动一个新的终端

cd /opt/flume
bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

 (3)连接33333端口

启动一个新的终端

telnet localhost 33333

现在你可以在最后这个终端里输入一些字符了。在你输入字符后可以看到第一个终端会显示如下的信息

结果:

上一篇:maven本地仓库设置


下一篇:网络安全 | 什么是单点登录SSO?-SSO-技术