ES笔记三:集群管理

1. 集群中的节点角色

Ingest Node

如何分配?

如何避免脑裂问题?

2. 分片及副本

3. 集群健康及监控

3.1 cat API

3.2 Cluster API

3.2.1 _cluster

3.2.2 _nodes

3.2.3 _remote

3.2.4 _tasks

3.3 X-Pack

4. 扩容

5. 故障转移

6. Thread Pool


1. 集群中的节点角色

每当启动ElasticSearch实例时,都会启动一个节点。连接的节点集合称为集群。如果您运行的是单个ElasticSearch节点,那么您拥有一个由一个节点组成的集群。

集群中的每个节点默认都可以处理HTTP 和Transport流量。Transport专门用于节点和 Java TransportClient之间的通信;HTTP层仅由外部REST客户端使用。

Master Node

将node.master设置为true的节点。使其有资格被选为控制集群的主节点。

Data Node

将node.data设置为true的节点。数据节点保存数据并执行与数据相关的操作,如CRUD、搜索和聚合。

Ingest node

将node.intrupt设置为true的节点。摄取节点能够将 ingest pipeline应用于文档,以便在索引前转换和丰富文档。对于大吞吐下,使用专用的摄取节点并将主节点和数据节点标记为 node.ingest: false 是有意义的。

Tribe node 部落节点,通过 tribe.* 配置,是一种特殊类型的协调节点,可以连接到多个集群,并在所有连接的集群上执行搜索和其他操作。

默认情况下,节点是一个Master节点和一个Data节点,并且它可以通过摄取管道预处理文档。这对于小型集群非常方便,但是随着集群的增长,考虑将专用的Master节点与专用的Data节点分离变得非常重要。Indexing 和 searching 数据是CPU、内存和I/O密集型工作,这会给节点的资源带来压力。为了确保主节点稳定且不受压力,在更大的集群中,最好在专用的符合主节点条件的节点和专用的数据节点之间划分角色。

尽管主节点也可以充当协调节点,并将搜索和索引请求从客户端路由到数据节点,但最好不要为此目的使用专用的主节点。主合格节点的工作越少,对于集群的稳定性就越重要。

node.master: true                 //默认是true
node.data: false                 //默认是true(启用node.data role)
node.ingest: false                 //默认是true(启用node.ingest role)
search.remote.connect: false     //默认是true(启用cross-cluster search)

当集群的节点数量较大时(比如超过30个节点),集群的管理工作会变得复杂很多。此时应该创建专有master节点,这些节点只负责集群管理,不存储数据,不承担数据读写压力;其他节点则仅负责数据读写,不负责集群管理的工作。这样把集群管理和数据的写入/查询分离,互不影响,防止因读写压力过大造成集群整体不稳定。 将专有master节点和数据节点的分离,需要修改ES的配置文件,然后滚动重启各个节点。

当index/shard数量过多(在一个20个节点的集群里,创建了4w+个shard),导致新建一个index需要60s+才能完成,可以考虑从以下几方面改进:

  • 降低数据量较小的index的shard数量
  • 把一些有关联的index合并成一个index
  • 数据按某个维度做拆分,写入多个集群

Ingest Node

ingest node是5.0新增的特性,它用来在真正对文档进行索引之前做预处理。所有的节点都是默认支持ingest的,任何节点都可以处理ingest请求,也可以创建一个专门的Ingest nodes。它产生的价值在于,对文档进行索引之前对文件进行预处理,通过定义包含了多个process的pipeline来实现。每个process(ES内置的数据处理器)实现了对文档的某种转换,如移除某个字段,重命名某个字段等。

ES笔记三:集群管理

Ingest APIs

Put Pipeline API,新增或更新pipeline

