《如何使用实时计算对 Flink 任务进行调优》|学习笔记

开发者学堂课程【《实时计算 Flink 版中级课程》:《如何使用实时计算对 Flink 任务进行调优》】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/806/detail/13884


《如何使用实时计算对 Flink 任务进行调优》

一、Flink 任务调优

>Metrics & Monitoring

>状态管理与调优

>检查点管理与调优

>Object reuse 与性能优化 .

>序列化器与性能优化

01. Metrics&Monitoring

内容简介:

1. Flink Metrics 系统概览

2. Metrics 的范围&变量

>System Scope

>Variables

3. Metrics 类型

>Counter

>Gauge

>Meter (rate)

>Histogram

4. VVP平台上 Metrics 链路

5.监控中的关键指标

>是否正在"运行”

Uptime

fullRestarts

>Checkpoint情况

numberOfCompletedCheckpoints

numberOfFailedCheckpoints

lastCheckpointSize

6.重点指标

>Task & Operator级别的吞吐

>numRecords(In|Out)PerSecond

>numRecords(In|Out)

>Progress & Event-Time 延迟 

>currentOutputWatermark

>Source connector 追数据情况

>(Kafka) records-lag-max

>(Kinesis) millisBehindLatest

7.JVM 相关内存指标

>Status.JVM.Memory.

>NonHeap.Committed

>Heap.Used

>Heap.Committed

>Direct.MemoryUsed

>Mapped.MemoryUsed

>Status.Flink.Memory .

>Managed.Used

>Managed.Total

8.JVM 相关 CPU 指标

>指标

>Status JVM.CPU.Load

>Status.JVM.CPU.Time

注意:对于 48 核机器上 1CPU 的 TM 来说0.021 = 100%负载

>Flink 统计的CPU指标仅供参考,无法准确统计非 java 的 CPU 使用情况,准确的CPU 使用可以参照调度系统(例如 YARN )的进程 /container CPU 使用情况

9.延迟问题排查

可以查看每个算子的 latency,通过配置 metrics.latency.interval 启用会引入一定的性能损失.

10.排查反压问题

11.定位反压问题

>上游的 outPoolUsage VS.下游的 inputFloatingBuffersUsage

>连续采样

>其他可能的手段>查看 source 的情况

>网络延迟

( credit based flow control )

>单 subtask 反压

>其他资源瓶颈

 

二.状态管理与调优

1.当你的作业规模扩大时

>可能更需要使用 RocksDB,需要启用增 checkpoint,必要时可以考虑 unaligned checkpoint

>清理 checkpoint 变得更重要

>checkpoint 能更容易失败或者超时

>可能更容易碰到外部系统服务的限制

>需要更优雅地处理恢复

2.Task Manager 内存的分布与 state backend

>基于RocksDB

>基于Java Heap Objects

3.回顾基于 Heap 的 state backend

· state 以 java 对象的形式存储在堆上

·基本以 hash table 的格式存储

·每个 state 一个 hash table

·支持异步 checkpoint

·只有在 state snapshot 和 restore 时候才会进行序列化

·吞吐最高

4.state 性能提升相关的考虑

·选择拥有高效拷贝的 typeSerializer

·尽可能避免拷贝

·GC 相关的 tuning

·单台机上多个 task manager 的方式来进行扩并发

5.回顾基于 RocksDB 的 state backend

·state 以序列化字节的方式存储在堆外内存和磁盘上

·键值对数据库,以 LSM tree 的格式进行存储

·key 会序列化成 <keyGroup,key,namespace>

·LSM 天然支持多版本控制功能

·每次读写的时候都会进行序列化/反序列化

6.RocksDB 的 level 层级架构

·key 和 value 都是以序列化字节形式存储。

·MemTable∶新的 writer 会先写入到内存结构

·sst文件∶不可变的排序文件

7.谈 RocksDB 的 LSM

·设计出发点

>顺序磁盘访问比随机访问快很多

>append-only 的日志写模式可以利用这一点,但是不能提供有效的基于键的访问

>SST 文件

>每个文件均包含一部分有序的数据集合

>由于是不可变文件,当 record 更新或者删除时,重复的 record 总会创建出来。

>读取时,先从 memtable 开始,然后才会检查sst文件,以从新到日的顺序读取。

>sst 文件总是紧凑且有序的。

>可以使用 index 和 bloom filter 来优化 sst 文件中的键值访问

