ElasticSearch详解

什么是ElasticSearch

ElasticSearch是一款非常强大的、基于Lucene的开源搜素及分析引擎;它是一个实时的分布式搜索分析引擎。
它通常被用作全文检索、结构化搜索、分析及这三个功能的组合。

除了搜索,Elastic Stack(ELK)还被广泛运用在大数据近实时分析领域,包括:日志分析、指标监控、信息安全等。
它可以帮助你探索海量结构化、非结构化数据,按需创建可视化报表,对监控数据设置报警阈值,通过使用机器学习,自动识别异常状况。

基本概念

集群(Cluster)

一个集群由一个唯一的名字标识,默认为"elasticsearch"。集群名称非常重要,具有相同集群名的节点才会组成一个集群。集群名称可以在配置文件中指定。
集群由一到多个节点组成,这些节点共同存储了所有数据,并提供了跨节点的联合索引和搜索功能。

节点(Node)

存储集群的数据,参与集群的索引和搜索功能。像集群有名字,节点也有自己的名称,默认在启动时会以一个随机的UUID的前七个字符作为节点的名字,你可以为其指定任意的名字。通过集群名在网络中发现同伴组成集群。一个节点也可是集群。
一个节点(node)就是一个Elasticsearch实例,而一个集群(cluster)由一个或多个节点组成,它们具有相同的 cluster.name ,它们协同工作,分享数据和负载。
当加入新的节点或者删除 一个节点时,集群就会感知到并平衡数据。
集群中一个节点会被选举为主节点(master),它将临时管理集群级别的一些变更,例如新建或删除索引、增加或移除节点等。主节点不参与文档级别的变更或搜索,这意味着在流量增长的时候,该主节点不会成为集群的瓶颈。
任何节点都可以成为主节点。

分片(Shard)

在创建一个索引时可以指定分成多少个分片来存储。每个分片本身也是一个功能完善且独立的"索引",可以被放置在集群的任意节点上。
分片有两个重要作用:

提供了容量水平扩展的能力;
多个分片云允许分布式并发操作,可以大大提高性能;
数据存储在哪个分片和搜索时文档聚合的机制完全由ES负责,这点对于用户来说是透明的。

索引(Index)

Index 有两层意思,作为动词表示建立索引 ,名词表示建立好的索引文件。
索引时具有某些类似特征的文档集合。例如客户索引、产品索引、订单索引等。

索引由一个全小写的名称标识,对数据的添加、删除、更新、搜索等操作,均需指定索引名称。

单个集群中可以创建任意数量的索引。

类型(Type)

之前的版本中,索引和文档中间还有个类型的概念,每个索引下可以建立多个类型,文档存储时需要指定index和type。
从6.0.0开始单个索引中只能有一个类型,7.0.0以后将将不建议使用,8.0.0 以后完全不支持。

类型用于定义文档属性,包括类型、分词器等。

文档(Document)

文档是构建索引的基本单元。例如一条客户数据、一条产品数据、一条订单数据都可以是一个文档。
文档以json格式表示,json是一种普遍使用的互联网数据格式。

备份(Replication)

在网络/云环境中随时可能发生故障,如果能在这些异常(不管什么原因导致分片下线或丢失)发生时有一定的容错机制,那真真是极好的。因此,ES 允许我们为分片生成一个或多个副本。

副本有两个重要作用:

  • 服务高可用:分片异常时,可以通过副本继续提供服务。因此分片副本不会与主分片分配到同一个节点;
  • 扩展性能:由于查询操作可以在分片副本上执行,因此可以提升系统的查询吞吐量;

Elastic Stack生态(ELK)

Beats + LogStash + ElasticSearch + Kibana

Beats

Beats是一个面向轻量型采集器的平台,这些采集器可以从边缘机器向Logstash、ElasticSearch发送数据,它是由Go语言进行开发的,运行效率方面比较快。

LogStash

Logstash是动态数据收集管道,拥有可扩展的插件生态系统,支持从不同来源采集数据,转换数据,并将数据发送到不同的存储库中。
它具有如下特性:

  1. 实时解析和转换数据
  2. 可扩展,具有200多个插件
  3. 可靠性、安全性。LogStash会通过持久化队列来保证至少将运行中的事件送达一次,同时将数据进行传输加密。
  4. 监控

