技术研究背景
由于目前的研发团队处于公司初创阶段,尚未有能成熟的运维体系,对于市面上常见的成熟MQ搭建维护能力不足,但是又希望能有一款轻量级的消息系统供研发团队的成员使用,因此开展了对该方面相关的技术调研工作。
通过相关的技术调研后,决定挑选基于Redis实现消息系统。
具体技术选型原因:
- 团队内部已经有搭建相关的Redis服务,并且具备一定的运维能力,可以节省技术成本
- 业界有较多关于Redis搭建消息系统方面的技术文章
- 目前的系统的整体吞吐量并不高,接入消息系统的主要目的只是为了实现系统之间的解耦
为了方便让读者们从0到1地学习这块内容,我将会从环节搭建开始介绍起。
基本环境的搭建
基于redis6.0.6版本搭建一套简单的消息队列系统。 环境部署:
docker run -p 6379:6379 --name redis_6_0_6 -d redis:6.0.6
- 参数解释: -d 后台启动 -p 端口映射 -name 容器名称
如果本地没有相关镜像,可以尝试通过搭建下方命令进行镜像的拉取:
docker pull redis:6.0.6
当redis的基础环境配置好了之后,接下来便是基于redis内置的一些基本功能开发一款消息队列组件了。
下边我将分三种不同的技术方案来介绍如何实现一款轻量级的消息队列。
基于常规的队列结构来实现消息队列
这块的实现比较简单,主要是基于Redis内部的List结构来落地的,发送方将消息从队列的左边写入,然后消费方从队列的右边读取。
package org.idea.mq.redis.framework.mq.list; import com.alibaba.fastjson.JSON; import org.idea.mq.redis.framework.bean.MsgWrapper; import org.idea.mq.redis.framework.mq.IMQTemplate; import org.idea.mq.redis.framework.redis.IRedisService; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @Author linhao * @Date created in 3:09 下午 2022/2/7 */ @Component public class RedisListMQTemplate implements IMQTemplate { @Resource private IRedisService iRedisService; @Override public boolean send(MsgWrapper msgWrapper) { try { String json = JSON.toJSONString(msgWrapper.getMsgInfo()); iRedisService.lpush(msgWrapper.getTopic(),json); return true; }catch (Exception e){ e.printStackTrace(); } return false; } }
问题思考
这里存在几个问题点需要思考下:
多个服务之间如何订阅同一个消息
这里我建议可以按照系统的项目名称前缀+业务标识来组织。
例如:用户系统中需要发布一条 会员已升级 的消息给到下游系统,此时可以将这条消息写入到名为:user-service:member-upgrade-list 的List集合中。
如果订单系统希望访问用户系统的消息,则需要在redis的key里指定user-service:member-upgrade-list关键字。
消息的监听机制如何实现?
对于List的消息可以采用轮询的方式获取,例如下边这段案例代码:
/** * 轮询的方式获取数据 * * @param msgWrapper */ private void pollingGet(MsgWrapper msgWrapper) { while (true) { String value = iRedisService.rpop(msgWrapper.getTopic()); if (!StringUtils.isEmpty(value)) { System.out.println(value); } //减少访问压力,定期睡眠一段时间 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } }
但是轮询的方式比较消耗性能,所以可以尝试使用Redis的阻塞式弹出指令,例如下边这种方式来监听消息的触发行为:
/** * 阻塞的方式获取数据 */ private void blockGet(MsgWrapper msgWrapper) { while (true) { List<String> values = iRedisService.brpop(msgWrapper.getTopic()); if (!CollectionUtils.isEmpty(values)) { values.forEach(value -> { System.out.println(value); }); } } }
消息的可靠性传输如何确保?
在设计消息队列的时候,我们非常看重的就是消息的可靠性保证。当一条消息发送到消费端之后,如果出现了异常,希望消息能够实现重新发送的效果。
对于这种场景的设计我们可以尝试使用 BRPOPLPUSH 这条指令,这条指令可以帮助我们在Redis内部将数据弹出时写入到另一个备份队列中,这样即使弹出的消息消费失败了,备份队列中还有一份备用消息可以使用,而且弹出和写入备份队列操作在Redis内部做了封装,外界调用可以视作为一个原子操作。
是否可以支持广播的模式?
从List集合的实现原理来看,Redis弹出的元素只能返回给一个客户端链接,因此无法支持广播这种效果的实现。
基于发布订阅功能实现消息队列
Redis的内部提供了一个叫做发布订阅的功能,通过subscibe命令和publish指令可以帮助我们实现关于消息发布和通知的功能。
使用subscibe/publish命令实现的效果和List结构最大的不同在于它的传输方式:
- list更多的是实现点对点方式的传输(P2P方式)
- subscibe/publish则是可以实现广播的方式和订阅者进行通信
publish部分的案例代码:
@Override public boolean publish(String channel, String content) { try (Jedis jedis = iRedisFactory.getConnection()) { jedis.publish(channel, content); return true; } catch (Exception e) { throw new RuntimeException(e); } }
subscibe部分的代码:
@Override public boolean subscribe(JedisPubSub jedisPubSub, String... channel) { try (Jedis jedis = iRedisFactory.getConnection()) { jedis.subscribe(jedisPubSub, channel); return true; } catch (Exception e) { throw new RuntimeException(e); } }
监听的部分可以通过额外开启一个线程来实现这部分效果:
@Component public class RedisSubscribeMQListener implements IMQListener { @Resource private IRedisService iRedisService; class TestChannel extends JedisPubSub { @Override public void onMessage(String channel, String message) { super.onMessage(channel, message); System.out.println("channel " + channel + " 接收到消息:" + message); } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d", channel, subscribedChannels)); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d", channel, subscribedChannels)); } } //所有频道的消息都监听 @Override public void onMessageReach(MsgWrapper msgWrapper) { Thread thread = new Thread(new Runnable() { @Override public void run() { iRedisService.subscribe(new TestChannel(), msgWrapper.getTopic()); } }); thread.start(); } }
要注意,回调通知的时候需要注入一个JedisPubSub的对象,这个对象的内部定义了接收消息之后的处理操作。