Flume+Kafka+SparkStreaming打造实时流处理框架

Flume+Kafka+SparkStreaming打造实时流处理框架

1-1 实时流处理产生背景
时效性高,数据量大

1-2 实时流处理概述
实时计算(秒,毫秒级别)
流式计算(在不断产生的实时数据流计算)7*24

1-3 离线计算与实时计算的对比
1-3-1 数据来源
离线:HDFS,历史数据,数据量比较大
实时:消息队列(kafka)实时新增,修改记录过来的某一笔数据
1-3-2 处理过程
离线:MR Spark
实时:Spark Streaming
1-3-3 处理速度
离线:慢
实时:快
1-3-4 进程
离线:启动+销毁
实时:7*24

1-4 实时流处理框架对比
storm
spark streaming :按照你设置的时间间隔拆成小的批处理
flink

1-5 实时流处理架构与技术选型
web/app ---> WebServers ---> Flume ---> Kafka ---> Spark Streaming
---> RDBMS/HBASE ---> 可视化展示

1-6 实时流处理在企业中的应用
电信行业:流量短信提醒
电商行业:分布式日志收集框架

2.Flume:分布式日志收集框架
2-1 业务现状分析
大量的日志数据如何从其他的Server上移动到hadoop之上
要考虑网络开销,io开销
server ---> Flume --->Hadoop集群

2-2 flume概述

webServer ---> Source ---> channel ---> sink ---> HDFS

2-3 flume 核心组件和架构
source :收集
channel: 聚集
sink: 输出

2-4 安装
2-4-1 安装JDK,配置环境变量,source 生效
2-4-2 安装flume,配置flume-env.sh文件,引入JAVA_HOME

2-5 flume的使用案例
使用flume的关键就是写配置文件
2-5-1 配置source
2-5-2 配置channel
2-5-3 配置sink
2-5-4 把三个组件串起来
*****其中一个source可以对应多个channel
*****但一个sink只能对应一个channel

需求1:从指定的网络端口采集数据,输出到控制台

启动flume
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec_memory_avro.conf \
-Dflume.root.logger=INFO,console

使用telnet进行测试
telnet master 44444
****Event 是Flume数据传输的基本单元


需求2:监控一个文件实时采集新增的数据输出到控制台
Agent选型:exec.source + memory.channel + logger.sink
type=exec command=tail -F /home/data/data.log
shell=/bin/sh -C

需求3:将A服务器上的日志实时采集到B服务器 (最常用的方式)
Agent1:exec.source+memory.channel+avrp.sink
Agent2: arvo.source+memory.channel+logger.sink

配置好两个conf文件,启动两个flume

2-6 日志收集过程
2-6-1 机器A上监控一个文件,当我们访问主站时,会有用户行为日志记录到access.log中
2-6-2 avro sink 把新产生的日志输出到相应的avro.source指定的hostname和port上
2-6-3 通过avro.source对应的agent将我们的日志输出到控制台


3.Kafka :分布式消息队列

3-1 kafka概述
消息中间件,生产者和消费者

3-2 kafka的核心架构
producer 生产者,生产馒头
consumer 消费者,吃馒头
broker 篮子 ,一个broker就是一个kafka
topic 主题,给馒头打一个标签,topic1的馒头是给你吃的,topic2的馒头是给弟弟吃的

3-3 kafka的部署和使用
分为三种,单节点单broker,单节点多broker,多节点多broker
这里以单节点单broker为例,其他两种类似

3-3-1 安装zookeeper 配置环境变量并source生效

3-3-2 配置zoo.cfg dataDir=/home/master/tmp/zookeeper

3-3-3 启动zk ./bin/zkServer.sh start

3-3-4 安装kafka 配置环境变量并source生效

3-3-5 配置server.properties中的hostname,brokerid,log.dirs,listeners,
zookeeper.connect=master:2181

3-3-6 启动kafka
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

3-3-7 创建topic
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic hello_topic

***replication-factor代表着副本系数,有几个从节点就设置为几***

3-3-8 查看topic是否创建成功
kafka-topics.sh --list --zookeeper master:2181

3-3-9 查看所有topic的详细信息
kafka.topics.sh --describe --zookeeper master:2181

3-3-10 producer端发送消息
kafka-console-producer.sh --broker-list master:9092 --topic hello_topic

3-3-11 consumer端消费消息
kafka-console-consumer.sh --zookeeper master:2181 --topic hello_topic \

--from beginning ***这个代表着每次从头消费,可以不写***