ElasticSearch

ElasticSearch对数据进行搜索、分析和存储,其是基于JSON的分布式搜索和分析引擎,专门为实现水平可扩展性、高可靠性和管理便捷性而设计的。

它的实现原理主要分为以下几个步骤:

  1. 首先用户将数据提交到ElasticSearch数据库中。
  2. 再通过分词控制器将对应的语句分词
  3. 将分词结果及其权重一并存入,以备用户在搜索数据时,根据权重将结果排名和打分,将返回结果呈现给用户。

kibana

kibana实现数据可视化,kibana能够以图表的形式呈现数据,并且具有可扩展的用户界面,可以全方面的配置和管理ElasticSearch。

ElasticSearch文档索引过程

  • 协调节点默认使用文档ID参与计算(也支持routing),以便为路由提供合适的分片。
  • 当分片所在的节点接收到来自协调节点的请求后,会将请求写入到Memory Buffer,然后定时(默认是每隔1秒)写入到FileSystem Cache,这个从Memory Buffer到FileSystem Cache的过程就叫做refresh。
  • 当然在某些情况下,存在Memory Buffer和FileSystem Cache的数据可能会丢失,ES是通过translog的机制来保证数据的可靠性的。其实现机制是接收到请求后,同时也会写入到translog中,当FileSystem Cache中的数据写入到磁盘中时,才会清除掉,这个过程叫做flush。
  • 在flush过程中,内存中的缓冲将被清除,内存被写入一个新段,段的fsync将创建一个新的提交点,并将内容刷新到磁盘,旧的translog将被删除并开始一个新的translog。flush触发的时机是定时触发(默认30分钟)或者translog变得太大(默认为512M)时。

数据持久化过程

write过程

一个新文档过来,会存储在in-memory buffer内存缓存区中,顺便会记录translog(ElasticSearch增加了一个translog,或者叫事务日志,在每一次对ElasticSearch进行操作时均进行了日志记录)

这时候数据还没到segment,是搜不到这个新文档的。数据只有被refresh后,才可以被搜索到。

refresh过程

refresh默认1秒钟,执行一次refresh流程。ES是支持修改这个值的,通过index.refresh_interval设置refresh间隔时间。
refresh流程大致如下:

  1. in-memory buffer中的文档写入到新的segment中,但segment是存储在文件系统的缓存中。此时文档可以被搜索到。
  2. 清空in-memory buffer。注意:translog没有被清空,为了将segment数据写到磁盘
  3. 文档经过refresh后,segment暂时写到文件系统缓存,这样避免了性能IO操作,又可以使文档搜索到。
    refresh执行1秒执行一次,性能损耗太大。一般建议稍微延长这个refresh时间间隔,比如5s。因此ES其实就是准实时,达不到真正的实时。

flush过程

每隔一段时间(例如translog变得越来越大),索引被刷新;一个新的translog被创建,并且一个全量提交被执行。
上个过程中segment在文件系统缓存中,会有意外故障文档丢失。那么,为了保证文档不会丢失,需要将文档写入磁盘。那么文档从文件缓存写入磁盘的过程就是flush.写入磁盘后,清空translog。具体过程如下:

  1. 所有在内存缓冲区的文档都被写入一个新的段
  2. 缓冲区被清空
  3. 一个Commit Point被写入磁盘
  4. 文件系统缓存通过fsync被刷新(flush)
  5. 老的translog被删除

merge过程

由于自动刷新流程每秒会创建一个新的段,这样会导致短时间内的段数量暴增。而段数目太多会带来较大的麻烦。每一个段都会消耗文件句柄、内存和CPU运行周期。更重要的是,每个搜素请求都必须轮流检查每个段;所以段越多,搜索也就越慢。
ElasticSearch通过在后台进行Merge Segment来解决这个问题。小的段被合并到大的段,然后这些大的段再次被合并到更大的段。
当索引的时候,刷新(refresh)操作会创建新的段并将段打开以供搜索使用。合并进程选择一小部分大小相似的段,并且在后台将它们合并到更大的段中。这并不会中断索引和搜索。

一旦合并结束,老的段被删除,新的段被刷新(flush)到了磁盘。

