## 系统简介
参与创业的第二年,产品的DAU终于突破了1000万,对于一个后端&大数据开发来说,这是一件多么刺激的事情。准确来说是两款产品,一款是走路类的产品(DAU是800万),另外一款是feed流资讯类产品(DAU是300万)。新上线的一个功能是客户端的埋点,用于产品和用户行为分析,数据丢失不敏感。
目前客户端接入层的大概实现如下:(大数据计算层的先省略掉,以后再介绍)
![3.png](http://www.itrensheng.com/upload/2019/11/3-f90cd13178c549769e6e3f75e67b4ff1.png)
其中客户端接入层api是部署了6台服务器,kafka topic是12个partition,MySQL是3个Master
## 上线后的问题
下午4点10分打开了客户端的自动升级以后,到下午7点之前系统并未见任何异常。但是7点之后线上kafka出现告警,当天kafka consumer group消费的埋点topic出现lag大于50万,且lag值在继续扩大。也就是说线上的kafka消费性能出现了瓶颈,导致线上kafka数据的消费存在积压。
### 1.我随即查看了阿里云slb对应的流量走势图如下:
![2.png](http://www.itrensheng.com/upload/2019/11/2-80553758303845818ce2444d404d7b26.png)
从上面的走势图可以看出来,流量从4点10分打开客户端自动升级至7点半的时间段之内,QPS达到了2000,甚至一度将要冲破4000。
### 2.我接下来查看了应用服务器,监控走势图如下:
![5.png](http://www.itrensheng.com/upload/2019/11/5-a3cf136e475949cb88c43616f6b2a29e.png)
我浏览了6台应用服务器,监控走势都差不多如上图,也就是瓶颈不在应用服务器
### 3.我再看了一下线上mysql所在物理机的监控图如下:
![4.png](http://www.itrensheng.com/upload/2019/11/4-e3cb5e417d4d44c684c8942c432fb09d.png)
可以看出mysql服务器的cpu有所增长,但是在可控范围之内,而内存占用也就差不多20%。很奇怪的两张图是磁盘的读写字节数和读写请求数,从右下角的两张图可以看出来,流量刚开始增长的时候,磁盘是只有写入,没有读取的(而这是服务埋点系统的,因为所有的mysql操作都是insert,并没有select)。**但是接下来从7点半开始,磁盘的读请求书激增,同事读字节数甚至超过了写入字节数**。(而这点也恰恰是我忽略的一点)
### 4.kafka consumer消费代码同步改为异步
目前线上kafka是同步消费,且手动提交offset的方式,具体代码如下:
```java
@KafkaListener(topics = KafkaTopicConstant.DATA_REPORT_KAFKA_TOPIC)
public void receive(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment){
try {
String key = consumerRecord.key().toString();
String message = consumerRecord.value().toString();
int partition = consumerRecord.partition();
long offset = consumerRecord.offset();
int runTimes = SystemConstant.CONSUMER_ITERATE_TIMES;
boolean commitOffsets = false;
while (!commitOffsets) {
runTimes--;
log.info(String.format("consum partition{%s}, offset{%s}, key{%s}",partition,offset,key));
commitOffsets = invokeLand(message);
if(!commitOffsets && runTimes <=0){
commitOffsets = true;
}
}
if (commitOffsets) {
//处理成功则提交offset
acknowledgment.acknowledge();
}
}catch (Exception ex){
log.error("receive throw exception",ex);
}
}
```
以上代码是同步消费kafka的方式,我先改动为异步消费如下:
```java
static class NameTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "data-consumer-" + mThreadNum.getAndIncrement());
return t;
}
}
private ExecutorService executorService =new ThreadPoolExecutor(10, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), new NameTreadFactory(), new ThreadPoolExecutor.AbortPolicy());
@KafkaListener(topics = KafkaTopicConstant.DATA_REPORT_KAFKA_TOPIC)
public void receive(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment){
try {
String message = consumerRecord.value().toString();
executorService.execute(() ->{
invokeLand(message);
});
acknowledgment.acknowledge();
}catch (Exception ex){
log.error("receive throw exception",ex);
}
}
```
### 5.Kafka lag消息减少
监控了一下系统,发现kafka lag的消息在20分钟作用从70万降到了2000,我分别观察了应用服务器和数据库服务器的情况,此时监控系统并未发现线程池未出现被拒绝的情况各项指标正常,数据库服务器各项指标也正常,也就是说系统负载目前可以支撑当前的QPS。
### 6.MySQL服务器告警
第二天早上7点半左右收到mysql服务器告警,CPU使用率接近90%。看了一下实时QPS稳定在3000左右。3台数据库服务器的CPU和IO都持续攀升,CPU都将近90%,**写入速度每天大概是55M/s,读取速度大概是110M/s**。(我第二次忽略了关键问题,为何只有insert的系统,读取速度是写入熟读的2倍)
![6.png](http://www.itrensheng.com/upload/2019/11/6-08aa19de2f8e4d18be937b7925bf7e81.png)
### 7.改进kafka为批量消费
从4中可以看到,此时insert到数据库依然是单笔插入的,现在改为批量插入
```java
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3000);
propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 100 * 1024);
propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50 * 1024);
propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000);
return propsMap;
}
@KafkaListener(topics = KafkaTopicConstant.DATA_REPORT_KAFKA_TOPIC,containerFactory = "kafkaListenerContainerFactory")
public void receive(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment){
try {
...
acknowledgment.acknowledge();
}catch (Exception ex){
log.error("receive throw exception",ex);
}
}
其中的mapper改为了
@Insert({
"<script>",
"INSERT INTO data_report(id, app_id, create_time, command_id, header, user_id, trx_time, content, partition_key) VALUES ",
"<foreach collection='reportDataDtos' item='item' index='index' separator=','>",
"(#{item.id}, #{item.appId}, #{item.createTime}, #{item.commandId}, #{item.header,typeHandler=com.datareport.mapper.handler.MybatisJsonTypeHandler}, #{item.userId}, #{item.trxTime}, #{item.content,typeHandler=com.datareport.mapper.handler.MybatisJsonTypeHandler}, #{item.partitionKey})",
"</foreach>",
"</script>"
})
void batchInsert(@Param(value = "reportDataDtos") List<ReportDataDto> reportDataDtos);
```
做了如上改动之后,实时监控入库操作,发现批次并不是我所设设定的FETCH_MIN_BYTES_CONFIG和FETCH_MAX_WAIT_MS_CONFIG(线上消息大概每条400Bytes,所以我设定每个批次至少是100条消息)。实际情况是每个批次1到20不等,而数据库服务器的CPU和IO负载有所降低
### 8.应用服务设定一个队列
根据7中所要设定的批次,虽然也会多笔同时插入数据库,但是数据库IO的请求次数并未达到理性状况。此时在应用服务器做了一个双端阻塞队列,当队列长度达到100的时候,提交一个异步线程到数据库,做批量插入操作。
```java
BlockingDeque<ReportDataDto> blockingDeque = new LinkedBlockingDeque<ReportDataDto>();
@KafkaListener(topics = KafkaTopicConstant.DATA_REPORT_KAFKA_TOPIC,containerFactory = "kafkaListenerContainerFactory")
public void receive(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment){
try {
for(ConsumerRecord<String, String> consumerRecord : consumerRecords){
String message = consumerRecord.value();
if(StringUtils.isNotEmpty(message)){
ReportDataDto reportDataDto = GsonUtil.build().fromJson(message,ReportDataDto.class);
if(blockingDeque.size() >= 100){
final List<ReportDataDto> pendingList = new LinkedList<ReportDataDto>();
while (!blockingDeque.isEmpty()){
pendingList.add(blockingDeque.takeFirst());
}
executorService.execute(() -> {
invokeLand(pendingList);
});
blockingDeque.putLast(reportDataDto);
}else {
blockingDeque.putLast(reportDataDto);
}
}
}
acknowledgment.acknowledge();
}catch (Exception ex){
log.error("receive throw exception",ex);
}
}
```
监控数据库服务器,各项指标有所降低,CPU使用率从80%降到了60%,读写请求次数降低了一半。
![7.png](http://www.itrensheng.com/upload/2019/11/7-b18c441ad2464ebebef2730539dae79d.png)
**此时,从读写字节数的走势图来看,读请求的字节数依然是写请求的两倍**(第三次,终于引起了我的注意)。从3中的截图看出来,读请求并非是随着写入请求同步持续走高的,中间是存在一个差值的时间段,大概是从7点到9点这1个小时的时间
![8.png](http://www.itrensheng.com/upload/2019/11/8-9276940b11f040bc9431a76f0ecd8978.png)
### 9.上面的8中数据库读字节数为何在一个小时的时间之内激增,甚至超过了写字节数?
查看系统的写入,其中主键使用的是一个char类型的UUID(至于为什么用一个char类型作为主键,是因为消费者写入的不仅仅是mysql,还要写入到mongodb和HBase*其他系统使用),代码如下:
```java
String snowId = String.valueOf(UUID.randomUUID()).replace("-", "");
```
InnoDB采用的是B+ 树的数据结构,而B+ 树为了维护索引有序性,在插入新值的时候需要做必要的维护。如果表ID采用自增主键的插入数据模式,每次插入一条新记录,都是追加操作,都不涉及到挪动其他记录,也不会触发叶子节点的分裂。但是现在使用的是UUID,不仅会使B+数页分裂,还影响数据页的利用率。原本放在一个页的数据,现在分到两个页中,整体空间利用率降低大约 50%。
那改造的目标就是使ID保持一个自增长的方式,综合考虑,使用雪花算法来替换UUID的方案,代码改动如下:
```java
String snowId = SnowflakeIdUtil.getInstance().nextId();
```
监控系统截图如下:
![9.png](http://www.itrensheng.com/upload/2019/11/9-cb08d5cc79ed4c559596822211d7eb21.png)
从监控可以看出,CPU再次从70%降到了10%,而读写字节数也从47M/s降到了4M/s。综合来看,雪花算法替换UUID的方案,在高并发场景下,性能有10倍提升