Get Pipeline API, 获取pipeline
Delete Pipeline API,删除pipeline
Simulate Pipeline API,模拟对pipeline的调用
Processors,内置的非常多
Convert Processor,将现有字段的值转换为其他类型,例如将字符串转换为整数。如果字段值是数组,则将转换所有成员。
Date Processor,分析字段中的日期,然后使用日期或时间戳作为文档的时间戳。默认情况下,日期处理器将解析的日期添加为一个名为@timestamp的新字段
Remove Processor,删除现有字段。如果一个字段不存在,将引发异常
Rename Processor,重命名现有字段。如果字段不存在或新名称已被使用,将引发异常

Grok Processor,从文档中的单个文本字段中提取结构化字段。它非常适合于syslog日志、apache和其他Web服务器日志、mysql日志,而且一般来说,任何日志格式都是为人类而非计算机消耗编写的。相信用过Logstash的同学一定不会陌生。

{//源文档
  "message": "55.3.244.1 GET /index.html 15824 0.043"
}
{//管道定义
  "description" : "...",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"]
      }
    }
  ]
}
{//处理后的文档
  "message": "55.3.244.1 GET /index.html 15824 0.043",
  "client": "55.3.244.1",
  "method": "GET",
  "request": "/index.html",
  "bytes": 15824,
  "duration": "0.043"
}

如何分配?

  • 小规模集群(数据量级几千万到数十亿级别),不需严格区分
  • 中大规模集群(十个以上节点),应考虑单独的角色充当。特别并发查询量大,查询的合并量大,可以增加独立的协调节点。角色分开的好处是分工分开,不互影响。如不会因协调角色负载过高而影响数据节点的能力。

如何避免脑裂问题?

一个集群中只有一个A主节点,A主节点因为需要处理的东西太多或者网络过于繁忙,从而导致其他从节点ping不通A主节点,这样其他从节点就会认为A主节点不可用了,就会重新选出一个新的主节点B。过了一会A主节点恢复正常了,这样就出现了两个主节点,导致一部分数据来源于A主节点,另外一部分数据来源于B主节点,出现数据不一致问题,这就是脑裂。

尽量避免脑裂,需要添加最小数量的主节点配置:discovery.zen.minimum_master_nodes: (有master资格节点数/2) + 1,它控制的是,选举主节点时需要看到最少多少个具有master资格的活节点,才能进行选举。官方的推荐值是(N/2)+1,其中N是具有master资格的节点的数量。一般Master 和 dataNode 角色分开,配置奇数个master,如3 。

discovery.zen.ping.multicast.enabled: false 关闭多播发现机制,默认是关闭的
discovery.zen.ping.unicast.hosts: ["master1", "master2", "master3"] 配置单播发现的主节点ip地址,其他从节点要加入进来,就得去询问单播发现机制里面配置的主节点我要加入到集群里面了,主节点同意以后才能加入,然后主节点再通知集群中的其他节点有新节点加入
discovery.zen.ping_timeout: 30(默认值是3秒) 其他节点ping主节点多久时间没有响应就认为主节点不可用了
discovery.zen.minimum_master_nodes: 2 选举主节点时需要看到最少多少个具有master资格的活节点,才能进行选举

2. 分片及副本

索引在被创建时默认情况下会被分配5个主分片(每个分片一个副本),更多用法

分片(shard):一个ES的index由多个shard组成,每个shard承载index的一部分数据。分片数指定后不可变,除非重索引。每个分片本质上就是一个Lucene索引, 因此会消耗相应的文件句柄, 内存和CPU资源。

副本(replica):index也可以设定副本数(numberofreplicas),也就是同一个shard有多少个备份。对于查询压力较大的index,可以考虑提高副本数(numberofreplicas),通过多个副本均摊查询压力。副本分片数可以随时修改。

shard数量(numberofshards)设置过多或过低都会引发一些问题:shard数量过多,则批量写入/查询请求被分割为过多的子写入/查询,导致该index的写入、查询拒绝率上升;对于数据量较大的inex,当其shard数量过小时,无法充分利用节点资源,造成机器资源利用率不高或不均衡,影响写入/查询的效率。

