~~~~~~~~~~~~~~~~分割线,正文可直接跳过~~~~~~~~~~~~~~~~
前段时间做二维码支付项目的时候,扫码枪扫到二维码后需要给用户反馈支付结果,属于双向通信。双向通信常用的手段是客户端轮询和websocet,考虑到用户体验和服务端的压力,最终选择了websocket。由于服务是多节点,websocket只会和其中一个节点保持长连接,远程服务扫描到支付码需要调用支付服务的扣款接口,根据扣款的成功与否给用户通知,支付服务需要发通知的时候并不知道要给哪个长连接发送。
解决方案是每个节点保存一份websocket的Session,断开连接时清除Session,服务端需要发送通知时哪个节点有此用户的Session就说明客户端是跟此节点保持的长连接。这里判断哪个节点含有用户的Session就需要检查所有的服务节点,此时就引入了发布/订阅,不管远程服务调用了支付服务的哪个节点,都去检查一遍所有的服务节点。
下图表示远程服务调用支付服务后的流程示意图:
~~~~~~~~~~~~~~~~~~~~~~分割线~~~~~~~~~~~~~~~~~~~
订阅频道:
JedisPool jedisPool = RedisHelper.getJedisPool(); SubscriberThread subscriberThread = new SubscriberThread(CHANNEL_QRCODE, jedisPool); subscriberThread.setDaemon(true); subscriberThread.start();
//订阅频道是阻塞的,需要开启一个线程
public class SubscriberThread extends Thread { private Subscriber subscriber = new Subscriber(); private String channel; private JedisPool jedisPool; public SubscriberThread(String channel, JedisPool jedisPool) { this.channel = channel; this.jedisPool = jedisPool; } @Override public void run() {while(true){ Jedis redis = this.jedisPool.getResource(); try{ log.info(">>>>>>>>>>订阅频道:{}", channel); redis.subscribe(subscriber, channel); }catch(Exception e){ e.printStackTrace(); } } } }
发送消息:
public class PublisherBuilder { private String channel; private JedisPool jedisPool; public PublisherBuilder(String channel, JedisPool jedisPool) { this.channel = channel; this.jedisPool = jedisPool; } public void publisher(String message) { log.info("publisher==>message:{}", message); Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.publish(channel, message); log.info("成功发布消息:{}", message); } catch (Exception e) { e.printStackTrace(); log.error("发布消息({})出错了, channel:", message, channel); } finally { if (jedis != null) { jedis.close(); } } } }
订阅消息:
public class Subscriber extends JedisPubSub { public Subscriber() { } @Override public void onMessage(String channel, String message) {
//收到消息会调用 log.info(String.format("收到订阅消息, channel %s, message %s", channel, message)); } @Override public void onSubscribe(String channel, int subscribedChannels) {
//订阅了频道会调用 log.info(">>>>>>>>>频道 {} 订阅成功:{}<<<<<<<<<<<", channel, subscribedChannels); } @Override public void onUnsubscribe(String channel, int subscribedChannels) {
//取消订阅 会调用 log.info(">>>>>>>>>取消频道 {} 订阅成功: {}<<<<<<<<<<<", channel, subscribedChannels); } }