Flink 源码解析(八)checkpoint的生命周期

checkpoint的生命周期

1.1我们结合源码来看看flink的checkpoint到底是如何实现其生命周期的:

由于flink提供的SocketSource并不支持checkpoint,所以这里我以FlinkKafkaConsumer010作为sourceFunction。

1.2.1 触发checkpoint
要完成一次checkpoint,第一步必然是发起checkpoint请求。那么,这个请求是哪里发出的,怎么发出的,又由谁控制呢?
还记得如果我们要设置checkpoint的话,需要指定checkpoint间隔吧?既然是一个指定间隔触发的功能,那应该会有类似于Scheduler的东西存在,flink里,这个负责触发checkpoint的类是CheckpointCoordinator。

flink在提交job时,会启动这个类的startCheckpointScheduler方法,如下所示

 public void startCheckpointScheduler() {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            // make sure all prior timers are cancelled
            stopCheckpointScheduler();

            periodicScheduling = true;
            currentPeriodicTrigger = timer.scheduleAtFixedRate(
                    new ScheduledTrigger(), 
                    baseInterval, baseInterval, TimeUnit.MILLISECONDS);
        }
    }
    
    private final class ScheduledTrigger implements Runnable {

        @Override
        public void run() {
            try {
                triggerCheckpoint(System.currentTimeMillis(), true);
            }
            catch (Exception e) {
                LOG.error("Exception while triggering checkpoint.", e);
            }
        }
    }

启动之后,就会以设定好的频率调用triggerCheckPoint()方法。这个方法太长,我大概说一下都做了什么:

检查符合触发checkpoint的条件,例如如果禁止了周期性的checkpoint,尚未达到触发checkpoint的最小间隔等等,就直接return
检查是否所有需要checkpoint和需要响应checkpoint的ACK(ack涉及到checkpoint的两阶段提交,后面会讲)的task都处于running状态,否则return
如果都符合,那么执行checkpointID = checkpointIdCounter.getAndIncrement();以生成一个新的id,然后生成一个PendingCheckpoint。PendingCheckpoint是一个启动了的checkpoint,但是还没有被确认。等到所有的task都确认了本次checkpoint,那么这个checkpoint对象将转化为一个CompletedCheckpoint。
定义一个超时callback,如果checkpoint执行了很久还没完成,就把它取消
触发MasterHooks,用户可以定义一些额外的操作,用以增强checkpoint的功能(如准备和清理外部资源)
接下来是核心逻辑:

  // send the messages to the tasks that trigger their checkpoint
    for (Execution execution: executions) {
        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    }
 

这里是调用了Execution的triggerCheckpoint方法,一个execution就是一个executionVertex的实际执行者。我们看一下这个方法:

 public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
        final LogicalSlot slot = assignedResource;

        if (slot != null) {
        //TaskManagerGateway是用来跟taskManager进行通信的组件
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
                "no longer running.");
        }
    }

再往下跟就进入了Task类的范畴,我们将在下一小节进行解读。本小节主要讲了CheckpointCoordinator类是如何触发一次checkpoint,从其名字也可以看出来其功能:检查点协调器。

1.2.2 Task层面checkpoint的准备工作
先说Task类中的部分,该类创建了一个CheckpointMetaData的对象,并且生成了一个Runable匿名类用于执行checkpoint,然后以异步的方式触发了该Runable:

public void triggerCheckpointBarrier(
            final long checkpointID,
            long checkpointTimestamp,
            final CheckpointOptions checkpointOptions) {

            ......

            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    // set safety net from the task's context for checkpointing thread
                    LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                    FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);

                    try {
                        boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                        if (!success) {
                            checkpointResponder.declineCheckpoint(
                                    getJobID(), getExecutionId(), checkpointID,
                                    new CheckpointDeclineTaskNotReadyException(taskName));
                        }
                    }
                    
                    ......
                }
            };
            executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
        }
    }

上面代码里的invokable事实上就是我们的StreamTask了。Task类实际上是将checkpoint委托给了更具体的类去执行,而StreamTask也将委托给更具体的类,直到业务代码。
StreamTask是这样实现的:

