Spring Cloud Stream 用来做什么
Spring Cloud Stream provides unified abstractions of message middleware configurations, and puts forward concepts such as publish-subscribe, consumer groups and partition.Spring Cloud Stream 提供了统一的抽象消息中间件配置,提出了发布订阅、消费组、和分区概念。
Spring Cloud Stream 中 Binder 是什么
Binder: A component used to integrate with external message middleware, and is used to create binding. Different message middleware products have their own binder implementations.For example, Kafka uses KafkaMessageChannelBinder, RabbitMQ uses RabbitMessageChannelBinder, while RocketMQ uses RocketMQMessageChannelBinder.一个用来集成外部消息中间件的组件,用来创建 Binding,不同的消息中间件产品有它们自己的 binder 实现。Kafka uses KafkaMessageChannelBinder, RabbitMQ uses RabbitMessageChannelBinder, while RocketMQ uses RocketMQMessageChannelBinder.
Spring Cloud Stream 中 Binding 能够做什么
Binding: Includes Input Binding and Output Binding.Binding serves as a bridge between message middleware and the provider and consumer of the applications. Developers only need to use the Provider or Consumer to produce or consume data, and do not need to worry about the interactions with the message middleware.Binding 服务是一个在消息中间件和应用程序生产和消费的桥梁,开发者只需要使用生产者生产数据、消费者消费数据,不需要担心消息中间件的交互,在 SpringBoot 分布式应用 Spring Cloud Stream 和消息中间件流程大致如下:
RocketMQBinder实现
官方实现流程
官方文档
RocketMQMessageChannelBinder实现了 Binder 的标准,它的内部构建了 RocketMQInboundChannelAdapter 和 RocketMQMessageHandler。而RocketMQMessageHandler将基于Springboot 配置文件 application.yml 中的 binding配置构建RocketMQTemplate。RocketMQTemplate模板类可以发送会将Spring-messaging 中封装的Message 转换为RocketMQ API 中的 Message,并且发送它。RocketMQInboundChannelAdapter将基于 配置文件(Springboot 中application.yml文件)中binding配置构RocketMQListenerBindingContainer,容器将会启动 RocketMQ 中的Consumer消费消息。RocketMQMessageChannelBinder is a standard implementation of Binder, it will build RocketMQInboundChannelAdapterand RocketMQMessageHandler internally. RocketMQMessageHandler will construct RocketMQTemplate based on the Binding configuration. RocketMQTemplate will convert the org.springframework.messaging.Message message class of spring-messaging module to the RocketMQ message class org.apache.rocketmq.common .message.Message internally, then send it out. RocketMQInboundChannelAdapter will also construct RocketMQListenerBindingContainer based on the Binding configuration, and RocketMQListenerBindingContainer will start the RocketMQ Consumer to receive the messages.
Binder 类结构图
Binder 分析
使用 MessageChannel 收发消息
发送到 RocketMQ Broker 的消息和接收 RocketMQ Broker 的消息都通过 MessageChannel 处理。- 生产消息:SendingHandler 处理来自 MessageChannel 处理器(订阅关系),当有新的消息要发送时,将消息发送到 MessageChannel 即可
- 消费消息:StreamListenerMessageHandler订阅 MessageChannel 处理器,当收到新的消息时,处理MessageChannel发来的消息即可
Binder源码分析
Binder实现规范接口说明
Binder规范
位于 org.springframework.cloud.stream中,包含绑定消费者和绑定生产者package org.springframework.cloud.stream.binder; /** * A strategy interface used to bind an app interface to a logical name. The name is * intended to identify a logical consumer or producer of messages. This may be a queue, a * channel adapter, another message channel, a Spring bean, etc. * * @param <T> the primary binding type (e.g. MessageChannel). * @param <C> the consumer properties type. * @param <P> the producer properties type. * @author Mark Fisher * @author David Turanski * @author Gary Russell * @author Jennifer Hickey * @author Ilayaperumal Gopinathan * @author Marius Bogoevici * @since 1.0 */ public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> { /** * Bind the target component as a message consumer to the logical entity identified by * the name. * @param name the logical identity of the message source * @param group the consumer group to which this consumer belongs - subscriptions are * shared among consumers in the same group (a <code>null</code> or empty String, must * be treated as an anonymous group that doesn't share the subscription with any other * consumer) * @param inboundBindTarget the app interface to be bound as a consumer * @param consumerProperties the consumer properties * @return the setup binding */ Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties); /** * Bind the target component as a message producer to the logical entity identified by * the name. * @param name the logical identity of the message target * @param outboundBindTarget the app interface to be bound as a producer * @param producerProperties the producer properties * @return the setup binding */ Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties); }
RocketMQ对Spring Cloud Stream Binder 代码实现(部分)
package com.alibaba.cloud.stream.binder.rocketmq; /** * @author <a href="mailto:fangjian0423@gmail.com">Jim</a> */ public class RocketMQMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<RocketMQConsumerProperties>, ExtendedProducerProperties<RocketMQProducerProperties>, RocketMQTopicProvisioner> implements ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> { private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties(); private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQProperties rocketMQProperties; private final InstrumentationManager instrumentationManager; //创建生产消息处理器 @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<RocketMQProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception { ... } //创建消息消费端点 @Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) throws Exception { ... } }
RocketMQMessageChannelBinder创建生产者说明:
使用RocketMQMessageChannelBinder::createProducerMessageHandler方法创建生产者。根据配置文件配置,创建一个RocketMQMessageHandler作为消息生产处理器,功能:消息生产发送给 RocketMQ Broker。创建RocketMQMessageHandler在RocketMQMessageChannelBinder::createProducerMessageHandler方法,截取代码如下:RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( rocketMQTemplate, destination.getName(), producerGroup, producerProperties.getExtension().getTransactional(), instrumentationManager);RocketMQ中RocketMQMessageHandler中启动消息处理器,handleMessageInternal()方法处理 RocketMQ消息,部分代码如下:
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle { @Override public void start() { if (!transactional) { instrumentationManager .addHealthInstrumentation(new Instrumentation(destination)); try { rocketMQTemplate.afterPropertiesSet(); instrumentationManager.getHealthInstrumentation(destination) .markStartedSuccessfully(); } catch (Exception e) { instrumentationManager.getHealthInstrumentation(destination) .markStartFailed(e); log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage()); throw new MessagingException(MessageBuilder.withPayload( "RocketMQTemplate startup failed, Caused by " + e.getMessage()) .build(), e); } } running = true; } @Override protected void handleMessageInternal(org.springframework.messaging.Message<?> message) throws Exception { try { final StringBuilder topicWithTags = new StringBuilder(destination); String tags = Optional .ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("") .toString(); if (!StringUtils.isEmpty(tags)) { topicWithTags.append(":").append(tags); } SendResult sendRes = null; if (transactional) { sendRes = rocketMQTemplate.sendMessageInTransaction(groupName, topicWithTags.toString(), message, message.getHeaders() .get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG)); log.debug("transactional send to topic " + topicWithTags + " " + sendRes); } else { int delayLevel = 0; try { Object delayLevelObj = message.getHeaders() .getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0); if (delayLevelObj instanceof Number) { delayLevel = ((Number) delayLevelObj).intValue(); } else if (delayLevelObj instanceof String) { delayLevel = Integer.parseInt((String) delayLevelObj); } } catch (Exception e) { // ignore } if (sync) { sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, rocketMQTemplate.getProducer().getSendMsgTimeout(), delayLevel); log.debug("sync send to topic " + topicWithTags + " " + sendRes); } else { rocketMQTemplate.asyncSend(topicWithTags.toString(), message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.debug("async send to topic " + topicWithTags + " " + sendResult); } @Override public void onException(Throwable e) { log.error( "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); if (getSendFailureChannel() != null) { getSendFailureChannel().send( RocketMQMessageHandler.this.errorMessageStrategy .buildErrorMessage( new MessagingException( message, e), null)); } } }); } } if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { if (getSendFailureChannel() != null) { this.getSendFailureChannel().send(message); } else { throw new MessagingException(message, new MQClientException("message hasn't been sent", null)); } } } catch (Exception e) { log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); if (getSendFailureChannel() != null) { getSendFailureChannel().send(this.errorMessageStrategy .buildErrorMessage(new MessagingException(message, e), null)); } else { throw new MessagingException(message, e); } } } }
RocketMQMessageChannelBinder创建消费者说明
使用RocketMQMessageChannelBinder::createConsumerEndpoint方法创建消费者。根据配置文件,创建RocketMQInboundChannelAdapter,适配器中的listenerContainer会启动容器中的消费者DefaultMQPushConsumer,消费 RocketMQ Broker消息。创建适配器RocketMQInboundChannelAdapter在RocketMQMessageChannelBinder::createConsumerEndpoint方法,截取代码如下://RocketMQMessageChannelBinder中createConsumerEndpoint方法部分代码: RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( listenerContainer, consumerProperties, instrumentationManager);RocketMQInboundChannelAdapter适配器中的容器,启动RocketMQ 消费者,截取代码如如下:
package com.alibaba.cloud.stream.binder.rocketmq.consuming; /** * @author <a href="mailto:fangjian0423@gmail.com">Jim</a> */ public class RocketMQListenerBindingContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle { private final static Logger log = LoggerFactory .getLogger(RocketMQListenerBindingContainer.class); private String nameServer; private String consumerGroup; private String topic; private int consumeThreadMax = 64; private String charset = "UTF-8"; private RocketMQListener rocketMQListener; private DefaultMQPushConsumer consumer; private boolean running; @Override public void start() { if (this.isRunning()) { throw new IllegalStateException( "container already running. " + this.toString()); } try { consumer.start(); } catch (MQClientException e) { throw new IllegalStateException("Failed to start RocketMQ push consumer", e); } this.setRunning(true); log.info("running container: {}", this.toString()); } }