Flink 笔录

Flink

文章目录

一、Flink 简介

​ Apache Flink是一个 框架分布式处理引擎,用于对 * 有界 数据流进行 有状态计算

1.1 Flink 的特点

  1. 事件驱动型(event-driven)

    事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。比较典型的就是以 kafka 为代表的消息队列几乎都是事件驱动型应用。

Flink 笔录

  1. 引擎Batch 和 Streaming 一个系统流处理和批处理共用一个引擎

  2. 状态

    应用状态是 Flink 中的一等公民,Flink 提供了许多状态管理相关的特性支持。

    状态计算的 exactly-once 语义:流程序可以在计算过程中维护自定义状态。checkpointing 机制保证了即时在故障发生下也能保障状态的 exactly once 语义。

Flink 笔录

超大数据量状态:Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态。

可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩。

  1. 时间

    流处理的一个重要方面就是如何衡量时间,即区分事件时间(event-time)和处理时间(processing-time)。

    Watermark 支持:Flink 引入了 watermark 的概念,用以衡量事件时间进展。Watermark 也是一种平衡处理延时和完整性的灵活机制。

    支持 Event Time 和 乱序事件:Flink 支持了流处理和 Event Time 语义的窗口机制。Event time 使得计算乱序到达的事件或可能延迟到达的事件更加简单。

Flink 笔录

  1. 带反压的连续流模型:数据流应用执行的是不间断的(常驻)operators。Flink streaming 在运行时有着天然的流控:慢的数据 sink 节点会反压(backpressure)快的数据源(sources)。

Flink 笔录

  1. 高度灵活的流式窗口:Flink 支持时间窗口,统计窗口,session 窗口,以及数据驱动的窗口。窗口可以通过灵活的触发条件来定制,以支持复杂的流计算模式。

Flink 笔录

  1. 容错性:Flink 的容错机制是基于 Chandy-Lamport distributed snapshots 来实现的。这种机制是非常轻量级的,允许系统拥有高吞吐率的同时还能提供强一致性的保障。

Flink 笔录

  1. 分层 API

Flink 笔录

过程函数(Process Function):允许用户可以*地处理来自一个或多个数据流的事件,并使用一致的容错的状态。用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。

DataStream API(有界或*流数据)和 DataSet API(有界数据集)

DataStream API 为数据处理提供了多种操作,比如转换(transformations),连接(joins),聚合(aggregations),窗口操作(windows)等等,DataStream API 支持 Java 和 Scala 语言。

DataSet API 为有界数据集提供了额外的支持,例如循环与迭代。

Table API 和 SQL

  1. Table API 遵循(扩展的)关系模型:表有二维数据结构(schema)。

  2. Table API 提供可比较的操作,例如 select、project、join、group-by、aggregate 等。

  3. Table API 可以通过多种类型的用户自定义函数(UDF)进行扩展。

  4. Table API 程序在执行之前会经过内置优化器进行优化。

  5. Flink 提供的最高层级的抽象是 SQL ,SQL查询可以直接在 Table API 定义的表上执行。

  6. 库:Flink 栈中提供了提供了很多具有高级 API 和满足不同场景的类库:机器学习、图分析、关系式数据处理。当前类库还在 beta 状 态,并且在大力发展。

    复杂事件处理(CEP):模式检测是事件流处理中的一个非常常见的用例。

    DataSet API:DataSet API 是 Flink 用于批处理应用程序的核心 API。DataSet API 所提供的基础算子包括mapreduce(outer) joinco-groupiterate等。

    Gelly: Gelly 是一个可扩展的图形处理和分析库。Gelly 是在 DataSet API 之上实现的,并与 DataSet API 集成。

1.2 Flink 与 Spark Streaming 对比

数据模型

​ –spark 采用 RDD 模型,spark streaming 的 DStream 实际上也就是一组小批数据 RDD 的集合

​ –flink 基本数据模型是数据流,以及事件(Event)序列

运行时架构

​ –spark 是批计算,将 DAG 划分为不同的 stage,一个完成后才可以计算下一个

​ –flink 是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理

二、快速上手

2.1 搭建 maven 工程

导入 pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>
</dependencies>
<build>
    <plugins>
    <!-- 该插件用于将Scala代码编译成class文件 -->
    <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.6</version>
        <executions>
            <execution>
                <!-- 声明绑定到maven的compile阶段 -->
                <goals>
                    <goal>testCompile</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

2.2 批处理 wordcount

object WordCount {
  def main(args: Array[String]): Unit = {
    // 创建一个执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 利用传入参数来指定 hostname 和 port
    val parameterTool = ParameterTool.fromArgs(args)
    val inputpath = parameterTool.get("inputpath")

    // 从文件中读取数据
    val inputDataSet = env.readTextFile(inputpath)

    // 切分数据得到word,然后再按word做分组聚合
    val wordCountDataSet = inputDataSet.flatMap(_.split("\\W+"))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
    wordCountDataSet.print
  }
}

3.3 流处理 wordcount

// 流处理WordCount程序
object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 创建流处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 利用传入参数来指定 hostname 和 port
    val paramTool: ParameterTool = ParameterTool.fromArgs(args)
    val host = paramTool.get("host")
    val port = paramTool.getInt("port")
    val outputPath = paramTool.get("outputPath")

    // 接收一个socket文本流
    val dataStream = env.socketTextStream(host, port)

    // 对每条数据进行处理和处理
    val wordCountDataStream = dataStream.flatMap(_.split("\\W+"))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    wordCountDataStream.writeAsText(outputPath)

    // 启动executor
    env.execute(this.getClass.getSimpleName)
  }
}

打包上传到 /opt/module/flink-1.7.2/lib/flink-maben-1.0-SNAPSHOT-jar-with-dependencies.jar

创建 /opt/module/flink-1.7.2/datas/ 目录用于输出文件测试

三、Flink 部署

3.1 Yarn 部署

Flink 笔录

​ 基于 yarn 层面的架构类似 spark on yarn 模式,都是由 Client 提交 App 到 RM 上面去运行,然后 RM 分配第一个 container 去运行 AM,然后由 AM 去负责资源的监督和管理。需要说明的是,Flink 的 yarn 模式更加类似 spark on yarn 的 cluster 模式,在 cluster 模式 中,dirver 将作为 AM 中的一个线程去运行,在 Flink on yarn 模式也是会将JobManager 启动在 container 里面,去做个类似于 driver 的 task 调度和分配,YARN AM 与 Flink JobManager 在同一个 Container 中,这样 AM 可以知道 Flink JobManager 的地址,从而 AM 可以申请 Container 去启动 Flink TaskManager类似于 executor。待 Flink 成功运行在 YARN 集群上,Flink YARN Client 就可以提交 Flink Job 到 Flink JobManager,并进行后续的映射、调度和计算处理。

  1. 下载 https://flink.apache.org/downloads.html#apache-flink-172,根据scala版本和hadoop版本选择下载 Apache Flink 1.7.2 with Hadoop® 2.7 for Scala 2.11 (asc, sha512) 版本的 flink

  2. 上传并解压

    tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz -C /opt/module/
    
  3. 配置 flink-conf.yaml

    vim /opt/module/flink-1.7.2/conf/flink-conf.yaml
    
    jobmanager.rpc.address: hadoop102
    
    # The RPC port where the JobManager is reachable.
    jobmanager.rpc.port: 6123
    
    # The heap size for the JobManager JVM
    jobmanager.heap.size: 1024m
    
    # The heap size for the TaskManager JVM
    taskmanager.heap.size: 1024m
    
    # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
    taskmanager.numberOfTaskSlots: 3
    
    # The parallelism used for programs that did not specify and other parallelism.
    parallelism.default: 1
    

    配置 master

    vim /opt/module/flink-1.7.2/conf/masters
    hadoop102:8081
    

    配置 slaves

    vim /opt/module/flink-1.7.2/conf/slaves
    hadoop102
    hadoop103
    hadoop104
    
  4. 启动 yarn-session

    ./yarn-session.sh -n 3 -s 4 -jm 1024 -tm 1024 -nm test -d
    
    

    -n(–container):TaskManager 的数量。

    -s(–slots): 每个 TaskManager 的 slot 数量,默认一个 slot 一个 core,默认每个 taskmanager 的 slot 的个数为1,有时可以多一些 taskmanager,做冗余。

    -jm:JobManager 的内存(单位MB)。

    -tm:每个 taskmanager 的内存(单位MB)。

    -nm:yarn 的 appName (yarn 的 UI 上的名字)。

    -d:后台执行。

  5. 打开 socket 端口模拟数据源

    [maben996@hadoop102 datas]$ nc -lk 7777
    
    
  6. 执行任务

    bin/flink run  -m yarn-cluster -c com.atguigu.wc.StreamWordCount  /opt/module/flink-1.7.2/lib/flink-maben-1.0-SNAPSHOT-jar-with-dependencies.jar  --host hadoop102 --part 7777 --outputPath /opt/module/flink-1.7.2/datas/output.txt
    
    
  7. 查看实时统计情况

    tail -f /opt/module/flink-1.7.2/datas/output.txt
    
    
  8. 在 yarn UI 上查看任务

    hadoop103:8088
    
    

