6-spark_streaming

学习目标

  • 说出Spark Streaming的特点
  • 说出DStreaming的常见操作api
  • 能够应用Spark Streaming实现实时数据处理
  • 能够应用Spark Streaming的状态操作解决实际问题
  • 独立实现foreachRDD向mysql数据库的数据写入
  • 独立实现Spark Streaming对接kafka实现实时数据处理

1、sparkStreaming概述

1.1 SparkStreaming是什么

  • 它是一个可扩展,高吞吐具有容错性的流式计算框架

    吞吐量:单位时间内成功传输数据的数量

之前我们接触的spark-core和spark-sql都是处理属于离线批处理任务,数据一般都是在固定位置上,通常我们写好一个脚本,每天定时去处理数据,计算,保存数据结果。这类任务通常是T+1(一天一个任务),对实时性要求不高。

但在企业中存在很多实时性处理的需求,例如:双十一的京东阿里,通常会做一个实时的数据大屏,显示实时订单。这种情况下,对数据实时性要求较高,仅仅能够容忍到延迟1分钟或几秒钟。

实时计算框架对比

Storm

  • 流式计算框架

  • 以record为单位处理数据

  • 也支持micro-batch方式(Trident)

Spark

  • 批处理计算框架

  • 以RDD为单位处理数据

  • 支持micro-batch流式处理数据(Spark Streaming)

对比:

  • 吞吐量:Spark Streaming优于Storm

  • 延迟:Spark Streaming差于Storm

1.2 SparkStreaming的组件

  • Streaming Context

    • 一旦一个Context已经启动(调用了Streaming Context的start()),就不能有新的流算子(Dstream)建立或者是添加到context中

    • 一旦一个context已经停止,不能重新启动(Streaming Context调用了stop方法之后 就不能再次调 start())

    • 在JVM(java虚拟机)中, 同一时间只能有一个Streaming Context处于活跃状态, 一个SparkContext创建一个Streaming Context

    • 在Streaming Context上调用Stop方法, 也会关闭SparkContext对象, 如果只想仅关闭Streaming Context对象,设置stop()的可选参数为false

    • 一个SparkContext对象可以重复利用去创建多个Streaming Context对象(不关闭SparkContext前提下), 但是需要关一个再开下一个

  • DStream (离散流)

    • 代表一个连续的数据流

    • 在内部, DStream由一系列连续的RDD组成

    • DStreams中的每个RDD都包含确定时间间隔内的数据

    • 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作

    • 数据源

      • 基本源

        • TCP/IP Socket

        • FileSystem

      • 高级源

        • Kafka

        • Flume

2、Spark Streaming编码实践

Spark Streaming编码步骤:

  • 1,创建一个StreamingContext

  • 2,从StreamingContext中创建一个数据对象

  • 3,对数据对象进行Transformations操作

  • 4,输出结果

  • 5,开始和停止

利用Spark Streaming实现WordCount

需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。

1,需要安装一个nc工具:sudo yum install -y nc

2,执行指令:nc -lk 9999 -v

import os
JAVA_HOME = '/usr/local/java/jdk1.8.0_131'
PYSPARK_PYTHON = "/usr/local/python3/python"
SPARK_HOME = "/bigdata/spark-2.1.2-bin-hadoop2.3"
os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
# os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
# os.environ["SPARK_HOME"] = SPARK_HOME

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext("local[2]", appName="NetworkWordCount")
    # 参数2:指定执行计算的时间间隔
    ssc = StreamingContext(sc, 1)
    # 监听ip,端口上的上的数据
    lines = ssc.socketTextStream('localhost', 9999)
    # 将数据按空格进行拆分为多个单词
    words = lines.flatMap(lambda line: line.split(" "))
    # 将单词转换为(单词,1)的形式
    pairs = words.map(lambda word: (word, 1))
    # 统计单词个数
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)
    # 打印结果信息,会使得前面的transformation操作执行
    wordCounts.pprint()
    # 启动StreamingContext
    ssc.start()
    # 等待计算结束
    ssc.awaitTermination()

