【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

1.概述

转载:Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型

相似文章:【Flink】Flink 基于 MailBox 实现的 StreamTask 线程模型

Flink 1.10 对内部事件处理的线程模型做了一个大的改进,采用了类似 Actor 的信箱模型。这篇文章我们将深入 Flink 内部 Mailbox 线程模型的设计即实现。

2.背景

在之前的线程模型中,StreamTask 中可能存在多个潜在的线程会修改内部的状态,因此需要通过加锁的方式来确保线程安全的状态,这个全局的锁就是著名的 checkpointLock。通过 checkpointLock 控制线程间的并发会让程序代码变得很复杂,并且锁对象还通过一些 API 暴露给了用户(例如 SourceFunction#getCheckpointLock()),如果没有正确加锁很容易引发线程安全问题。

为了解决这个问题,社区提出了基于 Mailbox 的线程模型,见 FLINK-12477。Mailbox 机制借鉴了 Actor 模型,通过单个 Mailbox 线程配合阻塞队列的方式,将内部状态的修改交由单个线程完成,从而避免多线程的问题。相比于使用 checkpointLock,Mailbox 模型另一个好处是方便控制事件处理的优先级,通过锁竞争很难达到类似的效果。

在原始的线程模型中,checkpointLock 主要用在三个地方:

  • 事件处理:包括 events, watermarks, barriers, latency markers 的处理和发送

  • checkpoint 触发:通过 RPC 调用触发 checkpoint(在 Source 中)、通知 checkpoint 的完成情况,(注:对下游来说,checkpoint 触发和取消是通过 barrier 触发的,归为第一种情况)

  • Processing Time Timers: 处理时间定时器是通过 ScheduledExecutor 异步执行的(事件事件定时器触发是通过 watermark 触发的,归为第一种情况)

在新的改进方案中,对锁的替换不仅仅要做到排他的效果,对于事件处理还需要保证原子性。

3.改进方案

Mailbox 模型的核心思想其实比较简单,其底层就是 FIFO 的队列 + 一个单线程的循环事件处理。所有需要处理的事件都封装成一个 Mail 投递到 Mailbox 中,然后按先后顺序由单线程加以处理,从而简化了并发访问问题。

在使用 Mailbox 以前,StreamTask 的核心逻辑是在 StreamTask#run() 中,内部是一个循环的事件处理。除此以外,checkpoint triggerprocessing time timer 在其它线程中运行。

在改进方案中,StreamTask 的基础逻辑大致如下(伪代码,来自设计文档):

BlockingQueue<Runnable> mailbox = ...

void runMailboxProcessing() {
    //TODO: can become a cancel-event through mailbox eventually
    Runnable letter;
    while (isRunning()) { 
        while ((letter = mailbox.poll()) != null) {
            letter.run();
        }

        defaultAction();
    }
}

void defaultAction() {
    // e.g. event-processing from an input
}

上面只是核心代码的大致逻辑,具体的实现还有一些优化,比如队列的公平性。之前的抢锁操作是完全没有任何公平性而言的。

在这个模型下,事件处理的循环被移到了 Mailbox 处理线程中,因此以往在 StreamTask#run() 中的循环逻辑就不再需要了。但这里会有个问题,因为历史原因,Flink Source Function 的核心逻辑是一个循环,这个循环不能和 Mailbox 的事件循环穿插执行,因此需要进行兼容性处理。在 FLIP-27 提出的新的 Source 接口中,已经可以比较好地和 Mailbox 模型进行兼容了。

对于 checkpoint trigger 和 processing time timer,只需要将对应的操作封装为 Mail 投递到 Mailbox 中,等待 Mailbox 线程进行处理即可

4.具体实现

4.1 整体设计

下面这张图展示了 Mailbox 线程模型中的核心抽象。
【Flink】Flink 源码阅读笔记(20)- Flink 基于 Mailbox 的线程模型
Mail 中封装了需要处理的消息和相应的动作,checkpoint trigger 和 processing time timer 就是通过 Mail 触发的TaskMailbox 用于存储 Mail(需要处理的消息);MailboxProcessor 负责从 TaskMailbox 中取出信件并处理;其它的调用方通过 MailboxExecutor 向 TaskMailbox 中投递信件。

MailboxDefaultAction 则是 MailboxProcessor 的默认动作,如前所述,MailboxDefaultAction 主要负责处理基础的 stream event、barrier、watermark 等。在 Mailbox 主线程的循环中,处理完新的 Mail 后就会执行该动作。MailboxDefaultAction 通过一个 MailboxControllerMailbox 进行交互,可以借此获悉所有的事件都处理完毕,或者临时暂停 MailboxDefaultAction

4.2 Mailbox

TaskMailbox 的内部使用了一个普通的 Deque 存储写入的 Mail,对 Deque 读写通过一个 ReentrantLock 来加以保护。Mailbox 的一个主要特性是可以做优先级控制,每一个 Mail 都有其优先级,从 TaskMailbox 获取 Mail 时可以指定优先级,实际实现时就是通过遍历队列元素比较优先级

为了减少读取队列时的同步开销TaskMailbox 支持创建一个 batch 后续消费,相当于把队列中的元素存入一个额外的队列,后续消费时就避免了加锁的操作。

4.3 MailboxProcessor

MailboxProcessot 核心就是前面提过的事件循环,在这个事件循环中,除了处理 TaskMailbox 中的事件外,还有一个 MailboxDefaultAction 用做默认的行为

MailboxDefaultActionTaskMailbox 内部的 Mail 的区别在于,Mail 通常用于一些控制类的消息处理,例如 checkpoint 触发,而 MailboxDefaultAction用于数据流上的普通消息处理(如正常的数据记录,barrier)等。数据流上的消息数据量比较大,通过邮箱内部队列进行处理显然开销比较大。

public class MailboxProcessor implements Closeable {
    //邮箱
    protected final TaskMailbox mailbox;

    // 默认行为,用于普通的数据流上的消息数据处理
    protected final MailboxDefaultAction mailboxDefaultAction;

 /**
     * Runs the mailbox processing loop. This is where the main work is done. This loop can be
     * suspended at any time by calling {@link #suspend()}. For resuming the loop this method should
     * be called again.
     *
     *  // 运行邮箱处理循环。 这是完成主要工作的地方。
     */
    public void runMailboxLoop() throws Exception {
        suspended = !mailboxLoopRunning;

        final TaskMailbox localMailbox = mailbox;

        // 检查当前运行线程是否是 mailbox 线程,只有 mailbox 线程能运行该方法
        //确保当前调用必须发生在 Mailbox 的事件处理线程中
        checkState(
                localMailbox.isMailboxThread(),
                "Method must be executed by declared mailbox thread!");

		// mailbox 状态必须是 OPEN
        assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";

        final MailboxController defaultActionContext = new MailboxController(this);

        // TODO:邮箱里有邮件,就进行处理. 邮件就是类似map之类的任务...
        while (isNextLoopPossible()) {
            // The blocking `processMail` call will not return until default action is available.
            // 在默认操作可用之前,阻塞的`processMail`调用将不会返回。
            // 处理事件,这是一个阻塞方法,如果默认行为不可用,方法不会返回
            processMail(localMailbox, false);
            // 再做一次检查,因为上面的 mail 处理可能会改变运行状态
            if (isNextLoopPossible()) {
                // TODO: 执行一个默认的动作 邮箱默认操作在StreamTask构造器中指定,为 processInput
                mailboxDefaultAction.runDefaultAction(
                        // 根据需要在默认操作中获取锁
                        defaultActionContext); // lock is acquired inside default action as needed
            }
        }
    }

4.4 MailboxExecutor

MailboxExecutor 的主要作用是向 TaskMailbox 中投递 Mail,这个接口被设计为类似 java.util.concurrent.Executor 接口。提交 Mail 的行为可以在任意线程中进行,因为 TaskMailbox 内部有基于锁的同步控制

除了提交 Mail 外,MailboxExecutor 还有一个比较重要的作用体现在 MailboxExecutor#yield 方法中。yield 这个词在程序设计语言中非常常见,但其含义往往又让人摸不着头脑。从字面解释来看,yield 有“让出”,“屈服”之意,在一些场景下也有“生成”的意思。这里我们不纠结这个,还是来看看这个方法设计的意图的什么。

Mailbox 模型中所有的事件都是在单个事件处理线程中处理的,排除掉优先级的因素,所有的事件按照 FIFO 的顺序加以处理。正常情况下,这种处理顺序是没有问题的。但是考虑到一种特殊的情况,如果要完成对事件A的处理需要等待一个条件,只有在处理完事件B之后这个条件才能满足,但是事件B在队列里的顺序是在事件A之后的,这样某种程度上来说就造成了一种 “死锁”。

yield 方法就是为了解决上面的问题,yield 会从队列中取出下一个事件进行处理,看上去像是暂时“让出”了对当前事件的处理。

说起来有点抽象,看一个示例:

MailboxExecutor mailboxExecutor = ....

mailboxExecutor.executr(() -> {
    // ...
    // 当前事件处理的逻辑,要完成,需要依赖后面某个事件的处理
    while (resource not available) {
        // 取出下一个事件处理
        mailboxExecutor.yield();
    }
    // 继续处理当前事件
    // ...
})

注意,为了不破坏 Mailbox 模型单线程执行的特性,这个方法必须在 Mailbox 事件处理线程中调用。这是一个阻塞方法,因此可能会阻塞事件处理线程。有些场景下可能还需要依赖事件处理线程来提交新的事件,因此也提供了非阻塞的 tryYield 方法。

5.StreamTask 如何应用 Mailbox 模型

StreamTask 的核心是处理消息流中的 StreamRecord,这个处理逻辑是 MailboxProcessor 的默认行为,即:

class StreamTask {
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        InputStatus status = inputProcessor.processInput(); //处理输入
        if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
            return;
        }
        if (status == InputStatus.END_OF_INPUT) {
            // 没有后续的输入了,告知 MailboxDefaultAction.Controller 
            controller.allActionsCompleted();
            return;
        }

        // 暂时没有输入的情况
        TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();
        TimerGauge timer;
        CompletableFuture<?> resumeFuture;
        if (!recordWriter.isAvailable()) {
            timer = ioMetrics.getBackPressuredTimePerSecond();
            resumeFuture = recordWriter.getAvailableFuture();
        } else {
            timer = ioMetrics.getIdleTimeMsPerSecond();
            resumeFuture = inputProcessor.getAvailableFuture();
        }
        // 一旦有输入了,就告知 controller 要恢复 MailboxDefaultAction 的处理
        assertNoException(
                resumeFuture.thenRun(
                        // 首先会暂停 MailboxDefaultAction 的处理
                        new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
    }

    private static class ResumeWrapper implements Runnable {
        private final Suspension suspendedDefaultAction;
        private final TimerGauge timer;

        public ResumeWrapper(Suspension suspendedDefaultAction, TimerGauge timer) {
            this.suspendedDefaultAction = suspendedDefaultAction;
            timer.markStart();
            this.timer = timer;
        }

        @Override
        public void run() {
            timer.markEnd();
            suspendedDefaultAction.resume();
        }
    }
}

对于 checkpoint 的触发,是通过 MailboxExecutor 提交一个 Mail 来实现的:

@Override
    public Future<Boolean> triggerCheckpointAsync(
            CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {

        CompletableFuture<Boolean> result = new CompletableFuture<>();
        mainMailboxExecutor.execute(
                () -> {
                    try {
                        // 触发Checkpoint操作 这里可以看到,其实现跟方案设计中的是一致,Checkpoint trigger
                        // 这里的操作就是向 MailBox 提交一个 Task,等待 MailBox 去处理。
                        result.complete(
                                triggerCheckpointAsyncInMailbox(
                                        checkpointMetaData, checkpointOptions));
                    } catch (Exception ex) {
                        // Report the failure both via the Future result but also to the mailbox
                        result.completeExceptionally(ex);
                        throw ex;
                    }
                },
                "checkpoint %s with %s",
                checkpointMetaData,
                checkpointOptions);
        return result;
    }

checkpoint 完成或者放弃的通知也是提交到 Mailbox 中运行的:

class StreamTask {
    // checkpoint 完成或者失败的回调通知操作
    private Future<Void> notifyCheckpointOperation(
            RunnableWithException runnable, String description) {
        CompletableFuture<Void> result = new CompletableFuture<>();
        mailboxProcessor
                .getMailboxExecutor(TaskMailbox.MAX_PRIORITY)
                .execute(
                        () -> {
                            try {
                                runnable.run();
                            } catch (Exception ex) {
                                result.completeExceptionally(ex);
                                throw ex;
                            }
                            result.complete(null);
                        },
                        description);
        return result;
    }
}

对于 processing time timer 的触发也是类似的:

class streamTask {
    public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() {
        return mailboxExecutor ->
                new ProcessingTimeServiceImpl(
                        timerService,
                        callback -> deferCallbackToMailbox(mailboxExecutor, callback));
    }

    ProcessingTimeCallback deferCallbackToMailbox(
            MailboxExecutor mailboxExecutor, ProcessingTimeCallback callback) {
        return timestamp -> {
            // 提交到 mailbox 中运行
            mailboxExecutor.execute(
                    () -> invokeProcessingTimeCallback(callback, timestamp),
                    "Timer callback for %s @ %d",
                    callback,
                    timestamp);
        };
    }
}

6.Legacy Source 的兼容处理

前面提到,因为历史遗留的问题,SourceFunction 被设计成一个无限的循环,这个循环不能和 Mailbox 的事件循环穿插执行,因此需要进行兼容性处理。

SourceStreamTask 被设计为 StreamTask 的子类,会启动另外一个独立的线程 LegacySourceFunctionThread 运行 SourceFunction 中的循环。这样相当于有两个线程在同时运行

  1. 一个是 SourceFunction 中生成数据流中的数据
  2. 另一个是 Mailbox 中的事件处理线程。

为了防止这两个线程发生冲突,在 SourceStreamTask 中保留了 checkpoint lock,用于在这两个线程间进行并发控制。

为了达到这样的效果,Flink 提供了一个 StreamTaskActionExecutor 的封装,用来运行 Runnable。正常情况下,StreamTaskActionExecutor 的实现就是直接去运行 Runnable;同时也提供了一个 SynchronizedStreamTaskActionExecutor 的实现,在运行 Runnable 的时候会进行加锁控制,这样就把获取锁的操作引入到 Mailbox 处理线程中了:

class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {
    private final Object mutex;

    public SynchronizedStreamTaskActionExecutor(Object mutex) {
        this.mutex = mutex;
    }

    @Override
    public void run(RunnableWithException runnable) throws Exception {
        synchronized (mutex) {
            runnable.run();
        }
    }
}

7.小结

Mailbox 模型是常见的用来控制并发的一种设计,通过引入 Mailbox 的线程模型,Flink 简化了 StreamTask 的代码逻辑,规避了多线程竞争带来的并发问题。

通过对 Mailbox、 MailboxProcessor、MailboxExecutor 这几个接口的设计进行分析,可以看出 Flink 的 Mailbox 模型设计还是比较优雅的,在使用方面也比较简单,很值得我们在开发其它项目的时候参考。

上一篇:Flink流处理基础之watermark


下一篇:架构