如果task还在运行,那就可以进行checkpoint。方法是先向下游所有出口广播一个Barrier,然后触发本task的State保存。
如果task结束了,那我们就要通知下游取消本次checkpoint,方法是发送一个CancelCheckpointMarker,这是类似于Barrier的另一种消息。
注意,从这里开始,整个执行链路上开始出现Barrier,可以和前面讲Fault Tolerant原理的地方结合看一下。

   private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics) throws Exception {

        synchronized (lock) {
            if (isRunning) {
            
                operatorChain.broadcastCheckpointBarrier(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;
            }
            else {

                ......
                
            }
        }
    }

完成broadcastCheckpointBarrier方法后,在checkpointState()方法中,StreamTask还做了很多别的工作:

 public void executeCheckpointing() throws Exception {
            
            ......

            try {
                //这里,就是调用StreamOperator进行snapshotState的入口方法
                for (StreamOperator<?> op : allOperators) {
                    checkpointStreamOperator(op);
                }

                // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
                AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
                    owner,
                    operatorSnapshotsInProgress,
                    checkpointMetaData,
                    checkpointMetrics,
                    startAsyncPartNano);

                owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                //这里注册了一个Runnable,在执行完checkpoint之后向JobManager发出CompletedCheckPoint消息,这也是fault tolerant两阶段提交的一部分
                owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
                
                ......
            
            } 
        }

说到checkpoint,我们印象里最直观的感受肯定是我们的一些做聚合的操作符的状态保存,比如sum的和以及count的值等等。这些内容就是StreamOperator部分将要触发保存的内容。可以看到,除了我们直观的这些操作符的状态保存外,flink的checkpoint做了大量的其他工作。

接下来,我们就把目光转向操作符的checkpoint机制。

1.2.3 操作符的状态保存及barrier传递
第六篇时,我们已经了解了StreamOperator的类关系,这里,我们就直接接着上一节的checkpointStreamOperator(op)方法往下讲。
顺便,前面也提到了,在进行checkpoint之前,operator初始化时,会执行一个initializeState方法,在该方法中,如果task是从失败中恢复的话,其保存的state也会被restore进来。

传递barrier是在进行本operator的statesnapshot之前完成的,我们先来看看其逻辑,其实和传递一条数据是类似的,就是生成一个CheckpointBarrier对象,然后向每个streamOutput写进去:

 public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
        try {
            CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
            for (RecordWriterOutput<?> streamOutput : streamOutputs) {
                streamOutput.broadcastEvent(barrier);
            }
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while broadcasting checkpoint barrier");
        }
    }

下游的operator接收到本barrier,就会触发其自身的checkpoint。

StreamTask在执行完broadcastCheckpointBarrier之后,
我们当前的wordcount程序里有两个operator chain,分别是:

kafka source -> flatmap
keyed aggregation -> sink
我们就按这个顺序来捋一下checkpoint的过程。

1.kafka source的checkpoint过程

   public final void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            unionOffsetStates.clear();

            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                // the fetcher has not yet been initialized, which means we need to return the
                // originally restored offsets or the assigned partitions
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                    unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
                }

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
                }
            }

            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
        }
    }

kafka的snapshot逻辑就是记录一下当前消费的offsets,然后做成tuple(partitiion,offset)放进一个StateBackend里。StateBackend是flink抽象出来的一个用于保存状态的接口。

2.FlatMap算子的checkpoint过程
没什么可说的,就是调用了snapshotState()方法而已。

3.本operator chain的state保存过程
细心的同学应该注意到了,各个算子的snapshot方法只把自己的状态保存到了StateBackend里,没有写入的持久化操作。这部分操作被放到了AbstractStreamOperator中,由flink统一负责持久化。其实不需要看源码我们也能想出来,持久化无非就是把这些数据用一个流写到磁盘或者别的地方,接下来我们来看看是不是这样:

  //还是AbstractStreamOperator.java的snapshotState方法
            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }

那么这个operatorStateBackend是怎么保存状态的呢?
首先把各个算子的state做了一份深拷贝;
然后以异步的方式执行了一个内部类的runnable,该内部类的run方法实现了一个模版方法,首先打开stream,然后写入数据,然后再关闭stream。
我们来看看这个写入数据的方法:

 public SnapshotResult<OperatorStateHandle> performOperation() throws Exception {
                    long asyncStartTime = System.currentTimeMillis();

                    CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;

                    // get the registered operator state infos ...
                    List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorMetaInfoSnapshots =
                        new ArrayList<>(registeredOperatorStatesDeepCopies.size());

                    for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) {
                        operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                    }

                    // ... write them all in the checkpoint stream ...
                    DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

                    OperatorBackendSerializationProxy backendSerializationProxy =
                        new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);

                    backendSerializationProxy.write(dov);

                    ......
                    
                }

注释写的很清楚,我就不多说了。

4.后继operatorChain的checkpoint过程
前面说到,在flink的流中,barrier流过时会触发checkpoint。在上面第1步中,上游节点已经发出了Barrier,所以在我们的keyed aggregation -> sink 这个operatorchain中,我们将首先捕获这个barrier。

捕获barrier的过程其实就是处理input数据的过程,对应着StreamInputProcessor.processInput()方法,该方法我们在第六篇已经讲过,这里我们简单回顾一下:

  //每个元素都会触发这一段逻辑,如果下一个数据是buffer,则从外围的while循环里进入处理用户数据的逻辑;这个方法里默默的处理了barrier的逻辑
            final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
            if (bufferOrEvent != null) {
                if (bufferOrEvent.isBuffer()) {
                    currentChannel = bufferOrEvent.getChannelIndex();
                    currentRecordDeserializer = recordDeserializers[currentChannel];
                    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                }
                else {
                    // Event received
                    final AbstractEvent event = bufferOrEvent.getEvent();
                    if (event.getClass() != EndOfPartitionEvent.class) {
                        throw new IOException("Unexpected event: " + event);
                    }
                }
            }

处理barrier的过程在这段代码里没有体现,因为被包含在了getNextNonBlocked()方法中,我们看下这个方法的核心逻辑:

//BarrierBuffer.getNextNonBlocked方法
            else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
                if (!endOfStream) {
                    // process barriers only if there is a chance of the checkpoint completing
                    processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
                }
            }
            else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
                processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
            }

先提一嘴,大家还记得之前的部分也提到过CheckpointMarker吧,这里正好也对上了。

处理barrier也是个麻烦事,大家回想一下5.1节提到的屏障的原理图,一个opertor必须收到从每个inputchannel发过来的同一序号的barrier之后才能发起本节点的checkpoint,如果有的channel的数据处理的快了,那该barrier后的数据还需要缓存起来,如果有的inputchannel被关闭了,那它就不会再发送barrier过来了:


private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
        final long barrierId = receivedBarrier.getId();

        // fast path for single channel cases
        if (totalNumberOfInputChannels == 1) {
            if (barrierId > currentCheckpointId) {
                // new checkpoint
                currentCheckpointId = barrierId;
                notifyCheckpoint(receivedBarrier);
            }
            return;
        }

        // -- general code path for multiple input channels --

        if (numBarriersReceived > 0) {
            // this is only true if some alignment is already progress and was not canceled

            if (barrierId == currentCheckpointId) {
                // regular case
                onBarrier(channelIndex);
            }
            else if (barrierId > currentCheckpointId) {
                // we did not complete the current checkpoint, another started before
                LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
                        "Skipping current checkpoint.", barrierId, currentCheckpointId);

                // let the task know we are not completing this
                notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));

                // abort the current checkpoint
                releaseBlocksAndResetBarriers();

                // begin a the new checkpoint
                beginNewAlignment(barrierId, channelIndex);
            }
            else {
                // ignore trailing barrier from an earlier checkpoint (obsolete now)
                return;
            }
        }
        else if (barrierId > currentCheckpointId) {
            // first barrier of a new checkpoint
            beginNewAlignment(barrierId, channelIndex);
        }
        else {
            // either the current checkpoint was canceled (numBarriers == 0) or
            // this barrier is from an old subsumed checkpoint
            return;
        }

        // check if we have all barriers - since canceled checkpoints always have zero barriers
        // this can only happen on a non canceled checkpoint
        if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
            // actually trigger checkpoint
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received all barriers, triggering checkpoint {} at {}",
                        receivedBarrier.getId(), receivedBarrier.getTimestamp());
            }

            releaseBlocksAndResetBarriers();
            notifyCheckpoint(receivedBarrier);
        }
    }

