1、在官网下载RocketMQ,下载完解压即可
2、添加环境变量:
ROCKETMQ_HOME="D:\rocketmq" NAMESRV_ADDR="localhost:9876"
3、启动Name Server
mqnamesrv.cmd
4、启动Broker,如果启动失败,删除C:\Users\"当前系统用户名"\store下的所有文件,注意要添加autoCreateTopicEnable=true 否则在创建消息组时会报错:MQClientException: No route info of this topic
mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
5、创建Maven工程,添加RocketMQ依赖,注意版本要和自己的服务器(下载的办法一致),否则可能会出现MQClientException: No route info of this topic这个错误
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.6.0</version> </dependency>
6、provider:
package provider; import com.sun.xml.internal.bind.api.impl.NameConverter; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class Pro { public static void main(String [] args){ DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo"); defaultMQProducer.setNamesrvAddr("localhost:9876"); try { defaultMQProducer.start(); for(int i = 0;i<100;i++){ try { Message message = new Message("Topic1","Tag1", ("Hello World"+i).getBytes("UTF-8")); SendResult sendResult = defaultMQProducer.send(message); //defaultMQProducer.sendOneway(message); } catch (Exception e) { e.printStackTrace(); } } defaultMQProducer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } } }
7、consumer
package consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String [] args){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerDemo"); consumer.setNamesrvAddr("localhost:9876"); try { consumer.subscribe("Topic1","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for(MessageExt messageExt :list){ String str = new String(messageExt.getBody()); System.out.println(str); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // consumer.registerMessageListener(new MessageListenerOrderly() { // public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { // for(MessageExt messageExt :list){ // String str = new String(messageExt.getBody()); // System.out.println(str); // } // return ConsumeOrderlyStatus.SUCCESS; // } // }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
7、输出结果,可以看到输出的结果并不是很一致。
先看看RocketMQ架构图
1,启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。
2,Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。
3,收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
4,Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。
5,Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。