Flink-v1.12官方网站翻译-P004-Flink Operations Playground

Flink-v1.12官方网站翻译-P004-Flink Operations Playground

 

 

 

Flink操作训练场

在各种环境中部署和操作Apache Flink的方法有很多。无论这种多样性如何,Flink集群的基本构件保持不变,类似的操作原则也适用。

在这个操场上,你将学习如何管理和运行Flink Jobs。您将看到如何部署和监控应用程序,体验Flink如何从Job故障中恢复,并执行日常操作任务,如升级和重新缩放。

这个游乐场的构造

这个游乐场由一个长寿的Flink Session Cluster和一个Kafka Cluster组成。

一个Flink Cluster总是由一个JobManager和一个或多个Flink TaskManagers组成。JobManager负责处理Job提交,监督Job以及资源管理。Flink TaskManagers是工人进程,负责执行构成Flink Job的实际任务。在这个游戏场中,你将从一个单一的TaskManager开始,但以后会扩展到更多的TaskManager。此外,这个游乐场还带有一个专门的客户端容器,我们使用它来提交Flink Job,并在以后执行各种操作任务。客户端容器不是Flink Cluster本身需要的,只是为了方便使用才包含在里面。

Kafka集群由一个Zookeeper服务器和一个Kafka Broker组成。
Flink-v1.12官方网站翻译-P004-Flink Operations Playground

 

 

 

当游乐场启动时,一个名为Flink Event Count的Flink Job将被提交给JobManager。此外,还会创建两个Kafka主题输入和输出。

 Flink-v1.12官方网站翻译-P004-Flink Operations Playground

 

 

 

该作业从输入主题中消耗ClickEvents,每个ClickEvents都有一个时间戳和一个页面。然后按页面对事件进行键入,并在 15 秒的窗口中进行计数。结果被写入输出主题。

有6个不同的页面,我们在每个页面和15秒内产生1000个点击事件。因此,Flink作业的输出应该显示每个页面和窗口有1000个浏览量。

启动游乐场

游戏场环境的设置只需几步。我们将引导你完成必要的命令,并展示如何验证一切都在正确运行。

我们假设你的机器上安装了Docker(1.12+)和docker-compose(2.1+)。

所需的配置文件可以在flink-playgrounds仓库中找到。检查一下,然后旋转环境。

 

git clone --branch release-1.12 https://github.com/apache/flink-playgrounds.git
cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d

之后,你可以用以下命令检查正在运行的Docker容器。

 

docker-compose ps

                    Name                                  Command               State                   Ports                
-----------------------------------------------------------------------------------------------------------------------------
operations-playground_clickevent-generator_1   /docker-entrypoint.sh java ...   Up       6123/tcp, 8081/tcp                  
operations-playground_client_1                 /docker-entrypoint.sh flin ...   Exit 0                                       
operations-playground_jobmanager_1             /docker-entrypoint.sh jobm ...   Up       6123/tcp, 0.0.0.0:8081->8081/tcp    
operations-playground_kafka_1                  start-kafka.sh                   Up       0.0.0.0:9094->9094/tcp              
operations-playground_taskmanager_1            /docker-entrypoint.sh task ...   Up       6123/tcp, 8081/tcp                  
operations-playground_zookeeper_1              /bin/sh -c /usr/sbin/sshd  ...   Up       2181/tcp, 22/tcp, 2888/tcp, 3888/tcp

这表明客户端容器已经成功提交了Flink Job(Exit 0),所有集群组件以及数据生成器都在运行(Up)。

您可以通过调用来停止游乐场环境。

 

docker-compose down -v

进入游乐场

在这个游乐场中,有很多东西你可以尝试和检查。在下面的两节中,我们将向您展示如何与Flink集群进行交互,并展示Flink的一些主要功能。

Flink WebUI 

观察你的Flink集群最自然的出发点是在http://localhost:8081 下暴露的 WebUI。如果一切顺利,你会看到集群最初由一个任务管理器组成,并执行一个名为Click Event Count的Job。

 Flink-v1.12官方网站翻译-P004-Flink Operations Playground

 

 

 

Flink WebUI包含了很多关于Flink集群和它的工作的有用和有趣的信息(JobGraph, Metrics, Checkpointing Statistics, TaskManager Status, ...)。

