RocketMQ调优过程
RocketMQ调优过程
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。
RocketMQ是阿里巴巴双十一官方指定消息产品,支撑阿里巴巴集团所有的消息服务,历经十余年高可用与高可靠的严苛考验,是阿里巴巴交易链路的核心产品。
在此主要记录对于指定任务的调优过程,对于RocketMQ的基础知识不再赘述。
一.任务描述
建立一条信息传输通道,初始端是一个Tcp服务器,负责生产固定频率(每100ms生产1条数据)的数据(带有八个字段的自定义对象信息),中间是一个Tcp客户端,负责接收Tcp服务器传来的数据并启动一个RocketMQ的生产者将数据发送到RocketMQ的broker中等待消费者消费,最后建立一个RocketMQ的消费者用以消费broker中的数据,该任务最关键的点是保证消息在生产者到消费者之间传输速度,不能有太大延迟。
二.基本结构
下面展示的都是最终程序的代码,可以直接cv,具体调优过程在第三章讲。
根据任务要求,需要有一个Tcp服务器端生产数据,我这里使用的就是Java的原生网络编程接口,以本地的30777端口作为服务器端口。为了满足多个Tcp客户端同时请求,这里还加入一个多线程的Channel内部类,每当有一个客户端建立连接,就新开一个Channel线程,具体代码如下,其中Student就是一个定义了八个字段的pojo类:
public class TcpServer{
//服务器端要绑定的端口号
private static int port = 30777;
private static ServerSocket serverSocket = null;
private static List<Student> lists = new ArrayList<>();
public static void main(String[] args) throws IOException {
for(int i = 0; i < 10; i++){
Student student = new Student("名字",18,"男",1,"学校","地址","学院",123);
lists.add(student);
}
Student student = new Student("名字",18,"男",0,"学校","地址","学院",123);
lists.add(student);
serverSocket = new ServerSocket(port);
log.info("TCP服务器建立成功,等待客户端连接");
while(true){
//一直等待客户端请求连接
Socket socket = serverSocket.accept();
System.out.println("一个客户端建立连接了");
Channel channel = new Channel(socket);
new Thread(channel).start();
}
}
//内部通道类
static class Channel implements Runnable {
private DataInputStream bis;
private DataOutputStream bos;
private ObjectInputStream ois;
private ObjectOutputStream oos;
private Socket socket;
private boolean isRunning;
public Channel(Socket socket) {
this.socket = socket;
try {
bis = new DataInputStream(socket.getInputStream());
bos = new DataOutputStream(socket.getOutputStream());
oos = new ObjectOutputStream(socket.getOutputStream());
isRunning = true;
} catch (IOException e) {
release();
}
}
@SneakyThrows
@Override
public void run() {
//发送一条成功连接的消息
//send("成功连接TCP服务器");
//连接成功后 定时发送消息 每100ms发送一条
int code = 0;
Scanner scanner = new Scanner(System.in);
while(true){
long startTime = System.currentTimeMillis(); //获取开始时间
System.out.println("程序运行时间:" + startTime + "ms"); //输出程序运行时间
for(; code < 10; code++){
send(lists.get(code));
Thread.sleep(0);
}
send(lists.get(code));
code = 0;
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void send(Student student) {
try {
oos.writeObject(student);
oos.flush();
} catch (IOException e) {
release();
}
}
//释放资源
public void release(){
isRunning = false;
try {
bos.close();
bis.close();
socket.close();
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
其次,需要一个Tcp客户端接收服务器传来的消息,并以RocketMQ的生产者身份将消息发布出去,使用时new一个类启动线程即可,代码如下:
public class TcpClient implements Runnable {
//服务器的主机地址
private static String host = "localhost";
//服务器的端口号
private int port = 30777;
private Socket socket = null;
private DataInputStream bis = null;
private ObjectInputStream ois = null;
private String name = "";
//主机地址(ip:port)
private static String nameServerAddr = "192.168.40.136:9876";
//group名字
private static String groupName = "producer8";
//topic名字
private static String topicName = "topic_test2";
//tag名字
private static String tagName = "TagC";
private static DefaultMQProducer producer = null;
private List<Message> messages = new ArrayList<>();
public TcpClient(String name){
this.name = name;
//向服务器请求连接
try {
socket = new Socket("localhost", 30777);
bis = new DataInputStream(socket.getInputStream());
ois = new ObjectInputStream(socket.getInputStream());
} catch (IOException e) {
release();
e.printStackTrace();
}
}
@Override
public void run() {
try {
System.out.println("===RocketMQ生产端===");
//实例化生产者
producer = new DefaultMQProducer(groupName);
//设置NameServer的地址
producer.setNamesrvAddr(nameServerAddr);
//启动Producer
producer.start();
}catch(Exception e) {
e.printStackTrace();
}
while(true){
Student receiveObject = null;
try {
receiveObject = (Student) ois.readObject();
//发送同步消息 设置topic和tag
if(receiveObject.getCode() != 0) {
//发送数据时的时间
long time = System.currentTimeMillis();
System.out.println("时间:" + (time) + "ms");
Message msg = new Message(topicName, tagName, objectToBytes(receiveObject));
messages.add(msg);
}else{
MessageBatch msgBatch = MessageBatch.generateFromList(messages);
msgBatch.setBody(msgBatch.encode());
producer.send(msgBatch, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable throwable) {
throwable.printStackTrace();
}
});
messages.clear();
System.out.println("发送消息成功");
}
} catch (Exception e) {
release();
e.printStackTrace();
}
}
}
//释放资源
public void release(){
try {
bis.close();
ois.close();
socket.close();
producer.shutdown();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 对象转字节数组
*/
public byte[] objectToBytes(Object obj) throws IOException {
try(ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream sOut = new ObjectOutputStream(out);
){
sOut.writeObject(obj);
sOut.flush();
byte[] bytes = out.toByteArray();
return bytes;
}
}
}
最后,就是用RocketMQ的consumer消费信息,具体代码如下:
public class RocketMqConsumer {
//主机地址(ip:port)
private static String nameServerAddr = "192.168.40.136:9876";
//group名字
private static String groupName = "consumer8";
//topic名字
private static String topicName = "topic_test2";
//tag名字
private static String tagName = "TagC";
public static void main(String[] args) throws Exception {
System.out.println("===RocketMQ消费端===");
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
// 设置NameServer的地址
consumer.setNamesrvAddr(nameServerAddr);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeMessageBatchMaxSize(10);
consumer.setPullBatchSize(32);
//consumer.setConsumeThreadMax(20);
//consumer.setConsumeThreadMin(15);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe(topicName, tagName);
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(msgs.size());
for (MessageExt msg : msgs) {
Student data = null;
try {
data = (Student) bytesToObject(msg.getBody());
}catch (Exception e){
e.printStackTrace();
}
log.info("===接收数据之后时间: {} rocketMQ接收消息为{}",System.currentTimeMillis(),data);
}
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
}
/**
* 字节数组转对象
*/
public static Object bytesToObject(byte[] bytes) throws IOException, ClassNotFoundException {
try (ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream sIn = new ObjectInputStream(in);
) {
return sIn.readObject();
}
}
}
上述代码使用时,需要注意的点就是把ip换成你自己搭建的单机或者集群环境的ip地址,我这里是本地局域网内部署的一个环境。
三.调优过程
任务要求的是控制消费者和生产者之间的消息传输速度,加上我接触MQ时间比较短,因此,我主要是对生产者生产消息的过程进行了一次调优摸索。
1.同步发送
刚开始做这个任务时,我是先查看了一下RocketMQ提供的简单实例,想着先把整个流程跑起来的思路,就使用了consumer提供的最简单、最可靠同时也是最费时间的同步方法发布新消息,因为同步方法每往broker发送一条消息就会阻塞,直到broker返回确认信息才能继续往下执行,这种方法对于追求高可靠信息的应用场景是非常适用的,但是显然不适合本次任务,果断放弃。
我在本机电脑上做的测试,同步发送十条上述要求的消息大概需要花费1800多ms,确实是非常慢的。
2.异步发送
通过上述可知,我们不想每次都等待上一条消息确认收到之后再发送下一条消息,所以,很自然的就想到是不是有异步发送的方法,异步发送简单的说就是生产者发送消息之后不需要等待可以继续发送,而当broker返回确认信息时,程序可以调用预先设定的一个回调函数对其进行处理,还是查文档,学习新技术的最好方式就是查官方文档,所幸,异步发送消息的方式文档也写的明明白白,第二章消费者代码中使用的就是异步发送方法。
同样是上面的十条消息,通过异步方法我测试的大概是在4-15ms之间,竟然比同步发送快了数倍,这里我自己感觉可能是使用的电脑性能问题,但是异步发送确实是比同步发送快很多的。
3.批量发送
批量发送即可以在同步发送实现,也可以在异步发送中实现。这一点的实际测试效果其实并没有好很多,但我觉得理论上肯定是比单独用异步发送要快一点的。
昨天晚上本来是想恶补一下RocketMQ的基础知识,就在b站上随机点开了一个教学视频,当课程讲到RocketMQ的通信实现是基于Netty服务器时,我才恍然大悟,明白了自己之前忽略了一个最重要的问题,就是每次生产者发送消息时就是进行一次网络I/O的操作,如果我们只是异步发送,那十条消息还是需要十次的网络I/O操作,如果把十次的数据放在一起发送呢,显然就少了9次网络I/O操作,如果数据量大的话,这笔操作开销还是挺大的。
只知道使用批量发送会提高速度还不行,还得看RocketMQ支持不支持,毕竟现在还没有造*的能力,同样是查看文档发现RocketMQ是支持批量发送的。如果是同步发送,只需要将要批量发送的信息放到一个List集合里面即可;如果是异步发送,就需要对信息进行稍微的处理,上面的生产者代码中也给出了具体的操作方法。
四.总结
除了人生,万事皆有定数。对于新事物,只要找到规律并对其归纳,总能掌握的。