3、Spark Streaming的状态操作

在Spark Streaming中存在两种状态操作

  • UpdateStateByKey

  • Windows操作

使用有状态的transformation,需要开启Checkpoint

  • spark streaming 的容错机制

  • 它将足够多的信息checkpoint到某些具备容错性的存储系统如hdfs上,以便出错时能够迅速恢复

3.1 updateStateByKey

Spark Streaming实现的是一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。

无状态:指的是每个时间片段的数据之间是没有关联的。

需求:想要将一个大时间段(1天),即多个小时间段的数据内的数据持续进行累积操作

一般超过一天都是用RDD或Spark SQL来进行离线批处理

如果没有UpdateStateByKey,我们需要将每一秒的数据计算好放入mysql中取,再用mysql来进行统计计算

Spark Streaming中提供这种状态保护机制,即updateStateByKey

步骤:

  • 首先,要定义一个state,可以是任意的数据类型

  • 其次,要定义state更新函数--指定一个函数如何使用之前的state和新值来更新state

  • 对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除

  • 对于每个新出现的key,也会执行state更新函数

举例:词统计。

案例:updateStateByKey

需求:监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来

代码

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
​
# 创建SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
​
ssc = StreamingContext(sc, 3)
#开启检查点
ssc.checkpoint("checkpoint")
​
#定义state更新函数
def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)
​
lines = ssc.socketTextStream("localhost", 9999)
# 对数据以空格进行拆分,分为多个单词
counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .updateStateByKey(updateFunc=updateFunc)#应用updateStateByKey函数
    
counts.pprint()
​
ssc.start()
ssc.awaitTermination()

 

3.2 Windows

  • 窗口长度L:运算的数据量

  • 滑动间隔G:控制每隔多长时间做一次运算

每隔G秒,统计最近L秒的数据

6-spark_streaming

 

 

操作细节

  • Window操作是基于窗口长度和滑动间隔来工作的

  • 窗口的长度控制考虑前几批次数据量

  • 默认为批处理的滑动间隔来确定计算结果的频率

相关函数

6-spark_streaming

  • Smart computation

  • invAddFunc

reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[num,Tasks])

func:正向操作,类似于updateStateByKey

invFunc:反向操作

 

例如在热词时,在上一个窗口中可能是热词,这个一个窗口中可能不是热词,就需要在这个窗口中把该次剔除掉

典型案例:热点搜索词滑动统计,每隔10秒,统计最近60秒钟的搜索词的搜索频次,并打印出最靠前的3个搜索词出现次数。

6-spark_streaming

 

 

案例

监听网络端口的数据,每隔3秒统计前6秒出现的单词数量

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
​
def get_countryname(line):
    country_name = line.strip()
​
    if country_name == 'usa':
        output = 'USA'
    elif country_name == 'ind':
        output = 'India'
    elif country_name == 'aus':
        output = 'Australia'
    else:
        output = 'Unknown'
​
    return (output, 1)
​
if __name__ == "__main__":
    #定义处理的时间间隔
    batch_interval = 1 # base time unit (in seconds)
    #定义窗口长度
    window_length = 6 * batch_interval
    #定义滑动时间间隔
    frequency = 3 * batch_interval
​
    #获取StreamingContext
    spark = SparkSession.builder.master("local[2]").getOrCreate()
    sc = spark.sparkContext
    ssc = StreamingContext(sc, batch_interval)
    
    #需要设置检查点
    ssc.checkpoint("checkpoint")
​
    lines = ssc.socketTextStream('localhost', 9999)
    addFunc = lambda x, y: x + y
    invAddFunc = lambda x, y: x - y
    #调用reduceByKeyAndWindow,来进行窗口函数的调用
    window_counts = lines.map(get_countryname) \
        .reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency)
    #输出处理结果信息
    window_counts.pprint()
​
    ssc.start()
    ssc.awaitTermination()

 



 

上一篇:Spark Streaming(二)—— 创建DStream


下一篇:Oracle体系结构博客链接