8.浅谈 RocksDB 的资源使用

>RocksDB 的实例是 operator 独享的

>一个state 对应一个column family

>SST 文件,MemTable, block cache 以及 compaction 线程

都是 per column family 的 。

>state.backend.rocksdb.block.cache-size

>默认的 block cache大小(8MB)

>state.backend.rocksdb.writebuffer.size

>默认的单个最大 MemTable 大小(64MB)

state.backend.rocksdb.writebuffer.count

>默认的最多 Memtable 数量(2)

9.浅谈 RocksDB 的资源使用

>RocksDB 的实例是 operator 独享的

>一个 state 对应一个 column family

>SST 文件,MemTable, block cache 以及 compaction 线程

都是 per column family 的

Indexes 和 Bloom filters

>可以通过 ConfigurableOptionsFactory 配置

>可以参考

PredefinedOptions#SPINNING_DISK_OPTIMIZED_HIGH_MEM

10.RocksDB 的性能调优

>写放大∶写入磁盘的数据量/写入 DB 的数据量

>读放大∶每次读取时需要读取磁盘的次数

>空间放大∶磁盘存储的数据量/ DB 的数据量

11.RocksDB 与 Timer

>默认情况下,timer 存储在 RocksDB 上,如果体量不大,可以考

虑放在 java 堆上

>如果想将 timer 存储在 Java 堆上,可以配置∶

state.backend.rocksdb.timer-service.factory: HEAP

>需要注意的是,如果使用 RocksDB 存储 state,但是将 timer 存储在堆上,这些timer 会在 snapshot 的同步阶段进行持久化。

12.增量 checkpointing

·目前只有 RocksDB state backend 支持增量 checkpoint

· Flink 会观察到上次成功创建/删除的 SST 文件,从而实现增量上传的目的。(默认情况下,SST 文件是 snappy 压缩的)

· 开启增量 checkpoint 后,进行恢复时,只需要简单地 re-open RocksDB 即可。

13.RocksDB 相关性能考虑

>尽量使用原生 state 类型

>可以开启 RocksDB native metrics 来帮助进行性能调优

>针对不同的硬件选择不同的 RocksDB 配置

>建议优先使用 SSD 磁盘,对性能有显著提升

14.State 过期清理

>不支持 EventTime 的 state 清理

>不支持 queryable state 的 清理

>不支持对已经存在的 state 添加或者移去 StateTtlConfig

 

三、检查点管理与调优

1.Snapshot,Checkpoint 和 Savepoint

·checkpoin 和 Savepoint 都属于snapshot 概念

·Checkpoint V.S Savepoint

2.Checkpoint 的相关配置

·增量还是全量 Checkpoint∶目前只有 RocksDB 支持增量,建议打开

·Exactly-once v.s at-least-once∶是否有 barrier 对齐,一般情况下,at-least-once能更快速完成 Checkpoint

·Checkpoint 间隔

·间隔越大,恢复时要追的数据越多

·对于 two-phase-commit 机制,间隔越大,事务提交地越慢

3.Checkpoint 的相关配置

>允许保存的 Checkpoint 数目∶默认1

>Checkpoint 超时时间∶默认10min

>Checkpoint 文件是否有压缩∶默认没有,RocksDB 天然支持压缩

>Checkpoint 遇到 error 时是否 fail 整个作业∶默认会 fail

>可以同时进行的 Checkpoint 数目∶默认1

>是否开启 unaligned checkpoint∶ 默认关闭

4.为什么 Checkpoint 会超时失败

·Timeout 对当前的 Checkpoint 来说太短了

-非常大的作业规模

-非常大的 state 或者没有开启增量 Checkpoint

-Checkpoint 的分布式文件存储系统网络访问慢

·反压导致 checkpoint barrier 无法及时传递到-反压的原因千千万

·用户方法内存在同步的对外 IO 操作,外部系统异常时导致 IO 操作慢

·数据量增大,作业响应慢

·如果不想因为反压导致 checkpoint 超时,可以考虑 unaligned checkpoint

5.Unaligned checkpoint

·barrier 可以"超前"被 operator 消费

·checkpoint 触发时机与作业反压与否解耦,不再受限制于当前 channel 处理速度

·增大 了 IO 数据量,增大了对磁盘的数据压力

6.checkpoint 相关 UI 监控

