后续内容
JAVA应用开发MQ实战最佳实践——Series2:消息队列RocketMQ性能测试案例
1. 最佳实践综述
本次最佳实践,将结合JAVA代码对消息队列RocketMQ版(简称RocketMQ)的使用原理进行分析。
RocketMQ 是企业级互联网架构的核心产品,具备低延迟、高并发、高可用、高可靠,可支撑万亿级数据洪峰的分布式消息中间件。可通过RocketMQ控制台创建RocketMQ实例,无需安装包,省去繁杂的手续。对RocketMQ消息服务消息可视化可以按Topic、MessageID或Topic不同维度查询发送的消息、按消息轨迹功能展示发送和消费关系、消息是否成功消费等信息。其中资源报表可以快速的统计RocketMQ在一定时间段内发送和订阅消息的TPS数。
本次最佳实践的内容主要包含:
(1)消息同步和异步发送的JAVA示例代码及原理分析。
(2)针对同步和异步发送的区别选择适用的消息发送方式满足需求。
(3)对消息发送可以分Topic,更细粒化标签tag消息进行归类。
(4)通过Topic和Tag选择过滤消费消息。
(5)对消息发送失败有进行消息重试处理。
(6)结合JAVA代码对集群和广播订阅消息消费原理进行详述。
2. 最佳实践代码设计
2.1 生产者发送消息
本章对生产者发送消息的两种模式进行代码的说明。
2.1.1 同步发送消息
同步发送原理:
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
1.在pom.xml文件导入依赖包。
<dependency>
<groupId>************ces</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
2.配置文件application.properties连接mq的参数值。# POC2专有云MQ配置
mq.accessKey=**************1HaI
mq.secretKey=*****************1HaI
mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.*************-d01.mq.namesrv.cloud.poc2.com:9876
mq.normalTopic=pdsa_topic
mq.producerId=GID_pdsa_mq
mq.consumerId=CID_consumer
mq.sendMsgTimeoutMillis=3000
mq.tag=TagA
3.同步发送示例代码,针对性适配后面MQ性能压测场景代码,内容包含发送每条消息数据大小50Kb,Topic和Tag消息更细粒化分类,消息发送失败进行重试处理。
package com.aliware.edas.com.aliware.edas.rocketmq;
import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;
import com.aliyun.openservices.ons.api.*;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Properties;
/**
* @author liuhuihui
*/
@Component("simpleMQProduce")
@ RefreshScope
public class SimpleMQProduce extends ProducerEntry
{
StringBuilder content = new StringBuilder();
public void sendMsg()
{
for(int i = 0; i < 6400; i++)
{
content.append(String.valueOf("A"));
}
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());
// 设置 TCP 接入域名,到控制台的实例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
Message msg = new Message(
// Message 所属的 Topic
this.getTopic(),
// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
this.getTag(),
// Message Body 可以是任何二进制形式的数据, MQ 不做任何干预,
// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
(content.toString()).getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发
// 注意:不设置也不会影响消息正常收发
msg.setKey("ORDERID_" + 1);
try
{
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功
if(sendResult != null)
{
System.out.println(new Date() + "消息长度:" + content.length() + "--发送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));
}
}
catch(Exception e)
{
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
System.out.println(new Date() + "消息长度:" + content.length() + "--发送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));
e.printStackTrace();
}
// 在应用退出前,销毁 Producer 对象
// 注意:如果不销毁也没有问题
producer.shutdown();
}
}
2.1.2 同步发送应用场景
此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
2.1.3 异步发送消息
异步发送原理:
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要用户实现异步发送回调接口。
1.在pom.xml文件导入依赖包。
<dependency>
<groupId>c***********ces</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
2.配置文件application.properties连接mq的参数值。# POC2专有云MQ配置
mq.accessKey=*************1qc
mq.secretKey=***************R1HaI
mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.**********-d01.mq.namesrv.cloud.poc2.com:9876
mq.normalTopic=pdsa_topic
mq.producerId=GID_pdsa_mq
mq.consumerId=CID_consumer
mq.sendMsgTimeoutMillis=3000
mq.tag=TagA
3.异步发送示例代码,针对性适配后面MQ性能压测场景代码,内容包含发送每条消息数据大小50Kb ,Topic和Tag消息更细粒化分类,消息发送失败进行重试处理。
package com.aliware.edas.com.aliware.edas.rocketmq;
import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;
import com.aliyun.openservices.ons.api.*;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Properties;
/**
* @author liuhuihui
*/
@Component("asyncSimpleMQProduce")
@ RefreshScope
public class AsyncSimpleMQProduce extends ProducerEntry
{
StringBuilder content = new StringBuilder();
public void sendMsg()
{
for(int i = 0; i < 6400; i++)
{
content.append(String.valueOf("A"));
}
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());
// 设置 TCP 接入域名,到控制台的实例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
Message msg = new Message(
// Message 所属的 Topic
this.getTopic(),
// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
this.getTag(),
// Message Body 可以是任何二进制形式的数据, MQ 不做任何干预,
// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
(content.toString()).getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发
// 注意:不设置也不会影响消息正常收发
msg.setKey("ORDERID_" + 1);
while(true)
{
producer.sendAsync(msg, new SendCallback()
{
@Override
public void onSuccess(SendResult sendResult)
{
System.out.println(new Date() + "消息长度:" + content.length() + "--发送消息内容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));
}
@Override
public void onException(OnExceptionContext context)
{
System.out.println("发送失败!");
}
});
}
// 在应用退出前,销毁 Producer 对象
// 注意:如果不销毁也没有问题
// producer.shutdown();
}
}
2.1.4 异步发送应用场景
异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
2.2 消费者订阅消息
本章对消费者订阅消息的两种模式进行代码的说明。
2.2.1 集群订阅
集群订阅原理:
同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。
1.在pom.xml文件导入依赖包。
<dependency>
<groupId>**************ices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
2.配置文件application.properties连接mq的参数值。
3.集群订阅示例代码,适配后面MQ性能压测场景代码。
@Component("simpleMQConsumer")
@ RefreshScope
public classSimpleMQConsumerextendsProducerEntry
{
public void receive()
{
Properties properties = new Properties();
//您在控制台创建的GroupID
properties.put(PropertyKeyConst.GROUP_ID, this.getProcucerId());
//AccessKey阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
//SecretKey阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//设置TCP接入域名,到控制台的实例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
//集群订阅方式(默认)
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
Consumer consumer = ONSFactory.createConsumer(properties);
//订阅另外一个Topic
consumer.subscribe(this.getTopic(), "*", new MessageListener()
{ //订阅全部Tag
@Override
public Action consume(Message message, ConsumeContext context)
{
System.out.println("Receive:" + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
2.2.2 广播订阅
广播订阅原理:
同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。
1.在pom.xml文件导入依赖包.
<dependency>
<groupId>**************ces</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
2.广播消费示例代码。@Component("simpleMQConsumer")
@ RefreshScope
public classSimpleMQConsumerextendsProducerEntry
{
public void receive()
{
Properties properties = new Properties();
//您在控制台创建的GroupID
properties.put(PropertyKeyConst.GROUP_ID, this.getProcucerId());
//AccessKey阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
//SecretKey阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//设置TCP接入域名,到控制台的实例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
//广播订阅方式(默认)
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(this.getTopic(), "*", new MessageListener()
{ //订阅全部Tag
@Override
public Action consume(Message message, ConsumeContext context)
{
System.out.println("Receive:" + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
我们是阿里云智能全球技术服务-SRE团队,我们致力成为一个以技术为基础、面向服务、保障业务系统高可用的工程师团队;提供专业、体系化的SRE服务,帮助广大客户更好地使用云、基于云构建更加稳定可靠的业务系统,提升业务稳定性。我们期望能够分享更多帮助企业客户上云、用好云,让客户云上业务运行更加稳定可靠的技术,您可用钉钉扫描下方二维码,加入阿里云SRE技术学院钉钉圈子,和更多云上人交流关于云平台的那些事。