Distributed Systems-Paxos

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/feilengcui008/article/details/50347281

本文主要提炼了《Paxos Make Simple》中的一些主要观点,然后加上自己的理解,使用通俗的语言尝试做一些解释。

  • 关于Paxos算法背景和一致性相关问题可以参见原论文

  • 算法涉及的主要对象

    • action 对一条记录(某个变量)的一次操作(这一点只是本人便于后面理解加上的)
      • 这里选用操作这个词,而不是值,因为一个在对某个变量达成某个值的共识前可能已经经过多个更新操作,所以为了区别,使用操作作为每次proposal的对象,而操作的值代表具体的修改动作,而且这也算是状态机复制(SMR)的一个基本组成单元,个人感觉更易于理解。比如action(log_id, log_content),log_id全局标识了此action的唯一性,log_content通常是针对某条记录的修改,可看做action的值。
    • proposer
      • 发起proposal的机器,注意在Paxos算法中允许多台机器同时发起proposal,并且有可能由于并发获取”需要达成一致的下一操作(action)”,从而使得不同的proposal针对同一个”需要达成一致的下一操作”达成共识,但是算法保证了其达成共识的action的值相同。
    • acceptor
      • 接受来自proposer的proposal,并根据对于proposer的prepare和accept消息做出响应。
    • learner
      • 从错误中恢复的机器,需要重新学习出错之前最后一次accpet的proposal id之后的所有proposal
  • Paxos instance

    • 针对某个”需要达成一致的操作(action)”运行一次Paxos算法的完整过程。
  • 算法推导逻辑

    • P0. To ensure data integrity under fault tolerence, a proposal is succeeded only when more than half machines accepted the proposal.
    • notice: P1[a] stands for requirement and algorithm for acceptors; P2[abc] stands for requirement and algorithm for proposer.
    • P1. An acceptor must accept the first proposal that it receives
      => problem : maybe two proposal are proposed at the same time and two less-than-half machine quorums receive separately these two different proposals, then these two proposals can not be succeeded.
      => so we must allow each acceptor to receive multiple proposals for the same value
      => so we must give each proposal a global unique and increasing id
    • P1a. An acceptor can accept a proposal numbered n iff it has not responed to a prepare request having a number greater than n
      => P1a -> P1 because this can ensure acceptor do not receive the before proposals which arrive later
      => so we ignore these proposal whose id is <= accepted and prepared id of acceptors
    • P2. If a proposal with value v is chosen, then every higher-numbered proposal that is chosen has value v
      => this is because we must ensure there is only a specific value chosen for a specific paxos instance which may contains multiple proposals
    • P2a. if a proposal with value v is chosen, then every higher-numbered proposal accepted by any acceptor has value v
      => notice: P2a -> P2
    • P2b. if a proposal with value v is chosen, then every higher-numbered proposal issued by any proposer has value v
      => notice: P2b -> P2a -> P2
    • P2c. for any v and n, if a proposal with value v and number n is issued, then there is a set S consisting of a majority of acceptors such that either

      • (a) no acceptor in S has accepted any proposal numbered less than n
      • (b) v is set to the value of the highest-numbered proposal among all proposals numbered less than n and accepted by the acceptors in S
        => notice: P2c -> P2b -> P2a -> P2
        => this is the specific algorithm for proposer in prepare phase
    • 总结

      • 根据上面的推导,核心的就两点,P1a和P2c,P1a规定了acceptor的行为,P2c规定了proposer的行为,由于P2c的需求,决定了需要有prepare阶段,这阶段主要是为了accept阶段为当前proposal设置正确的值。
  • 算法基本流程
    论文上主要有prepare和accept两个阶段,省略了选action(值)和选proposal id的阶段

    • 0.数据结构
      • 每台机器需要记录最大accpeted的proposal id(latest_accepted_id)和对应的accepted的操作(latest_accepted_action)以及最大promised的proposal id(latest_promised_id),这些数据需要刷盘。
    • 1.选择需要达成一致的操作
      • 来自客户端的请求,比如 Action{write A 12} => 通常这作为需要达成的某个操作的值,还需要一个全局唯一的id标识这个操作,比如对于某个log记录达成一致,需要寻找下一次需要记录的log id,这就需要向其他节点询问其记录的最近log id,并取最大值+1作为下一次需要达成一致的”记录日志这个操作(action)”的action(log) id。而这个过程可能会产生并发问题,即不同的机器可能针对同一个log id发起proposal,这一点后面阶段保证了一旦达成了proposal,则后续所有proposal都以相同的操作(值)达成。
    • 2.选择proposal id
      • proposal id需要保证全局唯一递增(这个后面补充)。
    • 3.prepare

      • 假设2中选择的proposal id为n,proposer发送prepare(n)给大多数机器
        • 对于acceptor,如果(n > latest_promised_id) /\ (n > latest_promised_id)
          • 如果acceptor已经有latest_accepted_id(说明之前对于同一个操作已经达到proposal了),则返回对应于latest_accepted_id的操作的值,为了accept阶段保证当前的proposal和以前已经达成的proposal最终操作值一样。
          • 如果acceptor没有latest_accepted_id(说明之前还未达成proposal),则不用返回值(accept阶段可以使用任意proposed的值) 
          • 令latest_accepted_id = latest_promised_id = n,并保证不再接受proposal id小于latest_promised_id的proposal
        • 否则acceptor返回拒绝,重新开始算法。
    • 4.accept

      • proposer收到大多数的机器对prepare的回复
        • 如果返回消息中latest_accepted_action集合不为空,则将当前proposal的action设置为对应于最大latest_accepted_id的latest_accepted_action,发送accept(n, action)消息
        • 如果返回消息中latest_accepted_action集合为空,则直接使用当前proposal的action(也就是论文中所说的any value),发送accept(n, action)消息 
      • 如果acceptor收到accept(n, action)消息时
        • latest_promised_id > n(说明有更新的),则放弃当前proposal,重新进入算法。
        • 否则接受proposal,完成此次proposal
      • 如果proposer收到大多数acceptor的成功消息,则成功返回给客户端,否则重新进入算法,由于liveness requirement,一个proposed的value必须eventually chosen,所以要么客户端返回成功,要么客户端请求超时,对于超时,客户端需要重新发起读的请求,此时可能已经成功了,否则继续重新发超时请求。
  • 几点辅助理解的说明

    • 多数是为了保证至少会有一台机器记录了上次达成的proposal的值,这样保证在不多于n/2台机器挂掉的条件下,在每次proposal的过程中,至少有一台机器有前面所有的proposal值的记录,从而保证所有的数据的完整。
    • 一轮paxos instance 是针对某个变量的一次操作的,而不是同一个变量。比如针对同一变量的一次操作打一次log,而这个log id应当是唯一的,而且针对这条log可能会有多次proposal,但是只要有一次proposal已经达成,那么针对这条log的proposal只能使用相同的log值更新(这也是为什么在prepare返回阶段,如果有一个acceptor已经达成过proposal,则返回其值替换当前proposal值)。
    • 对某个唯一的记录比如log或者变量的某次操作达成一致,那么proposer在发起proposal之前必定要到某个地方取下次需要达成一致的值,比如下一条日志记录的id,某个变量的下一个版本(某个变量的下一次操作)。而由于proposer可能有多个,那么在并发发起proposal时,不同的proposal可能会针对相同的某次操作,这时对于后达成的proposal来说,只能将其propose的值换为已经达成的proposal的值,而这个过程是通过prepare阶段accptor返回的结果集是否空来判断的。如果结果集不为空,说明针对此次操作,之前已经达成了一致,则后续proposal只能使用相同值;如果为空,那么可以使用此次proposal的值(也就是论文中所说的any value)。另外,在accept阶段,如果有accptor的最小promise id大于当前proposal id,那么说明已经有更更大proposal id的proposal先到达了(此时不管之前是否已经达成一致),此时需要放弃当前次的proposal