Flink 笔录

3.2 Kubernetes 部署

​ 容器化部署时目前业界很流行的一项技术,基于 Docker 镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是 Kubernetes(k8s),而 Flink 也在最近的版本中支持了 k8s 部署模式。

  1. 搭建 Kubernetes 集群(略)

  2. 配置各组件的 yaml 文件

    在 k8s 上构建 Flink Session Cluster,需要将 Flink 集群的组件对应的 docker 镜像分别在k8s上启动,包括 JobManager、TaskManager、JobManagerService 三个镜像服务。每个镜像服务都可以从*镜像仓库中获取。

  3. 启动 Flink Session Cluster

    # 启动jobmanager-service 服务
    kubectl create -f jobmanager-service.yaml
    # 启动jobmanager-deployment服务
    kubectl create -f jobmanager-deployment.yaml
    # 启动taskmanager-deployment服务
    kubectl create -f taskmanager-deployment.yaml
    
    
  4. 访问 Flink UI 页面

    集群启动后,就可以通过 JobManagerServicers 中配置的 WebUI 端口,用浏览器输入以下 url 来访问 Flink UI 页面了:http://{JobManagerHost:Port}/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy

四、Flink 运行时架构

4.1 Flink 运行时组件

  1. JobManager 作业管理器

    控制一个应用程序执行的主进程,每个应用程序都会被一个不同的 JobManager 所控制执行;

    一个应用程序包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的 JAR 包;

    JobManager 会把 JobGraph 转换为“执行图”(ExecutionGraph);

    JobManager 会向 ResourceManager 请求执行任务必要的资源,也就是 TaskManager上的插槽(slot);

    获取到了足够的资源,JobManager 会将 ExecutionGraph 分发到 TaskManager 上;

    在运行过程中,JobManager 会负责所有需要*协调的操作,比如说检查点(checkpoints)的协调。

  2. TaskManager 任务管理器

    Flink 中的工作进程。每一个 TaskManager 都包含了一定数量的插槽(slots);

    插槽的数量决定了 TaskManager 能够执行的任务数量;

    启动之后,TaskManager 会向 ResourceManger 注册它的插槽;

    TaskManager 就会将一个或者多个插槽提供给 JobManager 调用,JobManager 会向插槽分配任务(tasks);

    在执行过程中,一个 TaskManager 可以跟其它运行同一应用程序的 TaskManager 交换数据。

  3. ResourceManger 资源管理器

    ResourceManger 主要负责管理 TaskManager 的插槽(slot),插槽(slot)是 Flink 中定义的处理资源单元;

    Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如 YARN、Mesos、K8s,以及 standalone 部署;

    当 JobManager 申请插槽资源时,ResourceManager 会将有空闲插槽的 TaskManager 分配给 JobManager;

    ResourceManager 没有足够的插槽资源时,会向资源管理器(yarn/mesos)请求提供 TaskManager 运行的容器;

    ResourceManager 还负责终止空闲的 TaskManager,释放计算资源。

  4. Dispatcher 分发器

    可以跨作业运行,它为应用提交提供了 REST 接口;

    当一个应用被提交执行时,分发器就会启动并将应用移交给一个 JobManager;

    Dispatcher 会启动一个 Web UI,用来方便地展示和监控作业执行的信息;

    Dispatcher 在架构中可能并不是必需的,这取决于应用提交运行的方式。

4.2 任务提交流程

Flink 笔录

上图是从一个较为高层级的视角,来看应用中各组件的交互协作。

如果我们将 Flink 集群部署到 YARN 上,会有如下的提交流程:

Flink 笔录

  1. Flink 任务提交后,Client 向 HDFS 上传 Flink 的 Jar 包和配置,之后向 Yarn ResourceManager 提交任务;
  2. ResourceManager 分配 Container 资源并通知对应的 NodeManager 启动 ApplicationMaster;
  3. ApplicationMaster 启动后加载 Flink 的 Jar 包和配置构建环境,然后启动 JobManager;
  4. ApplicationMaster 向 ResourceManager 申请 Container 资源启动 TaskManager;
  5. ResourceManager 分配 Container资源后,由 ApplicationMaster 通知资源所在节点的 NodeManager 启动TaskManager
  6. NodeManager 加载 Flink 的 Jar 包和配置构建环境并启动 TaskManager,TaskManager 启动后向 JobManager 发送心跳包,并等待 JobManager 向其分配任务。

4.3 任务调度原理

Flink 笔录

2. TaskManger 与 Slots

Task Solt:是静态的概念,是指 TaskManager 具有的并发执行能力。

合理的 slots 数量应该和 CPU 核心数相同。

Flink 笔录

  1. Flink 中每一个 TaskManager 都是一个 JVM进程,它可能会在 独立的线程上执行一个或多个 subtask。
  2. 为了控制一个 TaskManager 能接收多少个 task,TaskManager 通过 task slot 来进行控制(一个 worker 至少有一个 task slot)。

slot sharing:Flink 允许 subtasks 共享 slots,即使它们是不同 tasks 的 subtasks,只要它们来自同一个 job。因此,一个 slot 可能会负责这个 job 的整个管道(pipeline)。

Flink 笔录

  1. Flink 集群需要与 job 中使用的最高并行度一样多的 slots。
  2. 通过 slot sharing,将示例中的并行度从 2 增加到 6 可以充分利用 slot 的资源,同时确保繁重的 subtask 在 TaskManagers 之间公平地获取资源。

APIs 还包含了 resource group 机制,它可以用来防止不必要的 slot sharing。

// slot-sharing group "green"
val a: DataStream[A] = env.createInput(...)
.slotSharingGroup("green")
.setParallelism(4)
val b: DataStream[B] = a.map(...)
// slot-sharing group "green" is inherited from a
.setParallelism(4)
// slot-sharing group "yellow"
val c: DataStream[C] = env.createInput(...)
.slotSharingGroup("yellow")
.setParallelism(2)
// slot-sharing group "blue"
val d: DataStream[D] = b.connect(c.broadcast(...)).process(...)
.slotSharingGroup("blue")
.setParallelism(4)
val e = d.addSink()
// slot-sharing group "blue" is inherited from d
.setParallelism(2)

Flink 笔录

​ 可以看到,不同组的 slotSharingGroup 之间是不能共享 slot 的,所以最大的 slot 数就是三个组的最大并行度之和。

3. 程序与数据流(DataFlow)

​ Flink 程序都是由三部分组成的: SourceTransformationSink

Flink 笔录

​ Source 负责读取数据源,Transformation 利用各种算子进行处理加工,Sink 负责输出。

​ Flink 上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分。每一个 dataflow 以一个或多个sources 开始,以一个或多个 sinks 结束。dataflow 类似于任意的有向无环图(DAG)。

4. 执行图(ExecutionGraph)

​ Flink 程序直接映射成的数据流图是 StreamGraph,也被称为逻辑流图。

​ Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。

JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。

ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构。

物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

5. 并行度(Parallelism)

一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)

Flink 笔录

​ Stream 在算子之间传输数据的形式可以是 one-to-one(forwarding) 的模式也可以是 redistributing 的模式,具体是哪一种形式,取决于算子的种类。

One-to-one:stream(比如在 source 和 map之间)维护着分区以及元素的顺序。那意味着 map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap 等算子都是 one-to-one 的对应关系。类似于 spark 中的 窄依赖

Redistributing:stream(map 跟 keyBy/window 之间或者 keyBy/window 跟 sink 之间) 的分区会发生改变。每一个算子的子任务依据所选择的 transformation 发送数据到不同的目标任务。例如,keyBy() 基于 hashCode 重分区、rescale、broadcast 和 rebalance 也会随机重新分区,这些算子都会引起 redistribute 过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。 类似于 spark 中的 宽依赖

6. 任务链(Operator Chains)

Flink 将并行度相同的 one to one 操作链接在一起形成一个 task,原来的算子成为里面的一部分。把算子链接成 tasks 能够减少线程间切换和缓冲的开销,在降低延迟的同时提高了整体吞吐量。

