Flume+Kafka+SparkStreaming打造实时流处理框架
1-1 实时流处理产生背景
时效性高,数据量大
1-2 实时流处理概述
实时计算(秒,毫秒级别)
流式计算(在不断产生的实时数据流计算)7*24
1-3 离线计算与实时计算的对比
1-3-1 数据来源
离线:HDFS,历史数据,数据量比较大
实时:消息队列(kafka)实时新增,修改记录过来的某一笔数据
1-3-2 处理过程
离线:MR Spark
实时:Spark Streaming
1-3-3 处理速度
离线:慢
实时:快
1-3-4 进程
离线:启动+销毁
实时:7*24
1-4 实时流处理框架对比
storm
spark streaming :按照你设置的时间间隔拆成小的批处理
flink
1-5 实时流处理架构与技术选型
web/app ---> WebServers ---> Flume ---> Kafka ---> Spark Streaming
---> RDBMS/HBASE ---> 可视化展示
1-6 实时流处理在企业中的应用
电信行业:流量短信提醒
电商行业:分布式日志收集框架
2.Flume:分布式日志收集框架
2-1 业务现状分析
大量的日志数据如何从其他的Server上移动到hadoop之上
要考虑网络开销,io开销
server ---> Flume --->Hadoop集群
2-2 flume概述
webServer ---> Source ---> channel ---> sink ---> HDFS
2-3 flume 核心组件和架构
source :收集
channel: 聚集
sink: 输出
2-4 安装
2-4-1 安装JDK,配置环境变量,source 生效
2-4-2 安装flume,配置flume-env.sh文件,引入JAVA_HOME
2-5 flume的使用案例
使用flume的关键就是写配置文件
2-5-1 配置source
2-5-2 配置channel
2-5-3 配置sink
2-5-4 把三个组件串起来
*****其中一个source可以对应多个channel
*****但一个sink只能对应一个channel
需求1:从指定的网络端口采集数据,输出到控制台
启动flume
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec_memory_avro.conf \
-Dflume.root.logger=INFO,console
使用telnet进行测试
telnet master 44444
****Event 是Flume数据传输的基本单元
需求2:监控一个文件实时采集新增的数据输出到控制台
Agent选型:exec.source + memory.channel + logger.sink
type=exec command=tail -F /home/data/data.log
shell=/bin/sh -C
需求3:将A服务器上的日志实时采集到B服务器 (最常用的方式)
Agent1:exec.source+memory.channel+avrp.sink
Agent2: arvo.source+memory.channel+logger.sink
配置好两个conf文件,启动两个flume
2-6 日志收集过程
2-6-1 机器A上监控一个文件,当我们访问主站时,会有用户行为日志记录到access.log中
2-6-2 avro sink 把新产生的日志输出到相应的avro.source指定的hostname和port上
2-6-3 通过avro.source对应的agent将我们的日志输出到控制台
3.Kafka :分布式消息队列
3-1 kafka概述
消息中间件,生产者和消费者
3-2 kafka的核心架构
producer 生产者,生产馒头
consumer 消费者,吃馒头
broker 篮子 ,一个broker就是一个kafka
topic 主题,给馒头打一个标签,topic1的馒头是给你吃的,topic2的馒头是给弟弟吃的
3-3 kafka的部署和使用
分为三种,单节点单broker,单节点多broker,多节点多broker
这里以单节点单broker为例,其他两种类似
3-3-1 安装zookeeper 配置环境变量并source生效
3-3-2 配置zoo.cfg dataDir=/home/master/tmp/zookeeper
3-3-3 启动zk ./bin/zkServer.sh start
3-3-4 安装kafka 配置环境变量并source生效
3-3-5 配置server.properties中的hostname,brokerid,log.dirs,listeners,
zookeeper.connect=master:2181
3-3-6 启动kafka
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
3-3-7 创建topic
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic hello_topic
***replication-factor代表着副本系数,有几个从节点就设置为几***
3-3-8 查看topic是否创建成功
kafka-topics.sh --list --zookeeper master:2181
3-3-9 查看所有topic的详细信息
kafka.topics.sh --describe --zookeeper master:2181
3-3-10 producer端发送消息
kafka-console-producer.sh --broker-list master:9092 --topic hello_topic
3-3-11 consumer端消费消息
kafka-console-consumer.sh --zookeeper master:2181 --topic hello_topic \
--from beginning ***这个代表着每次从头消费,可以不写***
3-4 kafka的容错性测试
不管是kill掉leader还是任意一个flower,只要还有一个副本存在,就不会影响到kafka的正常使用
4.Spark Streaming
4-1 Spark Streaming 概述
基于spark core,讲不同的数据源的数据经过spark streaming的处理之后,将结果输出到外部文件系统
特点
低延迟
能从错误中高效的恢复:fault-tolerant
能够运行在成百上千的节点
能够将批处理,机器学习,图计算等子框架和spark streaming结合起来使用
俗称:One stack to rule them all ----- 一栈式解决
4-2 应用场景
电商推荐系统-----最最常用
实时监控系统
4-3 从词频统计功能着手入门 spark streaming
4-3-1 先启动 nc -lk 9999
4-3-2 使用spark-submit方式来提交我们的spark应用程序运行的脚本
spark-submit --master local[2] \
--class org.apache.examples.streaming.NetWorkWordCount \
--name NetWorkWordCount \
--jars $SPARK_HOME/examples/jars/spark-examples_xxx.jar master 9999
4-3-2 使用spark-shell方式来提交(仅测试代码时用)
spark-shell --master local[2] 来启动
将代码copy到shell,import相应的包,直接运行
4-4 spark streaming 工作原理
4-4-1 粗粒度
spark streaming接收到实时的数据流,把数据流按照指定的时间段切成一片片小的数据块,然后把小的数据块
传给spark Engine来处理
4-4-2 细粒度
首先,spark stremaing 应用程序运行在Driver端,
Driver中有一个StreamingContext和SparkContext
Driver会要求在executor中启动一个Receiver,当有数据输入后,Receiver会将数据拆分成一些blocks
存放在内存中,如果设置了多副本,则也会copy这些blocks到其他机器
之后,receiver会把blocks的一些元数据信息告诉StreamingContext,当每隔几秒的周期后,
StreamingContext会通知SparkContext去启动jobs,并分发到executors中执行去处理数据
4-5 SparkStreaming 核心
4-5-1 StreamingContext 有两个副结构体
4-5-1-1 def this(SparkContext,Duration)
4-5-1-2 def this(SparkConf,Duration) -----这个用的多
bathch interval可以根据你应用程序需要的延迟要求,以及集群的可用资源来配置
4-5-2 DStream (Discretized Streams)
一个DStream 代表着一系列不间断的RDD
每一个RDD包含着这一个批次汇总的所有数据
对DStream操作算子,比如map/flatmap。其实底层会被翻译为对Dstream 中的每一个RDD都做相同的
操作,因为一个DStream是由不同批次的RDD所构成
4-5-3 Input DStreams and Receiverss
Input DStream (从输入数据流的源头过来的DStream)
每一个Input DStream都需要关联一个Receiver (文件系统除外)
local模式下不要使用local和local[1],因为若为1,则代表着只有一个核心可以用来接收数据,
但没用核心去处理数据了,常用local[2]
local[n]中的 n > Receivers的个数
4-5-4 Transformation on DStreams
map,flatmap,filter...
4-5-5 Output Operations on DStreams (输出结果)
print,saveAsTextFile,saveAsHadoopFile,foreachRDD...
4-6 实战案例
spark streaming处理socket数据
spark streaming处理hdfs文件数据
4-7 spark streaming进阶
4-7-1 updateStateByKey算子
截止到目前为止xxx的统计
使用这个时必须设置checkpoint
4-7-2 计算目前为止累积出现的单词个数写入到mysql中
建表:create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);
4-7-3 基于windows的统计
window:定时的进行一个时间段内的数据处理
window length:窗口长度 -----10m
sliding interval:窗口的间隔 -----10s
代表着:每隔10s统计前10分钟的数据
4-7-4 黑名单过滤
4-8 spark streaming整合kafka实战
分为两种
Receiver方式和Direct Approch (容错性,性能更高)
采用Direct方式
优点:简化并行度,性能更高,0数据丢失,满足只执行一次 Exactly-once
缺点: 不能跟新offset到zk中,需要手动加入更新
direct方式会直接读取kafka底层的元数据信息
kafka就相当于底层的文件系统,
direct 直接作用于Driver端
5.使用log4j来模拟生产日志,flume采集到后传递给kafka,再交由spark streaming来处理数据
5-1 配置flume
streaming.conf
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=log-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.log-sink.type=logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.log-sink.channel=logger-channel
启动 flume
flume-ng agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
--name agent1 \
-Dflume.root.logger=INFO,console
streaming2.conf
#flume 1.6版本使用此方法
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
#define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = streamingtopic
agent1.sinks.kafka-sink.brokerList = master:9092
agent1.sinks.kafka-sink.batchSize = 20
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel
启动 flume
flume-ng agent \
--name agent1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming2.conf \
-Dflume.root.logger=INFO,console
我们现在是在本地进行测试的,在IDEA中运行LoggerGenerator,然后使用Flume,kafka以及Spark Streaming 进行处理操作
在生产上肯定不是这么干的,怎么干呢?
1)打包jar,执行LoggerGenerator类
2)Flume,kafka和测试的是一样的
3)Spark Streaming的代码也需要打成jar包,然后使用spark-submit方式运行
在生产上,整个流处理的流程都是一样的,区别在于业务逻辑的复杂性
crontab -e
*/1 * * * * /home/master/文档/code/log_generator.sh
每隔一分钟执行一次sh文件,即每隔一分钟会产生一批日志写到log里
service crond stop 停止定时文件运行
service crond start 开始定时文件运行
选型:access.log ---> 控制台输出
exec + memory + logger
1.对接python日志产生器输出的日志到Flume
streaming_project.conf
exec-memory-logger.sources=exec-source
exec-memory-logger.sinks=logger-sink
exec-memory-logger.channels=memory-channel
exec-memory-logger.sources.exec-source.type=exec
exec-memory-logger.sources.exec-source.command=tail -F /home/master/文档/data/access.log
exec-memory-logger.sources.exec-source.shell=/bin/sh -c
exec-memory-logger.channels.memory-channel.type=memory
exec-memory-logger.sinks.logger-sink.type=logger
exec-memory-logger.sources.exec-source.channels=memory-channel
exec-memory-logger.sinks.logger-sink.channel=memory-channel
启动flume:
flume-ng agent \
--name exec-memory-logger \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_project.conf \
-Dflume.root.logger=INFO,console
2.日志===>Flume===>kafka
启动zk ./zkServer.sh start
启动kafka kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
修改flume配置文件,使得flume的日志到kafka
选型:exec + memory + kafka sink
streaming_project2.conf
exec-memory-kafka.sources=exec-source
exec-memory-kafka.sinks=kafka-sink
exec-memory-kafka.channels=memory-channel
exec-memory-kafka.sources.exec-source.type=exec
exec-memory-kafka.sources.exec-source.command=tail -F /home/master/文档/data/access.log
exec-memory-kafka.sources.exec-source.shell=/bin/sh -c
exec-memory-kafka.channels.memory-channel.type=memory
exec-memory-kafka.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList=master:9092
exec-memory-kafka.sinks.kafka-sink.topic=streamingtopic
exec-memory-kafka.sinks.kafka-sink.batchSize=5
exec-memory-kafka.sinks.kafka-sink.requireAcks=5
exec-memory-kafka.sources.exec-source.channels=memory-channel
exec-memory-kafka.sinks.kafka-sink.channel=memory-channel
启动flume:
flume-ng agent \
--name exec-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming_project2.conf \
-Dflume.root.logger=INFO,console
启动消费者
kafka-console-consumer.sh --zookeeper master:2181 --topic streamingtopic
idea写spark streaming对kafka中的数据进行处理
数据清洗操作:从原始日志中取出我们所需要的字段信息
结果如下:
ClickLog(46.30.143.10,20200113032901,145,200,-)
ClickLog(29.87.10.156,20200113032901,131,404,-)
ClickLog(87.10.72.30,20200113032901,145,200,-)
将结果写入到外部数据库中,前端页面调取结果,以供图形化展示
选择HBase来作为结果存储数据库
HBase表设计
create 'course_clickcount','info'
Rowkey设计:根据业务需求
day_courseid
create 'course_search_clickcount',"info"
Rowkey设计:
day_searche_course
清空表
truncate 'course_search_clickcount'
如何使用scala操作hbase
项目打包: mvn clean package -DskipTests
spark-submit --master local[5] \
--class scala/org/example/project/spark/StreamingApp.scala \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 \
/home/master/文档/code/KafkaTest-1.0-SNAPSHOT.jar \
192.168.187.10:2181 test streamingtopic 1
--jars $(echo /usr/local/src/hbase-1.3.1/lib/*.jar | tr ' ' ',') \
数据可视化
将抽象的科学或者商业数据,用图像表示出来,帮助理解数据的意义的过程。
Spring Boot构建Web项目
Echarts