下面给一个一轮Paxos算法的伪代码:

# 一轮Paxos协议的流程
# 此处为了清晰将proposer和acceptor的逻辑分开写了,实际上原论文中一个server既可以做proposer也可以做acceptor

# 对每一个值(比如logid唯一的一条日志)可能会同时发起写请求(比如两个客户端并发访问qorumn里面的两个server)
# 所以此时这两个server都是proposer,针对同一条logid发起不同ballot number的决议请求。此时,如果是ballot number 
# 小的那个决议请求先达到多数派,那么应该保证后到的ballot number的请求使用相同的值。所以Acceptor需要做的事情如下:
# prepare phase:
# 1.如果请求的req_ballot_id比当前server已经应答过的last_ballot_id小,此时直接忽略,因为有更新的投票决议。
# 2.如果请求的req_ballot_id大于等于当前server已经应答过的last_ballot_id,此时使用req_ballot_id更新last_ballot_id,并返回last_voted_value,注意这个可以是空,说明要么是当前这个server以前未参与此值的多数派投票,要么是此值还未达成过多数派投票。
# commit phase:
# 1.如果commit消息数据中的ballot_id与last_ballot_id不同,则放弃
# 2.否则更新相应的值,并写日志
class Acceptor(object):
    last_ballot_id = None #我正在等着
    last_voted_value = None
    last_voted_ballot_id = None 
    servers = []
    def handleProposalRequest(self, reqData):
        req_ballot_id = reqData.ballot_id
        if req_ballot_id < self.last_ballot_id:
            pass # return nothing 
        else:
            self.last_ballot_id = req_ballot_id 
            return (self.last_ballot_id, self.last_voted_value, self.last_voted_ballot_id)

    def handleCommitRequest(self, reqData):
        commit_ballot_id = reqData.last_sent_ballot_id
        client_value = reqData.client_req_value 
        if commit_ballot_id != self.last_ballot_id:
            pass 
        self.last_ballot_id = self.last_voted_ballot_id = reqData.last_sent_ballot_id
        self.last_voted_value = client_value 
        writelog((self.last_ballot_id, self.last_voted_ballot_id, self.last_voted_value))

    def writelog(self, data):
        pass 