Flink 笔录

  1. 使用 env.disableOperatorChaining() 取消 Operator Chains 的链接操作,也就是所有的 Operator 都是独立的。

Flink 笔录

  1. 在单独的 Operator 上使用 disableChaingin() 操作,例如: filter.disableChaingin(),意味着 filter 不能与其他 Operator 形成 Operator Chains。

Flink 笔录

  1. 在单独的 Operator 上使用 startNewChain() 操作,例如: filter.startNewChain(),意味着 filter 可以与其后的 Operator 形成 Operator Chains。

Flink 笔录

4.4 网络流控与反压机制

1. 网络流控

​ 网络流控可以在上下游速度不匹配的情况下,防止下游出过载。

​ 网络流控的手段有两种:静态限速动态反压

  1. 静态限速

Flink 笔录

  1. 动态反压

Flink 笔录

Spark Streaming 反压机制

Flink 笔录

2. Flink 的反压机制

Flink 反压传播的两个阶段:TaskManager内部TaskManager之间

Flink 笔录

  1. TaskManager之间的反压过程

Flink 笔录

  1. TaskManager内部的反压过程

Flink 笔录

Flink 1.5 之前时基于 TCP 和 bounded buffer 来实现反压。

TCP-based反压的弊端

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EXidwP6j-1578809749648)(C:\Users\maben\AppData\Roaming\Typora\typora-user-images\1572067321556.png)]

​ 多个Task会复用一个Socket,单个Task的反压会导致整个Socket阻塞,连 Checkpoint barrier 也无法发出;并且压力需要多层buffer传递,生效延时比较大。

Flink 1.5 之后引入 Credit-based 反压机制

Flink 笔录
​ Flink 内部的 ResultSubpartition 在写入时会传递一个backlong, InputChannel 读取数据时会返回一个 credit 表示自己的余量。ResultSubpartition 收到反馈后,调整写入速度,这样就避免了,从Netty到Socket的一层层压力传递,相当于在应用层模拟了 TCP 流控的机制。

思考:有了动态反压,静态限速的是不是就完全没有作用了?

Flink 笔录

解答:NO,一般出现反压要确定动态反压的源头,根据情况调节并行度,来解除压力。但是有一些瓶颈(压力的源头)不会触发动态反压,比如写入ES外部存储,ES 不会反馈压力,那么 Sink 端的压力无法反馈到 Flink 的流中,会导致 Sink 写出数据 timeout 等。此时,就要找到任务的失败的环节,确定最大流量,直接在 Source 端进行静态限速。

五、Flink 流处理 API

5.1 Envrionment

createLocalEnvironment

​ 返回本地执行环境,需要在调用时指定默认的并行度。

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

createRemoteEnvironment

​ 返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager 的 IP 和端口号,并指定要在集群中运行的 Jar 包。

val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")

getExecutionEnvrionment

​ 创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境。

// 批处理环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
// 流处理环境
val envStream: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

5.2 Source

fromCollection

//1. 从集合读取数据
val stream1 = env
.fromCollection(List(
    SensorReading("sensor_1", 1547718199, 35.8),
    SensorReading("sensor_6", 1547718201, 15.4),
    SensorReading("sensor_7", 1547718202, 6.7),
    SensorReading("sensor_10", 1547718205, 38.1)
))

readTextFile

//2. 从文件读取数据
val stream2 = env.readTextFile("D:\\workspace\\workspace_idea\\flink-maben\\src\\main\\resources\\hello.txt")

addSource 读取 kafka 数据

引入 kafka 连接器

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

//3. 从kafka读取数据
// 定义相关配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop102:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))

自定义 Source

只需向 addSource() 中传入一个自定的 SourceFunction

//4. 自定义Source
val stream4 = env.addSource(new MySenserSource())

class MySenserSource() extends SourceFunction[SensorReading] {
  // 定义一个标识位,表示数据源是否继续运行
  var running: Boolean = true
  // 生成自定义的传感器数据
  override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
    // 初始化一个随机数发生器
    val rand = new Random()
    // 初始化10个传感器数据,随机生成
    var curTemp = (1 to 10).map(
      i => ("sensor_" + i, 60 + rand.nextGaussian() * 20)
    )
    // 无限循环,在初始温度值基础上随机波动
    while (running) {
      // 对10个数据更新温度值
      curTemp = curTemp.map(
        data => (data._1, data._2 + rand.nextGaussian())
      )
      // 获取当前的时间戳,包装成样例类
      val curTime = System.currentTimeMillis()
      curTemp.foreach(
        data => ctx.collect(SensorReading(data._1, curTime, data._2))
      )
      // 间隔500ms
      Thread.sleep(500)
    }
  }
  override def cancel(): Unit = {
    running = false
  }
}

5.3 transform

map

Flink 笔录

val streamMap = stream.map { x => x * 2 }

flatMap

val streamFlatMap = stream.flatMap{
    x => x.split(" ")
}

fliter

Flink 笔录

val streamFilter = stream.filter{
    x => x == 1
}

keyBy

DataStream KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以hash 的形式实现的。

Flink 笔录

滚动聚合算子(Rolling Aggregation)

sum() min() max() minBy() maxBy() 这些算子可以对 KeyedStream 的每一个支流做聚合。

reduce

KeyedStream DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

val stream2 = env.readTextFile("YOUR_PATH\\sensor.txt")
  .map( data => {
    val dataArray = data.split(",")
    SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
  })
  .keyBy("id")
  .reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature) )

split & select

Flink 笔录

DataStream SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream。

Flink 笔录

SplitStream → DataStream:从一个 SplitStream 中获取一个或者多个 DataStream。

// 需求:传感器数据按照温度高低(以30度为界),拆分成两个流。
// 2.多留转换算子测试
val splitStream: SplitStream[SensorReading] = dataStream
.split(sensorDate => {
    // 根据温度高低划分不同的流
    if (sensorDate.temperature > 30) Seq("high") else Seq("low")
})
val lowTempStream: DataStream[SensorReading] = splitStream.select("low")
val highTempStream: DataStream[SensorReading] = splitStream.select("high")
val allTempStream: DataStream[SensorReading] = splitStream.select("high", "low")

// 打印输出
lowTempStream.print("low")
highTempStream.print("high")
allTempStream.print("all")

connect & coMap

Flink 笔录

DataStream,DataStream ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保各自的数据和形式不发生任何变化,两个流相互独立。

Flink 笔录

ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map 和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap 处理。

// 3.合并两条流
// 高温流处理为元组类型
val warningStream: DataStream[(String, Double)] = highTempStream.map(data => (data.id, data.temperature))

// 合并流中有两种数据类型:元组类型的高温流,样例类类型的低温流
val connectedStreams: ConnectedStreams[(String, Double), SensorReading] = warningStream.connect(lowTempStream)

// 使用模式匹配,匹配类型,对ConnectedStreams上的每个Stream分别进行map操作,将其调整为同样的流。
val coMapStream: DataStream[(String, Double, String)] = connectedStreams.map(
    warningData => (warningData._1, warningData._2, "high tempareture warning"),
    lowData => (lowData.id, lowData.temperature, "healthy")
)
coMapStream.print("coMap stream")

union

Flink 笔录
DataStream DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream元素的新 DataStream。

val lowTempStream: DataStream[SensorReading] = splitStream.select("low")
val highTempStream: DataStream[SensorReading] = splitStream.select("high")
val allTempStream: DataStream[SensorReading] = splitStream.select("high", "low")
// DataStream的元素类型,且可以同时union多个流
val unionStream: DataStream[SensorReading] = highTempStream.union(lowTempStream, allTempStream)

Connect 与 Union 区别

  1. Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap 中再去调整成为一样的。
  2. Connect 只能操作两个流,Union 可以操作多个。

5.4 UDF(更细粒度的控制流)

1. 函数类(Function Classes)

​ Flink 暴露了所有 udf 函数的接口(实现方式为接口或者抽象类)。例如 MapFunction, FilterFunction, ProcessFunction 等等。

// UDF 函数
// 1. 使用外部定义的函数类,定义一个过滤id以 keyword(可定制性) 开头的 filter 操作
dataStream.filter(new MyFilter("sensor_1")).print("filter")
// 2. 也可以写成匿名内部类的形式
dataStream.filter(
      new FilterFunction[SensorReading] {
        override def filter(value: SensorReading): Boolean = {
          value.id.startsWith("sensor_1")
        }
      }
    )

// 在外部定义具体的函数类
class MyFilter(keyword: String) extends FilterFunction[SensorReading] {
    override def filter(value: SensorReading): Boolean = {
        value.id.startsWith(keyword)
    }
}

