MQ的使用场景
1:跨系统的数据传递
2:高并发流量削峰
3:异步处理…
常用的消息中间件
ActiveMq、RabbitMq、Kafka、RockeMq,本文主要讲述RockeMq,个中区别,请自行百度
RockeMq是由阿里捐赠给Apache的一款分布式,队列模型的开源消息中间件,经历了淘宝双十一的考验,性能和功能完备性,同级最强,使用java语言开发。
RockeMq的基本概念
Producer:消息生产者,负责生产消息
Consumer:消息消费者,负责消费消息
ProducerGroup:生产者组,这类生产者通常发送一类消息,且发送逻辑一致
ConsumerGroup:消费者组,这类消费者通常消费一类消息,且消费逻辑一致
NameSrv:一个无状态的名称服务,可以部署集群,每一个Broker启动的时候都会向名称服务注册,接收客户端的请求并返回路由信息
Broker:消息中转角色,负责存储,发送和接收消息
Topic:消息的主题,生产者往主题上发送,消费者订阅该主题
Message:消息实例,在生产者和消费者之间传递,一个Message必须属于一个Topic
Offset:偏移量,记录了每一次的消费的位置
Tag:用于对消息进行过滤,一个Message可以有不同的Tag
Key:消息键,每条消息唯一
RockeMq的安装和启动,自行百度,这里不做过多介绍
官网地址:http://rocketmq.apache.org/ 自行下载
注意事项说明:
1:使用RockeMq,需启动NameSrv和Broker,这两者均是java程序,有顺序要求,需保证先启动NameSrv
2:如启动后本地连接不上,请先检查对应的端口是否开放,如使用阿里云服务器,需将对应端口添加至安全组,如使用虚拟机,直接关闭防火墙即可。
RockeMq的特性
1:原生分布式
2:两种消息拉取
3:严格的顺序消息
4:特有的分布式协调器
5:亿级消息对接
6:组(GROUP)
RockeMq的使用
maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
第一个入门示例
生产者发送消息:
public class SyncProducer {
public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 1. 创建生产者对象
DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");
// 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
producer.setNamesrvAddr(NAME_SERVER_ADDR);
// 3. 启动生产者
producer.start();
// 4. 生产者发送消息
for (int i = 0; i < 10; i++) {
Message message = new Message("TopicTest", "TagA", ("Hello MQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(message);
System.out.printf("发送结果:%s%n", result);
}
// 5. 停止生产者
producer.shutdown();
}
消费者消费消息
public class Consumer {
public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";
public static void main(String[] args) throws MQClientException {
// 1. 创建消费者(Push)对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST");
// 2. 设置NameServer的地址
consumer.setNamesrvAddr(NAME_SERVER_ADDR);
consumer.setMaxReconsumeTimes(-1);// 消费重试次数 -1代表16次
// 3. 订阅对应的主题和Tag
consumer.subscribe("TopicTest", "*");
// 4. 注册消息接收到Broker消息后的处理接口
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
MessageExt messageExt = list.get(0);
System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错)
consumer.start();
System.out.println("已启动消费者");
}
}