spring cloud stream 3.1.2 源码搭配rocketmq学习 (三)

(二)中介绍了函数的注册, 这篇介绍一下函数的初始化

这文章涉及到了大量响应式编程的方式, reactor 需要补一下

前言

这个 functionInitializer 其实是 channelfunction bean的绑定

响应式的doOn

同步钩子方法,在subscriber触发一系列事件的时候触发

先来熟悉一下doOn系列的方法. 这个方法在subscriber的时候如果没触发对应的钩子, 是不会执行的.

doOn资料传送门

热身

@Bean
public Function<Flux<Message<String>>, Mono<Void>> demo() {
    return flux -> flux.map(message -> {
        System.out.println("接收到了: " + message);
        return message;
    }).then();
}

@Component
static class DemoRunner implements CommandLineRunner {
    @Autowired
    Wrapper wrapper;

    @Override
    public void run(String... args) throws Exception {
        InputChannel inputChannel = new InputChannel();
        Flux<Message<String>> input = Flux.defer(() -> {
            Sinks.Many<Message<String>> sink = Sinks.many().unicast().onBackpressureError();
            System.out.println("初始化了inputChannel");
            MessageHandler messageHandler = message -> {
                System.out.println("处理信息");
                sink.tryEmitNext((Message<String>) message);
            };
            inputChannel.subscribe(messageHandler);
            return sink.asFlux().doOnCancel(() -> {
                // ...
            });
        });

        Mono<Void> result = wrapper.apply(input);

        // 上面这一段操作等同于 操作 flux 合并成了一个大的响应式
//          Mono<Void> result = Flux.defer(() -> {
//                Sinks.Many<Message<String>> sink = Sinks.many().unicast().onBackpressureError();
//                System.out.println("初始化了inputChannel");
//                MessageHandler messageHandler = message -> {
//                    System.out.println("处理信息");
//                    sink.tryEmitNext((Message<String>) message);
//                };
//                inputChannel.subscribe(messageHandler);
//                return sink.asFlux().doOnCancel(() -> {
//                    // ...
//                });
//            }).map(message -> {
//                System.out.println("接收到了: " + message);
//                return message;
//            }).then()
//            .doOnSubscribe(message -> {
//                System.out.println("在Wrapper.apply我加入了");
//            });

        result.subscribe();
        inputChannel.handle(MessageBuilder.withPayload("aaaa").build());
    }
}

static class InputChannel {
    final List<MessageHandler> messageHandlers = new ArrayList<>();

    public void subscribe(MessageHandler messageHandler) {
        messageHandlers.add(messageHandler);
    }

    public void handle(Message<String> message) {
        messageHandlers.get(0).handleMessage(message);
    }
}

@Component
static class Wrapper {
    @Autowired
    Function<Flux<Message<String>>, Mono<Void>> demo;

    public Mono<Void> apply(Flux<Message<String>> input) {
        System.out.println("---------");
        return demo.apply(input).doOnSubscribe(message -> {
            System.out.println("在Wrapper.apply我加入了");
        });
    }
}

这一段简单的响应式, 是functionInitializer核心的部分.

先组装flux然后调用我们注册的Bean把初始化的东西传入并生成一个总的响应式, 类似于合体一样. 上面注释部分的result就是最终生成的响应式.

functionInitializer就是把注册的Function Bean的调用某些注册方法加入到channel中和增加一些响应式的钩子达到统一处理某些信息的注册.

下面我们一起来看看源码

functionInitializer

初始化了一个这样的Bean--new FunctionConfiguration.FunctionToDestinationBinder

public void afterPropertiesSet() throws Exception {
    Map<String, BindableProxyFactory> beansOfType = this.applicationContext.getBeansOfType(BindableProxyFactory.class);
}

首先把BindableProxyFactory.class的Bean都取出来了.

看到BindableProxyFactory是不是很熟悉, 点进去发现, 他是BindableFunctionProxyFactory的父类.

BindableFunctionProxyFactory是不是(二)中用definition注册的Bean.

接着我们看到下面的这个bindFunctionToDestinations函数

只有这个函数不是提供者的时候才能绑定函数到目的地

if (function != null && !function.isSupplier()) {
    this.bindFunctionToDestinations(bindableProxyFactory, functionDefinition);
}

从下述代码发现inputs/outputs 就是(二)中注册的Input/Output

Set<String> inputBindingNames = bindableProxyFactory.getInputs();
Set<String> outputBindingNames = bindableProxyFactory.getOutputs();

public Set<String> getInputs() {
    return this.inputHolders.keySet();
}

我们看到其中有一段关键的代码

SubscribableChannel, 是不是在(二)中注册的DirectWithAttributesChannel的Bean.
把对应inputBindingName的取了出来并做了对应的封装.
组合成一个Publisher

SubscribableChannel inputChannel = (SubscribableChannel)this.applicationContext.getBean(inputBindingName, SubscribableChannel.class);

IntegrationReactiveUtils.messageChannelToFlux(inputChannel);