总之,当收到全部的barrier之后,就会触发notifyCheckpoint(),该方法又会调用StreamTask的triggerCheckpoint,和之前的operator是一样的。

如果还有后续的operator的话,就是完全相同的循环,不再赘述。

5.报告完成checkpoint事件
当一个operator保存完checkpoint数据后,就会启动一个异步对象AsyncCheckpointRunnable,用以报告该检查点已完成,其具体逻辑在reportCompletedSnapshotStates中。这个方法把任务又最终委托给了RpcCheckpointResponder这个类:

checkpointResponder.acknowledgeCheckpoint(
            jobId,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            acknowledgedState);

从这个类也可以看出来,它的逻辑是通过rpc的方式远程调JobManager的相关方法完成报告事件,底层也是通过akka实现的。
那么,谁响应了这个rpc调用呢?是该任务的JobMaster。

 //JobMaster.java
    public void acknowledgeCheckpoint(
            final JobID jobID,
            final ExecutionAttemptID executionAttemptID,
            final long checkpointId,
            final CheckpointMetrics checkpointMetrics,
            final TaskStateSnapshot checkpointState) {

        final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
        final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
            jobID,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            checkpointState);

        if (checkpointCoordinator != null) {
            getRpcService().execute(() -> {
                try {
                    checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
                } catch (Throwable t) {
                    log.warn("Error while processing checkpoint acknowledgement message");
                }
            });
        } else {
            log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
                    jobGraph.getJobID());
        }
    }

JobMaster反手就是一巴掌就把任务又rpc给了CheckpointCoordinator.receiveAcknowledgeMessage()方法。

之前提到,coordinator在触发checkpoint时,生成了一个PendingCheckpoint,保存了所有operator的id。

当PendingCheckpoint收到一个operator的完成checkpoint的消息时,它就把这个operator从未完成checkpoint的节点集合移动到已完成的集合。当所有的operator都报告完成了checkpoint时,CheckpointCoordinator会触发completePendingCheckpoint()方法,该方法做了以下事情:

把pendinCgCheckpoint转换为CompletedCheckpoint
把CompletedCheckpoint加入已完成的检查点集合,并从未完成检查点集合删除该检查点
再度向各个operator发出rpc,通知该检查点已完成
本文里,收到这个远程调用的就是那两个operator chain,我们来看看其逻辑:

   public void notifyCheckpointComplete(long checkpointId) throws Exception {
        synchronized (lock) {
            if (isRunning) {
                LOG.debug("Notification of complete checkpoint for task {}", getName());

                for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
                    if (operator != null) {
                        operator.notifyCheckpointComplete(checkpointId);
                    }
                }
            }
            else {
                LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
            }
        }
    }

再接下来无非就是层层通知对应的算子做出响应罢了。
至此,flink的两阶段提交的checkpoint逻辑全部完成。

1.3 承载checkpoint数据的抽象:State & StateBackend
State是快照数据的载体,StateBackend是快照如何被保存的抽象。

State分为 KeyedState和OperatorState,从名字就可以看出来分别对应着keyedStream和其他的oeprator。从State由谁管理上,也可以区分为raw state和Managed state。Flink管理的就是Managed state,用户自己管理的就是raw state。Managed State又分为ValueState、ListState、ReducingState、AggregatingState、FoldingState、MapState这么几种,看名字知用途。

StateBackend目前提供了三个backend,MemoryStateBackend,FsStateBackend,RocksDBStateBackend,都是看名字知用途系列。

State接口、StateBackend接口及其实现都比较简单,代码就不贴了, 尤其State本质上就是一层容器封装。

上一篇:Flink Checkpoint 案例分析


下一篇:Flink实现Kafka到Mysql的Exactly-Once