RocketMQ 介绍与基本使用

介绍

RocketMQ是阿里巴巴自研的第三代分布式消息中间件,是阿里系下开源的一款分布式、队列模型的消息中间件,原名Metaq,3.0 版本名称改为RocketMQ,是阿里参照kafka设计思想使用Java实现的一套MQ。同时将阿里系内部多款MQ产品(Notify、metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下MQ的架构。

2016年11月,阿里将RocketMQ捐献给Apache软件基金会,正式成为孵化项目。阿里称会将其打造成*项目。

2017年2月20日,RocketMQ正式发布4.0版本,专家称新版本适用于电商领域,金融领域,大数据领域,兼有物联网领域的编程模型。

相关地址

小试牛刀

可通过自己下载源码编译或下载编译好的文件,地址见上。

假设是自己下载源码进行编译

下载源码并进行编译

> git clone https://github.com/apache/incubator-rocketmq.git
> cd incubator-rocketmq
> mvn clean package install -Prelease-all assembly:assembly -U
> cd target/apache-rocketmq-all/

Start Name Server

> nohup sh bin/mqnamesrv &
> tailf nohup.out

Start Broker

> nohup sh bin/mqbroker -n localhost:9876 &
> tailf nohup.out

注意如果这里启动失败,看一下内存是否足够,可以看一下“runbroker.sh”这个文件,对应的修改参数,如下

JAVA_OPT="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"

测试发送与接收

 > export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

关闭服务

> sh bin/mqshutdown broker
> sh bin/mqshutdown namesrv

在Java项目中的使用

pom.xml

<properties>
<rocketmq_ver>4.0.0-incubating</rocketmq_ver>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq_ver}</version>
</dependency>
</dependencies>

生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr(Config.ADDR);
try {
producer.start(); Message msg = new Message("PushTopic", "push", "1", "Just for push1.".getBytes()); SendResult result = producer.send(msg);
System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "2", "Just for push2.".getBytes()); result = producer.send(msg);
System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "pull", "1", "Just for pull.".getBytes()); result = producer.send(msg);
System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}

消费者

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt; public class Consumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer("PushConsumer");
consumer.setNamesrvAddr(Config.ADDR);
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("PushTopic", "push");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext Context) {
for (Message msg : msgs) {
System.out.println(new String(msg.getBody()) + ":" + msg.toString());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}

结果

id:C0A801663174723279CF77AF3C6E0000 result:SEND_OK
id:C0A801663174723279CF77AF3C7B0001 result:SEND_OK
id:C0A801663174723279CF77AF3C7D0002 result:SEND_OK
Just for push1.:MessageExt [queueId=2, storeSize=184, queueOffset=14, sysFlag=0, bornTimestamp=1490348772974, bornHost=/192.168.127.1:53238, storeTimestamp=1490348775615, storeHost=/192.168.127.128:10911, msgId=C0A87F8000002A9F000000000002EDE8, commitLogOffset=191976, bodyCRC=1396413800, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=15, KEYS=1, CONSUME_START_TIME=1490348782880, UNIQ_KEY=C0A801663174723279CF77AF3C6E0000, WAIT=true, TAGS=push}, body=15]]
Just for push2.:MessageExt [queueId=3, storeSize=184, queueOffset=14, sysFlag=0, bornTimestamp=1490348772987, bornHost=/192.168.127.1:53238, storeTimestamp=1490348775620, storeHost=/192.168.127.128:10911, msgId=C0A87F8000002A9F000000000002EEA0, commitLogOffset=192160, bodyCRC=2014758571, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=15, KEYS=2, CONSUME_START_TIME=1490348782882, UNIQ_KEY=C0A801663174723279CF77AF3C7B0001, WAIT=true, TAGS=push}, body=15]]

参考

上一篇:Testin云测试平台初体验


下一篇:java 结束程序进程 代码