7.如何调查 checkpoint 超时原因

·观察 E2E duration,sync duration,async duration

·E2E duration>>sync duration+async duration,说明大部分时间耗在了barrier 传递到相关节点上,推测作业存在反压,通过相关UI工具去验证是否反压。

·async duration 耗时长,写分布式存储性能有问题

·对齐的 buffered 数据量太大

·作业存在反压或者部分 task 问 题,这个数值越大越有问题

·热点问题

·是否有部分 task 的 state 数据量特别大,或者存在明显的反压情况。

8.如何降低 Checkpoint 超时的可能性

·不要实现存在阻塞操作的相关算子

·根据实际情况合理配置超时时间

·使用 unaligned Checkpoint

·选择升级 Flink 的新特性

-FLIP-27 和 FLINK-10886 将支持 source 端的 event time 对齐

 

四、Object reuse 与性能优化

1.回顾 operator chaining

>Chaining

>发送方 out.collect()直接接入接收

>数据是通过保护性拷贝进行传输(不通过 Flink 的网络层序列化传输)

>只有在 forward 数据传输时才能生效

2. Object Reuse

·默认是关闭的

·当启用时

-流计算∶在 operator chain 中传输的是相同的对象

-批计算∶可以有更复杂的 re-use 模式

3.流计算中 Object Reuse 的特点

·Operator 间的数据传输不再是保护性拷贝

·第二个 operator 可以修改一个 operator 的数值

-第一个 operator 内的 filed 可以修改

-基于 JVM Heap 的 state backend

-在 out.collect()之后,第一个 operator 再 访问数据

·存储的 value 可能是不明确的

· window state

· contextkey

4.新计算中 Object Reuse 的 限制

·在方法调用过程中记住输入对象是不安全的

·不要修改输入对象

·可能会修改了输出对象并且再次输出

· 在使用 Object reuse 时,确保使用不可变类型

 

五.序列化器与性能优化

1.Flink 的序列化系统

·数据导入时的序列化

—在向外部系统读写数据时的序列化(例如 Kafka)

·数据传输的序列化

—在 Flink task 间交互数据时的序列化

·状态的序列化

—checkpoint 写时候的序列化,以及 RocksDB 的读写数据

2.Flink 的序列化系统

·原生支持的类型 TypeExtractor#privateGetForClass

·基本类型(Primitive type)

· Tuples, Scala Case Classes

·Avro 类型

·POJO 类型

·不原生支持的类型都会转为 kryo 进行序列化

3.Flink 的序列化系统

3. 自定义序列化器以提升性能

·通过 ExecutionConfig 来注册 kryo

· registerKryoType(Class<?>)

· 针对相应类提供 kryo 的序列化器(需要继承自:

com.esotericsoftware.kryo.Serializer )

4.自定义序列化器以提升性能

>利用 @TypeInfo 创建用户定义的 TypeInformation

5.自定义序列化器以提升性能

·创建 POJO 类借用 pojo serializer

· 类必须是 public 且不含静态内部类(no non-static inner class)

·类必须有一个无参构造方法 (no-argument constructor )

·所有非静态属性必须是(non- final)public 或者有 public 的 getter 和 setter


六、Object reuse

1.Object reuse 作业练习题

·源码下载

>将摄氏度转换为华氏温度

>利用指数移动平均求平均值

进入指标页面,观察每秒发送的 record 数量,从而获知作业 TPS

2. Object reuse 作业练习题

启动的 class 入口

-并发度设置为 2

·问题1∶如果开启 object reuse

(传入参数--objectReuse true),比较性能区别

·问题2∶注意观察为了保证正确性,MovingAverageSensors 和ConvertToLocalTemperature 类做了哪些努力

>启动的 class 入口

com.ververica.flinktraining.solutions.troubleshoot.0bjectReuseJobSolution1

>并发度设置为 2

问题 3∶

MovingAverageSensors 和 ConvertToLocalTemperature 的传输的对象是否可以用更高效的 serializer 来优化性能 (不用 kryo)?观察 metrics 中的每秒输出 record 数进行调优。

-Hint∶

com.ververica.flinktraining.solutions.troubleshoot.immutable下有提供相关不可变类以及自定义的 serializer


如何使用 AutoPilot 对作业自动调优

目录

· AutoPilot 整体架构

· AutoPilot 自动调优策略

· AutoPilot 应用实操

