MetaQ 消息中间件介绍及使用

MetaQ 消息中间件介绍及使用

简介

MetaQ是阿里云中间件团队设计和研发的一款分布式、队列模型的消息中间件。有如下几个特点:
  1. 有push、pull两种消费模式
  2. 支持严格的消息顺序
  3. 亿级别的堆积能力
  4. 支持消息回溯
  5. 多维度消息的查询
MetaQ的发展历史可以分成如下三个阶段:

初期:2011年基于Kafka的设计,重写并推出了MetaQ 1.0

中期:2012年对MetaQ重构升级,推出MetaQ 2.0

后期:基于RocketMQ3.0, 使用拉模型解决顺序消息和海量堆积问题,推出MetaQ 3.0

MetaQ与RocketMQ的关系

阿里内部称为MetaQ,外部称RocketMQ

基本用法

1.客户端接入

<dependency>

   <groupId>com.taobao.metaq.finalgroupId>

   <artifactId>metaq-clientartifactId>

   <version>4.2.6.Finalversion>

dependency>

2.订阅普通消息

public class PushConsumer {

   /**

    * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从MetaQ服务器推到了应用客户端。

    * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法

    */

   public static void main(String[] args) throws InterruptedException, MQClientException {

       /**

        * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例

        * 注意:ConsumerGroupName需要由应用来保证唯一

        * ConsumerGroupName在生产环境需要申请,非生产环境不需要

        */

       MetaPushConsumer consumer = new MetaPushConsumer("RebalanceTest_Consumer_Group");

       /**

        * 订阅指定topic下tags分别等于TagA或TagC或TagD

        */

       consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

       consumer.setConsumeMessageBatchMaxSize(3);

       /**

        * 订阅指定topic下所有消息

        * 注意:一个consumer对象可以订阅多个topic

        */

       consumer.subscribe("TopicTest2", "*");

       consumer.registerMessageListener(new MessageListenerConcurrently() {

           /**

            * 1、默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息

            * 2、如果设置为批量消费方式,要么都成功,要么都失败。

            * 3、此方法由MetaQ客户端多个线程回调,需要应用来处理并发安全问题

            * 4、抛异常与返回ConsumeConcurrentlyStatus.RECONSUME_LATER等价

            * 5、每条消息失败后,会尝试重试,重试16次都失败,则丢弃

            */

           @Override

           public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

                                                           ConsumeConcurrentlyContext context) {

               System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

               // for (MessageExt msg : msgs) {

               // if (msg.getTags().equals("TagA")) {

               // return ConsumeConcurrentlyStatus.RECONSUME_LATER;

               // }

               // }

               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

           }

       });

       /**

        * Consumer对象在使用之前必须要调用start初始化,初始化一次即可

        */

       consumer.start();

       System.out.println("Consumer Started.");

   }

}

3.发送普通消息

public class Producer {

   public static void main(String[] args) throws MQClientException, InterruptedException {

       /**

        * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例

        * 注意:ProducerGroupName需要由应用来保证唯一

        * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,

        * 因为服务器会回查这个Group下的任意一个Producer

        */

       MetaProducer producer = new MetaProducer("manhongTestPubGroup");

       /**

        * Producer对象在使用之前必须要调用start初始化,初始化一次即可

        * 注意:切记不可以在每次发送消息时,都调用start方法

        */

       producer.start();

       /**

        * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。

        * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,

        * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,

        * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。

        */

       try {

           for (int i = 0; i < 20; i++) {

               {

                   Message msg = new Message("Jodie_topic_1023",// topic

                           "TagA",// tag

                           "OrderID001",// key,消息的Key字段是为了唯一标识消息的,方便运维排查问题。如果不设置Key,则无法定位消息丢失原因。

                           ("Hello MetaQ").getBytes());// body

                   SendResult sendResult = producer.send(msg);

                   System.out.println(sendResult);

               }

               {

                   Message msg = new Message("TopicTest2",// topic

                           "TagB",// tag

                           "OrderID0034",// key

                           ("Hello MetaQ").getBytes());// body

                   SendResult sendResult = producer.send(msg);

                   System.out.println(sendResult);

               }

               {

                   Message msg = new Message("TopicTest3",// topic

                           "TagC",// tag

                           "OrderID061",// key

                           ("Hello MetaQ").getBytes());// body

                   SendResult sendResult = producer.send(msg);

                   System.out.println(sendResult);

               }

           }

       } catch (Exception e) {

           e.printStackTrace();

       }

       /**

        * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己

        * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法

        */

       producer.shutdown();

   }

}

4.主动pull

public class PullConsumer {

   private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

   public static void main(String[] args) throws MQClientException {

       MetaPullConsumer consumer = new MetaPullConsumer("please_rename_unique_group_name_5");

       consumer.start();

       Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");

       for (MessageQueue mq : mqs) {

           System.out.println("Consume from the queue: " + mq);

           PullResult pullResult;

           try {

               pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);

               System.out.println(pullResult);

               putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

               switch (pullResult.getPullStatus()) {

                   case FOUND:

                       // TODO

                       break;

                   case NO_MATCHED_MSG:

                       break;

                   case NO_NEW_MSG:

                       break;

                   case OFFSET_ILLEGAL:

                       break;

                   default:

                       break;

               }

           } catch (Exception e) {

               e.printStackTrace();

           }

       }

       consumer.shutdown();

   }

   private static long getMessageQueueOffset(MessageQueue mq) {

       Long offset = offseTable.get(mq);

       if (offset != null)

           return offset;

       return 0;

   }

   private static void putMessageQueueOffset(MessageQueue mq, long offset) {

       offseTable.put(mq, offset);

   }

}

5.除以上功能外,MetaQ还支持发送顺序消息、订阅顺序消息、订阅广播消息、单元化消息订阅与发送等

常用功能及说明

1. MetaQ的控制台操作地址
2. 查询环境地址:curl http://jmenv.tbsite.net:8080/env

MetaQ 消息中间件介绍及使用

3. 关于消息消费的两种方式

一种是pull,即消费者主动去broke拉取;一种是push,主动推送给消费者。

MetaQ 消息中间件介绍及使用

关于两者的详细区别可以参考:原文地址

总结

MetaQ是一款功能强大的消息中间件,基于消费者订阅及发布模式,支持Topic管理、Message查询、消息的消费和生产、监控警报等功能。

笔者在实际使用过程中,发现MetaQ有着易于上手使用便捷的特点,对开发中较为友好,但同时发现其在使用场景上也有需要注意的点,譬如MetaQ不保证消息不重复、消息进入死信队列就不会继续投递应用等。

如果浩鲸的同僚想进一步深入了解MetaQ,可以去查看官方文档,结合实际场景使用它,实践出真知!

上一篇:【转载】高并发的核心技术-幂等的实现方案


下一篇:shell脚本学习笔记 -1