Spring Boot使用Redis进行消息的发布与订阅

Redis 不仅提供一个NoSQL数据库,同时还提供了一套消息系统。
下面我将Spring Boot使用Redis进行消息的发布与订阅具体的流程分享给大家

首先引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-redis</artifactId>
    <version>1.4.7.RELEASE</version>
</dependency>
1.发送消息
@SpringBootApplication
@EntityScan(basePackages= {"cn.sh.sttri.app.messagepush"})
@EnableCaching
@EnableScheduling
public class MessagepushApplication {

    public static void main(String[] args) {

        ApplicationContext ct=SpringApplication.run(MessagepushApplication.class, args);

        StringRedisTemplate template = ct.getBean(StringRedisTemplate.class);
        template.convertAndSend("messagepush", "Hello message !");
        template.convertAndSend("messagepush3", "Hello message3 !");

    }
}

发送消息我们使用StringRedisTemplate来发送键和值均为字符串的消息。在main()方法中我们创建一个Spring应用的Context,初始化消息监听者容器,开始监听消息。然后获取StringRedisTemplate的实例,往messagepush和messagepush3两个主题发送消息

2.Redis 配置消息通道

创建一个Redis消息配置类
连接工程我们使用Spring Boot默认的RedisConnectionFactory
我们将在listenerAdapter方法中定义的Bean注册为一个消息监听者,它将监听messagepush和messagepush3两个主题的消息。

因为Receiver类是一个POJO,要将它包装在一个消息监听者适配器(实现了MessageListener接口),这样才能被监听者容器RedisMessageListenerContainer的addMessageListener方法添加到连接工厂中。有了这个适配器,当一个消息到达时,就会调用receiveMesage()方法进行响应。

@Configuration
public class RedisChannelConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅主题messagepush和messagepush3
        container.addMessageListener(listenerAdapter, new PatternTopic("messagepush"));
        container.addMessageListener(listenerAdapter, new PatternTopic("messagepush3"));
        //这个container 可以添加多个 messageListener
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(MessageReceive receiver) {
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
        //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean //注入操作数据的template(这里不需要操作redis数据,和消息队列功能无关)
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }

}
3.处理MessageReceive 消息接收类的receiveMessage()处理业务

/**redis 消息处理器*/
@Component
public class MessageReceive {

    @Autowired
    private MessageReceiveHandler messageReceiveHandler;
    /**接收消息的方法*/
    public void receiveMessage(String message){
        //System.out.println(message);

        messageReceiveHandler.messagePush(message);
    }

}

到MessageReceiveHandler 类中调用messagePush()

@Component
public class MessageReceiveHandler {
  public void messagePush(String message){
     
   System.out.println("----------收到消息了message:"+message);

 }
}

到这里一个完整的Spring Boot使用Redis进行消息的发布与订阅过程就打通了


Spring Boot使用Redis进行消息的发布与订阅
图片.png
上一篇:DataWorks百问百答44:如何同步JSON里的部分数据?


下一篇:P2P线上模式倒逼平台做大数据风控