RocketMQ

RocketMQ

作用

  • 应用解耦
  • 流量削峰
  • 数据分发

缺点

  • 系统的可用性降低
  • 系统复杂度提高
  • 一致性问题

和其他MQ的比较

RocketMQ
开发语言:java、单机吞吐量:10万级、时效性:ms级、可用性:非常高(分布式架构)、功能特性:MQ功能比较完备,扩展性佳。

集群搭建

producer:消息的发送者
consumer:消息接受者
broker:暂存和传输消息
nameServer:管理broker
topic:区分消息的种类
message queue:相当于topic的分区,用于并行发送和接受消息


nameServer集群:无状态的,每一个broker在启动时都需要向每一个nameServer上报消息。简单的说,nameServer和broker是多对多的关系。
producer集群:没有数据同步关系
consumer集群:没有数据同步关系
broker集群:主从关系,一主多从。
producer集群会和nameServer集群建立一个长连接,consumer集群也会和nameServer集群建立一个长连接。


集群模式(broker集群)

  • 单Master模式

风险较大,一旦broker重启或者宕机,会导致整个服务不可用。

  • 多Master模式

全部都是Master,没有Slave。
优点:配置简单,单个Master宕机或者重启对应用没有影响,性能最高。
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性会受到影响。

  • 多Master多Slave模式(异步)

优点:即使磁盘损坏,消息丢失的非常少,且实时性不会受影响。master宕机后可以从slave消费,此过程无需人工干预,性能和多master几乎一样。
缺点:master宕机,磁盘损毁情况下会丢失少量消息。

  • 多Master多Slave模式(同步)

只有主备都写成功,才向应用返回成功。
优点:数据和服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性和数据可用性都非常高。
缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的往返时间会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

消息发送基本样例

  • 消息生产者步骤分析
  1. 创建消息生产者producer,并制定生产者组名
  2. 指定NameServer地址
  3. 启动producer
  4. 创建消息对象,指定主题Topic、Tag和消息体
  5. 发送消息
  6. 关闭生产者producer
  • 消息消费者步骤分析
  1. 创建消费者Consunmer,制定消费者组名
  2. 指定NameServer地址
  3. 订阅主题Topic和Tag
  4. 设置回调函数,处理消息
  5. 启动消费者Consumer

基本样例

  • 发送同步消息

对时延不敏感的场景可以使用这种方式。

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
        producer.start();
        for (int i = 0;i < 10;++i){
            Message msg = new Message("Topic","Tag",("消息内容" + i).getBytes());
            SendResult result = producer.send(msg);
            SendStatus status = result.getSendStatus();
            String msgId = result.getMsgId();
            int queueId = result.getMessageQueue().getQueueId();
            System.out.println("发送状态:+" + result + ",消息ID" + msgId + ", 队列" + queueId);
            TimeUnit.SECONDS.sleep(1);
        }
        producer.shutdown();
    }
}
  • 发送异步消息

对时延敏感的场景使用这种方式。

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
        producer.start();
        producer.setRetryTimesWhenSendFailed(1);
        for (int i = 0;i < 10;++i){
            Message msg = new Message("Topic","Tag",("消息内容" + i).getBytes());
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送成功!");
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println("发送异常!" + throwable);
                }
            });
            
            TimeUnit.SECONDS.sleep(1);
        }
        producer.shutdown();
    }
}
  • 单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
        producer.start();
        for (int i = 0;i < 10;++i){
            Message msg = new Message("Topic","Tag",("消息内容" + i).getBytes());
            producer.sendOneway(msg);
            TimeUnit.SECONDS.sleep(1);
        }
        producer.shutdown();
    }
}

消费消息

  • 基本流程