3-4 kafka的容错性测试

不管是kill掉leader还是任意一个flower,只要还有一个副本存在,就不会影响到kafka的正常使用


4.Spark Streaming
4-1 Spark Streaming 概述
基于spark core,讲不同的数据源的数据经过spark streaming的处理之后,将结果输出到外部文件系统

特点
低延迟
能从错误中高效的恢复:fault-tolerant
能够运行在成百上千的节点
能够将批处理,机器学习,图计算等子框架和spark streaming结合起来使用

俗称:One stack to rule them all ----- 一栈式解决

4-2 应用场景
电商推荐系统-----最最常用
实时监控系统

4-3 从词频统计功能着手入门 spark streaming
4-3-1 先启动 nc -lk 9999

4-3-2 使用spark-submit方式来提交我们的spark应用程序运行的脚本
spark-submit --master local[2] \
--class org.apache.examples.streaming.NetWorkWordCount \
--name NetWorkWordCount \
--jars $SPARK_HOME/examples/jars/spark-examples_xxx.jar master 9999

4-3-2 使用spark-shell方式来提交(仅测试代码时用)
spark-shell --master local[2] 来启动
将代码copy到shell,import相应的包,直接运行

4-4 spark streaming 工作原理
4-4-1 粗粒度
spark streaming接收到实时的数据流,把数据流按照指定的时间段切成一片片小的数据块,然后把小的数据块
传给spark Engine来处理
4-4-2 细粒度
首先,spark stremaing 应用程序运行在Driver端,
Driver中有一个StreamingContext和SparkContext
Driver会要求在executor中启动一个Receiver,当有数据输入后,Receiver会将数据拆分成一些blocks
存放在内存中,如果设置了多副本,则也会copy这些blocks到其他机器
之后,receiver会把blocks的一些元数据信息告诉StreamingContext,当每隔几秒的周期后,
StreamingContext会通知SparkContext去启动jobs,并分发到executors中执行去处理数据

4-5 SparkStreaming 核心
4-5-1 StreamingContext 有两个副结构体
4-5-1-1 def this(SparkContext,Duration)
4-5-1-2 def this(SparkConf,Duration) -----这个用的多
bathch interval可以根据你应用程序需要的延迟要求,以及集群的可用资源来配置

4-5-2 DStream (Discretized Streams)
一个DStream 代表着一系列不间断的RDD
每一个RDD包含着这一个批次汇总的所有数据
对DStream操作算子,比如map/flatmap。其实底层会被翻译为对Dstream 中的每一个RDD都做相同的
操作,因为一个DStream是由不同批次的RDD所构成

4-5-3 Input DStreams and Receiverss
Input DStream (从输入数据流的源头过来的DStream)
每一个Input DStream都需要关联一个Receiver (文件系统除外)

local模式下不要使用local和local[1],因为若为1,则代表着只有一个核心可以用来接收数据,
但没用核心去处理数据了,常用local[2]

local[n]中的 n > Receivers的个数

4-5-4 Transformation on DStreams
map,flatmap,filter...

4-5-5 Output Operations on DStreams (输出结果)
print,saveAsTextFile,saveAsHadoopFile,foreachRDD...

4-6 实战案例
spark streaming处理socket数据
spark streaming处理hdfs文件数据

4-7 spark streaming进阶
4-7-1 updateStateByKey算子
截止到目前为止xxx的统计
使用这个时必须设置checkpoint
4-7-2 计算目前为止累积出现的单词个数写入到mysql中

建表:create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);

4-7-3 基于windows的统计
window:定时的进行一个时间段内的数据处理
window length:窗口长度 -----10m
sliding interval:窗口的间隔 -----10s
代表着:每隔10s统计前10分钟的数据

4-7-4 黑名单过滤

4-8 spark streaming整合kafka实战
分为两种
Receiver方式和Direct Approch (容错性,性能更高)

采用Direct方式
优点:简化并行度,性能更高,0数据丢失,满足只执行一次 Exactly-once
缺点: 不能跟新offset到zk中,需要手动加入更新

direct方式会直接读取kafka底层的元数据信息
kafka就相当于底层的文件系统,
direct 直接作用于Driver端

 

5.使用log4j来模拟生产日志,flume采集到后传递给kafka,再交由spark streaming来处理数据

5-1 配置flume
streaming.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.log-sink.type=logger

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel

启动 flume
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console


streaming2.conf
#flume 1.6版本使用此方法

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

