《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码

1.1.2 Eclipse调试RocketMQ源码

本节将展示在Eclipse中启动NameServer、Broker,并运行消息发送与消息消费示例程序。
1.启动NameServer
Step1:展开namesrv模块,右键NamesrvStartup.java,移动到Debug As,选中Debug Configurations,弹出Debug Configurations对话框,如图1-14所示。
Step2:选中Java Application条目并单击右键,选择New弹出Debug Configurations对话框,如图1-15所示。
Step3:设置RocketMQ运行主目录。选择Environment选项卡,添加环境变量ROCKET_HOME。
Step4:在RocketMQ运行主目录中创建conf、logs、store三个文件夹,如图1-16所示。
《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码
《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码
《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码
Step5:从RocketMQ distribution部署目录中将broker.conf、logback_broker.xml文件复制到conf目录中,logback_namesrv.xml文件则只需修改日志文件的目录,broker.conf文件内容如下所示。
代码清单1-3 broker.conf文件
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0

nameServer地址,分号分割

namesrvAddr=127.0.0.1:9876
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

存储路径

storePathRootDir=D:\rocketmq\store

commitLog 存储路径

storePathCommitLog=D:\rocketmq\store\commitlog

消费队列存储路径

storePathConsumeQueue=D:\rocketmq\store\consumequeue

消息索引存储路径

storePathIndex=D:\rocketmq\store\index

checkpoint 文件存储路径

storeCheckpoint=D:\rocketmq\store\checkpoint

abort 文件存储路径

abortFile=D:\rocketmq\store\abort
Step6:在Eclipse Debug中运行NamesrvStartup,并输出“The Name Server boot success. Serializetype=JSON”。
2.启动Broker
Step1:展开broker模块,右键BrokerStartup.java,移动到Debug As,选中Debug Configurations,弹出如图1-17所示的对话框,选择arguments选项卡,配置-c属性指定broker配置文件路径。

《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码
Step2:切换选项卡Environment,配置RocketMQ主目录,如图1-18所示。

《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码
Step3:以Debug模式运行BrokerStartup.java,查看${ROCKET_HOME}/logs/broker.log文件,未报错则表示启动成功。
代码清单1-4 broker启动日志截图
2018-03-22 20:47:29 INFO main - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:47:29 INFO main - The broker[broker-a, 192.168.1.3:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
2018-03-22 20:47:38 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-03-22 20:47:38 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2018-03-22 20:47:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:48:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:48:37 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-03-22 20:48:37 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2018-03-22 20:48:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:49:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
3.使用RocketMQ提供的实例验证消息发送与消息消费
Step1:修改org.apache.rocketmq.example.quickstart.Producer示例程序,设置消息生产者NameServer地址。
代码清单1-5 消息发送示例程序
public class Producer {

public static void main(String[] args) throws MQClientException, 
                InterruptedException {
    DefaultMQProducer producer = new 
                DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for (int i = 0; i < 1; i++) {
        try {
            Message msg = new Message("TopicTest"/* Topic */,"TagA"/* Tag */,
                ("Hello RocketMQ " + i).getBytes
                    (RemotingHelper.DEFAULT_CHARSET)/* Message body */
                );
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
    }
    producer.shutdown();
}

}
Step2:运行该示例程序,查看运行结果,如果输出代码清单1-6所示结果则表示消息发送成功。
代码清单1-6 消息发送结果
SendResult [sendStatus=SEND_OK, msgId=C0A8010325B46D06D69C70A211400000,
offsetMsgId=C0A8010300002A9F0000000000000000, messageQueue=MessageQueue
[topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
Step3:修改org.apache.rocketmq.example.quickstart.Consumer示例程序,设置消息消费者NameServer地址。
代码清单1-7 消息消费示例程序
public class Consumer {

public static void main(String[] args) throws InterruptedException, 
        MQClientException {
    DefaultMQPushConsumer consumer = new 
        DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", 
                Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
System.out.printf("Consumer Started.%n");
}

}
Step4:运行消息消费者程序,如果输出如下所示则表示消息消费成功。
代码清单1-8 消息消费结果
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0,
storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1521723269443,
bornHost=/192.168.1.3:57034, storeTimestamp=1521723269510,
storeHost=/192.168.1.3:10911, msgId=C0A8010300002A9F0000000000000000,
commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0,
preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0,
properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1521723841419,
UNIQ_KEY=C0A8010325B46D06D69C70A211400000, WAIT=true, TAGS=TagA}, body=16]]]
消息发送与消息消费都成功,则说明RocketMQ调试环境已经成功搭建了,可以直接Debug源码,探知RocketMQ的实现奥秘了。

上一篇:《RocketMQ技术内幕:RocketMQ架构设计与实现原理》一导读


下一篇:《语义Web编程》一导读