多少分片才算合理?

ElasticSearch推荐的最大JVM堆空间是30~32G, 所以把你的分片最大容量限制为30GB, 然后再对分片数量做合理估算。例如, 你认为你的数据能达到200GB, 推荐你最多分配7到8个分片。在开始阶段, 一个好的方案是根据你的节点数量按照1.5~3倍的原则来创建分片。

对于基于日期的索引需求, 并且对索引数据的搜索场景非常少。也许这些索引量将达到成百上千, 但每个索引的数据量只有1GB甚至更小. 对于这种类似场景, 建议只需要为索引分配1个分片。

应该设置几个副本?

副本的用途是备份数据保证高可用数据不丢失,高并发的时候参与数据查询。一般一个分片有1-2个副本即可保证高可用。副本多浪费存储空间、占用资源、影响性能。

为保证高可用,副本数设置为2即可。要求集群至少要有3个节点,来分开存放主分片、副本。如发现并发量大时,查询性能会下降,可增加副本数,来提升并发查询能力。

注意:新增副本时主节点会自动协调,然后拷贝数据到新增的副本节点

有推荐设置?

  • 对于数据量较小(100GB以下)的index,往往写入压力查询压力相对较低,一般设置3~5个shard,numberofreplicas设置为1即可(也就是一主一从,共两副本)
  • 对于数据量较大(100GB以上)的index:
  1. 一般把单个shard的数据量控制在(20GB~50GB)
  2. 让index压力分摊至多个节点:可通过index.routing.allocation.totalshardsper_node参数,强制限定一个节点上该index的shard数量,让shard尽量分配到不同节点上
  3. 综合考虑整个index的shard数量,如果shard数量(不包括副本)超过50个,就很可能引发拒绝率上升的问题,此时可考虑把该index拆分为多个独立的index,分摊数据量,同时配合routing使用,降低每个查询需要访问的shard数量。

3. 集群健康及监控

为了简单,可以搭建一个空集群,进入elasticsearch-5.5.2\bin,启动elasticsearch.bat。

3.1 cat API

JSON非常适合计算机。所有cat命令都接受一个查询字符串参数help,以查看它们提供的所有头和信息,并且/_cat命令单独列出了所有可用的命令。

_cat
    基本参数
        verbose: 显示列名, 请求参数为v
            示例: curl http://localhost:9200/_cat/health?v
        help: 显示当前命令的各列含义, 请求参数为help. 某些命令部分列默认不显示,可通过help该命令可显示的所有列
            示例: curl http://localhost:9200/_cat/master?help
        bytes: 数值列还原为原始值. 如diskSize, 默认转为以kb/mb/gb表示, 打开后还原为原始值
            示例: curl http://localhost:9200/_cat/indices?bytes=b
        header: 显示指定列的信息, 请求参数为h
            示例: curl http://localhost:9200/_cat/indices?h=i,tm(显示集群各索引的内存使用)

/_cat/allocation 查看单节点的shard分配整体情况
/_cat/shards              
/_cat/shards/{index}  
查看各shard的详细情况,包括shard的分布, 当前状态, doc数量, 磁盘占用情况, shard的访问情况,
/_cat/master 查看集群中的master节点
/_cat/nodes 查看集群各个节点的当前状态, 包括节点的物理参数(包括os/jdk版本, uptime, 当前mem/disk/fd使用情况等), 请求访问情况(如search/index成功和失败的次数)等详细信息
/_cat/indices
/_cat/indices/{index}
查看集群中所有index的详细信息,包括index状态,shard个数(primary/replica),doc个数等

/_cat/segments

/_cat/segments/{index}

查看各index的segment详细信息,包括segment名, 所属shard, 内存/磁盘占用大小, 是否刷盘, 是否merge为compound文件等
/_cat/count  
/_cat/count/{index}
查看当前集群的doc数量
/_cat/recovery                
/_cat/recovery/{index}
查看集群内每个shard的recovery过程. 调整replica,恢复snapshot或者节点启动都会触发shard的recover.
/_cat/health

