MetaQ 消息中间件介绍及使用
简介
MetaQ是阿里云中间件团队设计和研发的一款分布式、队列模型的消息中间件。有如下几个特点:
- 有push、pull两种消费模式
- 支持严格的消息顺序
- 亿级别的堆积能力
- 支持消息回溯
- 多维度消息的查询
MetaQ的发展历史可以分成如下三个阶段:
初期:2011年基于Kafka的设计,重写并推出了MetaQ 1.0
中期:2012年对MetaQ重构升级,推出MetaQ 2.0
后期:基于RocketMQ3.0, 使用拉模型解决顺序消息和海量堆积问题,推出MetaQ 3.0
MetaQ与RocketMQ的关系
阿里内部称为MetaQ,外部称RocketMQ
基本用法
1.客户端接入
<dependency>
<groupId>com.taobao.metaq.finalgroupId>
<artifactId>metaq-clientartifactId>
<version>4.2.6.Finalversion>
dependency>
2.订阅普通消息
public class PushConsumer {
/**
* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从MetaQ服务器推到了应用客户端。
* 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法
*/
public static void main(String[] args) throws InterruptedException, MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
* 注意:ConsumerGroupName需要由应用来保证唯一
* ConsumerGroupName在生产环境需要申请,非生产环境不需要
*/
MetaPushConsumer consumer = new MetaPushConsumer("RebalanceTest_Consumer_Group");
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
consumer.setConsumeMessageBatchMaxSize(3);
/**
* 订阅指定topic下所有消息
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe("TopicTest2", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 1、默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
* 2、如果设置为批量消费方式,要么都成功,要么都失败。
* 3、此方法由MetaQ客户端多个线程回调,需要应用来处理并发安全问题
* 4、抛异常与返回ConsumeConcurrentlyStatus.RECONSUME_LATER等价
* 5、每条消息失败后,会尝试重试,重试16次都失败,则丢弃
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
// for (MessageExt msg : msgs) {
// if (msg.getTags().equals("TagA")) {
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
// }
// }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
3.发送普通消息
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/**
* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例
* 注意:ProducerGroupName需要由应用来保证唯一
* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
* 因为服务器会回查这个Group下的任意一个Producer
*/
MetaProducer producer = new MetaProducer("manhongTestPubGroup");
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
/**
* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
*/
try {
for (int i = 0; i < 20; i++) {
{
Message msg = new Message("Jodie_topic_1023",// topic
"TagA",// tag
"OrderID001",// key,消息的Key字段是为了唯一标识消息的,方便运维排查问题。如果不设置Key,则无法定位消息丢失原因。
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest2",// topic
"TagB",// tag
"OrderID0034",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest3",// topic
"TagC",// tag
"OrderID061",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
}
} catch (Exception e) {
e.printStackTrace();
}
/**
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
*/
producer.shutdown();
}
}
4.主动pull
public class PullConsumer {
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
MetaPullConsumer consumer = new MetaPullConsumer("please_rename_unique_group_name_5");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
PullResult pullResult;
try {
pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.println(pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
// TODO
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
}
5.除以上功能外,MetaQ还支持发送顺序消息、订阅顺序消息、订阅广播消息、单元化消息订阅与发送等
常用功能及说明
1. MetaQ的控制台操作地址
2. 查询环境地址:curl http://jmenv.tbsite.net:8080/env
3. 关于消息消费的两种方式
一种是pull,即消费者主动去broke拉取;一种是push,主动推送给消费者。
关于两者的详细区别可以参考:原文地址
总结
MetaQ是一款功能强大的消息中间件,基于消费者订阅及发布模式,支持Topic管理、Message查询、消息的消费和生产、监控警报等功能。
笔者在实际使用过程中,发现MetaQ有着易于上手使用便捷的特点,对开发中较为友好,但同时发现其在使用场景上也有需要注意的点,譬如MetaQ不保证消息不重复、消息进入死信队列就不会继续投递应用等。
如果浩鲸的同僚想进一步深入了解MetaQ,可以去查看官方文档,结合实际场景使用它,实践出真知!