经常会有人提及到“消息中间件/消息队列”,那这到底是个啥呢?


官方:消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统


    • 我个人觉得消息中间件也可以用共享单车模式形容,平台共享单车放在固定的某些地方,然后大家有需要的自己用对应的app扫描对应的二维码就可以开锁使用。
    • 固定地方就像是中间件存放消息的地方Broker共享单车就是Message消息平台-Producer生产者大家-Consumer消费者,而对应的app就像是Topic(如果用美团app扫描,就是使用美团单车,如果用支付宝扫描,就是使用哈罗单车),每个单车的编号其实就是组成Topic的更小单元Queue


大概的执行逻辑是这样滴:

经常会有人提及到“消息中间件/消息队列”,那这到底是个啥呢?


为什么要使用消息队列?

主要的应用场景解耦、异步、削峰经常会有人提及到“消息中间件/消息队列”,那这到底是个啥呢?


解耦:

    • 传统模式的系统间耦合性太强,比如系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!
    • 中间件模式的优点是将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。


异步:

    • 传统模式的一些非必要的业务逻辑以同步的方式运行,太耗费时间。
    • 中间件模式的优点是将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度


削峰:

    • 传统模式并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
    • 中间件模式系统可以慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。

上代码瞅瞅,我之前项目上用过的

1、引入依赖:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.8.Final</version>
</dependency>

2、yml配置文件添加生产者属性:

#配置rocketmq      
rocketmq:
  producer:
  producerId: ***** #生产者id(旧版本是生产者id,新版本是groupid),替换成自己的
  msgTopic: alarmStatus #生产主题,替换成自己的
  accessKey: XXX  #连接通道,替换成自己的
  secretKey: XXX  #连接秘钥,替换成自己的
  onsAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet  #生产者ons接入域名,替换成自己的

3、 初始化生产者:

package com.iwhalecloud.citybrain.ducha.ism.utils;
 
import java.util.Properties;
 
import javax.annotation.PostConstruct;
 
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
 
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
 
/**
 * rocketmq生产者启动初始化类
 * @author lkf
 * @Date 2021年8月9日
 *
 */
@Component
public class RocketmqProducerInit {
 
   @Value("${rocketmq.producer.producerId}")
   private String producerId;
  
   @Value("${rocketmq.producer.accessKey}")
   private String accessKey;
  
   @Value("${rocketmq.producer.secretKey}")
   private String secretKey;
  
   @Value("${rocketmq.producer.onsAddr}")
   private String ONSAddr;
  
   private static Producer producer;
  
   /* 
   
   //当无法注入实例的时候可以使用此方法进行实例初始化
   private static class ProducerHolder {
     private static final RocketmqProducerInit INSTANCE = new RocketmqProducerInit();
   }
  
   private RocketmqProducerInit (){
    
   }
  
   public static final RocketmqProducerInit getInstance() {
     return ProducerHolder.INSTANCE;
   }*/
  
   @PostConstruct
   public void init(){
    System.out.println("初始化启动生产者!");
     // producer 实例配置初始化
     Properties properties = new Properties();
     //您在控制台创建的Producer ID
     properties.setProperty(PropertyKeyConst.GROUP_ID, producerId);
     // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
     properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
     // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
     properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
     //设置发送超时时间,单位毫秒
     properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
     // 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
     properties.setProperty(PropertyKeyConst.ONSAddr, ONSAddr);
     producer = ONSFactory.createProducer(properties);
     // 在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown
     producer.start();
   }
  
   /**
    * 初始化生产者
    * @return
    */
   public Producer getProducer(){
     return producer;
   }
}

4、发送消息:

package com.iwhalecloud.citybrain.ducha.ism.utils;
 
import java.util.Date;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
 
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.sixmonth.rocketmq.common.rocketmq.init.RocketmqProducerInit;
 
/**
 * 消息生产者,可与消费者分离
 * @author lkf
 * @Date 2021年8月9日
 *
 */
@Service
public class RocketmqProducerService {
 
 private Logger logger = LoggerFactory.getLogger(RocketmqProducerService.class);
 
 @Value("${rocketmq.producer.msgTopic}")
 private String msgTopic;
 
