Spring Cloud Stream Binder 实现

Spring Cloud Stream Binder 实现

JMS 实现 ActiveMQ

1.增加Maven依赖

<!-- 整合 Sprig Boot Starter ActiveMQ -->
        <!-- 间接依赖:
            spring jms
            jms api
            activemq
            spring boot jms
        -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

2.启动ActiveMQ broker


activemq console

3.原生API:生产消息
请注意启动后的控制台输出

private static void sendMessage() throws Exception {
        // 创建 ActiveMQ 链接,设置 Broker URL
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 创造 JMS 链接
        Connection connection = connectionFactory.createConnection();
        // 创建会话 Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建消息目的 - Queue 名称为 "TEST"
        Destination destination = session.createQueue("TEST");
        // 创建消息生产者
        MessageProducer producer = session.createProducer(destination);
        // 创建消息 - 文本消息
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText("Hello,World");
        // 发送文本消息
        producer.send(message);

        // 关闭消息生产者
        producer.close();
        // 关闭会话
        session.close();
        // 关闭连接
        connection.close();
    }

4.原生API:消费消息

 private static void receiveMessage() throws Exception {

        // 创建 ActiveMQ 链接,设置 Broker URL
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 创造 JMS 链接
        Connection connection = connectionFactory.createConnection();
        // 启动连接
        connection.start();
        // 创建会话 Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建消息目的 - Queue 名称为 "TEST"
        Destination destination = session.createQueue("TEST");
        // 创建消息消费者
        MessageConsumer messageConsumer = session.createConsumer(destination);
        // 获取消息
        Message message = messageConsumer.receive(100);

        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("消息消费内容:" + textMessage.getText());
        }

        // 关闭消息消费者
        messageConsumer.close();
        // 关闭会话
        session.close();
        // 关闭连接
        connection.stop();
        connection.close();
    }

Spring Boot JMS+ActiveMQ

1.Maven依赖

<!-- 整合 Sprig Boot Starter ActiveMQ -->
        <!-- 间接依赖:
            spring jms
            jms api
            activemq
            spring boot jms
        -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

2.配置ActiveMQ属性
application.properties

## ActiveMQ 配置
spring.activemq.brokerUrl = tcp://localhost:61616

3.配置JMS属性
application.properties

## JMS 配置
spring.jms.template.defaultDestination = sf-users-activemq

4.改造user-service-client:实现ActiveMQ User对象消息生产
UserServiceClientController.java

@Autowired
    private JmsTemplate jmsTemplate;

    @PostMapping("/user/save/message/activemq")
    public boolean saveUserByActiveMQMessage(@RequestBody User user) throws Exception {
        jmsTemplate.convertAndSend(user);
        return true;
    }

5.启动user-service-client
预先启动“eureka-server”以及“config-server”
6.改造user-service-provider:实现ActiveMQ User对象信息消费

  @Autowired
    private JmsTemplate jmsTemplate;

    @GetMapping("/user/poll")
    public Object pollUser() {
        // 获取消息队列中,默认 destination = sf-users-activemq
        return jmsTemplate.receiveAndConvert();
    }

ActiveMQ Spring Cloud Stream Binder实现

创建spring-cloud-stream-binder-activemq 工程
引入Maven
实现Binder接口-仅实现消息发送

package com.segumentfault.spring.cloud.stream.binder.activemq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;

/**
 * Active MQ MessageChannel Binder 实现
 *
 * @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
 * @since 0.0.1
 */
public class ActiveMQMessageChannelBinder implements
        Binder<MessageChannel, ConsumerProperties, ProducerProperties> {

    @Autowired
    private JmsTemplate jmsTemplate;

    /**
     * 接受 ActiveMQ 消息
     *
     * @param name
     * @param group
     * @param inboundBindTarget
     * @param consumerProperties
     * @return
     */
    @Override
    public Binding<MessageChannel> bindConsumer(String name, String group, MessageChannel inboundBindTarget, ConsumerProperties consumerProperties) {
        // TODO: 实现消息消费
        return () -> {
        };
    }

    /**
     * 负责发送消息到 ActiveMQ
     *
     * @param name
     * @param outputChannel
     * @param producerProperties
     * @return
     */
    @Override
    public Binding<MessageChannel> bindProducer(String name, MessageChannel outputChannel, ProducerProperties producerProperties) {
        Assert.isInstanceOf(SubscribableChannel.class, outputChannel,
                "Binding is supported only for SubscribableChannel instances");

        SubscribableChannel subscribableChannel = (SubscribableChannel) outputChannel;

        subscribableChannel.subscribe(message -> {
            // 接受内部管道消息,来自于 MessageChannel#send(Message)
            // 实际并没有发送消息,而是此消息将要发送到 ActiveMQ Broker
            Object messageBody = message.getPayload();
            jmsTemplate.convertAndSend(name, messageBody);

        });

        return () -> {
            System.out.println("Unbinding");
        };
    }
}

