Redis 消息订阅(直接订阅SpringBoot 配置的Redis 和 自定义Redis连接)

如果发布的redis是当前springboot配置的Redis,那就直接进行订阅!

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;


@Configuration
public class RedisSubscriber extends JedisPubSub{


@Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅了一个叫chat 的通道
        container.addMessageListener(new MessageListener(){
            @Override
            public void onMessage(Message message, byte[] pattern) {
                String msg = new String(message.getBody());
                System.out.println(new String(pattern) + "主题发布:" + msg);
                //获取到通道消息进行解析msg,并处理
                
            }
        }, new PatternTopic("xxx"));  //xxx:具体的通道名。如果是多个通道,那就直接复制setConnectionFactory就好
        return container;
    }


}

如果订阅的消息不是,当前配置的Redis,那就需要从新起一个线程,不然直接获取连接,会导致线程阻塞,因为redis本身是但线程的。

port org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;


@Configuration
public class RedisSubscriber extends JedisPubSub{


    //在yaml里面的自定义的redis的链接进行获取

    @Value("${video.host}")  //ip
    private String host;

    @Value("${video.port}")  //端口
    private String port;


    @Value("${video.password}")  //密码
    private String password;


@Bean
    public void redisMessageListenerContainer() {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(100);
        jedisPoolConfig.setMaxWaitMillis(10000);
        jedisPoolConfig.setMinIdle(8);
        jedisPoolConfig.setMaxTotal(500);
        jedisPoolConfig.setJmxEnabled(true);
        jedisPoolConfig.setTestOnBorrow(true);
        jedisPoolConfig.setTestOnReturn(true);
        JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, Integer.valueOf(port), 100000, password);
        SubThread subThread=new SubThread(jedisPool);
        subThread.start();
   }

}
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class SubThread extends Thread {

    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();

    //订阅通道
    private final String Channel1 = "Channel1";
    private final String Channel2= "Channel2";

    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();   //取出一个连接
            jedis.subscribe(subscriber, Channel1,Channel2) 【通道可配置单个或者多个,底层源码有体现】
        } catch (Exception e) {
            System.out.println(e);
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisPubSub;


@Component
public class  Subscriber extends JedisPubSub {

    // 这样写是因为 service无法注入的原因

    public static XxxxService xxxxxService;

    @Autowired
    public void setSenderService(XxxxService xxxxxService){
        Subscriber.XxxxService = xxxxxService;
    }

    

    public Subscriber(){}
    @Override
    public void onMessage(String channel, String message) {       //收到消息会调用
        //视频告警
        //System.out.println("【"+channel + "】主题发布:" + message);
        if("Channel1".equals(channel)){
           //具体逻辑编写
        }
    }

}

上一篇:学习Spring5必知必会(1)~未使用spring前的麻烦


下一篇:如何找到Spring框架的官方API?