# 1.如果有acceptor接收到其他proposal发出的更大ballot_id的决议请求,那么放弃此次决议 
# 2.如果为达到多数派,放弃此次决议
# 3.如果acceptor中返回的last_voted_value不为空,则将当前proposal的值设置为相同值,进入commit阶段,否则直接用client_req_value作为值进入commit
class Proposer(object):
    last_sent_ballot_id = None 
    client_req_value = None 
    res_data = []
    servers = []
    quorumn_number = 5

    def sendRequest(self, data):
        pass 
    def sendProposal(self):
        self.last_sent_ballot_id += 1
        reqData.ballot_id = self.last_sent_ballot_id
        for i in self.servers:
            sendRequest(i, reqData)

    def readData(server):
        pass
    def handleEachProposalResponse(self, server):
        resData = self.readData(server)
        self.res_data.append((resData.last_ballot_id, resData.last_voted_value, resData.last_voted_ballot_id))
    def handleProposalResponse(self):
        for i in servers:
            handleEachProposalResponse(servers[i])
        res_ballot_id = max([i[0] for i in self.res_data])
        if res_ballot_id > self.last_sent_ballot_id:
            pass # maybe another proposer has finished a proposal for the value, what should we give back to client? 
        elif len(res_data) < (self.quorumn_number / 2 + 1):
            pass # failed, maybe timeout due to network or server crash?
        else:
            voted_data = [(i[1], i[2]) for i in self.res_data]
            voted_data.sort()
            if voted_data[0][1] is not None:
                self.client_req_value = voted_data[0][1] 
            self.commit((self.last_sent_ballot_id, self.client_req_value))

    def commit(self, reqData):
        pass 

上一篇:java单例的几种实现方法:


下一篇:【高并发简单解决方案】redis队列缓存 + mysql 批量入库 + php离线整合