合并大的段需要消耗大量的I/O和CPU资源,如果任其发展会影响搜索性能。ElasticSearch在默认情况下会对合并流程进行资源限制,所以搜欧索仍然有足够的资源很好的执行。

ElasticSearch读取文档过程

单个文档

以下是从主分片或者副本分片检索文档的步骤顺序:

  1. 客户端向Node1发送获取请求。
  2. 节点使用文档的_id来确定文档属于分片0。分片0的副本分片存在于所有的三个节点上。在这种情况下,它将请求转发给Node2。
  3. Node2将文档返回给Node1,然后将文档返回给客户端。

在处理读取请求时,协调节点在每次请求的时候都会通过轮询所有的副本分片来达到负载均衡。

在文档被检索时,已经被索引的文档可能已经存在于主分片上但是还没有复制到副本分片。在这种情况下,副本分片可能会报告文档不存在,但是主分片可能成功返回文档。一旦索引请求成功返回给用户,文档在主分片和副本分片都是可用的。

多个文档

以下是使用单个mget请求取回多个文档所需的步骤顺序:

  1. 客户端向Node1发送mget请求
  2. Node1为每个分片构建多文档获取请求,然后并行转发这些请求到托管在每个所需的主分片或者副本分片的节点上。一旦收到所有答复,Node1构建响应并将其返回给客户端。

文档读取过程详解

所有的搜索系统一般都是两阶段查询,第一阶段查询到匹配的DocID,第二阶段再查询DocID对应的完整文档,这种在ElasticSearch中称为query_then_fetch
  1. 在初始查询阶段,查询会广播到索引中每一个分片拷贝(主分片或者副本分片)。每个分片在本地执行搜索并构建一个匹配文档的大小为from+size的优先队列
    注:在2.搜索的时候是会查询FileSystem Cache的,但是有部分数据还在Memory Buffer,所以搜索是近实时的。

  2. 每个分片返回各自优先队列中所有文档的ID和排序值给协调节点,它合并这些值到自己的优先队列来产生一个全局排序后的结果列表。

  3. 接下来就是取回阶段,协调节点辨别出哪些文档需要被取回并向相关的分片提交多个GET请求。每个分片加载并丰富文档,如果有需要的话,接着返回文档给协调节点,一旦所有的文档都被取回了,协调节点返回结果给客户端。

ES结构化字段查询

ElasticSearch match,match_phrase,term区别

  1. term结构化字段查询,匹配一个值,且输入的值不会被分词器分词。
    比如查询条件是:
{
	"query":{
		"term":{
			"foo":"hello world"
		}
	}
}

那么只有在字段中存储了"hello world"的数据才会被返回,如果在存储时,使用了分词,原有的文本"I say hello world"会被分词进行存储,不会存在"hello world"这整个词,那么不会返回任何值。

  1. match模糊匹配,先对输入进行分词,对分词后的结果进行查询,文档只要包含match查询条件的一部分就会被返回。

  2. query_string语法查询,同match_phase的相同点在于,输入的查询条件会被分词,但是不同之处在于与文档中的数据可以不用和query_string中的查询条件有相同的顺序。

ES分布式一致性原理剖析----节点

ES集群构成

一个ElasticSearch集群是由许多节点构成的,Node可以有不同的类型,通过以下配置,可以产生四种不同类型的Node

conf/elasticsearch.yml
node.master: true/false
node.data: true/false

当node.master为true时,其表示这个node是一个master的候选节点,可以参与选举,在ES的文档中常被称作master-eligible node,类似于MasterCandidate。
ES正常运行时只能有一个master,多于一个时会发生脑裂。

当node.data为true时,这个节点作为一个数据节点,会存储分配在该node上的shard的数据并负责这些shard的写入、查询等。
此外,任何一个集群内的node都可以执行任何请求,其会负责将请求转发给对应的node进行处理。

当node.master和node.data都为false时,这个节点可以作为一个类似proxy的节点,接受请求并进行转发、结果聚合等。

上图是一个ES集群的示意图,其中Node_A是当前集群的Master,Node_B和Node_C是Master的候选节点,
其中Node_A和Node_B同时也是数据节点(DataNode),此外,Node_D是一个单纯的数据节点,Node_E是一个proxy节点。

节点发现

conf/elasticsearch.yml
	discovery.zen.unicast.host:[1.1.1.1 1.1.1.2 1.1.1.3]