2. 匿名函数(Lambda Function)

// 简单的逻辑可以写成匿名函数的形式
dataStream.filter(_.id.startsWith("sensor_1")).print("filter")

3. 富函数(Rich Function)

​ 所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。RichMapFunction、RichFlatMapFunction、RichFilterFunction 等

​ Rich Function 有一个生命周期的概念。典型的生命周期方法有:

​ open() 方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter 被调用之前 open() 会被调用。

​ close() 方法是生命周期中的最后一个调用的方法,做一些清理工作。

​ getRuntimeContext() 方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态

class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
	var subTaskIndex = 0
    override def open(configuration: Configuration): Unit = {
        subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
        // 以下可以做一些初始化工作,例如建立一个和HDFS的连接
    }
    override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
        if (in % 2 == subTaskIndex) {
            out.collect((subTaskIndex, in))
        }
    }
    override def close(): Unit = {
        // 以下做一些清理工作,例如断开和HDFS的连接。
    }
}

5.5 Sink

Kafka

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//1. 配置kafka
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop102:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
//2. 读取kafka数据
val inputStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
//3.处理(做一类的包装)
val dataStream: DataStream[String] = inputStream
.map(data => {
    val dataArray = data.split(",")
    // 注意:这里又将包装类转为toString,方便写入kafka时做序列化
    SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble).toString
})
//4. 写入kafka
dataStream.addSink(
    new FlinkKafkaProducer011[String]("hadoop102:9092", "sinkTest", new SimpleStringSchema())
)
//5. 控制台打印,print 也是一种 sink
dataStream.print()
env.execute(this.getClass.getSimpleName)

Redis

<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

object RedisSinkTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val inputStream: DataStream[String] = env.readTextFile("D:\\workspace\\workspace_idea\\flink-maben\\src\\main\\resources\\sensor.txt")
    val dataStream: DataStream[SensorReading] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
      })

    // redis 配置
    val conf: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
      .setHost("hadoop102")
      .setPort(6379)
      .build()
      
    // 写入redis
    dataStream.addSink(new RedisSink[SensorReading](conf, new MyRedisMapper()))
    dataStream.print()
    env.execute(this.getClass.getSimpleName)
  }
}

class MyRedisMapper() extends RedisMapper[SensorReading] {
  // 定义保存数据到 redis 的命令,HSET sensor_temperature sensor_id temperature
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
  }
  override def getKeyFromData(data: SensorReading): String = {
    data.id
  }
  override def getValueFromData(data: SensorReading): String = {
    data.temperature.toString
  }
}

Elasticsearch

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

object ElasticSearchSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val inputStream = env.readTextFile("D:\\workspace\\workspace_idea\\flink-maben\\src\\main\\resources\\sensor.txt")
    val dataStream = inputStream.map(data => {
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })
    // 定义 ES 的 httpHost 信息
    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("hadoop102", 9200))
    // 创建一个esSink的builder
    val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts,
      new ElasticsearchSinkFunction[SensorReading] {
        override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          // 用HashMap作为插入的es的数据类型
          val sourceData = new util.HashMap[String, String]()
          sourceData.put("sensor_id", t.id)
          sourceData.put("temperature", t.temperature.toString)
          sourceData.put("ts", t.timestamp.toString)
          // 创建一个index requset
          val indexRequest = Requests.indexRequest().index("sensor").`type`("readingData").source(sourceData)
          // 用indexer发送请求
          requestIndexer.add(indexRequest)
          println(t + "saved successfully")
        }
      })
    //向ElasticSearch写入
    dataStream.addSink(esSinkBuilder.build())
    env.execute(this.getClass.getSimpleName)
  }
}

JDBC 自定义 Sink

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.44</version>
</dependency>

object JDBCSink {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val inputStream: DataStream[String] = env.readTextFile("D:\\workspace\\workspace_idea\\flink-maben\\src\\main\\resources\\sensor.txt")
    val dataStream: DataStream[SensorReading] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
      })
    // 写入一个自定义jdbcSink
    dataStream.addSink(new MyJdbcSink())
    env.execute(this.getClass.getSimpleName)
  }
}

// 实现一个自定义的sink function
class MyJdbcSink() extends RichSinkFunction[SensorReading] {
  // 定义连接和预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  // 在open生命周期中创建连接和预编译
  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://192.168.12.100:3306/test", "root", "maben996")
    insertStmt = conn.prepareStatement("INSERT INTO temperature (sensor,temp) VALUES (?,?)")
    updateStmt = conn.prepareStatement("UPDATE temperature SET temp = ? WHERE sensor = ?")
  }

  // 调用连接执行sql
  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    // 可以先执行更新语句
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    // 判断更新是否有结果,如果没有就插入
    if (updateStmt.getUpdateCount == 0) {
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  // 关闭资源
  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

六、Flink 中的 Window

6.1 Window

​ window是一种 切割无限数据为有限块进行处理 的手段。

窗口分类

  1. TimeWindow:按照时间生成 Window。
  2. CountWindow:按照指定的数据条数生成一个 Window,与时间无关。

6.2 Window API

1. 时间窗口(TimeWindow)

  1. 滚动窗口(Tumbling Windows) 时间对齐,窗口长度固定,没有重叠。

    Flink 默认的时间窗口根据 Processing Time 进行窗口的划分。

    .timeWindow(Time.seconds(15))
    
    
  2. 滑动窗口(Sliding Windows) 时间对齐,窗口长度固定,可以有重叠

    滑动窗口和滚动窗口的函数名是完全一致,传入参数多一个sliding_size。

    .timeWindow(Time.seconds(15), Time.seconds(5))
    
    
  3. 会话窗口(Session Windows) 时间无对齐

    由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口

    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    
    

2. 计数窗口(CountWindow)

  1. 滚动计数窗口(tumbling count window)

    指定窗口大小,当相同 key 元素的数量达到窗口大小时,就会触发窗口的执行。

    .keyBy(_.1)
    .countWindow(window_size)
    
    
  2. 滑动计数窗口(sliding count window)

    每当某一个 key 的个数达到 sliding_size 的时候触发计算,计算该 key 最近 window_size 个元素的内容。

    .keyBy(_.1)
    .countWindow(window_size, sliding_size)
    
    

3. 窗口分配器(window assigner)

​ window() 方法接收的输入参数是一个 WindowAssigner,WindowAssigner 负责将每条输入的数据分发到正确的 window 中。Flink 提供了通用的 WindowAssigner。

// 时间滚动窗口 TumblingEventTimeWindows
.window(TumblingEventTimeWindows)
// 时间滑动窗口 SlidingEventTimeWindows
.window(SlidingEventTimeWindows)
// 时间会话窗口 EventTimeSessionWindows
.window(EventTimeSessionWindows)
// 全局窗口 GlobalWindows
.window(GlobalWindows)

4. 窗口函数(window function)

window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类

  1. 增量聚合函数(incremental aggregation functions)

    每条数据到来就进行计算,保持一个简单的状态,典型的增量聚合函数有 ReduceFunction, AggregateFunction。

  2. 全窗口函数(full window functions)

    先把窗口所有数据收集起来,等到计算的时候会遍历所有数据,ProcessWindowFunction 就是一个全窗口函数。

5. 其他可选 API

stream
.keyBy(...)
.window(...)
[.trigger(...)]		// 触发器:定义 window 什么时候关闭,触发计算并输出结果
[.evictor(...)]		// 移除器:定义移除某些数据的逻辑
[.allowedLateness(...)]		// 允许处理迟到的数据
[.sideOutputLateData(...)]	// 将迟到的数据放入侧输出流
.reduce/aggregate/fold/apply()
[.getSideOutput(...)]		// 获取侧输出流

流的转换

Flink 笔录

七、时间语义与 Watermark

7.1 时间语义

​ • Event Time:事件创建的时间

​ • Ingestion Time:数据进入 Flink 的时间

​ • Processing Time:执行操作算子的本地系统时间,与机器相关

不同的时间语义有不同的应用场合,一般情况下更加关心事件时间(Event Time)

设置时间语义

import org.apache.flink.streaming.api.TimeCharacteristic		// 注意别到错包
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)	// 设置使用EventTime作为时间语义
// processingTime 每隔500毫秒调用 getCurrentWatermark 生成一个 watermark 放入流中
env.getConfig.setAutoWatermarkInterval(500)

7.2 Watermark 概念

​ 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,Window 的执行也是由 Watermark 触发的。

​ Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 Window 来实现。

Watermark 的特点

Flink 笔录

​ • watermark 是一条特殊的数据记录

​ • watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退

​ • watermark 与数据的时间戳相关

Watermark 的传递

Flink 笔录

​ 整个 Job 的 Watermark 是选择每个分区 Watermark 的最小值,代表整个 Job 在该 Watermark 前的数据已经全部到达,改时间点的 Window 可以关闭。类似于木桶效应。

7.3 Watermark 的引入

  1. 对于排好序的数据,不需要延迟触发,可以指定时间戳

    // 传入的是一个毫秒数,长度是13位,根据时间戳的不同,可能要乘1000
    .assignAscendingTimestamps(_.timestamp * 1000L) 
    
    
  2. 对于乱序的数据,Flink 暴露了 TimestampAssigner 接口,可以实现接口自定义如何从事件数据中抽取时间戳和生成 Watermark。

    // 传入一个 TimestampAssigner 的实现类,用于自定义如何收取时间戳,和生成 Watermark
    // MyAssigner 可以有两种类型,都继承自 TimestampAssigner
    .assignTimestampsAndWatermarks(new MyAssigner())
    
    

    TimestampAssigner 接口

    定义了 抽取时间戳,以及生成 watermark 的方法,有两种类型:

    1. AssignerWithPeriodicWatermarks

      • 周期性的生成 watermark:系统会周期性的将 watermark 插入到流中

      • 默认周期是200毫秒,可以使用 env.getConfig.setAutoWatermarkInterval() 方法进行设置

      • 升序和前面乱序的处理 BoundedOutOfOrderness ,都是基于周期性 watermark 的

      • 当 AssignerWithPeriodicWatermarks 的 getCurrentWatermark()方法 返回一个时间戳大于之前水位的时间戳,新的 watermark 会被插入到流中。这个检查保证了水位线是单调递增的。

      dataStream.assignTimestampsAndWatermarks(
          // 传入 BoundedOutOfOrdernessTimestampExtractor,其参数为生成 watermark 的延迟时长
          new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(500)) {
              // extractTimestamp 方法返回抽取的时间戳
            override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
          })
      
      

      自定义 MyAssigner

      dataStream.assignTimestampsAndWatermarks(new MyAssigner())
      
      
      // 自定义 MyAssiger
      class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading] {
        // 定义一个最大延迟时间
        val bound: Long = 1000L
        // 定义当前最大的时间戳
        var maxTs: Long = Long.MinValue
      
        // env中设置了500毫秒,该方法processingTime每隔500毫秒调用一次,生成watermark
        override def getCurrentWatermark: Watermark = {
          new Watermark(maxTs - bound)
        }
      
        // 提取时间戳
        override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
          // 获取最大时间戳
          maxTs = maxTs.max(element.timestamp * 1000L)
          element.timestamp * 1000L
        }
      }
      
      
    2. AssignerWithPunctuatedWatermarks

      • 没有时间周期规律,一般不会每个数据都生成一个 watermark,性能消耗太大。可以根据每个数据的情况判断是否要生成 watermark

Watermark 的设定

  1. 先确定 watermark 的最大乱序程度,一般延迟时间设定为数据的最大乱序时间。
  2. 如果 watermark 设置的延迟时间太久,收到的数据的延迟时间就很大,解决方法是在水位线到达之前输出一个近似值。
  3. 如果 watermark 设置的延迟时间太早,收到的数据准确性很差,解决方法是使用 .allowedLateness 允许处理延时数据。

allowedLateness 和 侧输出流

stream.assignTimestampsAndWatermarks(
    	// watermark 延时1秒,导致eventTime要延后1秒才能触发窗口关闭
      new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000L)) {
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
      })

val minTempPerWindowStream = dataStream
.keyBy(_.id)
.timeWindow(Time.seconds(10), Time.seconds(2))
.allowedLateness(Time.seconds(10))  // 允许窗口处理延时数据,窗口关闭后延迟10秒的数据会再次和之前的数据计算,并输出
.sideOutputLateData(new OutputTag[SensorReading]("side")) // 延迟超过10秒的数据会输出到侧输出流
.minBy("temperature")

八、ProcessFunction API(底层API)

​ DataStream API 提供了一系列的 Low-Level 转换算子。可以 访问时间戳、watermark以及注册定时事件。还可以输出 特定的一些事件,例如超时事件等。

Flink 提供了8个Process Function:

​ • ProcessFunction
​ • KeyedProcessFunction
​ • CoProcessFunction
​ • ProcessJoinFunction
​ • BroadcastProcessFunction
​ • KeyedBroadcastProcessFunction
​ • ProcessWindowFunction
​ • ProcessAllWindowFunction

8.1 KeyedProcessFunction

​ KeyedProcessFunction 用来操作 KeyedStream。KeyedProcessFunction 会处理流的每一个元素,输出为0个、1个或者多个元素。所有的 Process Function 都继承自 RichFunction 接口,所以都有 open()、close() 和 getRuntimeContext() 等方法。

​ KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法: processElementonTimer

  1. processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs)。
  2. onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。

8.2 TimerService 和 定时器(Timers)

​ Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法:

// 返回当前处理时间
currentProcessingTime(): Long

// 返回当前watermark的时间戳
currentWatermark(): Long 

// 注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
registerProcessingTimeTimer(timestamp: Long): Unit 

// 注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
registerEventTimeTimer(timestamp: Long): Unit 

// 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteProcessingTimeTimer(timestamp: Long): Unit 

// 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 

当定时器 timer 触发时,会执行回调函数 onTimer()。注意定时器 timer 只能在 keyedStream 上面使用。

需求:监控温度传感器的温度值,如果温度值在10秒钟之内(processing time)连续上升,则报警。

// 使用KeyedProcessFunction处理KeyedStream
val warningStream = dataStream
    .keyBy(_.id)
    .process(new TempIncreaseWarning())
// 输出预警流(若触发onTimer,就会输出预警信息)
warningStream.print("waring")

class TempIncreaseWarning() extends KeyedProcessFunction[String, SensorReading, String] {

  // 1. 定义一个状态,保存上一次的温度值
  lazy val lastTemp: ValueState[Double] =
    getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp-state", Types.of[Double]))
  // 2. 定义一个状态,保存定时器的触发时间
  lazy val currentTimer: ValueState[Long] =
    getRuntimeContext.getState(new ValueStateDescriptor[Long]("currentTimer-state", Types.of[Long]))

  // 3. 处理每一条数据
  override def processElement(value: SensorReading,
                              ctx: KeyedProcessFunction[String, SensorReading, String]#Context,
                              out: Collector[String]): Unit = {
    // 3.1 取出一条数据温度和当前定时器温度
    val prevTemp = lastTemp.value()
    val curTimerTs = currentTimer.value()

    // 3.2 更新温度
    lastTemp.update(value.timestamp)

    // 3.3 判断温度上升,而且没有设置过定时器,就注册一个定时器(如果有定时器,且温度一直上升,不做任何操作,直到定时器触发)
    if (value.timestamp > prevTemp && curTimerTs == 0) {
      // 设定触发时间:当前时间+10S
      val timerTs = ctx.timerService().currentProcessingTime() + 10000L
      ctx.timerService().registerProcessingTimeTimer(timerTs)
      // 保存触发时间到状态到定时器,到指定时间,定时器触发
      currentTimer.update(timerTs)
    }
    // 如果温度下降,删除定时器
    else if (value.timestamp < prevTemp) {
      ctx.timerService().deleteProcessingTimeTimer(curTimerTs)
      // 清空状态
      currentTimer.clear()
    }
  }

  // 4. 定时器触发
  override def onTimer(timestamp: Long,
                       ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext,
                       out: Collector[String]): Unit = {
    // 4.1 向流中写入预警信息
    out.collect(ctx.getCurrentKey + "温度10秒内连续上升")
    // 4.2 清空定时器
    currentTimer.clear()
  }
}

8.3 SideOutput 测流输出

​ Process function 的 SideOutput 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 SideOutput 可以定义为 OutputTag[X] 对象,X是输出流的数据类型。Process function 可以通过 Context 对象发射一个事件到一个或者多个SideOutput 。

需求:设定零点温度,当出现低于零点温度时,立即报警

// 使用ProcessFunction处理DataStream
val freezingMoniterStream = dataStream
	.process(new FreezingMoniter(32.0))	// 设定冰点温度为32
// 低温预警(主流)
freezingMoniterStream.print("healthy")
// 输出低温报警(侧流)
freezingMoniterStream
    .getSideOutput(new OutputTag[(String, String)]("freezing-warning"))	// 输出测流
    .print("freezing")

