官方:消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统 。
- 我个人觉得消息中间件也可以用共享单车模式形容,平台将共享单车放在固定的某些地方,然后大家有需要的自己用对应的app扫描对应的二维码就可以开锁使用。
- 固定地方就像是中间件存放消息的地方Broker,共享单车就是Message消息,平台-Producer生产者,大家-Consumer消费者,而对应的app就像是Topic(如果用美团app扫描,就是使用美团单车,如果用支付宝扫描,就是使用哈罗单车),每个单车的编号其实就是组成Topic的更小单元Queue。
大概的执行逻辑是这样滴:
为什么要使用消息队列?
主要的应用场景解耦、异步、削峰
解耦:
- 传统模式的系统间耦合性太强,比如系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
- 中间件模式的优点是将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。
异步:
- 传统模式的一些非必要的业务逻辑以同步的方式运行,太耗费时间。
- 中间件模式的优点是将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
削峰:
- 传统模式并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
- 中间件模式系统可以慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
上代码瞅瞅,我之前项目上用过的
1、引入依赖:
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.8.Final</version> </dependency>
2、yml配置文件添加生产者属性:
#配置rocketmq rocketmq producer producerId ***** #生产者id(旧版本是生产者id,新版本是groupid),替换成自己的 msgTopic alarmStatus #生产主题,替换成自己的 accessKey XXX #连接通道,替换成自己的 secretKey XXX #连接秘钥,替换成自己的 onsAddr http //onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet #生产者ons接入域名,替换成自己的
3、 初始化生产者:
package com.iwhalecloud.citybrain.ducha.ism.utils; import java.util.Properties; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; /** * rocketmq生产者启动初始化类 * @author lkf * @Date 2021年8月9日 * */ public class RocketmqProducerInit { "${rocketmq.producer.producerId}") ( private String producerId; "${rocketmq.producer.accessKey}") ( private String accessKey; "${rocketmq.producer.secretKey}") ( private String secretKey; "${rocketmq.producer.onsAddr}") ( private String ONSAddr; private static Producer producer; /* //当无法注入实例的时候可以使用此方法进行实例初始化 private static class ProducerHolder { private static final RocketmqProducerInit INSTANCE = new RocketmqProducerInit(); } private RocketmqProducerInit (){ } public static final RocketmqProducerInit getInstance() { return ProducerHolder.INSTANCE; }*/ public void init(){ System.out.println("初始化启动生产者!"); // producer 实例配置初始化 Properties properties = new Properties(); //您在控制台创建的Producer ID properties.setProperty(PropertyKeyConst.GROUP_ID, producerId); // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.setProperty(PropertyKeyConst.AccessKey, accessKey); // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.setProperty(PropertyKeyConst.SecretKey, secretKey); //设置发送超时时间,单位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取 properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr); producer = ONSFactory.createProducer(properties); // 在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown producer.start(); } /** * 初始化生产者 * @return */ public Producer getProducer(){ return producer; } }
4、发送消息:
package com.iwhalecloud.citybrain.ducha.ism.utils; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.OnExceptionContext; import com.aliyun.openservices.ons.api.SendCallback; import com.aliyun.openservices.ons.api.SendResult; import com.sixmonth.rocketmq.common.rocketmq.init.RocketmqProducerInit; /** * 消息生产者,可与消费者分离 * @author lkf * @Date 2021年8月9日 * */ public class RocketmqProducerService { private Logger logger = LoggerFactory.getLogger(RocketmqProducerService.class); "${rocketmq.producer.msgTopic}") ( private String msgTopic; private RocketmqProducerInit rocketmqProducerInit; public String tag = "*";//生产标签,可自定义,默认通配 /** * 异步发送消息 * 可靠异步发送:发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式; * 特点:速度快;有结果反馈;数据可靠; * 应用场景:异步发送一般用于链路耗时较长,对 rt响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等; * @param msg * @return */ public boolean sendMsgAsy(String msg) { Long startTime = System.currentTimeMillis(); Message message = new Message(msgTopic, tag, msg.getBytes()); rocketmqProducerInit.getProducer().sendAsync(message, new SendCallback() { public void onSuccess(SendResult sendResult) { ///消息发送成功 System.out.println("send message success. topic=" + sendResult.getMessageId()); } public void onException(OnExceptionContext context) { //消息发送失败 System.out.println("send message failed. execption=" + context.getException()); } }); Long endTime = System.currentTimeMillis(); System.out.println("单次生产耗时:"+(endTime-startTime)/1000); return true; } }
5、消费者初始化:
package com.iwhalecloud.citybrain.ducha.ism.utils; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Properties; public class RocketMQ3 { private static Logger log = LoggerFactory.getLogger(RocketMQ3.class); /** * 创建的Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。 */ private static Consumer consumer; /** * 消费警情数据 * */ public void init() { log.info("初始化启动消费者"); Properties p = new Properties(); // accessKey 阿里云身份验证,在阿里云服务器管理控制台创建 p.setProperty(PropertyKeyConst.AccessKey, "****"); //您在控制台创建的 CONSUMER_ID p.setProperty(PropertyKeyConst.ConsumerId, "****");//CID_alarmInfo_jwzt // secretKey 阿里云身份验证,在阿里云服务器管理控制台创建//rYwdAimwMeov5xEKLkOBHq1V3I89lc p.setProperty(PropertyKeyConst.SecretKey, "*******");//kbpnLBVkSWUjCr1RPaHLa23MaLcaMQ // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看 // p.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet"); p.setProperty(PropertyKeyConst.NAMESRV_ADDR, "15.***.***.242:8080"); //设置发送超时时间,单位毫秒 p.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); consumer = ONSFactory.createConsumer(p); //订阅多个 Tag alarmStatus consumer.subscribe("alarmStatus", "*", new RocketMQListener()); consumer.start(); log.info("消费者启动成功"); } }
6、消费者消费/处理消息:
package com.iwhalecloud.citybrain.ducha.ism.utils; import com.alibaba.fastjson.JSONObject; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.iwhalecloud.citybrain.ducha.ism.mapper.PoliceOrgMapper; import com.iwhalecloud.citybrain.ducha.ism.service.impl.JWZTUtilsImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; public class RocketMQListener implements MessageListener { private static Logger log = LoggerFactory.getLogger(RocketMQListener.class); public Action consume(Message message, ConsumeContext consumeContext) { log.info("Receive: {}", message); String messageBodyStr = new String(message.getBody()); log.info("message消息为: {}", messageBodyStr); System.out.println("message消息为:" + messageBodyStr); try { JSONObject jsonObject = JSONObject.parseObject(messageBodyStr); //数据处理 //。。。。 //请求接收日志 } catch (Exception e) { log.error("MQ警情事件数据录字符串格式异常Exception", e); // 记录字符串格式不正确 return Action.CommitMessage; } return Action.CommitMessage; } }
简单的使用大概就是这样的,其他的比如
- 消息队列如何选型?
- 如何保证消息队列是高可用的?
- 如何保证消息不被重复消费?
- 如何保证消费的可靠性传输?
- 如何保证消息的顺序性?
- 生产端和消费端的集群负载均衡
想了解更多相关知识的,可以去这儿