rocketmq的使用及高级特性

MQ的使用场景

1:跨系统的数据传递
2:高并发流量削峰
3:异步处理…

常用的消息中间件

ActiveMq、RabbitMq、Kafka、RockeMq,本文主要讲述RockeMq,个中区别,请自行百度

RockeMq是由阿里捐赠给Apache的一款分布式,队列模型的开源消息中间件,经历了淘宝双十一的考验,性能和功能完备性,同级最强,使用java语言开发。

RockeMq的基本概念

Producer:消息生产者,负责生产消息
Consumer:消息消费者,负责消费消息
ProducerGroup:生产者组,这类生产者通常发送一类消息,且发送逻辑一致
ConsumerGroup:消费者组,这类消费者通常消费一类消息,且消费逻辑一致
NameSrv:一个无状态的名称服务,可以部署集群,每一个Broker启动的时候都会向名称服务注册,接收客户端的请求并返回路由信息
Broker:消息中转角色,负责存储,发送和接收消息
Topic:消息的主题,生产者往主题上发送,消费者订阅该主题
Message:消息实例,在生产者和消费者之间传递,一个Message必须属于一个Topic
Offset:偏移量,记录了每一次的消费的位置
Tag:用于对消息进行过滤,一个Message可以有不同的Tag
Key:消息键,每条消息唯一

RockeMq的安装和启动,自行百度,这里不做过多介绍
官网地址:http://rocketmq.apache.org/ 自行下载

注意事项说明:

1:使用RockeMq,需启动NameSrv和Broker,这两者均是java程序,有顺序要求,需保证先启动NameSrv
2:如启动后本地连接不上,请先检查对应的端口是否开放,如使用阿里云服务器,需将对应端口添加至安全组,如使用虚拟机,直接关闭防火墙即可。

RockeMq的特性

1:原生分布式
2:两种消息拉取
3:严格的顺序消息
4:特有的分布式协调器
5:亿级消息对接
6:组(GROUP)

RockeMq的使用

maven依赖

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
 </dependency>

第一个入门示例

生产者发送消息:

public class SyncProducer {
    public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 1. 创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");

        // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
        producer.setNamesrvAddr(NAME_SERVER_ADDR);

        // 3. 启动生产者
        producer.start();

        // 4. 生产者发送消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message("TopicTest", "TagA", ("Hello MQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult result = producer.send(message);

            System.out.printf("发送结果:%s%n", result);
        }

        // 5. 停止生产者
        producer.shutdown();
    }

消费者消费消息

public class Consumer {

    public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";

    public static void main(String[] args) throws MQClientException {
        // 1. 创建消费者(Push)对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST");

        // 2. 设置NameServer的地址
        consumer.setNamesrvAddr(NAME_SERVER_ADDR);
        consumer.setMaxReconsumeTimes(-1);// 消费重试次数 -1代表16次
        // 3. 订阅对应的主题和Tag
        consumer.subscribe("TopicTest", "*");

        // 4. 注册消息接收到Broker消息后的处理接口
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    MessageExt messageExt = list.get(0);
                    System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });

        // 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错)
        consumer.start();

        System.out.println("已启动消费者");
    }
}
上一篇:RocketMQ入门


下一篇:Kafka Producer 核心知识