查看集群当前状态, 包括data节点个数,primary shard个数等基本信息,status字段指示着当前集群在总体上是否工作正常。它的三种颜色含义如下:

"unassigned_shards": 3,表示没有被分配到任何节点的副本数(同一个节点上既保存原始数据又保存副本是没有意义的,因为一旦失去了,我们也将丢失该节点上的所有副本数据)  

  • green,有的主分片和副本分片都正常运行。 
  • yellow,所有的主分片都正常运行,但不是所有的副本分片都正常运行。 
  • red,有主分片没能正常运行。
/_cat/pending_tasks 查看当前集群的pending task

/_cat/aliases

/_cat/aliases/{alias}

所有alias信息,包括alias对应的index, 路由配置等
/_cat/thread_pool 查看集群各节点内部不同类型的threadpool的统计信息, 覆盖了es对外所有请求的threadpool(活跃线程数和任务队列大小以及拒绝数量)
/_cat/plugins 查看集群各个节点上的plugin信息
/_cat/fielddata
/_cat/fielddata/{fields}
查看集群fielddata内存占用情况

3.2 Cluster API

大多数集群级API允许指定要在哪个节点上执行(例如,获取节点的节点状态)。节点可以使用其内部节点ID、节点名称、地址、自定义属性或仅接收请求的本地节点在API中标识。

3.2.1 _cluster

Cluster Health

它允许获得集群运行状况的非常简单的状态

GET /_cluster/health
GET /_cluster/health?wait_for_status=yellow&timeout=50s
GET /_cluster/health/twitter?level=shards

Cluster State

它允许获取整个集群的全面状态信息,metrics包含version、master_node、nodes、routing_table、metadata、blocks。

GET /_cluster/state/{metrics}/{indices} 

GET /_cluster/state
GET /_cluster/state/metadata,routing_table/foo,bar
GET /_cluster/state/blocks

Cluster Stats

它允许从集群范围的角度检索统计信息。API返回基本索引度量(shard数、存储大小、内存使用情况)和有关构成集群的当前节点的信息(number、roles、操作系统、JVM版本、内存使用情况、CPU和已安装插件)

GET /_cluster/stats?human&pretty

Pending cluster tasks

它返回尚未执行的任何集群级别更改(例如,创建索引、更新映射、分配或失败碎片)的列表。

GET /_cluster/pending_tasks

Cluster Reroute

它允许手动更改集群中单个分片的分配。例如,可以显式地将分片从一个节点移动到另一个节点,可以取消分配,并且可以显式地将未分配的分片分配给特定的节点

Cluster Update Settings

允许更新群集范围内的特定设置。更新的设置可以是持久的(在重新启动时应用),也可以是暂时的(在所以的群集重新启动后将失效)

PUT /_cluster/settings
{
    "persistent" : {
        "indices.recovery.max_bytes_per_sec" : "50mb"
    }
}

Cluster Allocation Explain API

它目的是为集群中的shard分配提供解释。对于未分配的分片,explain api提供了未分配分片的原因的解释。对于分配的分片,explain api提供了一个解释,解释为什么分片保留在其当前节点上,并且没有移动或重新平衡到另一个节点。当试图诊断分片未分配的原因,或者当您可能希望分片继续保留在当前节点上时,此API非常有用。

GET /_cluster/allocation/explain
{
  "index": "myindex",
  "shard": 0,
  "primary": true
}

3.2.2 _nodes

Nodes Info

它允许检索一个或多个(或全部)群集节点信息

GET /_nodes
GET /_nodes/nodeId1,nodeId2
GET /_nodes/plugins
GET /_nodes/ingest

Nodes Stats

它允许检索一个或多个(或全部)群集节点统计信息