class FreezingMoniter(freezingTemperature: Double) extends ProcessFunction[SensorReading, (String, Double, String)] {
  override def processElement(value: SensorReading,
                              ctx: ProcessFunction[SensorReading, (String, Double, String)]#Context,
                              out: Collector[(String, Double, String)]): Unit = {
    // 1. 定义一个侧输出流
    lazy val freezingAlarmOutput: OutputTag[(String, String)] = 
      						new OutputTag[(String, String)]("freezing-warning")
    // 2. 判断温度是否低于冰点温度
    if (value.temperature < freezingTemperature) {
      // 3. 写出侧输出流
      ctx.output(freezingAlarmOutput, (value.id, "freezing"))
    } else {
      // 4. 温度正常,所有数据输出到主流
      out.collect((value.id, value.temperature, "healthy"))
    }
  }
}

8.4 CoProcessFunction

​ 对于两条输入流,DataStream API 提供了 CoProcessFunction 这样的 low-level 操作。CoProcessFunction 提供了操作每一个输入流的方法:processElement1() 和 processElement2()。

​ 类似于 ProcessFunction,这两种方法都通过 Context 对象来调用。Context 对象可以访问事件数据,定时器时间戳,TimerService,以及 SideOutputs。CoProcessFunction 也提供了 onTimer() 回调函数。

九、状态编程和容错机制

9.1 有状态的算子和应用程序

Opertor state

Flink 笔录

​ 算子状态的作用范围限定为算子任务,算子状态只对同一任务共享,不能由另一个任务访问。

Flink 笔录

  1. 列表状态(List state)

    将状态表示为一组数据的列表.

  2. 联合列表状态(Union list state)

    也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。

  3. 广播状态(Broadcast state)

    如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。

Keyed state

Flink 笔录

​ 键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。

Keyed State 之间的继承关系

Flink 笔录

Keyed State 之间的区别

Flink 笔录

Keyed State 简单使用

// 1.声明一个键控状态
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState[Double](
	new ValueStateDescriptor[Double]("lastTemp", Types.of[Double]))
// 2.读取状态
val prevTemp = lastTem.value()
// 3.更新转态
lastTemp.update(value.temperature)

状态的常用场景

去重:记录所有的主键

窗口计算:已进入未触发的窗口数据

机器学习、深度学习:训练的模型以及参数

访问的历史数据:需要与昨日数据进行对比

9.2 后端状态(State backend)

• 每传入一条数据,有状态的算子任务都会读取和更新状态

• 由于有效的状态访问对于处理数据的低延迟至关重要,每个并行任务都会在本地维护其状态,以确保快速的状态访问

• 状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做 状态后端(state backend)

• 状态后端主要负责两件事:本地的状态管理,以及将检查点(checkpoint)状态写入远程存储。

MemoryStateBackend

​ 内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在 TaskManager 的 JVM 堆上;而将checkpoint 存储在 JobManager 的内存中。

​ 特点:快速、低延迟,但不稳定,多用于测试。

FsStateBackend

​ 将 checkpoint 存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟 MemoryStateBackend 一样,也会存在 TaskManager 的 JVM 堆上。

​ 特点:同时拥有内存级的本地访问速度,和更好的容错保证。

Flink 笔录

RocksDBStateBackend

​ 将所有状态序列化后,存入本地的RocksDB中存储,数据量大时考虑使用。RocksDB 需要引入依赖

Flink 笔录

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

设置状态后端和失败重启策略

// 配置RocksDB状态后端,设置路径,开启快照增量保存
env.setStateBackend(new RocksDBStateBackend("hdfs://....."), true)	
// 配置文件系统状态后端,设置路径,开启同步快照保存
env.setStateBackend(new FsStateBackend("", true))	
// 配置内存状态后端,开启同步快照保存
env.setStateBackend(new MemoryStateBackend(false))	

// 配置失败重启策略,每5分钟最多重启5次,每次重启之间的间隔至少为10秒
env.setRestartStrategy(
    RestartStrategies.failureRateRestart(5,
                                       org.apache.flink.api.common.time.Time.minutes(5),
                                       org.apache.flink.api.common.time.Time.seconds(10))
)

9.3 Flink 容错机制

Checkpoint

Flink 笔录

​ Flink 故障恢复机制的核心,就是应用状态的一致性检查点。

​ 有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。

Checkpoint 恢复

Flink 笔录

​ 在执行流应用程序期间,Flink 会定期保存状态的一致检查点

​ 如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态

Flink 笔录

  1. 第一步:重启应用

  2. 第二步:所有的算子从 checkpoint 中读取状态,将状态重置

  3. 第三步:Source 调节 offset 重新开始消费

    检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性。

Checkpoint 算法实现

​ 基于 Chandy-Lamport 算法的分布式快照

检查点分解线(Checkpoint Barrier):Barrier 由 JobManager 产生,进入数据流,Barrier 之前的数据导致的状态更改,都会被包含在当前分界线所属的检查点中。

  1. JobManager 会向每个 Source 发送一条带有检查点ID的消息,通过这种方式来启动检查点。

Flink 笔录

  1. Source 将自身状态写入检查点,状态后端确认存入检查点通知 Source,Source 向 JobManager 确定检查点完成。

    Source 会发出 Barrier 给所有分区的任务。

Flink 笔录

  1. 每一个任务会等待所有输入分区的 Barrier 到达。

    对于 Barrier 已经到达的分区,继续到达的数据会被缓存,而 Barrier 尚未到达的分区,数据会被正常处理。

Flink 笔录

  1. 当收到所有输入分区的 Barrier 时,任务就将其状态保存到状态后端(State backend),Barrier 继续向下游转发。

Flink 笔录

  1. 当 Sink 收到 Barrier ,向 JobManager 确认状态保存到 checkpoint 完毕。

Flink 笔录

Checkpoint 的使用

// 设置每60秒进行一次Checkpoint
env.enableCheckpointing(60000L)
// 设置检查点模式
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置检查点超时时间90秒
env.getCheckpointConfig.setCheckpointTimeout(90000L)
// 设置检查点最大并行数
env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
// 设置当检查点任务失败,作业任务是否也会失败
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
//设置检查点之间的最小间隔,该值会使最大行数失效,默认为1
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(10000L)
// 设置保留检查点在外部的缓存
env.getCheckpointConfig.enableExternalizedCheckpoints(
			ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

保存点(Savepoints)

​ • Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)

​ • 原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点

​ • Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作

​ • 保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用等等。

9.4 Flink 状态一致性

状态一致性

  1. at-most-once (最多一次)

    当任务故障时,最简单的做法时什么都不干,既不恢复丢失的状态。at-most-once 语义是最多处理一次事件。

  2. at-least-once (至少一次)

    在大多数真实场景下,不希望数据丢失,这种类型的保障称为 at-least-once,每天数据至少处理一次,有的数据可以处理多次。

  3. exactly-once (精确一次)

    每一条数据都会被处理一次,且不能出现重复处理。

Flink 内部的 Exactly-Once

​ Flink 内部使用 Checkpoint(Chandy-Lamport 算法) 来实现 exactly-once 状态一致性。但是 Checkpint 只能保证 Flink 内部的 exactly-once,而端到端的 exactly-once 是指整个流处理应用的始终,每一个组件都保证了它自己的一致性。

Flink 端到端的 Exactly-Once

  1. Flink 内部 —— checkpoint
  2. Source 端 —— 可重设数据的 offset
  3. Sink 端 —— 从故障恢复时,数据不会重复写入外部系统

Sink 写入,保证 exactly-once 的两种方法:

幂等写入(Idempotent Writes)

​ 幂等操作,一个操作重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了

​ 比如:1的n次方还是1;e^x 的 n 阶导数还是 e^x。

事务写入(Transactional Writes)

​ 针对于 checkpoint 做事务处理,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。