实现 Spring Cloud Stream Binder 自动装配
package com.segumentfault.spring.cloud.stream.binder.activemq;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * ActiveMQ Stream Binder 自动装配
 *
 */
@Configuration
@ConditionalOnMissingBean(Binder.class)
public class ActiveMQStreamBinderAutoConfiguration {

    @Bean
    public ActiveMQMessageChannelBinder activeMQMessageChannelBinder() {
        return new ActiveMQMessageChannelBinder();
    }
}

2.配置META-INF/spring.binders

activemq :\
com.segumentfault.spring.cloud.stream.binder.activemq.ActiveMQStreamBinderAutoConfiguration

3.整合消息生产者user-service-client
引入 ActiveMQ Spring Cloud Stream Binder Maven 依赖

 <!-- 引入 Active MQ Spring Cloud Stream Binder 实现 -->
        <dependency>
            <groupId>com.segumentfault</groupId>
            <artifactId>spring-cloud-stream-binder-activemq</artifactId>
            <version>${project.version}</version>
        </dependency>

4.配置ActiveMQ Spring Cloud Stream Binder 属性

## Spring Cloud Stream 默认 Binder
spring.cloud.stream.defaultBinder=rabbit

### 消息管道 activemq-out 配置
spring.cloud.stream.bindings.activemq-out.binder = activemq
spring.cloud.stream.bindings.activemq-out.destination = sf-users-activemq

5.实现Binder接口 - 实现消息消费

 @Override
    public Binding<MessageChannel> bindConsumer(String name, String group, MessageChannel inputChannel, ConsumerProperties consumerProperties) {

        ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
        try {
            // 创造 JMS 链接
            Connection connection = connectionFactory.createConnection();
            // 启动连接
            connection.start();
            // 创建会话 Session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建消息目的
            Destination destination = session.createQueue(name);
            // 创建消息消费者
            MessageConsumer messageConsumer = session.createConsumer(destination);

            messageConsumer.setMessageListener(message -> {
                // message 来自于 ActiveMQ
                if (message instanceof ObjectMessage) {
                    ObjectMessage objectMessage = (ObjectMessage) message;
                    try {
                        Object object = objectMessage.getObject();
                        inputChannel.send(new GenericMessage<Object>(object));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        }

        return () -> {
        };
    }

6.整合消息消费者- user-service-provider
引入 ActiveMQ Spring Cloud Stream Binder Maven 依赖

<!-- 引入 Active MQ Spring Cloud Stream Binder 实现 -->
        <dependency>
            <groupId>com.segumentfault</groupId>
            <artifactId>spring-cloud-stream-binder-activemq</artifactId>
            <version>${project.version}</version>
        </dependency>

7.配置ActiveMQ Spring Cloud Stream Binder 属性

## Spring Cloud Stream 默认 Binder
spring.cloud.stream.defaultBinder=rabbit

### 消息管道 activemq-out 配置
spring.cloud.stream.bindings.activemq-in.binder = activemq
spring.cloud.stream.bindings.activemq-in.destination = sf-users-activemq

8.实现User消息监听

 @StreamListener("activemq-in")
    public void onUserMessage(User user) throws IOException {
        System.out.println("Subscribe by @StreamListener");
        userService.saveUser(user);
    }

    // 监听 ActiveMQ Stream
    userMessage.activeMQIn().subscribe(message -> {

        if (message instanceof GenericMessage) {
            GenericMessage genericMessage = (GenericMessage) message;
            User user = (User) genericMessage.getPayload();
            userService.saveUser(user);
        }
    });
上一篇:【整理】组建 MySQL 集群的几种方案


下一篇:Spring Cloud消息驱动整合