这个配置可以看作是,在本节点到每个hosts中的节点建立一条边,当整个句群所有的node形成一个连通图时,所有节点都可以知道集群中有哪些节点,不会形成孤岛。

Master选举

集群中可能会有多个master-eligible node,此时就要进行master选举,保证只有一个当选master。
如果有多个node当选master,则集群会出现脑裂,脑裂会破坏数据的一致性,导致集群行为不可控,产生各种非预期的影响。

为了避免产生脑裂,ES采用了常见的分布式系统思路,保证选举出的master被多数派(quorum)的master-eligible node认可,以此来保证只有一个master。
这个quorum通过以下配置进行配置:

conf/elasticsearch.yml
	discovery.zen.minimum_master_nodes:2

1.master选举谁发起,什么时候发起?

master选举是由master-eligible节点发起,当一个master-eligible节点发现满足以下条件时发起选举:

1.该master-eligible节点的当前状态不是master。
2.该master-eligible节点通过ZenDiscovery模块的ping操作询问其已知的集群其他节点,没有任何节点连接到master。
3.包括本节点在内,当前已有超过minimum_master_node个节点没有连接到master。

总结:当一个节点发现包括自己在内的多数派的master-eligible节点认为集群没有master时,就可以发起master选举。

2.当需要选举master时,选举谁

选举的是排序后的第一个MasterCandidate(即master-eligible node)。

选举源码:

public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
    assert hasEnoughCandidates(candidates);
    List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
    sortedCandidates.sort(MasterCandidate::compare);
    return sortedCandidates.get(0);
}

排序规则:根据节点的clusterStateVersion比较,clusterStateVersion越大,优先级越高。
clusterStateVersion相同时,进入compareNodes,其内部按照节点的ID比较(ID为节点第一次启动时随机生成)。

排序源码:

public static int compare(MasterCandidate c1, MasterCandidate c2) {
    // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
    // list, so if c2 has a higher cluster state version, it needs to come first.
    int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
    if (ret == 0) {
        ret = compareNodes(c1.getNode(), c2.getNode());
    }
    return ret;
}

总结:
1.当clusterVersion越大,优先级越高。这是为了保证新master拥有最新的clusterState(即集群的meta),避免已经commit的meta变更丢失。
因为Master当选后,就会以这个版本的clusterState为基础进行更新。(一个例外是集群全部重启,所有节点都没有meta,需要先选出一个master,然后master再通过持久化的数据进行meta恢复,再进行meta同步)。

2.当clusterVersion相同时,节点的id越小,优先级越高。即总是倾向于选择Id小的Node,这个Id是节点第一次启动时生成的一个随机字符串。
之所以这么设计,应该是为了让选举结果尽可能稳定,不要出现都想当master而选不出来的情况。

3.怎么算选举成功

当一个master-eligible node(我们假设为Node_A)发起一次选举时,它会按照上述策略选出一个它认为的master。

假设Node_A选Node_B当Master:
Node_A会向Node_B发送join请求,那么此时
1.如果Node_B已经成为了Master,Node_B就会把Node_A加入到集群中,然后发布最新的cluster_state,最新的cluster_state就会包含Node_A的信息。相当于一次正常情况的新节点加入。
对于Node_A,等新的cluster_state发布到Node_A的时候,Node_A也就完成了join。
2.如果Node_B在竞选Master,那么Node_B会把这次join当作一张选票。对于这种情况,Node_A会等待一段时间,看Node_B是否能成为真正的Master,直到超时或者有别的Master竞选成功。
3.如果Node_B认为自己不是Master(现在不是,将来也选不上),那么Node_B会拒绝这次join。对于这种情况,Node_A会开启下一轮选举。

假设NOde_A选自己当Master:
此时NodeA会等别的node来join,即等待别的node的选票,当收集到超过半数的选票时,认为自己成为master,然后变更cluster_state中的master node为自己,并向集群发布这一消息。

选举过程源码:

if (transportService.getLocalNode().equals(masterNode)) {
            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
            logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                    new NodeJoinController.ElectionCallback() {
                        @Override
                        public void onElectedAsMaster(ClusterState state) {
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDone(currentThread);
                            }
                        }

                        @Override
                        public void onFailure(Throwable t) {
                            logger.trace("failed while waiting for nodes to join, rejoining", t);
                            synchronized (stateMutex) {
                                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                            }
                        }
                    }

            );
        } else {
            // process any incoming joins (they will fail because we are not the master)
            nodeJoinController.stopElectionContext(masterNode + " elected");

            // send join request
            final boolean success = joinElectedMaster(masterNode);

            synchronized (stateMutex) {
                if (success) {
                    DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
                    if (currentMasterNode == null) {
                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                        // a valid master.
                        logger.debug("no master node is set, despite of join request completing. retrying pings.");
                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                    } else if (currentMasterNode.equals(masterNode) == false) {
                        // update cluster state
                        joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
                    }

                    joinThreadControl.markThreadAsDone(currentThread);
                } else {
                    // failed to join. Try again...
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                }
            }
        }

假如集群中有3个master-eligible node,分别为Node_A、 Node_B、 Node_C, 选举优先级也分别为Node_A、Node_B、Node_C。
三个node都认为当前没有master,于是都各自发起选举,选举结果都为Node_A(因为选举时按照优先级排序,如上文所述)。
于是Node_A开始等join(选票),Node_B、Node_C都向Node_A发送join,当Node_A接收到一次join时,加上它自己的一票,就获得了两票了(超过半数),于是Node_A成为Master。
此时cluster_state(集群状态)中包含两个节点,当Node_A再收到另一个节点的join时,cluster_state包含全部三个节点。

4.选举怎么保证不脑裂
基本原则还是多数派的策略,如果必须得到多数派的认可才能成为Master,那么显然不可能有两个Master都得到多数派的认可。

上述流程中,master候选人需要等待多数派节点进行join后才能真正成为master,就是为了保证这个master得到了多数派的认可。但是我这里想说的是,上述流程在绝大部份场景下没问题,听上去也非常合理,但是却是有bug的。

因为上述流程并没有限制在选举过程中,一个Node只能投一票,那么什么场景下会投两票呢?比如Node_B投Node_A一票,但是Node_A迟迟不成为Master,Node_B等不及了发起了下一轮选主,这时候发现集群里多了个Node_0,Node_0优先级比Node_A还高,那Node_B肯定就改投Node_0了。假设Node_0和Node_A都处在等选票的环节,那显然这时候Node_B其实发挥了两票的作用,而且投给了不同的人。

那么这种问题应该怎么解决呢,比如raft算法中就引入了选举周期(term)的概念,保证了每个选举周期中每个成员只能投一票,如果需要再投就会进入下一个选举周期,term+1。假如最后出现两个节点都认为自己是master,那么肯定有一个term要大于另一个的term,而且因为两个term都收集到了多数派的选票,所以多数节点的term是较大的那个,保证了term小的master不可能commit任何状态变更(commit需要多数派节点先持久化日志成功,由于有term检测,不可能达到多数派持久化条件)。这就保证了集群的状态变更总是一致的。

而ES目前(6.2版本)并没有解决这个问题,构造类似场景的测试case可以看到会选出两个master,两个node都认为自己是master,向全集群发布状态变更,这个发布也是两阶段的,先保证多数派节点“接受”这次变更,然后再要求全部节点commit这次变更。很不幸,目前两个master可能都完成第一个阶段,进入commit阶段,导致节点间状态出现不一致,而在raft中这是不可能的。那么为什么都能完成第一个阶段呢,因为第一个阶段ES只是将新的cluster_state做简单的检查后放入内存队列,如果当前cluster_state的master为空,不会对新的cluster_state中的master做检查,即在接受了Node_A成为master的cluster_state后(还未commit),还可以继续接受Node_B成为cluster_state。这就使Node_A和Node_B都能达到commit条件,发起commit命令,从而将集群状态引向不一致。当然,这种脑裂很快会自动恢复,因为不一致发生后某个master再次发布cluster_state时就会发现无法达到多数派条件,或者是发现它的follower并不构成多数派而自动降级为candidate等。

这里要表达的是,ES的ZenDiscovery模块与成熟的一致性方案相比,在某些特殊场景下存在缺陷,下面讲ES的meta变更流程时也会分析其他的ES无法满足一致性的场景。

错误检测

1.MasterFaultDetection与NodesFaultDetection