 @Autowired
 private RocketmqProducerInit rocketmqProducerInit;
 
 public String tag = "*";//生产标签,可自定义,默认通配
   
   /**
    * 异步发送消息
    * 可靠异步发送:发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式;
    * 特点:速度快;有结果反馈;数据可靠;
    * 应用场景:异步发送一般用于链路耗时较长,对 rt响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等;
    * @param msg
    * @return
    */
   public boolean sendMsgAsy(String msg) {
    Long startTime = System.currentTimeMillis();
    Message message = new Message(msgTopic, tag, msg.getBytes());
    rocketmqProducerInit.getProducer().sendAsync(message, new SendCallback() {
   @Override
   public void onSuccess(SendResult sendResult) {
    ///消息发送成功
             System.out.println("send message success. topic=" + sendResult.getMessageId());
   }
 
   @Override
   public void onException(OnExceptionContext context) {
    //消息发送失败
    System.out.println("send message failed. execption=" + context.getException());
   }
    });
       Long endTime = System.currentTimeMillis();
    System.out.println("单次生产耗时:"+(endTime-startTime)/1000);
    return true;
   }
   
}

5、消费者初始化:

package com.iwhalecloud.citybrain.ducha.ism.utils;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Properties;
@Component
public class RocketMQ3 {
    private static Logger log = LoggerFactory.getLogger(RocketMQ3.class);
    /**
     * 创建的Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。
     */
    private static Consumer consumer;
    /**
     * 消费警情数据
     *
     */
    @PostConstruct
    public void init() {
        log.info("初始化启动消费者");
        Properties p = new Properties();
        // accessKey 阿里云身份验证,在阿里云服务器管理控制台创建
        p.setProperty(PropertyKeyConst.AccessKey, "****");
        //您在控制台创建的 CONSUMER_ID
        p.setProperty(PropertyKeyConst.ConsumerId, "****");//CID_alarmInfo_jwzt
        // secretKey 阿里云身份验证,在阿里云服务器管理控制台创建//rYwdAimwMeov5xEKLkOBHq1V3I89lc
        p.setProperty(PropertyKeyConst.SecretKey, "*******");//kbpnLBVkSWUjCr1RPaHLa23MaLcaMQ
        // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
//        p.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
        p.setProperty(PropertyKeyConst.NAMESRV_ADDR, "15.***.***.242:8080");
        //设置发送超时时间,单位毫秒
        p.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        consumer = ONSFactory.createConsumer(p);
        //订阅多个 Tag  alarmStatus
        consumer.subscribe("alarmStatus", "*", new RocketMQListener());
        consumer.start();
        log.info("消费者启动成功");
    }
}


6、消费者消费/处理消息:

package com.iwhalecloud.citybrain.ducha.ism.utils;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.iwhalecloud.citybrain.ducha.ism.mapper.PoliceOrgMapper;
import com.iwhalecloud.citybrain.ducha.ism.service.impl.JWZTUtilsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
@Service
public class RocketMQListener implements MessageListener {
    private static Logger log = LoggerFactory.getLogger(RocketMQListener.class);
    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        log.info("Receive: {}", message);
        String messageBodyStr = new String(message.getBody());
        log.info("message消息为: {}", messageBodyStr);
        System.out.println("message消息为:" + messageBodyStr);
        try {
            JSONObject jsonObject = JSONObject.parseObject(messageBodyStr);
            //数据处理
            //。。。。
            //请求接收日志
        } catch (Exception e) {
            log.error("MQ警情事件数据录字符串格式异常Exception", e);
            // 记录字符串格式不正确
            return Action.CommitMessage;
        }
        return Action.CommitMessage;
    }
}


简单的使用大概就是这样的,其他的比如

  1. 消息队列如何选型?
  2. 如何保证消息队列是高可用的?
  3. 如何保证消息不被重复消费?
  4. 如何保证消费的可靠性传输?
  5. 如何保证消息的顺序性?
  6. 生产端和消费端的集群负载均衡

想了解更多相关知识的,可以去这儿

上一篇:Android--从系统Camera和Gallery获取图片优化


下一篇:微信接口