日志

JobManager

JobManager的日志可以通过docker-compose进行尾随。

docker-compose logs -f jobmanager

在初始启动后,你应该主要看到每一个检查点完成的日志信息。

 

TaskManager

 

任务管理器的日志也可以用同样的方式进行尾随。

docker-compose logs -f taskmanager


在初始启动后,你应该主要看到每一个检查点完成的日志信息。

 

Flink CLI可以在客户端容器中使用。例如,要打印Flink CLI的帮助信息,你可以运行以下命令

 

docker-compose run --no-deps client flink --help

Flink REST API

Flink REST API通过主机上的localhost:8081或客户端容器中的jobmanager:8081暴露出来,例如,要列出所有当前正在运行的作业,你可以运行。

 

curl localhost:8081/jobs

Kafka Topics

 

你可以通过运行以下命令来查看写入Kafka主题的记录

 

//input topic (1000 records/s)
docker-compose exec kafka kafka-console-consumer.sh   --bootstrap-server localhost:9092 --topic input

//output topic (24 records/min)
docker-compose exec kafka kafka-console-consumer.sh   --bootstrap-server localhost:9092 --topic output

Time to Play!开玩

现在你已经学会了如何与Flink和Docker容器进行交互,让我们来看看一些常见的操作任务,你可以在我们的操场上尝试一下。所有这些任务都是相互独立的,即你可以以任何顺序执行它们。大多数任务可以通过CLI和REST API来执行。

 

Listing Running Jobs列出正在运行的作业

命令1

docker-compose run --no-deps client flink list

预期输出1

Waiting for response...
------------------ Running/Restarting Jobs -------------------
16.07.2019 16:37:55 : <job-id> : Click Event Count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

JobID在提交时分配给作业,并且需要通过CLI或REST API对作业执行操作。

 

命令2

curl localhost:8081/jobs

预期输出2

{
  "jobs": [
    {
      "id": "<job-id>",
      "status": "RUNNING"
    }
  ]
}

JobID在提交时分配给作业,并且需要通过CLI或REST API对作业执行操作。

观察故障和恢复

Flink在(部分)失败下提供了精确的一次处理保证。在这个游戏场中,你可以观察并--在一定程度上--验证这种行为。

步骤1:观察输出

如上所述,在这个游戏场中的事件是这样生成的,每个窗口正好包含一千条记录。因此,为了验证Flink是否成功地从TaskManager故障中恢复,而没有数据丢失或重复,你可以跟踪输出主题,并检查--恢复后--所有的窗口都存在,而且计数是正确的。

为此,从输出主题开始读取,并让这个命令运行到恢复后(步骤3)。

docker-compose exec kafka kafka-console-consumer.sh   --bootstrap-server localhost:9092 --topic output


步骤2:引入故障

为了模拟部分故障,你可以杀死一个任务管理器。在生产设置中,这可能对应于TaskManager进程、TaskManager机器的丢失,或者仅仅是框架或用户代码抛出的一个短暂的异常(例如,由于外部资源的暂时不可用)。

docker-compose kill taskmanager

几秒钟后,JobManager会注意到任务管理器的丢失,取消受影响的Job,并立即重新提交它进行恢复。当Job被重新启动后,它的任务仍然处于SCHEDULED状态,紫色的方块表示(见下面的截图)。

 Flink-v1.12官方网站翻译-P004-Flink Operations Playground

 

 

 注意:即使作业的任务处于SCHEDULED状态,还没有RUNNING,作业的整体状态也会显示为RUNNING。

 

此时,Job的任务不能从SCHEDULED状态转为RUNNING状态,因为没有资源(TaskManagers提供的TaskSlots)来运行任务。在新的任务管理器可用之前,Job将经历一个取消和重新提交的循环。

同时,数据生成器会不断地将ClickEvents推送到输入主题中。这类似于真正的生产设置,在生产数据的同时,要处理数据的Job却停机了。

 

步骤3:恢复

一旦你重新启动任务管理器,它就会重新连接到JobManager。

docker-compose up -d taskmanager

当JobManager被通知到新的任务管理器时,它将恢复中的Job的任务调度到新的可用任务槽。重新启动后,任务会从故障前最后一次成功的检查点恢复其状态,并切换到RUNNING状态。