这里的错误检测可以理解为类似心跳的机制,有两类错误检测,一类是Master定期检测集群内其他的Node,另一类是集群内其他的Node定期检测当前集群的Master。检查的方法就是定期执行ping请求。

如果Master检测到某个Node连不上了,会执行removeNode的操作,将节点从cluster_state中移除,并发布新的cluster_state。当各个模块apply新的cluster_state时,就会执行一些恢复,比如选择新的primaryShard或者replica,执行数据复制等。

如果某个Node发现Master连不上了,会清空pending在内存中还未commit的new cluster_state,然后发起rejoin,重新加入集群(如果达到选举条件则触发新master选举)。

2.rejoin

除了上述两种情况,还有一种情况是Master发现自己已经不满足多数派条件(>=minimumMasterNodes)了,需要主动退出master状态(退出master并执行rejoin)以避免脑裂的发生,那么master如何发现自己需要rejoin呢?

当有节点连不上时,会执行removeNode。在执行removeNode时判断剩余的Node是否满足多数派条件,如果不满足,则执行rejoin。

if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
                final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
                rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
                                                         masterNodes, electMasterService.minimumMasterNodes()));
                return resultBuilder.build(currentState);
            } else {
                return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
            }

在publish新的cluster_state时,分为send阶段和commit阶段,send阶段要求多数派必须成功,然后再进行commit。如果在send阶段没有实现多数派返回成功,然后可能是有了新的master或者是无法连接到多数派个节点 等,则master需要执行rejoin。

try {
            publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
        } catch (FailedToCommitClusterStateException t) {
            // cluster service logs a WARN message
            logger.debug("failed to publish cluster state version [{}](not enough nodes acknowledged, min master nodes [{}])",
                newState.version(), electMaster.minimumMasterNodes());

            synchronized (stateMutex) {
                pendingStatesQueue.failAllStatesAndClear(
                    new ElasticsearchException("failed to publish cluster state"));

                rejoin("zen-disco-failed-to-publish");
            }
            throw t;
        }

在对其他节点进行定期的ping时,发现有其他节点也是master,此时会比较本节点与另一个master节点的cluster_state的version,谁的version大谁成为master,version小的执行rejoin。

if (otherClusterStateVersion > localClusterState.version()) {
            rejoin("zen-disco-discovered another master with a new cluster_state [" + otherMaster + "][" + reason + "]");
        } else {
            // TODO: do this outside mutex
            logger.warn("discovered [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster ([{}])", otherMaster, otherMaster, reason);
            try {
                // make sure we're connected to this node (connect to node does nothing if we're already connected)
                // since the network connections are asymmetric, it may be that we received a state but have disconnected from the node
                // in the past (after a master failure, for example)
                transportService.connectToNode(otherMaster);
                transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(localClusterState.nodes().getLocalNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

                    @Override
                    public void handleException(TransportException exp) {
                        logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), exp);
                    }
                });
            } catch (Exception e) {
                logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), e);
            }
        }

集群扩缩容

1.扩容DataNode

假设一个ES集群存储或者计算资源不够了,我们需要进行扩容,这里我们只针对DataNode,即配置为:

conf/elasticsearch.yml:
    node.master: false
    node.data: true

然后需要配置集群名、节点名等其他配置,为了让该节点能够加入集群,我们把discovery.zen.ping.unicast.hosts配置为集群中的master-eligible node。

conf/elasticsearch.yml:
    cluster.name: es-cluster
    node.name: node_Z
    discovery.zen.ping.unicast.hosts: ["x.x.x.x", "x.x.x.y", "x.x.x.z"]

然后启动节点,节点会自动加入到集群中,集群会自动进行rebalance,或者通过reroute api进行手动操作。

2.缩容DataNode

假设一个ES集群使用的机器数太多了,需要缩容,我们怎么安全的操作来保证数据安全,并且不影响可用性呢?

首先,我们选择需要缩容的节点,注意本节只针对DataNode的缩容,MasterNode缩容涉及到更复杂的问题,下面再讲。

然后,我们需要把这个Node上的Shards迁移到其他节点上,方法是先设置allocation规则,禁止分配Shard到要缩容的机器上,然后让集群进行rebalance。