#define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = streamingtopic
agent1.sinks.kafka-sink.brokerList = master:9092
agent1.sinks.kafka-sink.batchSize = 20
agent1.sinks.kafka-sink.requiredAcks = 1

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

启动 flume
flume-ng agent \
--name agent1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
-Dflume.root.logger=INFO,console


我们现在是在本地进行测试的,在IDEA中运行LoggerGenerator,然后使用Flume,kafka以及Spark Streaming 进行处理操作

在生产上肯定不是这么干的,怎么干呢?
1)打包jar,执行LoggerGenerator类
2)Flume,kafka和测试的是一样的
3)Spark Streaming的代码也需要打成jar包,然后使用spark-submit方式运行

在生产上,整个流处理的流程都是一样的,区别在于业务逻辑的复杂性


crontab -e

*/1 * * * * /home/master/文档/code/log_generator.sh

每隔一分钟执行一次sh文件,即每隔一分钟会产生一批日志写到log里

service crond stop 停止定时文件运行

service crond start 开始定时文件运行

选型:access.log ---> 控制台输出
exec + memory + logger

1.对接python日志产生器输出的日志到Flume
streaming_project.conf

exec-memory-logger.sources=exec-source
exec-memory-logger.sinks=logger-sink
exec-memory-logger.channels=memory-channel

exec-memory-logger.sources.exec-source.type=exec
exec-memory-logger.sources.exec-source.command=tail -F /home/master/文档/data/access.log
exec-memory-logger.sources.exec-source.shell=/bin/sh -c


exec-memory-logger.channels.memory-channel.type=memory

exec-memory-logger.sinks.logger-sink.type=logger


exec-memory-logger.sources.exec-source.channels=memory-channel
exec-memory-logger.sinks.logger-sink.channel=memory-channel

启动flume:

flume-ng agent \
--name exec-memory-logger \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_project.conf \
-Dflume.root.logger=INFO,console


2.日志===>Flume===>kafka

启动zk ./zkServer.sh start
启动kafka kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

修改flume配置文件,使得flume的日志到kafka
选型:exec + memory + kafka sink

streaming_project2.conf

exec-memory-kafka.sources=exec-source
exec-memory-kafka.sinks=kafka-sink
exec-memory-kafka.channels=memory-channel

exec-memory-kafka.sources.exec-source.type=exec
exec-memory-kafka.sources.exec-source.command=tail -F /home/master/文档/data/access.log
exec-memory-kafka.sources.exec-source.shell=/bin/sh -c


exec-memory-kafka.channels.memory-channel.type=memory


exec-memory-kafka.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList=master:9092
exec-memory-kafka.sinks.kafka-sink.topic=streamingtopic
exec-memory-kafka.sinks.kafka-sink.batchSize=5
exec-memory-kafka.sinks.kafka-sink.requireAcks=5

exec-memory-kafka.sources.exec-source.channels=memory-channel
exec-memory-kafka.sinks.kafka-sink.channel=memory-channel

启动flume:

flume-ng agent \
--name exec-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_project2.conf \
-Dflume.root.logger=INFO,console


启动消费者
kafka-console-consumer.sh --zookeeper master:2181 --topic streamingtopic


idea写spark streaming对kafka中的数据进行处理

数据清洗操作:从原始日志中取出我们所需要的字段信息
结果如下:
ClickLog(46.30.143.10,20200113032901,145,200,-)
ClickLog(29.87.10.156,20200113032901,131,404,-)
ClickLog(87.10.72.30,20200113032901,145,200,-)

将结果写入到外部数据库中,前端页面调取结果,以供图形化展示

选择HBase来作为结果存储数据库

HBase表设计

create 'course_clickcount','info'
Rowkey设计:根据业务需求
day_courseid


create 'course_search_clickcount',"info"
Rowkey设计:
day_searche_course

清空表
truncate 'course_search_clickcount'

如何使用scala操作hbase

项目打包: mvn clean package -DskipTests


spark-submit --master local[5] \
--class scala/org/example/project/spark/StreamingApp.scala \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 \
/home/master/文档/code/KafkaTest-1.0-SNAPSHOT.jar \
192.168.187.10:2181 test streamingtopic 1


--jars $(echo /usr/local/src/hbase-1.3.1/lib/*.jar | tr ' ' ',') \

 

数据可视化

将抽象的科学或者商业数据,用图像表示出来,帮助理解数据的意义的过程。

Spring Boot构建Web项目

Echarts

 

上一篇:Flume的案例以及测试方法


下一篇:(转)Android 之生成图形验证码