RocketMQ 第六章:手把手教老婆实现-批量消息生产者和消费者

RocketMQ 第六章:手把手教老婆实现-批量消息生产者和消费者

RocketMQ使用教程相关系列 目录


目录


第一节:介绍


限制:


第二节:批量消息-生产者和消息者步骤说明


批量消息生产者代码实现步骤


批量消息消费者代码实现步骤


第三节:批量消息生产者-小于4MB


效果:


第四节:批量消息消费者


效果:


第五节:批量消息生产者-大于4MB


消息拆分工具类


生产者


效果:


第六节:批量消息消费者-大于4MB


效果:


第一节:介绍

批量发送消息能显著提高传递小消息的性能。


限制:

应该有相同的topic,相同的waitStoreMsgOK

不能是延时消息

这一批消息的总大小不应超过4MB(默认配置:DefaultMQProducer的maxMessageSize参数,可在broker*.properties配置文件中修改)。

第二节:批量消息-生产者和消息者步骤说明

批量消息生产者代码实现步骤

1.创建消息生产者producer,并制定生产者组名


2.指定Nameserver地址


3.启动producer


4.创建消息对象集合,指定主题Topic、Tag和消息体


5.发送集合消息


6.关闭生产者producer


批量消息消费者代码实现步骤

1.创建消费者Consumer,制定消费者组名


2.指定Nameserver地址


3.订阅主题Topic和Tag


4.设置回调函数,处理消息


5.启动消费者consumer


注意:消费者的 Topic 和 Tag 需要和生产者保持一致


第三节:批量消息生产者-小于4MB

public class Producer {
 
   public static void main(String[] args) throws Exception {
      // 1.创建消息生产者producer,并制定生产者组名
      DefaultMQProducer producer = new DefaultMQProducer("demo_producer_batch_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      List<Message> msgs = new ArrayList<Message>();
 
      // 4.创建消息对象,指定主题Topic、Tag和消息体
      /**
       * 参数一:消息主题Topic
       * 参数二:消息Tag
       * 参数三:消息内容
       */
      for (int i = 0; i < 20; i++) {
         Message msg = new Message("Topic_batch_demo", "Tag_batch_demo", ("Hello 虚竹,这是批量消息" + i).getBytes());
         msgs.add(msg);
      }
 
      // 5.发送消息
      SendResult result = producer.send(msgs);
      // 发送状态
      SendStatus status = result.getSendStatus();
 
      System.out.println("发送结果:" + result);
 
      // 线程睡1秒
      TimeUnit.SECONDS.sleep(1);
 
      // 6.关闭生产者producer
      producer.shutdown();
      System.out.println("生产者关闭");
   }
 
}

RocketMQ 第六章:手把手教老婆实现-批量消息生产者和消费者

public class Consumer {
    
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.88.131:9876");
        
        //消息拉取最大条数
        consumer.setConsumeMessageBatchMaxSize(2);
        //3.订阅主题Topic和Tag
        consumer.subscribe("Topic_batch_demo", "*");
 
        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            //接受消息内容
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : msgs) {
 
                    try {
                        //获取主题
                        String topic = msg.getTopic();
                        //获取标签
                        String tags = msg.getTags();
                        //获取信息
                        byte[] body =  msg.getBody();
                        String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                        System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+
                                ",result"+ result);
 
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        //重试
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
    }
}

RocketMQ 第六章:手把手教老婆实现-批量消息生产者和消费者

public class ListSplitter implements Iterator<List<Message>> {
   private int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
 
   public ListSplitter(List<Message> messages) {
      this.messages = messages;
   }
 
   public ListSplitter(List<Message> messages, DefaultMQProducer mqProducer) {
      this.messages = messages;
      this.SIZE_LIMIT = mqProducer.getMaxMessageSize();
   }
 
   @Override
   public boolean hasNext() {
      return currIndex < messages.size();
   }
 
   @Override
   public List<Message> next() {
      int nextIndex = currIndex;
      int totalSize = 0;
      for (; nextIndex < messages.size(); nextIndex++) {
         Message message = messages.get(nextIndex);
         int tmpSize = message.getTopic().length() + message.getBody().length;
         Map<String, String> properties = message.getProperties();
         for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length();
         }
         // 增加日志的开销20字节
         tmpSize = tmpSize + 20;
 
         if (tmpSize > SIZE_LIMIT) {
            // 单个消息超过了最大的限制
            // 忽略,否则会阻塞分裂的进程
            if (nextIndex - currIndex == 0) {
               // 假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
               nextIndex++;
            }
            break;
         }
         if (tmpSize + totalSize > SIZE_LIMIT) {
            break;
         } else {
            totalSize += tmpSize;
         }
 
      }
      List<Message> subList = messages.subList(currIndex, nextIndex);
      currIndex = nextIndex;
      return subList;
   }
 
   @Override
   public void remove() {
 
   }
 
}

生产者

public class ProducerMoreThan4M {
 
   public static void main(String[] args) throws Exception {
      // 1.创建消息生产者producer,并制定生产者组名
      DefaultMQProducer producer = new DefaultMQProducer("demo_producer_batch_group");
      // 2.指定Nameserver地址
      producer.setNamesrvAddr("192.168.88.131:9876");
      // 3.启动producer
      producer.start();
      System.out.println("生产者启动");
      List<Message> msgs = new ArrayList<Message>();
 
      // 4.创建消息对象,指定主题Topic、Tag和消息体
      /**
       * 参数一:消息主题Topic
       * 参数二:消息Tag
       * 参数三:消息内容
       */
      for (int i = 0; i < 20; i++) {
         Message msg = new Message("Topic_batch_demo", "Tag_batch_demo", ("Hello 虚竹,这是批量消息" + i).getBytes());
         msgs.add(msg);
      }
      // 5.发送消息
      // 发送批量消息:把大的消息分裂成若干个小的消息
      ListSplitter splitter = new ListSplitter(msgs, producer);
      while (splitter.hasNext()) {
         try {
            List<Message> listItem = splitter.next();
            SendResult result = producer.send(listItem);
            System.out.println("发送结果:" + result);
         } catch (Exception e) {
            e.printStackTrace();
            // 处理error
         }
      }
 
      // 线程睡1秒
      TimeUnit.SECONDS.sleep(1);
 
      // 6.关闭生产者producer
      producer.shutdown();
      System.out.println("生产者关闭");
   }
 
}

RocketMQ 第六章:手把手教老婆实现-批量消息生产者和消费者

上一篇:Vue核心思想:数据驱动、组件化


下一篇:【数据挖掘】关联规则挖掘 Apriori 算法 ( 关联规则性质 | 非频繁项集超集性质 | 频繁项集子集性质 | 项集与超集支持度性质 )