RocketMQ使用教程相关系列 目录
目录
第一节:介绍
限制:
第二节:批量消息-生产者和消息者步骤说明
批量消息生产者代码实现步骤
批量消息消费者代码实现步骤
第三节:批量消息生产者-小于4MB
效果:
第四节:批量消息消费者
效果:
第五节:批量消息生产者-大于4MB
消息拆分工具类
生产者
效果:
第六节:批量消息消费者-大于4MB
效果:
第一节:介绍
批量发送消息能显著提高传递小消息的性能。
限制:
应该有相同的topic,相同的waitStoreMsgOK
不能是延时消息
这一批消息的总大小不应超过4MB(默认配置:DefaultMQProducer的maxMessageSize参数,可在broker*.properties配置文件中修改)。
第二节:批量消息-生产者和消息者步骤说明
批量消息生产者代码实现步骤
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象集合,指定主题Topic、Tag和消息体
5.发送集合消息
6.关闭生产者producer
批量消息消费者代码实现步骤
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
注意:消费者的 Topic 和 Tag 需要和生产者保持一致
第三节:批量消息生产者-小于4MB
public class Producer { public static void main(String[] args) throws Exception { // 1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("demo_producer_batch_group"); // 2.指定Nameserver地址 producer.setNamesrvAddr("192.168.88.131:9876"); // 3.启动producer producer.start(); System.out.println("生产者启动"); List<Message> msgs = new ArrayList<Message>(); // 4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ for (int i = 0; i < 20; i++) { Message msg = new Message("Topic_batch_demo", "Tag_batch_demo", ("Hello 虚竹,这是批量消息" + i).getBytes()); msgs.add(msg); } // 5.发送消息 SendResult result = producer.send(msgs); // 发送状态 SendStatus status = result.getSendStatus(); System.out.println("发送结果:" + result); // 线程睡1秒 TimeUnit.SECONDS.sleep(1); // 6.关闭生产者producer producer.shutdown(); System.out.println("生产者关闭"); } }
public class Consumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group"); //2.指定Nameserver地址 consumer.setNamesrvAddr("192.168.88.131:9876"); //消息拉取最大条数 consumer.setConsumeMessageBatchMaxSize(2); //3.订阅主题Topic和Tag consumer.subscribe("Topic_batch_demo", "*"); //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerOrderly() { //接受消息内容 @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) { for (MessageExt msg : msgs) { try { //获取主题 String topic = msg.getTopic(); //获取标签 String tags = msg.getTags(); //获取信息 byte[] body = msg.getBody(); String result = new String(body, RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer消费信息:topic:" + topic + ",tags:"+tags+ ",result"+ result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); //重试 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; } }); //5.启动消费者consumer consumer.start(); } }
public class ListSplitter implements Iterator<List<Message>> { private int SIZE_LIMIT = 1024 * 1024 * 4; private final List<Message> messages; private int currIndex; public ListSplitter(List<Message> messages) { this.messages = messages; } public ListSplitter(List<Message> messages, DefaultMQProducer mqProducer) { this.messages = messages; this.SIZE_LIMIT = mqProducer.getMaxMessageSize(); } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List<Message> next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } // 增加日志的开销20字节 tmpSize = tmpSize + 20; if (tmpSize > SIZE_LIMIT) { // 单个消息超过了最大的限制 // 忽略,否则会阻塞分裂的进程 if (nextIndex - currIndex == 0) { // 假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环 nextIndex++; } break; } if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } @Override public void remove() { } }
生产者
public class ProducerMoreThan4M { public static void main(String[] args) throws Exception { // 1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("demo_producer_batch_group"); // 2.指定Nameserver地址 producer.setNamesrvAddr("192.168.88.131:9876"); // 3.启动producer producer.start(); System.out.println("生产者启动"); List<Message> msgs = new ArrayList<Message>(); // 4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic * 参数二:消息Tag * 参数三:消息内容 */ for (int i = 0; i < 20; i++) { Message msg = new Message("Topic_batch_demo", "Tag_batch_demo", ("Hello 虚竹,这是批量消息" + i).getBytes()); msgs.add(msg); } // 5.发送消息 // 发送批量消息:把大的消息分裂成若干个小的消息 ListSplitter splitter = new ListSplitter(msgs, producer); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); SendResult result = producer.send(listItem); System.out.println("发送结果:" + result); } catch (Exception e) { e.printStackTrace(); // 处理error } } // 线程睡1秒 TimeUnit.SECONDS.sleep(1); // 6.关闭生产者producer producer.shutdown(); System.out.println("生产者关闭"); } }