发布订阅-jedis

~~~~~~~~~~~~~~~~分割线,正文可直接跳过~~~~~~~~~~~~~~~~

  前段时间做二维码支付项目的时候,扫码枪扫到二维码后需要给用户反馈支付结果,属于双向通信。双向通信常用的手段是客户端轮询和websocet,考虑到用户体验和服务端的压力,最终选择了websocket。由于服务是多节点,websocket只会和其中一个节点保持长连接,远程服务扫描到支付码需要调用支付服务的扣款接口,根据扣款的成功与否给用户通知,支付服务需要发通知的时候并不知道要给哪个长连接发送。

  解决方案是每个节点保存一份websocket的Session,断开连接时清除Session,服务端需要发送通知时哪个节点有此用户的Session就说明客户端是跟此节点保持的长连接。这里判断哪个节点含有用户的Session就需要检查所有的服务节点,此时就引入了发布/订阅,不管远程服务调用了支付服务的哪个节点,都去检查一遍所有的服务节点。

  下图表示远程服务调用支付服务后的流程示意图:

发布订阅-jedis

 

 

 

~~~~~~~~~~~~~~~~~~~~~~分割线~~~~~~~~~~~~~~~~~~~

订阅频道:

JedisPool jedisPool = RedisHelper.getJedisPool();
SubscriberThread subscriberThread = new SubscriberThread(CHANNEL_QRCODE, jedisPool);
subscriberThread.setDaemon(true);
subscriberThread.start();
//订阅频道是阻塞的,需要开启一个线程
public class SubscriberThread extends Thread { private Subscriber subscriber = new Subscriber(); private String channel; private JedisPool jedisPool; public SubscriberThread(String channel, JedisPool jedisPool) { this.channel = channel; this.jedisPool = jedisPool; } @Override public void run() {while(true){ Jedis redis = this.jedisPool.getResource(); try{ log.info(">>>>>>>>>>订阅频道:{}", channel); redis.subscribe(subscriber, channel); }catch(Exception e){ e.printStackTrace(); } } } }

 

发送消息:

public class PublisherBuilder {
private String channel;
    private JedisPool jedisPool;

    public PublisherBuilder(String channel, JedisPool jedisPool) {
        this.channel = channel;
        this.jedisPool = jedisPool;
    }

    public void publisher(String message) {
        log.info("publisher==>message:{}", message);

        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.publish(channel, message);
            log.info("成功发布消息:{}", message);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("发布消息({})出错了, channel:", message, channel);
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }

}

 

订阅消息:

public class Subscriber extends JedisPubSub {
public Subscriber() {
    }

    @Override
    public void onMessage(String channel, String message) {       
     //收到消息会调用 log.info(String.format("收到订阅消息, channel %s, message %s", channel, message)); } @Override public void onSubscribe(String channel, int subscribedChannels) {
//订阅了频道会调用 log.info(">>>>>>>>>频道 {} 订阅成功:{}<<<<<<<<<<<", channel, subscribedChannels); } @Override public void onUnsubscribe(String channel, int subscribedChannels) {
//取消订阅 会调用 log.info(">>>>>>>>>取消频道 {} 订阅成功: {}<<<<<<<<<<<", channel, subscribedChannels); } }

 

上一篇:Redis-高级篇


下一篇:使用java操作redis