win10 RocketMQ

1、在官网下载RocketMQ,下载完解压即可

2、添加环境变量:

ROCKETMQ_HOME="D:\rocketmq"
NAMESRV_ADDR="localhost:9876"

3、启动Name Server

mqnamesrv.cmd

4、启动Broker,如果启动失败,删除C:\Users\"当前系统用户名"\store下的所有文件,注意要添加autoCreateTopicEnable=true 否则在创建消息组时会报错:MQClientException: No route info of this topic

mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

5、创建Maven工程,添加RocketMQ依赖,注意版本要和自己的服务器(下载的办法一致),否则可能会出现MQClientException: No route info of this topic这个错误

  <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.6.0</version>
        </dependency>

 

6、provider:

package provider;

import com.sun.xml.internal.bind.api.impl.NameConverter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Pro {
    public static void main(String [] args){
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
        defaultMQProducer.setNamesrvAddr("localhost:9876");
        try {
            defaultMQProducer.start();
            for(int i = 0;i<100;i++){
                try {
                    Message message = new Message("Topic1","Tag1",
                            ("Hello World"+i).getBytes("UTF-8"));
                    SendResult sendResult = defaultMQProducer.send(message);
                    //defaultMQProducer.sendOneway(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            defaultMQProducer.shutdown();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

7、consumer

package consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public  static  void main(String [] args){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerDemo");
        consumer.setNamesrvAddr("localhost:9876");
        try {
            consumer.subscribe("Topic1","*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus
                consumeMessage(List<MessageExt> list,
                               ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for(MessageExt messageExt :list){
                        String str  =  new String(messageExt.getBody());
                        System.out.println(str);
                    }
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
//            consumer.registerMessageListener(new MessageListenerOrderly() {
//                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
//                    for(MessageExt messageExt :list){
//                        String str  =  new String(messageExt.getBody());
//                        System.out.println(str);
//                    }
//                    return  ConsumeOrderlyStatus.SUCCESS;
//                }
//            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

 

7、输出结果,可以看到输出的结果并不是很一致。

win10  RocketMQ

先看看RocketMQ架构图

win10  RocketMQ

 

 

1,启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。
2,Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。
3,收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
4,Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。
5,Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

win10 RocketMQ

上一篇:C#_数据类型


下一篇:在Linux下运行YY,WINE方式,主要注册表修改点及字体文件列表