​ 实现方式:1. 预写日志;2. 两阶段提交。

  1. 预写日志(Write-Ahead-Log,WAL)

    把结果数据先当成 状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统;

    该方法简单且易于实现,由于数据提前在状态后端中做了缓存,无论什么 sink 系统,都能用这种方式一批搞定;

    DataStream API 提供了一个模板类:GenericWriteAheadSink,来实现这种事务性 sink。

    缺点:

    1. 由于做了缓存,类似于批处理,且要等待 checkpoint 完成的通知,所以时效性会差一些。
    2. 若 sink 向外部批量写一部分后,开始报错,事务回滚,重新接受 checkpoint,重新写入,造成数据重复。
    3. sink 写入完成后,会返回一个确认信息给 JobManager,JobManager 会使用 CheckpointCommitter 将本次 checkpoint 提交到外部存储,用于故障恢复。但是,若是本次确认信息,到 checkpoint 提交的过程中出现故障,会导致本次 checkpoint 提交失败,那么就会直接恢复 checkpoint 重新写一次,最终造成数据重复。

    以上两种故障可能性比较小,但理论上还是会导致数据重复,故 预写日志(WAL)只能到达 at-least-once 语义。

  2. 两阶段提交(Two-Phase-Commit,2PC)

    对于每个 checkpoint,sink 任务会启动一个事务,并将接下来所有接收的数据添加到事务里

    然后将这些数据写入外部 sink 系统,但不提交它们 —— 这时只是“预提交”

    当它收到 checkpoint 完成的通知时,它才正式提交事务,实现结果的真正写入

    2PC 真正实现了 exactly-once,Flink 也为此提供了 TwoPhaseCommitSinkFunction 接口,方便其逻辑的实现。

    2PC 实现 Exactly-Once 对外部 Sink 系统的要求

    1. 2PC 模式要求外部 Sink 系统必须支持事务在 checkpoint 的间隔时间里,必须能够开启一个事务并接受数据写入
    2. Sink 在收到 checkpoint 完成的通知之前,事务必须是“等待提交”状态。checkpoint 周期(或故障恢复)的时长不能超过外部 Sink 预提交事务等待的最大时长。
    3. Sink 任务必须能够在进程失败后恢复事务,且提交事务必须是一个幂等操作。
Sink类型 Source不可重置Offset Source可重置Offset
任意(Any) At-most-once At-least-once
幂等性写入 At-most-once Exactly-once(故障恢复时会出现暂时不一致)
预写日志(WAL) At-most-once At-least-once
两阶段提交(2PC) At-most-once Exactly-once

Flink+Kafka 实现端到端 Exactly-Once

​ Kafka 在最近的0.11版本中添加了对 事务的支持。这意味着现在通过 Flink 读写 Kafaka,并提供端到端的 Exactly-Once 语义有了必要的支持。在此基础上 Flink 使用 两阶段提交 协议(2PC)实现到 Kafka 的 Exactly-Once。

—— Source:Kafka Consumer 作为 Source,可以保存 Offset,故障恢复时,可直接重置偏移量

—— Flink 程序:利用 Checkpoint 机制,把状态存盘,故障恢复时,保证内部状态一致性

—— Sink:Kafka Producer 作为 Sink,采用两阶段提交 Sink,需要实现一个 TwoPhaseCommitSinkFunction,同时Kafka Producer 要事务隔离级别为 READ COMMITTED (只读取已提交数据)。

  1. 在 Checkpoint 开始的时候,即两阶段提交协议的“预提交”阶段。当 Checkpoint 开始时,Flink 的 JobManager 会将 Checkpoint Brarrier (检查点分界线)注入数据流。

Flink 笔录

  1. Source 保存了消费 Kafka 的 Offset,之后将 Checkpoint Barrier 传递给下一个 Operator。

    Brarrier 在 Operator 之间传递。对于每一个 Operator,它触发 Operator 的状态快照写入到 State Backend。

Flink 笔录

  1. Sink 具有外部状态,在“预提交”阶段,除了将其状态快照写入 State Backend 之外,数据输出端还必须预先提交其外部事务。

Flink 笔录

  1. 当 Checkpoint Barrier 在所有 Operator 都传递了一遍,并且触发的 Checkpoint 回调成功完成时,“预提交”阶段结束。此时的 Checkpoint 是整个应用程序状态的快照,包括预先提交的外部状态。如果发生故障,我们可以回滚到上次成功完成快照的时间点。

  2. JobManager 为应用程序中的每个 Operator 发出 Checkpoint 已完成的回调。此时进入“提交”阶段,Source 和 Widnow Operator 没有外部状态,因此在提交阶段,这些 Operator 不必执行任何操作。但是,Sink 拥有外部状态,此时 Sink 会提交外部事务。

Flink 笔录

Flink 将两阶段提交(2PC)的通用逻辑提取到抽象类 TwoPhaseCommitSinkFunction

// 在事务开始前,在目标文件系统的临时目录中创建一个临时文件。随后,在处理数据时将数据写入此文件。
beginTransaction
// 在预提交阶段,持续写入数据到文件,之后关闭文件。随后,为下一个 checkpoint 启动一个新的事务.
preCommit
// 在提交阶段,将预提交阶段的文件原子地移动到真正的目标目录。需要注意的是,这会增加输出数据可见性的延迟。
commit 
// 在中止阶段,删除临时文件。
abort 

一般情况:发生任何故障,Flink 会将应用程序的状态恢复到最新的一次 Checkpoint 点。

一种极端的情况:预提交成功了,但在这次 Commit 的通知到达 Operator 之前发生了故障。在这种情况下,Flink 会将 Operator 的状态恢复到已经预提交,但尚未真正提交的状态。

总结:

  1. Flink 的 Checkpoint 机制是支持两阶段提交协议并提供端到端的 Exactly-Once 语义的基础。
  2. Flink 的 TwoPhaseCommitSinkFunction 提取了两阶段提交协议的通用逻辑,基于此将 Flink 和支持事务的外部系统结合,构建端到端的 Exactly-Once 成为可能。
  3. Kafka 在 0.11 版本首次引入了事务,为在 Flink 程序中使用 Kafka Producer 提供 Exactly-Once 提供了可能性。
  4. Kafaka 0.11 Producer 的事务是在 TwoPhaseCommitSinkFunction 基础上实现的,和 At-Least-Once Producer 相比只增加了非常低的开销。

十、Table API 与 SQL

10.1 Table API

引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

简单使用

def main(args: Array[String]): Unit = {
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("ECOMMERCE")
  val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
  // 1. 获取tableEnv
  val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
  // 2. 封装dataStream为样例类
  val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ 
jsonString => JSON.parseObject(jsonString,classOf[EcommerceLog]) }
  // 3. 获取动态表,流中的数据为样例类,会自动根据类属性映射为 table 的字段
  val ecommerceLogTable: Table = tableEnv.fromDataStream(ecommerceLogDstream)
  // 4. 根据表字段,进行Table API 的操作
  val table: Table = ecommerceLogTable.select("mid,ch").filter("ch ='appstore'")
  // 5. 动态表可以转换为流输出
  val midchDataStream: DataStream[(String, String)] = table.toAppendStream[(String,String)]

  midchDataStream.print()
  env.execute()
}

动态表的字段也可以根据样例类属性的顺序单独命名

// 用一个单引放到字段前面来标识字段名, 如 ‘name , ‘mid ,’amount 等
tableEnv.fromDataStream(ecommerceLogDstream,’mid,’uid  .......)  

10.2 Table API 的窗口集合

简单使用

import org.apache.flink.table.scala._	// 用于手动指定列名 和 table.toAppendStream/toRetractStream

// 计算每10秒中数据渠道为 appstore 的个数
def main(args: Array[String]): Unit = {
  //sparkcontext
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  //时间特性改为eventTime
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("ECOMMERCE")
  val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

  val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[EcommerceLog]) }
  // 设置watermark
  val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] = ecommerceLogDstream
    .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
    override def extractTimestamp(element: EcommerceLog): Long = {
      element.ts
    }
  }).setParallelism(1)
  // 获取tableEnv
  val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
  // 把数据流转化成Table
  val ecommerceTable: Table = tableEnv.fromDataStream(
      ecommerceLogWithEventTimeDStream,
      'mid,
      'uid,
      'appid,
      'area,
      'os,
      'ch,
      'logType,
      'vs,
      'logDate,
      'logHour,
      'logHourMinute,
      'ts.rowtime)  // .rowtime 表示将ts作为eventTime来处理
   
  // 通过 Table API 进行操作
  // 每10秒统计一次各个渠道的个数 Table API 解决
  val resultTable: Table = ecommerceTable
    .window(Tumble over 10000.millis on 'ts as 'tt)	// 基于 ts 创建滚动窗口,窗口长度为 10S,别名为 tt
    .groupBy('ch,'tt )	// 基于渠道、eventTime时间分组
    .select( 'ch, 'ch.count)	// 统计10秒内各个渠道的个数

  // 把Table转化成数据流
  val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable
    													.toRetractStream[(String,Long)]
  // 过滤,Boolean为true的数据,表示新数据
  resultDstream.filter(_._1).print()   
  env.execute()
}