GET /_nodes/stats
GET /_nodes/nodeId1,nodeId2/stats
GET /_nodes/stats/indices
GET /_nodes/stats/os,process
GET /_nodes/10.0.0.1/stats/process
GET /_nodes/stats/indices/fielddata?fields=field1,field2
GET /_nodes/stats?groups=_all
GET /_nodes/stats/indices?groups=foo,bar

Nodes Feature Usage

它允许检索有关每个节点功能使用情况的信息

GET _nodes/usage
GET _nodes/nodeId1,nodeId2/usage

Nodes hot_threads

允许获取群集中每个节点上当前热线程的API

GET    /_nodes/hot_threads
GET    /_nodes/{nodesIds}/hot_threads

3.2.3 _remote

Remote Cluster Info允许检索所有配置的远程集群信息。此命令返回由配置的远程集群别名键控的连接和端点信息。

GET /_remote/info

3.2.4 _tasks

Task Management API 允许检索有关当前在群集中一个或多个节点上执行的任务的信息。这个API是新增的,应该仍然被视为beta特性。API可能会以不向后兼容的方式更改。

GET _tasks 
GET _tasks?nodes=nodeId1,nodeId2 
GET _tasks?nodes=nodeId1,nodeId2&actions=cluster:*

3.3 X-Pack

ElasticSearch是X-pack监控的核心。在所有情况下,x-pack监视文档都只是普通的JSON文档,它是通过在某个轮询间隔(默认为10秒)监视每个弹性堆栈组件而构建的,然后将这些文档索引到监视集群中。考虑到x-pack是收费的,这里就不展开讲。

4. 扩容

水平扩容,添加并启动新节点即可,扩容允许每个节点上存在一个分片。上面讲过也可以通过增加副本来扩容。

5. 故障转移

cluster.name:只要它和第一个节点有同样的 cluster.name 配置,它就会自动发现集群并加入到其中。但是在不同机器上启动节点的时候,为了加入到同一集群,你需要配置一个可连接到的单播主机列表。 详细信息请查看最好使用单播代替组播。当第二个节点加入到集群后,3个副本分片将会分配到这个节点上,每个主分片对应一个副本分片(2个节点中任何一个节点出现问题时,数据都完好无损),cluster-health中status值为green,表示都在正常运行。

比如我们关闭主节点,意味着失去主分片1和2,此时集群状态red。而Node 2和Node 3上对应的副本分片提升为主分片,此时集群的状态将变为为yellow,当缺失的副本分片分配完成(还是9个)集群的状态会变为green。

6. Thread Pool

一个节点包含多个线程池,以改进如何在一个节点中管理线程内存消耗。其中许多池还具有与其关联的队列,这些队列允许挂起的请求被保留而不是丢弃。

generic 对于一般操作(例如后台节点发现)。线程池类型是scaling
index 用于index/delete操作。线程池类型固定为可用处理器的大小,队列大小为200。此池的最大大小为available_processors+1。
search 用于count/search/suggest操作。线程池类型是fixed_auto_queue_size,大小为int(available_processors*3)/2)+1,初始队列_大小为1000。
get 用于get操作。线程池类型固定为available_processors,队列大小为1000。
bulk 用于bulk操作。线程池类型固定为available_processors,队列大小为200。此池的最大大小为available_processors+1。
snapshot 用于snapshot/restore操作。线程池类型是scaling,保持活动状态为5m,最大值为min(5,available_processors/2)。
warmer 用于segment warm-up(段预热)操作。线程池类型是scaling,保持活动状态为5m,最大值为min(5,available_processors/2)。
refresh 用于refresh操作。线程池类型是scaling,保持活动状态为5m,最大值为min(10,available_processors/2)。
listener 主要用于当侦听器线程设置为true时Java客户端执行操作。线程池类型是scaling,默认最大值min(10,available_processors/2)。
上一篇:面试系列10 es生产集群的部署架构


下一篇:处理 unassigned shard