Job将快速处理来自Kafka的全部积压输入事件(在故障期间积累的),并以更高的速度(>24条记录/分钟)产生输出,直到到达流的头部。在输出中,你会看到所有的键(页)都存在于所有的时间窗口中,而且每个计数都是精确的一千。由于我们是在 "至少一次 "模式下使用FlinkKafkaProducer,所以你有可能会看到一些重复的输出记录。

 注意:大多数生产设置依赖于资源管理器(Kubernetes、Yarn、Mesos)来自动重启失败的进程。

升级和重新缩放一个作业

升级Flink作业总是涉及两个步骤。首先,用一个保存点优雅地停止Flink Job。保存点是在一个明确定义的、全局一致的时间点(类似于检查点)上的完整应用状态的一致快照。其次,升级后的Flink Job从Savepoint开始。在这种情况下,"升级 "可以意味着不同的事情,包括以下内容。

  • 配置的升级(包括作业的并行性)。
  • 对工作的拓扑结构进行升级(增加/删除操作员)。
  • 对Job的用户定义功能进行升级。

在开始升级之前,你可能要开始尾随输出主题,以观察在升级过程中没有数据丢失或损坏。

 

docker-compose exec kafka kafka-console-consumer.sh   --bootstrap-server localhost:9092 --topic output

步骤1:停止作业

要优雅地停止作业,您需要使用 CLI 或 REST API 的 "stop "命令。为此,您需要该作业的JobID,您可以通过列出所有正在运行的Job或从WebWeb中获得。有了JobID,您就可以继续停止该作业。

第一种:CLI方式
命令

docker-compose run --no-deps client flink stop <job-id>

预期输出

Suspending job "<job-id>" with a savepoint.
Savepoint completed. Path: file:<savepoint-path>

Savepoint已经被存储到flink-conf.yaml中配置的state.savepoint.dir中,它被安装在本地机器的/tmp/flink-savepoints-directory/下。在下一步,你将需要这个Savepoint的路径。

 第二种:REST方式
请求

{
  "request-id": "<trigger-id>"
}

预期输出

{
  "request-id": "<trigger-id>"
}

请求

# 检查停止动作的状态并检索保存点路径
 curl localhost:8081/jobs/<job-id>/savepoints/<trigger-id>

预期输出

{
  "status": {
    "id": "COMPLETED"
  },
  "operation": {
    "location": "<savepoint-path>"
  }
}

 步骤2A:重新启动作业而不做更改

现在您可以从该保存点重新启动升级后的作业。为了简单起见,您可以在不做任何更改的情况下重新启动它。

 第一种方式:CLI方式
命令

docker-compose run --no-deps client flink run -s <savepoint-path>   -d /opt/ClickCountJob.jar   --bootstrap.servers kafka:9092 --checkpointing --event-time

预期输出 

Job has been submitted with JobID <job-id>

一旦作业再次运行,您将在输出主题中看到,在作业处理中断期间积累的积压时,记录以更高的速度产生。此外,您还会看到在升级过程中没有丢失任何数据:所有窗口都存在,数量正好是一千。

 

第二种方式:REST方式
请求

# 从客户端容器上传JAR
docker-compose run --no-deps client curl -X POST -H "Expect:"   -F "jarfile=@/opt/ClickCountJob.jar" http://jobmanager:8081/jars/upload

预期输出

{
  "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
  "status": "success"
}

请求

# 提交工作
curl -X POST http://localhost:8081/jars/<jar-id>/run   -d ‘{"programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}‘

预期输出

{
  "jobid": "<job-id>"
}

一旦作业再次运行,您将在输出主题中看到,在作业处理中断期间积累的积压时,记录以更高的速度产生。此外,您还会看到在升级过程中没有丢失任何数据:所有窗口都存在,数量正好是一千。

 

步骤2B:以不同的并行度重新启动作业(重新缩放)

另外,您也可以在重新提交时通过传递不同的并行性,从这个保存点重新缩放作业。

第一种方式:CLI方式
命令

docker-compose run --no-deps client flink run -p 3 -s <savepoint-path>   -d /opt/ClickCountJob.jar   --bootstrap.servers kafka:9092 --checkpointing --event-time

预期输出

