内核调优 | 如何提升Elasticsearch master调度性能40倍

作者:兴丰__阿里云Elasticsearch团队 高级开发工程师

本文字数:1299
阅读时间:2~5分钟

以下是正文


背景

我们在协助某Elasticsearch用户准备将自建集群迁往阿里云Elasticsearch的过程中发现,自建集群从ES6.3.2版本升级到7.4.0版本后,master变得特别卡,创建索引和删除耗时超过1分钟。该集群当时有3个专有主节点、10个热节点、2个冷节点,超过5万个shard,绝大部分索引/shard都是关闭的,当索引过期移动到冷节点就close掉,需要查询时再调用open命令打开。同时,在试过6.x到7.x的多个版本后,发现自7.2.0后的版本都有问题,而即使把专有主节点升级到32c64g的规格,还是不行。

思考

由于是自建的线上生产集群,登录机器和查看集群状态极为不便,也有一定的风险。因此计划先从Elasticsearch代码的变更入手,查找版本7.2的哪一个pull request最有可能改变master的调度行为,很快发现pr#39499引入的“支持对已关闭索引进行shard数据复制“有最大的嫌疑。

在引入pr#39499之前,一个索引关闭之后,数据节点对应的engine会关闭,不再提供查询和写入服务,当然也不能进行relocate和数据的拷贝,当下掉一个数据节点后,上面已关闭索引的shard数据就会丢失。

引入pr#39499之后,master会在cluster state中继续保留关闭的索引,对索引的shard进行调度,数据节点使用NoOpEngine打开索引,不支持查询和写入,和常规的engine相比开销很小。所以就转化为,集群状态存在很多shard时,master调度很慢的问题。

复现

搭建一个最小化测试环境

  • Elasticsearch版本: 7.4.0
  • 专有主节点: 3 * 16c64g
  • 数据节点: 2 * 16c64g

先创建5000个索引,每个索引有5个primary,0个replica,总共25000个shard。然后测试发现每次创建新索引需要58s。master所在机器有一个cpu利用率一直处于100%,通过top -Hp $ES_PID, 得到忙碌的线程ID。通过和jstack获取master节点的调用栈信息对比,发现是masterServices线程一直调用shardsWithState导致的。


"elasticsearch[iZ2ze1ymtwjqspsn3jco0tZ][masterService#updateTask][T#1]" #39 daemon prio=5 os_prio=0 cpu=150732651.74ms elapsed=258053.43s tid=0x00007f7c98012000 nid=0x3006 runnable  [0x00007f7ca28f8000]

  java.lang.Thread.State: RUNNABLE
       at java.util.Collections$UnmodifiableCollection$1.hasNext(java.base@13/Collections.java:1046)
       at org.elasticsearch.cluster.routing.RoutingNode.shardsWithState(RoutingNode.java:148)
       at org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.sizeOfRelocatingShards(DiskThresholdDecider.java:111)
       at org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.getDiskUsage(DiskThresholdDecider.java:345)
       at org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.canRemain(DiskThresholdDecider.java:290)
       at org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders.canRemain(AllocationDeciders.java:108)
       at org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator$Balancer.decideMove(BalancedShardsAllocator.java:668)
       at org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator$Balancer.moveShards(BalancedShardsAllocator.java:628)
       at org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.allocate(BalancedShardsAllocator.java:123)
       at org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(AllocationService.java:405)
       at org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(AllocationService.java:370)
       at org.elasticsearch.cluster.metadata.MetaDataIndexStateService$1$1.execute(MetaDataIndexStateService.java:168)
       at org.elasticsearch.cluster.ClusterStateUpdateTask.execute(ClusterStateUpdateTask.java:47)
       at org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:702)
       at org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:324)
       at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:219)
       at org.elasticsearch.cluster.service.MasterService.access$000(MasterService.java:73)
       at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:151)
       at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150)
       at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188)
       at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:703)
       at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252)
       at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13/ThreadPoolExecutor.java:1128)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13/ThreadPoolExecutor.java:628)
       at java.lang.Thread.run(java.base@13/Thread.java:830)

分析&解决

通过阅读对应的代码,发现所有触发reroute操作的请求,比如创建、删除、更新集群状态等,都会调用BalancedShardsAllocator来遍历集群的所有started shard,再计算shard所在节点被relocated shard的磁盘占用大小。这需要找到节点所有处于INITIALIZING且正在relocated的shard,当前的实现是遍历节点的所有shard。

最外层对shard的遍历是O(n), 内层对每个节点的shard的循环是O(n/m), 其中n是集群shard总数,m是节点个数,整体复杂度是O(n^2/m),对一个固定节点个数的集群来说,m可以认为是常数,则每轮reroute的调度复杂度是O(n^2)。

考虑到每次都要重新遍历节点所有shard,寻找处于INITIALIZING和RELOCATING的shard,这部分可以在初始化时算一次,然后在每次shard有状态变化时,简单更新下即可,那么整体复杂度降为O(n)。对ES7.4.0的代码做了简单的修改、打包和测试,创建索引等触发reroute操作的请求时间从之前的58s降为1.2s,ES的hot_threads api和jstack都显示shardsWithState不再是热点,效果显著。

内核调优 | 如何提升Elasticsearch master调度性能40倍

临时解决方案

master内部使用MasterService类来管理集群任务管理工作,为了保证状态的一致性,任务是单线程串行处理的,所以不能通过提升master节点的机器规格来解决。

当前的ES集群碰到这个问题,可以通过设置cluster.routing.allocation.disk.include_relocations为false来绕过,让master调度时不考虑正在relocating的shard磁盘占用。但是这会导致磁盘使用被错误估计,有反复触发relocate的风险。

相关活动


内核调优 | 如何提升Elasticsearch master调度性能40倍

扫码关注公众号‘Elasticsearch技术’,收获大咖最佳行业应用经验

内核调优 | 如何提升Elasticsearch master调度性能40倍

往期好文

【产品解读】阿里云 Elasticsearch 在日志场景中实现“低成本高性能”
年度盘点 | “三年磨一剑” 阿里云Elasticsearch干货手册
Elasticsearch大咖说|携程旅行:从日志分析平台到综合性 Elasticsearch 管理平台

上一篇:【入门到精通,23天掌握Elasticsearch大数据实时搜索与分析能力】eBay技术大牛阮一鸣邀您免费听课


下一篇:从业务需求到能力扩展 | 阿里云Elasticsearch向量检索能力的创变史