RocketMQ
作用
- 应用解耦
- 流量削峰
- 数据分发
缺点
- 系统的可用性降低
- 系统复杂度提高
- 一致性问题
和其他MQ的比较
RocketMQ
开发语言:java、单机吞吐量:10万级、时效性:ms级、可用性:非常高(分布式架构)、功能特性:MQ功能比较完备,扩展性佳。
集群搭建
producer:消息的发送者
consumer:消息接受者
broker:暂存和传输消息
nameServer:管理broker
topic:区分消息的种类
message queue:相当于topic的分区,用于并行发送和接受消息
nameServer集群:无状态的,每一个broker在启动时都需要向每一个nameServer上报消息。简单的说,nameServer和broker是多对多的关系。
producer集群:没有数据同步关系
consumer集群:没有数据同步关系
broker集群:主从关系,一主多从。
producer集群会和nameServer集群建立一个长连接,consumer集群也会和nameServer集群建立一个长连接。
集群模式(broker集群)
- 单Master模式
风险较大,一旦broker重启或者宕机,会导致整个服务不可用。
- 多Master模式
全部都是Master,没有Slave。
优点:配置简单,单个Master宕机或者重启对应用没有影响,性能最高。
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性会受到影响。
- 多Master多Slave模式(异步)
优点:即使磁盘损坏,消息丢失的非常少,且实时性不会受影响。master宕机后可以从slave消费,此过程无需人工干预,性能和多master几乎一样。
缺点:master宕机,磁盘损毁情况下会丢失少量消息。
- 多Master多Slave模式(同步)
只有主备都写成功,才向应用返回成功。
优点:数据和服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性和数据可用性都非常高。
缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的往返时间会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
消息发送基本样例
- 消息生产者步骤分析
- 创建消息生产者producer,并制定生产者组名
- 指定NameServer地址
- 启动producer
- 创建消息对象,指定主题Topic、Tag和消息体
- 发送消息
- 关闭生产者producer
- 消息消费者步骤分析
- 创建消费者Consunmer,制定消费者组名
- 指定NameServer地址
- 订阅主题Topic和Tag
- 设置回调函数,处理消息
- 启动消费者Consumer
基本样例
- 发送同步消息
对时延不敏感的场景可以使用这种方式。
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
producer.start();
for (int i = 0;i < 10;++i){
Message msg = new Message("Topic","Tag",("消息内容" + i).getBytes());
SendResult result = producer.send(msg);
SendStatus status = result.getSendStatus();
String msgId = result.getMsgId();
int queueId = result.getMessageQueue().getQueueId();
System.out.println("发送状态:+" + result + ",消息ID" + msgId + ", 队列" + queueId);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}
- 发送异步消息
对时延敏感的场景使用这种方式。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
producer.start();
producer.setRetryTimesWhenSendFailed(1);
for (int i = 0;i < 10;++i){
Message msg = new Message("Topic","Tag",("消息内容" + i).getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功!");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送异常!" + throwable);
}
});
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}
- 单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
producer.start();
for (int i = 0;i < 10;++i){
Message msg = new Message("Topic","Tag",("消息内容" + i).getBytes());
producer.sendOneway(msg);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}
消费消息
- 基本流程
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
consumer.subscribe("Topic","Tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
System.out.println(new String((msg.getBody())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
- 负载均衡
多个消费者分担消息消费压力,即一个消息只能被消费一次。
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
consumer.subscribe("Topic","Tag");
// 设定消费模式:负载均衡(默认)
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
System.out.println(new String((msg.getBody())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
- 广播模式
多个消费者分别对同一个消息均消费一次,即一个消息能被多个消费者消费。
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
consumer.subscribe("Topic","Tag");
// 设定消费模式:广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
System.out.println(new String((msg.getBody())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
顺序消息
消费消息的顺序需要很生产消息的顺序保持一致。
Broker内部存在多个队列,要保证消息消费的顺序必须把消息放到同一个队列中,保证局部消息顺序。没有必要保证全局消息顺序。
public class orderProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
List<Order> orderList = new ArrayList<>();
orderList.add(new Order(1,"yes"));
orderList.add(new Order(2,"yes"));
producer.start();
for (int i = 0;i < 10;++i){
Message msg = new Message("Topic","Tag",("消息内容" + i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
/**
*
* @param mqs 队列集合
* @param msg 消息对象
* @param arg 业务标识的参数
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long orderId = (long) arg;
long index = orderId % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getId());
System.out.println("发送结果 : " + sendResult);
}
producer.shutdown();
}
public static class Order{
Integer id;
String desc;
public Order(){
}
public Order(Integer i,String d){
id = i;
desc = d;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
}
延时消息
消费者延时消费。
使用限制:“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
从1s到2h分别对应等级1到18。
public class delayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
producer.start();
for (int i = 0;i < 10;++i){
Message msg = new Message("Topic","Tag",("消息内容" + i).getBytes());
// 延时1m
msg.setDelayTimeLevel(5);
producer.sendOneway(msg);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}
批量消息
一次性发送多条消息,通过发送消息集合的方式发送;
每次发送消息不能超过4M。
public class batchProducer {
public static void main(String[] args) {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
producer.start();
List<Message> msgs = new ArrayList<>();
Message msg1 = new Message("Topic1","Tag1",("消息内容1").getBytes());
Message msg2 = new Message("Topic2","Tag2",("消息内容2").getBytes());
Message msg3 = new Message("Topic3","Tag3",("消息内容3").getBytes());
msgs.add(msg1);
msgs.add(msg2);
msgs.add(msg3);
SendResult result = producer.send(msgs);
TimeUnit.SECONDS.sleep(1);
producer.shutdown();
}
}
}
过滤消息
可以根据某些规则过滤
- 根据Tag过滤
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
//过滤出Tag1 和 Tag2中的消息
consumer.subscribe("Topic","Tag1 || Tag2");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
System.out.println(new String((msg.getBody())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
- 根据SQL语法过滤
略
事务消息
生产者向Server发送半消息,之后消费者还必须进行事务的提交,此时消息才对消费者可见;回滚则消息对消费者不可见。
提交状态、回滚状态、中间状态。
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.25.135;192.168.25.136:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
/**
* 在该方法中进行本地事务
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o){
if (StringUtils.equals("tagA",message.getTags())){
return LocalTransactionState.COMMIT_MESSAGE;
}
else if (StringUtils.equals("tagB",message.getTags())){
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else if (StringUtils.equals(("tagC",message.getTags())){
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
/**
* 在该方法时进行消息事务状态回查
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("消息的tag:" + messageExt.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
String[] tags = {"tagA","tagB","tagC"};
for (int i = 0;i < 3;++i){
Message msg = new Message("Topic",tags[i],("消息内容" + i).getBytes());
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(msg, null);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}