Starting execution of program
Job has been submitted with JobID <job-id>

第二种方式:REST方式
请求

# 从客户端容器上传JAR
docker-compose run --no-deps client curl -X POST -H "Expect:"   -F "jarfile=@/opt/ClickCountJob.jar" http://jobmanager:8081/jars/upload

预期输出

{
  "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
  "status": "success"
}

请求

# 提交工作
curl -X POST http://localhost:8081/jars/<jar-id>/run   -d ‘{"parallelism": 3, "programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}‘

预期输出

{
  "jobid": "<job-id>"
}

现在,作业已经被重新提交,但它不会启动,因为没有足够的TaskSlots在增加的并行度下执行它(2个可用,3个需要)。有了

docker-compose scale taskmanager=2


你可以在Flink集群中添加一个带有两个任务槽的第二个任务管理器,它将自动注册到JobManager中。添加任务管理器后不久,该任务应该再次开始运行。

一旦Job再次 "RUNNING",你会在输出Topic中看到在重新缩放过程中没有丢失数据:所有的窗口都存在,计数正好是一千。

查询作业的指标

JobManager通过其REST API公开系统和用户指标。

端点取决于这些指标的范围。可以通过 jobs/<job-id>/metrics 来列出一个作业的范围内的度量。指标的实际值可以通过get query参数进行查询。

 请求

curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize"

预期输出

[
  {
    "id": "lastCheckpointSize",
    "value": "9378"
  }
]

REST API不仅可以用来查询指标,还可以检索运行中的作业状态的详细信息。

 请求

# find the vertex-id of the vertex of interest
curl localhost:8081/jobs/<jod-id>

预期输出