进入messageChannelToFlux方法我们发现会调用adaptSubscribableChannelToPublisher

 private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
    return Flux.defer(() -> {
        Many<Message<T>> sink = Sinks.many().unicast().onBackpressureError();
        MessageHandler messageHandler = (message) -> {
            while(true) {
                switch(sink.tryEmitNext(message)) {
                case FAIL_NON_SERIALIZED:
                case FAIL_OVERFLOW:
                    LockSupport.parkNanos(1000L);
                    break;
                case FAIL_ZERO_SUBSCRIBER:
                    throw new IllegalStateException("The [" + sink + "] doesn't have subscribers to accept messages");
                case FAIL_TERMINATED:
                case FAIL_CANCELLED:
                    throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink for message channel: " + inputChannel);
                default:
                    return;
                }
            }
        };
        inputChannel.subscribe(messageHandler);
        return sink.asFlux().doOnCancel(() -> {
            inputChannel.unsubscribe(messageHandler);
        });
    });
}

会发现有一行

inputChannel.subscribe(messageHandler);

把处理message的处理器注册进了inputChannel中

因为这个inputChannel就是DirectWithAttributesChannel, 所以我们直接关注到DirectWithAttributesChannel的subscibe方法.

 MessageDispatcher dispatcher = this.getRequiredDispatcher();
        boolean added = dispatcher.addHandler(handler);

把这个handler加进了dispatcher中, 那这个dispatcher是一个什么呢?

我们查阅继承关系发现DirectChannel这个类初始化的时候初始化了一个dispathcher

public DirectChannel(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
    this.dispatcher = new UnicastingDispatcher();
    ...
}

这样 messageHander 就注册进了DirectWithAttributesChannel的dispatcher中.

我们回到bindFunctionToDestinations中, 然后我们关注到这一行代码

Object resultPublishers = ((Function)functionToInvoke).apply(inputPublishers.length == 1 ? inputPublishers[0] : Tuples.fromArray(inputPublishers));

functionToInvoke 就是FunctionWrapper, 所以我们看看FunctionInvocationWrapper的apply方法
点进去看看

public Object apply(Object input) {
    // ...
    Object result = this.doApply(input);
    // ...
    return result;
}

看到doApply中, 因为我们注册的Bean是Function类型的, 所以我们直接看到 invokeFunction
发现有关键的一行 invokeFunctionAndEnrichResultIfNecessary

result = this.invokeFunctionAndEnrichResultIfNecessary(convertedInput);

private Object invokeFunctionAndEnrichResultIfNecessary(Object value) {
    //...

    // target就是注册的Function Bean的函数.
    // 在此处我们对他进行调用并把输入传入.
    // intputValue是对inputChannel内的信息进行了处理并封装成了Message
    // 想知道怎么处理的朋友可以看看源码, 就在这个函数里
    Object result = ((Function)this.target).apply(inputValue);

    //...
}

那这个target是什么呢, 这个是时候我们可以打个断点看看, 发现他就是我们注册的Function.

然后他调用了apply, 证明调用了这个方法, 并且传入了inputValue

然后我们发现functionToInvoke.apply这个函数将上述封装的inputChannel响应式进行传入, 并调用对应的function Bean, 得到完整的响应式函数. 合并了两段响应式函数.

这里的resultPublishers实际上就是我们配置的Function调用后的返回的值.

接着对resultPublishers进行判断, 是否有输出需要处理, 有的话做个doOnNext的钩子, 并封装对应的发送和错误处理逻辑.
没有则进行subscribe, 让之前的inputChannel的调用进行消费注册.

((Iterable)resultPublishers).forEach((publisher) -> {
    Flux flux = Flux.from((Publisher)publisher);
    if (!CollectionUtils.isEmpty(outputBindingNames)) {
       //  ...发送逻辑
    }
    // 如果不是消费者 则消费.
    // 这会subscribe上面配置的Flux, 进行对应的初始化.
    // 但是doOn的方法是钩子, 这边只是简单的subscribe所以不会被触发
    if (!function.isConsumer()) {
        flux.subscribe();
    }
});

至此, 我们才完整的注册了一个Function Bean.

总结

  1. 找到(二)中注册的Bean
  2. 找到(二)中注册的对应的Input/Output的Bean
  3. 将channel和这个Function bean绑定到一起, 并加入统一的处理方法

ps 响应式其实不是直接调用, 是配置了一堆东西, 等同 于配置文件. 等到一个命令来的时候例如类被new的时候, 再进行统一的执行.

好, 这能这篇文章比较干, 也可能比较乱, 如果有不好的地方, 欢迎讨论改进, 谢谢!

然后channel的注册 --- (二)
function Bean 和 channel 的绑定 (三) 已经说完了
那是不是还缺一个channel 和外部消息中间件的绑定呢, 我们下一篇文章继续!

Wish. Do.

上一篇:“赢”战2020!阿里、字节:一套高效的iOS面试真题!


下一篇:[非专业翻译] Mapster - 拷贝与合并