· AutoPilot 使用注意事项

·智能诊断功能(VVP 2.4.0)

一、AutoPilot 整体架构

1.Agent基本工作流程

《如何使用实时计算对 Flink 任务进行调优》|学习笔记

二、AutoPilot 自动调优策略

1.基于 Cpu 利用率的调优策略

·CPU 利用率高于指定阈值时,进行 scale up

·CPU 利用率长时间低于指定阈值时,进行 s cale down

·IO Bound 类型的作业需要把 CPU 的阈值调小

·优点∶简单高效, 没有额外依赖

·缺点∶算法精度低,收敛速度慢

2.基于内存使用的调优策略

·内存的调优策略主要是根据作业内存实际利用率,以及 GC 的 metric 来动态调整tm 的内存

·整体内存利用率高于指定阈值时,增大 TM 的内存

·整体内存利用率长时间低于指定阈值时,减少 TM 的内存

·TM GC 时 间、GC 频率超过阈值时,增大 TM 的内存

3.基于 Delay 的调优策略

·当 Source 出现延迟时,此时作业处于反压的状态吞吐达到最大

·根据 Delay 的变化率,以及反压状态下的吞吐,可以快速的预估出作业实际需要处理的 TPS 和并发度

·优点∶算法精度高,可以实现作业快速回复健康状态

·缺点∶依赖 delay metric,支持大部分 SQL 作业中用到的 connector

已支持社区 FLIP-33 标准化的 Connector Metrics,覆盖所有实现这一标准 metric体系的 connector

4.基于 Slot 利用率的调优策略

· Flink-1.11 开始完成作业数据处理模型的升级,Mailbox 模式变成默认处理模式,每个任务处理模型简化变成单线程更加高效,为了衡量一个 mail 是否延迟,我们引入 IdleTime 的 metric∶用来统计一个 task 没有处理数据的时间

·综合所有并发的 IdeTime,我们可以快速衡量一个 vertex 是否空闲,能不能进讲行并发度的回收

· 优点∶算法精度高,可以准确的衡量每 个 vertex 的空闲程度

能根据slot利用率来进行 scale up/scale down 作业的并发度

5.基于 JobException 的调优策略

· 作业在启动或运行过程中,因为内存不足或者其它原因会造成作业持续 failover

· Autopilot 会对 job Exception 进行根因诊断,通过调整作业资源来保持作业的稳定

· 优点∶

自动恢复因为内存配置不足的作业

有效解决 JM OOM 的场景

6.策略冲突怎么办?

AutoPilot 会智能判断选择!

7.AutoPilot 三种工作形态

·Active

· Monitor(默认开启)

·Disabled (非常不建议)

8.AutoPilot 高级配置

· 限制调整资源的上下限

· 调整各个 trigger 指标的阈值

· 调整 scale up/down 动作的 ratio

· 限制调整并发的上下限

四、AutoPilot 应用实操

1. 开启 AutoPilot

2.查看作业的当前状态

3.查看历史操作记录

4.未开启/开启 AutoPilot 效果

五、 AutoPilot 使用注意事项

1.AutoPilot 使用注意事项

· AutoPilot 修改并发度是通过默认的并发度来实现的,因此作业每个节点都不能设置并发度,否则就无法实现动态调节

· AutoPilot 触发后需要重启作业,会导致作业短暂停止处理数据

· AutoPilot 策略对作业的处理模式假设∶

1.流量平滑变化、不能有数据倾斜、每个算子的吞吐能力能够随并发度线性拓展

2.当作业 pattern 严重偏离这几个假设时,可能会存在作业异常,没有触发 scale,或者算法无法收敛,作业持续重启等异常场景,此时需要关闭 AutoPilot 手动调优

· AutoPilot 目前无法识别外部系统的问题∶当外部系统故障,访问变慢时,会导致作业 scale up,导致输出压力变大,

会加重外部系统的压力,导致外部系统雪崩

· Autopilot 目前不支持 session cluster 部署的作业

· Deployment 后需要手动重启作业

>作业状态

> Failover 汇总

>Check Poin t 状态

>数据倾斜

>数据过滤

>K8S 资源

> Slot 利用率

>SQL聚合优化

>TM/JM cpu/mem 状态

3.下图左侧就是作业的信息

上一篇:quidway secpath 下的防火墙配置


下一篇:二维绘图(3.1.5)