public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
        consumer.subscribe("Topic","Tag");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs){
                    System.out.println(new String((msg.getBody())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
  • 负载均衡

多个消费者分担消息消费压力,即一个消息只能被消费一次。

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
        consumer.subscribe("Topic","Tag");
        
        // 设定消费模式:负载均衡(默认)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs){
                    System.out.println(new String((msg.getBody())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
  • 广播模式

多个消费者分别对同一个消息均消费一次,即一个消息能被多个消费者消费。

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
        consumer.subscribe("Topic","Tag");

        // 设定消费模式:广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs){
                    System.out.println(new String((msg.getBody())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

顺序消息

消费消息的顺序需要很生产消息的顺序保持一致。
Broker内部存在多个队列,要保证消息消费的顺序必须把消息放到同一个队列中,保证局部消息顺序。没有必要保证全局消息顺序。

public class orderProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");

        List<Order> orderList = new ArrayList<>();
        orderList.add(new Order(1,"yes"));
        orderList.add(new Order(2,"yes"));


        producer.start();
        for (int i = 0;i < 10;++i){
            Message msg = new Message("Topic","Tag",("消息内容" + i).getBytes());
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                /**
                 *
                 * @param mqs 队列集合
                 * @param msg 消息对象
                 * @param arg 业务标识的参数
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    long orderId = (long) arg;
                    long index = orderId % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getId());

            System.out.println("发送结果 : " + sendResult);
        }
        producer.shutdown();
    }

    public static class Order{
        Integer id;
        String desc;

        public Order(){

        }
        public Order(Integer i,String d){
            id = i;
            desc = d;
        }

        public Integer getId() {
            return id;
        }

        public void setId(Integer id) {
            this.id = id;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }
    }
}

延时消息

消费者延时消费。
使用限制:“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
从1s到2h分别对应等级1到18。

public class delayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
        producer.start();
        for (int i = 0;i < 10;++i){
            Message msg = new Message("Topic","Tag",("消息内容" + i).getBytes());
            // 延时1m
            msg.setDelayTimeLevel(5);
            
            producer.sendOneway(msg);
            TimeUnit.SECONDS.sleep(1);
        }
        producer.shutdown();
    }
}

批量消息

一次性发送多条消息,通过发送消息集合的方式发送;
每次发送消息不能超过4M。

public class batchProducer {
    public static void main(String[] args) {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("group1");
            producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
            producer.start();
        
            List<Message> msgs = new ArrayList<>();
            Message msg1 = new Message("Topic1","Tag1",("消息内容1").getBytes());
            Message msg2 = new Message("Topic2","Tag2",("消息内容2").getBytes());
            Message msg3 = new Message("Topic3","Tag3",("消息内容3").getBytes());
            msgs.add(msg1);
            msgs.add(msg2);
            msgs.add(msg3);
            
            SendResult result = producer.send(msgs);
            TimeUnit.SECONDS.sleep(1);
            
            producer.shutdown();
        }
    }
}

过滤消息

可以根据某些规则过滤

  • 根据Tag过滤
public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
        
        //过滤出Tag1 和 Tag2中的消息
        consumer.subscribe("Topic","Tag1 || Tag2");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs){
                    System.out.println(new String((msg.getBody())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
  • 根据SQL语法过滤


事务消息

生产者向Server发送半消息,之后消费者还必须进行事务的提交,此时消息才对消费者可见;回滚则消息对消费者不可见。
提交状态、回滚状态、中间状态。

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");

        // 设置事务监听器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 在该方法中进行本地事务
             * @param message
             * @param o
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o){
                if (StringUtils.equals("tagA",message.getTags())){
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                else if (StringUtils.equals("tagB",message.getTags())){
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                else if (StringUtils.equals(("tagC",message.getTags())){
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.UNKNOW;
            }

            /**
             * 在该方法时进行消息事务状态回查
             * @param messageExt
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("消息的tag:" + messageExt.getTags());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        String[] tags = {"tagA","tagB","tagC"};
        for (int i = 0;i < 3;++i){
            Message msg = new Message("Topic",tags[i],("消息内容" + i).getBytes());
            TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(msg, null);

            TimeUnit.SECONDS.sleep(1);
        }
        producer.shutdown();
    }
}



上一篇:kafka教程


下一篇:Java 客户端访问kafka