{
  "jid": "<job-id>",
  "name": "Click Event Count",
  "isStoppable": false,
  "state": "RUNNING",
  "start-time": 1564467066026,
  "end-time": -1,
  "duration": 374793,
  "now": 1564467440819,
  "timestamps": {
    "CREATED": 1564467066026,
    "FINISHED": 0,
    "SUSPENDED": 0,
    "FAILING": 0,
    "CANCELLING": 0,
    "CANCELED": 0,
    "RECONCILING": 0,
    "RUNNING": 1564467066126,
    "FAILED": 0,
    "RESTARTING": 0
  },
  "vertices": [
    {
      "id": "<vertex-id>",
      "name": "ClickEvent Source",
      "parallelism": 2,
      "status": "RUNNING",
      "start-time": 1564467066423,
      "end-time": -1,
      "duration": 374396,
      "tasks": {
        "CREATED": 0,
        "FINISHED": 0,
        "DEPLOYING": 0,
        "RUNNING": 2,
        "CANCELING": 0,
        "FAILED": 0,
        "CANCELED": 0,
        "RECONCILING": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 0,
        "read-bytes-complete": true,
        "write-bytes": 5033461,
        "write-bytes-complete": true,
        "read-records": 0,
        "read-records-complete": true,
        "write-records": 166351,
        "write-records-complete": true
      }
    },
    {
      "id": "<vertex-id>",
      "name": "Timestamps/Watermarks",
      "parallelism": 2,
      "status": "RUNNING",
      "start-time": 1564467066441,
      "end-time": -1,
      "duration": 374378,
      "tasks": {
        "CREATED": 0,
        "FINISHED": 0,
        "DEPLOYING": 0,
        "RUNNING": 2,
        "CANCELING": 0,
        "FAILED": 0,
        "CANCELED": 0,
        "RECONCILING": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 5066280,
        "read-bytes-complete": true,
        "write-bytes": 5033496,
        "write-bytes-complete": true,
        "read-records": 166349,
        "read-records-complete": true,
        "write-records": 166349,
        "write-records-complete": true
      }
    },
    {
      "id": "<vertex-id>",
      "name": "ClickEvent Counter",
      "parallelism": 2,
      "status": "RUNNING",
      "start-time": 1564467066469,
      "end-time": -1,
      "duration": 374350,
      "tasks": {
        "CREATED": 0,
        "FINISHED": 0,
        "DEPLOYING": 0,
        "RUNNING": 2,
        "CANCELING": 0,
        "FAILED": 0,
        "CANCELED": 0,
        "RECONCILING": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 5085332,
        "read-bytes-complete": true,
        "write-bytes": 316,
        "write-bytes-complete": true,
        "read-records": 166305,
        "read-records-complete": true,
        "write-records": 6,
        "write-records-complete": true
      }
    },
    {
      "id": "<vertex-id>",
      "name": "ClickEventStatistics Sink",
      "parallelism": 2,
      "status": "RUNNING",
      "start-time": 1564467066476,
      "end-time": -1,
      "duration": 374343,
      "tasks": {
        "CREATED": 0,
        "FINISHED": 0,
        "DEPLOYING": 0,
        "RUNNING": 2,
        "CANCELING": 0,
        "FAILED": 0,
        "CANCELED": 0,
        "RECONCILING": 0,
        "SCHEDULED": 0
      },
      "metrics": {
        "read-bytes": 20668,
        "read-bytes-complete": true,
        "write-bytes": 0,
        "write-bytes-complete": true,
        "read-records": 6,
        "read-records-complete": true,
        "write-records": 0,
        "write-records-complete": true
      }
    }
  ],
  "status-counts": {
    "CREATED": 0,
    "FINISHED": 0,
    "DEPLOYING": 0,
    "RUNNING": 4,
    "CANCELING": 0,
    "FAILED": 0,
    "CANCELED": 0,
    "RECONCILING": 0,
    "SCHEDULED": 0
  },
  "plan": {
    "jid": "<job-id>",
    "name": "Click Event Count",
    "nodes": [
      {
        "id": "<vertex-id>",
        "parallelism": 2,
        "operator": "",
        "operator_strategy": "",
        "description": "ClickEventStatistics Sink",
        "inputs": [
          {
            "num": 0,
            "id": "<vertex-id>",
            "ship_strategy": "FORWARD",
            "exchange": "pipelined_bounded"
          }
        ],
        "optimizer_properties": {}
      },
      {
        "id": "<vertex-id>",
        "parallelism": 2,
        "operator": "",
        "operator_strategy": "",
        "description": "ClickEvent Counter",
        "inputs": [
          {
            "num": 0,
            "id": "<vertex-id>",
            "ship_strategy": "HASH",
            "exchange": "pipelined_bounded"
          }
        ],
        "optimizer_properties": {}
      },
      {
        "id": "<vertex-id>",
        "parallelism": 2,
        "operator": "",
        "operator_strategy": "",
        "description": "Timestamps/Watermarks",
        "inputs": [
          {
            "num": 0,
            "id": "<vertex-id>",
            "ship_strategy": "FORWARD",
            "exchange": "pipelined_bounded"
          }
        ],
        "optimizer_properties": {}
      },
      {
        "id": "<vertex-id>",
        "parallelism": 2,
        "operator": "",
        "operator_strategy": "",
        "description": "ClickEvent Source",
        "optimizer_properties": {}
      }
    ]
  }
}

请参考REST API参考资料,了解可能查询的完整列表,包括如何查询不同范围的指标(如TaskManager指标)。

 

变体

你可能已经注意到,Click Event Count应用程序总是以--checkpointing和--event-time程序参数启动。通过在docker-compose.yaml的客户端容器的命令中省略这些,你可以改变Job的行为。

  • --checkpointing启用了checkpoint,这是Flink的容错机制。如果你在没有它的情况下运行,并通过故障和恢复,你应该会看到数据实际上已经丢失了。
  • --event-time 启用了你的 Job 的事件时间语义。当禁用时,作业将根据挂钟时间而不是ClickEvent的时间戳将事件分配给窗口。因此,每个窗口的事件数量将不再是精确的一千。

Click Event Count应用程序还有另一个选项,默认情况下是关闭的,你可以启用这个选项来探索这个作业在背压下的行为。你可以在docker-compose.yaml的客户端容器的命令中添加这个选项。

  • -backpressure在作业中间增加了一个额外的操作者,在偶数分钟内会造成严重的背压(例如,在10:12期间,但在10:13期间不会)。这可以通过检查各种网络指标(如 outputQueueLength 和 outPoolUsage)和/或使用 WebUI 中的背压监控来观察。

 

 

Flink-v1.12官方网站翻译-P004-Flink Operations Playground

上一篇:【AtCoder】AtCoder Grand Contest 039 解题报告


下一篇:C++栈类小练习