关于 group by

  1. 如果了使用 groupby,table 转换为流的时候只能用 toRetractDstream,不能有 toAppendDstream

    // 把Table转化成数据流
    val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable
    														.toRetractStream[(String,Long)]
    
    
  2. toRetractDstream 得到的第一个 Boolean 型字段标识 true 就是新的数据(Insert),false 表示过期老数据(Delete)

    // 过滤,Boolean为true的数据,表示新数据
    resultDstream.filter(_._1).print()
    
    
  3. 如果使用的 API 包括时间窗口,那么窗口的字段必须出现在 groupBy 中

    // 通过 Table API 进行操作
    val resultTable: Table = ecommerceTable
        .window(Tumble over 10000.millis on 'ts as 'tt)	
        .groupBy('ch,'tt )	// 窗口的字段必须出现在 groupBy 中
        .select( 'ch, 'ch.count)	
    
    

关于时间窗口

  1. 用到时间窗口,必须提前声明时间字段,如果是 ProcessTime 直接在创建动态表时进行追加就可以。

    val ecommerceLogTable: Table = tableEnv.fromDataStream(
    		ecommerceLogWithEtDstream,
    		'mid,'uid,'appid,'area,'os,'ch,'logType,
        	'vs,'logDate,'logHour,'logHourMinute,'ps.proctime) // 声明 ProcessTime
    
    
  2. 如果是 EventTime 要将数据中的时间戳声明为 EventTime

    val ecommerceLogTable: Table = tableEnv.fromDataStream(
    		ecommerceLogWithEtDstream,
    		'mid,'uid,'appid,'area,'os,'ch,'logType,
        	'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime) // 声明 EventTime
    
    
  3. 滚动窗口可以使用 Tumble over 10000.millis on 来表示

    val table: Table = ecommerceLogTable.filter("ch ='appstore'")
    	.window(Tumble over 10000.millis on 'ts as 'tt) // 基于ts创建滚动窗口,窗口长度为10S,别名为 tt
    	.groupBy('ch ,'tt)
    	.select("ch,ch.count ")
    
    

10.4 SQL

def main(args: Array[String]): Unit = {
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  // 时间特性改为eventTime
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("ECOMMERCE")
  val dstream: DataStream[String] = env.addSource(myKafkaConsumer)
  // 封装样例类
  val ecommerceLogDstream: DataStream[EcommerceLog] = dstream.map{
      jsonString =>JSON.parseObject(jsonString,classOf[EcommerceLog])
  }
  // 设置watermark
  val ecommerceLogWithEventTimeDStream: DataStream[EcommerceLog] = ecommerceLogDstream
    .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[EcommerceLog](Time.seconds(0L)) {
    override def extractTimestamp(element: EcommerceLog): Long = {
      element.ts
    }
  }).setParallelism(1)
  // 获取tableEnv
  val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

  //把数据流转化成Table
  val ecommerceTable: Table = tableEnv.fromDataStream(ecommerceLogWithEventTimeDStream , 'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)

  // 每10秒 统计一次各个渠道的个数 table api 解决
    
  //通过table api 进行操作
    /*val resultTable: Table = ecommerceTable
      .window(Tumble over 10000.millis on 'ts as 'tt)
      .groupBy('ch,'tt )
      .select( 'ch, 'ch.count)*/
    
 // 通过sql 进行操作
  val resultSQLTable : Table = tableEnv.sqlQuery( "select ch ,count(ch)   from "+ecommerceTable+"  group by ch, Tumble(ts,interval '10' SECOND )")

  //把Table转化成数据流
  /*val appstoreDStream: DataStream[(String, String, Long)] = appstoreTable
    											.toAppendStream[(String,String,Long)]*/
  val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable
    											.toRetractStream[(String,Long)]
  // 过滤,Boolean为true的数据,表示新数据
  resultDstream.filter(_._1).print()
  env.execute()
}

十一、ECP (复杂事件处理)

​ 一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。

Flink 笔录

CEP 支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件 允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。

​ Flink CEP 是在 Flink 中实现的复杂事件处理(CEP)库

​ CEP 用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。

应用场景:

输入的流数据,尽快产生结果
在2个event流上,基于时间进行聚合类的计算
提供实时/准实时的警告和通知
在多样的数据源中产生关联并分析模式
高吞吐、低延迟的处理

1. 模式的类型

  1. 个体模式(Individual Patterns)

    个体模式包含 单例 和 循环模式 ,单例模式只接收一个事件,而循环模式可以接收多个。

    在个体模式后添加量词,使其变为循环模式,量词代表循环次数

    // 匹配出现4次
    start.times(4)
    // 匹配出现4次或0次
    start.times(4).optional
    // 匹配出现2,3或4次
    start.times(2, 4)
    // 匹配出现2,3或4次,尽可能多的重复匹配
    start.times(2, 4).greedy
    // 匹配出现1次或多次
    start.oneOrMore
    // 匹配出现0次、2次或多次,并尽可能多的重复匹配
    start.timesOrMore(2).optional.greedy
    
    
  2. **组合模式(**Combining Patterns,也叫模式序列)

    模式序列必须以一个“初始模式”开始:begin

    val start = Pattern.begin("start")
    
    
  3. 模式组(Groups of patterns)

​ 处理事件的规则,被叫做“模式”(Pattern)

2. 模式的条件

每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据,

CEP 中的个体模式主要通过调用 .where() .or() 和 .until() 来指定条件。

简单条件一般通过 .where() 方法对事件中的字段进行判断筛选,决定是否接受该事件

start.where(event => event.getName.startsWith("foo"))

3. 模式的序列

严格近邻(Strict Contiguity):next()

所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由 .next() 指定
例如对于模式”a  next b”,事件序列 [a, c, b1, b2] 没有匹配

宽松近邻( Relaxed Contiguity ):followedBy()

允许中间出现不匹配的事件,由 .followedBy() 指定
例如对于模式”a followedBy b”,事件序列 [a, c, b1, b2] 匹配为 {a, b1}

非确定性宽松近邻( Non-Deterministic Relaxed Contiguity ):followedByAny()

进一步放宽条件,之前已经匹配过的事件也可以再次使用,由 .followedByAny() 指定
例如对于模式”a followedByAny b”,事件序列 [a, c, b1, b2] 匹配为 {a, b1},{a, b2}

除以上模式序列外,还可以定义“不希望出现某种近邻关系”

.notNext()  —— 不想让某个事件严格紧邻前一个事件发生
.notFollowedBy() —— 不想让某个事件在两个事件之间发生

注意:

  1. 所有模式序列必须以 .begin() 开始
  2. 模式序列不能以 .notFollowedBy() 结束
  3. “not” 类型的模式不能被 optional 所修饰
  4. 可以为模式指定时间约束,用来要求在多长时间内匹配有效:within()
next.within(Time.seconds(10))

4. 模式的使用

  1. 定义模式

    // 定义一个pattern
    val loginFailPattern = Pattern
        .begin[LoginEvent]("start").where(_.status == "fail")
        .next("next").where(_.status == "fail").within(Time.seconds(2))
    
    
  2. 进行模式检验

    指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配

    调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream

    // 将pattern应用到数据流上
    val patternStream: PatternStream[LoginEvent] = CEP.pattern(dataStream, loginFailPattern)
    
    
  3. 提取匹配事件

    创建 PatternStream 之后,就可以应用 select() 或者 flatselect 方法,从检测到的事件序列中提取事件。

    // 从patternStream中选出符合规律的事件
    val loginFailWarningStream = patternStream.select(new LoginFailDetect())
    
    
  4. 编写 select function

    select() 方法需要输入一个 select function 作为参数,每个成功匹配的事件序列都会调用它。

    select() 以一个 Map[String,Iterable [IN]] 来接收匹配到的事件序列,其中 key 就是每个模式的名称,而 value 就是所有接收到的事件的 Iterable 类型

    class LoginFailDetect() extends PatternSelectFunction[LoginEvent, Warning] {
      override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {
        val firstFailEvent = map.get("start").iterator().next()
        val secondFailEvent = map.get("next").iterator().next()
        Warning(firstFailEvent.userId, firstFailEvent.timestamp, secondFailEvent.timestamp, "login fail 2 times")
      }
    }
    
    

    注意:Flink 1.8 之后 要继承 PatternProcessFunction

补充:超时事件的提取

当一个模式通过 within 关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃;为了能够处理这些超时的部分匹配,select 和 flatSelect API 调用允许指定超时处理程序。

超时处理程序会接收到目前为止由模式匹配到的所有事件,由一个 OutputTag 定义接收到的超时事件序列。

Flink 笔录

Flink 笔录Flink 笔录 马本不想再等了 发布了18 篇原创文章 · 获赞 0 · 访问量 129 私信 关注
上一篇:Python_字符串方法


下一篇:YELP NLP <文本信息提取项目>