PUT _cluster/settings
{
  "transient" : {
    "cluster.routing.allocation.exclude._ip" : "10.0.0.1"
  }
}

等这个节点上的数据全部迁移完成后,节点可以安全下线。

3.扩容MasterNode

假如我们想扩容一个MasterNode(master-eligible node), 那么有个需要考虑的问题是,上面提到为了避免脑裂,ES是采用多数派的策略,需要配置一个quorum数:

conf/elasticsearch.yml:
    discovery.zen.minimum_master_nodes: 2

假设之前3个master-eligible node,我们可以配置quorum为2,如果扩容到4个master-eligible node,那么quorum就要提高到3。

所以我们应该先把discovery.zen.minimum_master_nodes这个配置改成3,再扩容master,更改这个配置可以通过API的方式:

curl -XPUT localhost:9200/_cluster/settings -d '{
    "persistent" : {
        "discovery.zen.minimum_master_nodes" : 3
    }
}'

这个API发送给当前集群的master,然后新的值立即生效,然后master会把这个配置持久化到cluster meta中,之后所有节点都会以这个配置为准。

但是这种方式有个问题在于,配置文件中配置的值和cluster meta中的值很可能出现不一致,不一致很容易导致一些奇怪的问题,比如说集群重启后,在恢复cluster meta前就需要进行master选举,此时只可能拿配置中的值,拿不到cluster meta中的值,但是cluster meta恢复后,又需要以cluster meta中的值为准,这中间肯定存在一些正确性相关的边界case。

总之,动master节点以及相关的配置一定要谨慎,master配置错误很有可能导致脑裂甚至数据写坏、数据丢失等场景。

4.缩容MasterNode

缩容MasterNode与扩容跟扩容是相反的流程,我们需要先把节点缩下来,再把quorum数调下来,不再详细描述。

使用Zookeeper实现ES的上述功能

1.节点发现:每个节点的配置文件中配置一下Zookeeper服务器的地址,节点启动后到Zookeeper中某个目录中注册一个临时的znode。
当前集群的master监听这个目录的子节点增减事件,当发现有新节点时,将新节点加入集群。

2.master选举:当一个master-eligible node启动时,都尝试到固定位置注册一个名为master的临时znode,如果注册成功,即成为master,如果注册失败则监听这个znode的变化。
当master出现故障时,由于是临时znode,会自动删除,这时集群中其他的master-eligible node就会尝试再次注册。使用Zookeeper后其实是把选master变成了抢master。

3.错误检测:由于节点的znnode和master的znode都是临时znode,如果节点故障,会与Zookeeper断开session,znode自动删除。集群的master只需要监听znode变更事件即可,如果master故障,其他的候选master则会监听到master znode被删除的事件,尝试成为新的master。

4.集群扩缩容:扩缩容将不再需要考虑minimum_master_nodes配置的问题,会变得更容易。

ES分布式一致性原理剖析----Meta

Master如何管理集群

既然要管理集群,那么Master节点必然需要以某种方式通知其他节点,从而让其他节点执行相应的动作,来完成某些事情。
比如建立一个新的Index就需要将其Shard分配在某些节点上,在这些节点上需要创建出对应Shard的目录,并在内存中创建对应Shard的一些结构等。

在ES中,Master节点是通过发布ClusterState来通知其他节点的。Master会将新的ClusterState发布给其他的所有节点,当节点收到新的ClusterState后,会把新的ClusterState发给相关的各个模块,各个模块根据新的ClusterState判断是否要做什么事情,比如创建Shard等。即这是一种通过Meta数据来驱动各个模块工作的方式。

在Master进行Meta变更并通知所有节点的过程中,需要考虑Meta变更的一致性问题,假如这个过程中Master挂掉了,那么可能只有部分节点按照新的Meta执行了操作。当选举出新的Master后,需要保证所有节点都要按照最新的Meta执行操作,不能回退,因为已经有节点按照新的Meta执行操作了,再回退就会导致不一致。

ES中只要新Meta在一个节点上被commit,那么就会开始执行相应的操作。因此我们要保证一旦新Meta在某个节点上被commit,此后无论谁是master,都要基于这个commit来产生更新的meta,否则就可能产生不一致。

上一篇:CentOS6.x 64位下mysql5.5.25安装图解教程


下一篇:Python3 读、写Excel文件