【spring boot】【redis】spring boot 集成redis的发布订阅机制

一.简单介绍

1.redis的发布订阅功能,很简单。
  消息发布者和消息订阅者互相不认得,也不关心对方有谁。
  消息发布者,将消息发送给频道(channel)。
  然后是由 频道(channel)将消息发送给对自己感兴趣的 消息订阅者们,进行消费。

2.redis的发布订阅和专业的MQ相比较

  1>redis的发布订阅只是最基本的功能,不支持持久化,消息发布者将消息发送给频道。如果没有订阅者消费,消息就丢失了。
  2>在消息发布过程中,如果客户端和服务器连接超时,MQ会有重试机制,事务回滚等。但是Redis没有提供消息传输保障。
  3>简单的发布订阅可以使用redis,根据业务需求选择。

二.spring boot 集成[spring boot 2.x]

1.pom.xml文件

<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--spring2.0集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<!-- 使用redis的LUA脚本 需要序列化操作的jar-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

2.redis的config

为redis添加消息适配器,绑定消息处理器

消息适配器 可以添加多个

package com.sxd.swapping.config;

import com.sxd.swapping.redisReceiver.RedisReceiver;
import com.sxd.swapping.redisReceiver.RedisReceiver2;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer; /**
* @author sxd
* @date 2019/5/27 16:13
*/
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedisConfig { /**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter,
MessageListenerAdapter listenerAdapter2)
{
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory); //可以添加多个 messageListener
//可以对 messageListener对应的适配器listenerAdapter 指定本适配器 适配的消息类型 是什么
//在发布的地方 对应发布的redisTemplate.convertAndSend("user",msg); 那这边的就对应的可以消费到指定类型的 订阅消息
container.addMessageListener(listenerAdapter, new PatternTopic("user"));
container.addMessageListener(listenerAdapter2, new PatternTopic("goods")); return container;
} /**
* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
*
* receiveMessage 是默认监听方法 一般不变
* @param redisReceiver redis消息处理器,自定义的
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
System.out.println("消息适配器1进来了");
return new MessageListenerAdapter(redisReceiver, "receiveMessage");
} /**
* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
*
* receiveMessage 是默认监听方法 一般不变
* @param redisReceiver2 redis消息处理器,自定义的
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter2(RedisReceiver2 redisReceiver2) {
System.out.println("消息适配器2进来了");
return new MessageListenerAdapter(redisReceiver2, "receiveMessage");
} //使用默认的工厂初始化redis操作模板
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
} @Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(factory);
RedisSerializer keySerializer = new StringRedisSerializer();
// RedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer();
//key采用字符串反序列化对象
redisTemplate.setKeySerializer(keySerializer);
//value也采用字符串反序列化对象
//原因:管道操作,是对redis命令的批量操作,各个命令返回结果可能类型不同
//可能是 Boolean类型 可能是String类型 可能是byte[]类型 因此统一将结果按照String处理
redisTemplate.setValueSerializer(keySerializer);
return redisTemplate;
} }

3.创建几个消息处理器[处理消息订阅者  接收到消息后的业务]

package com.sxd.swapping.redisReceiver;

import org.springframework.stereotype.Service;

/**
*
* redis 订阅发布 消息接收器/处理器
* @author sxd
* @date 2019/5/30 17:12
*/
@Service
public class RedisReceiver { public void receiveMessage(String message) {
System.out.println("消息处理器1>我处理用户信息:"+message);
//这里是收到通道的消息之后执行的方法
//此处执行接收到消息后的 业务逻辑
}
}
package com.sxd.swapping.redisReceiver;

import org.springframework.stereotype.Service;

/**
* redis 订阅发布 消息接收器/处理器2
* @author sxd
* @date 2019/5/30 17:15
*/
@Service
public class RedisReceiver2 { public void receiveMessage(String message) {
System.out.println("消息处理器2>我处理商品信息:"+message);
//这里是收到通道的消息之后执行的方法
//此处执行接收到消息后的 业务逻辑
}
}

4.消息发布controller

@Autowired
RedisTemplate redisTemplate; /**
* redis 发布订阅pubsub
*/
@RequestMapping(value = "/redisPubSub")
public void redisPubSub(String msg){
if (msg.contains("用户")){
redisTemplate.convertAndSend("user",msg);
}else {
redisTemplate.convertAndSend("goods",msg);
}
}

测试:

发送请求:http://localhost:9666/redistest/redisPubSub?msg=用户---德玛西亚的用户

结果:

消息处理器1>我处理用户信息:用户---德玛西亚的用户

发送请求:http://localhost:9666/redistest/redisPubSub?msg=goods---德玛西亚的用商品

结果:

消息处理器2>我处理商品信息:goods---德玛西亚的用商品

上一篇:python之上下文管理、redis的发布订阅、rabbitmq


下一篇:Java技术总结