分布式系统
概要
分布式系统的核心是通过网络来协调,共同完成一致任务的一些计算机。我们在本课程中将会重点介绍一些案例,包括:大型网站的储存系统、大数据运算,如 MapReduce、以及一些更为奇妙的技术,比如点对点的文件共享。这是我们学习过程中的一些例子。分布式计算之所以如此重要的原因是,许多重要的基础设施都是在它之上建立的,它们需要多台计算机或者说本质上需要多台物理隔离的计算机。
如果你可以在一台计算机上解决,而不需要分布式系统,那你就应该用一台计算机解决问题。在选择使用分布式系统解决问题前,你应该要充分尝试别的思路
人们使用大量的相互协作的计算机驱动力是:
- 人们需要获得更高的计算性能。可以这么理解这一点,(大量的计算机意味着)大量的并行运算,大量CPU、大量内存、以及大量磁盘在并行的运行。
- 另一个人们构建分布式系统的原因是,它可以提供容错(tolerate faults)。比如两台计算机运行完全相同的任务,其中一台发生故障,可以切换到另一台。
- 第三个原因是,一些问题天然在空间上是分布的。例如银行转账,我们假设银行A在纽约有一台服务器,银行B在伦敦有一台服务器,这就需要一种两者之间协调的方法。所以,有一些天然的原因导致系统是物理分布的。
- 最后一个原因是,人们构建分布式系统来达成一些安全的目标。比如有一些代码并不被信任,但是你又需要和它进行交互,这些代码不会立即表现的恶意或者出现bug。你不会想要信任这些代码,所以你或许想要将代码分散在多处运行,这样你的代码在另一台计算机运行,我的代码在我的计算机上运行,我们通过一些特定的网络协议通信。所以,我们可能会担心安全问题,我们把系统分成多个的计算机,这样可以限制出错域。
这门课程中,主要会讨论前两点:性能和容错。剩下两点我们会通过对某些案例的研究来学习。
- 因为系统中存在很多部分,这些部分又在并发执行,你会遇到并发编程和各种复杂交互所带来的问题,以及时间依赖的问题(比如同步,异步)。这让分布式系统变得很难。
- 另一个导致分布式系统很难的原因是,分布式系统有多个组成部分,再加上计算机网络,你会会遇到一些意想不到的故障。如果你只有一台计算机,那么它通常要么是工作,要么是故障或者没电,总的来说,要么是在工作,要么是没有工作。而由多台计算机组成的分布式系统,可能会有一部分组件在工作,而另一部分组件停止运行,或者这些计算机都在正常运行,但是网络中断了或者不稳定。所以,局部错误也是分布式系统很难的原因。
- 最后一个导致分布式系统很难的原因是,人们设计分布式系统的根本原因通常是为了获得更高的性能,比如说一千台计算机或者一千个磁盘臂达到的性能。但是实际上一千台机器到底有多少性能是一个棘手的问题,这里有很多难点。所以通常需要倍加小心地设计才能让系统实际达到你期望的性能。
实验组成部分
-
MapReduce
-
Raft
- 这是一个理论上通过复制来让系统容错的算法,具体是通过复制和出现故障时自动切换来实现
-
KV server
- 需要使用你的Raft算法实现来建立一个可以容错的KV服务
-
分片式KV服务-sharded key value service sharding
- 需要把你写的KV服务器分发到一系列的独立集群中,这样你会切分你的KV服务,并通过运行这些独立的副本集群进行加速。同时,你也要负责将不同的数据块在不同的服务器之间搬迁,并确保数据完整。这里我们通常称之为分片式KV服务。分片是指我们将数据在多个服务器上做了分区,来实现并行的加速。
分布式系统的抽象和实现工具(Abstraction and Implementation)
这门课程是有关应用的基础架构的。所以,贯穿整个课程,我会以分离的方式介绍:第三方的应用程序,和这些应用程序所基于的,我们课程中主要介绍的一些基础架构。基础架构的类型主要是存储,通信(网络)和计算。
最关注的是存储,因为这是一个定义明确且有用的抽象概念,并且通常比较直观。人们知道如何构建和使用储存系统,知道如何去构建一种多副本,容错的,高性能分布式存储实现。
让用户认为这个分布式上不是一个分布式。
(让用户认为一个多线程的用户是一个单线程的任务)
RPC,Threads,concurrent,
- RPC(Remote Procedure Call)。RPC的目标就是掩盖我们正在不可靠网络上通信的事实。
- 另一个我们会经常看到的实现相关的内容就是线程。这是一种编程技术,使得我们可以利用多核心计算机。对于本课程而言,更重要的是,线程提供了一种结构化的并发操作方式,这样,从程序员角度来说可以简化并发操作。
- 因为我们会经常用到线程,我们需要在实现的层面上,花费一定的时间来考虑并发控制,比如锁。
可扩展性(Scalability)
另一个就是性能。
构建分布式系统的目的是为了获取人们常常提到的可扩展的加速。所以,我们这里追求的是可扩展性(Scalability)。而我这里说的可扩展或者可扩展性指的是,如果我用一台计算机解决了一些问题,当我买了第二台计算机,我只需要一半的时间就可以解决这些问题,或者说每分钟可以解决两倍数量的问题。两台计算机构成的系统如果有两倍性能或者吞吐,就是可扩展性。
当你只有1-2个用户时,一台计算机就可以运行web服务器和数据,或者一台计算机运行web服务器,一台计算机运行数据库。但是有可能你的网站一夜之间就火了起来,你发现可能有一亿人要登录你的网站。你该怎么修改你的网站,使它能够在一台计算机上支持一亿个用户?你可以花费大量时间极致优化你的网站,但是很显然你没有那个时间。所以,为了提升性能,你要做的第一件事情就是购买更多的服务器,然后把不同用户分到不同服务器上。这样,一部分用户可以去访问第一台web服务器,另一部分去访问第二台web服务器。所有的用户最终都需要看到相同的数据。所以,所有的web服务器都与后端数据库通信。这样,很长一段时间你都可以通过添加web服务器来并行的提高web服务器的代码效率。
很可能在某个时间点你有了10台,20台,甚至100台web服务器,它们都在和同一个数据库通信。现在,数据库突然成为了瓶颈,并且增加更多的web服务器都无济于事了。所以就可能需要很多个DB服务器了,但是很难将多个DB进行同步。
可用性(Availability)
大型分布式系统中有一个大问题,那就是一些很罕见的问题会被放大。例如在我们的1000台计算机的集群中,总是有故障,要么是机器故障,要么是运行出错,要么是运行缓慢,要么是执行错误的任务。
大规模系统会将一些几乎不可能并且你不需要考虑的问题,变成一个持续不断的问题。
所以,因为错误总会发生,必须要在设计时就考虑,系统能够屏蔽错误,或者说能够在出错时继续运行。同时,因为我们需要为第三方应用开发人员提供方便的抽象接口,我们的确也需要构建这样一种基础架构,它能够尽可能多的对应用开发人员屏蔽和掩盖错误。这样,应用开发人员就不需要处理各种各样的可能发生的错误。
对于容错,有很多不同的概念可以表述。这些表述中,有一个共同的思想就是可用性(Availability)。某些系统经过精心的设计,这样在特定的错误类型下,系统仍然能够正常运行,仍然可以像没有出现错误一样,为你提供完整的服务。
某些系统通过这种方式提供可用性。比如,你构建了一个有两个拷贝的多副本系统,其中一个故障了,另一个还能运行。当然如果两个副本都故障了,你的系统就不再有可用性。所以,可用系统通常是指,在特定的故障范围内,系统仍然能够提供服务,系统仍然是可用的。如果出现了更多的故障,系统将不再可用。
除了可用性之外,另一种容错特性是自我可恢复性(recoverability)—如果出现了问题,服务会停止工作,不再响应请求,之后有人来修复,并且在修复之后系统仍然可以正常运行,就像没有出现过问题一样。这是一个比可用性更弱的需求,因为在出现故障到故障组件被修复期间,系统将会完全停止工作。但是修复之后,系统又可以完全正确的重新运行,所以可恢复性是一个重要的需求。
为了实现这些特性,有很多工具。其中最重要的有两个:
- 一个是非易失存储(non-volatile storage,类似于硬盘)。这样当出现类似电源故障,甚至整个机房的电源都故障时,我们可以使用非易失存储,比如硬盘,闪存,SSD之类的。我们可以存放一些checkpoint或者系统状态的log在这些存储中,这样当备用电源恢复或者某人修好了电力供给,我们还是可以从硬盘中读出系统最新的状态,并从那个状态继续运行。所以,这里的一个工具是非易失存储。因为更新非易失存储是代价很高的操作,所以相应的出现了很多非易失存储的管理工具。同时构建一个高性能,容错的系统,聪明的做法是避免频繁的写入非易失存储。在过去,甚至对于今天的一个3GHZ的处理器,写入一个非易失存储意味着移动磁盘臂并等待磁碟旋转,这两个过程都非常缓慢。有了闪存会好很多,但是为了获取好的性能,仍然需要许多思考。
- 对于容错的另一个重要工具是复制(replication),不过,管理复制的多副本系统会有些棘手。任何一个多副本系统中,都会有一个关键的问题,比如说,我们有两台服务器,它们本来应该是有着相同的系统状态,现在的关键问题在于,这两个副本总是会意外的偏离同步的状态,而不再互为副本。对于任何一种使用复制实现容错的系统,我们都面临这个问题。lab2都是通过管理多副本来实现容错的系统,你将会看到这里究竟有多复杂。
一致性(Consistency)
假设我们在构建一个分布式存储系统,并且这是一个KV服务。这个KV服务只支持两种操作,其中一个是put操作会将一个value存入一个key;另一个是get操作会取出key对应的value。整体表现就像是一个大的key-value表单。
一致性就是用来定义操作行为的概念。因为,从性能和容错的角度来说,我们通常会有多个副本。在一个非分布式系统中,你通常只有一个服务器,一个表单。虽然不是绝对,但是通常来说对于put/get的行为不会有歧义。直观上来说,put就是更新这个表单,get就是从表单中获取当前表单中存储的数据。
但是在一个分布式系统中,由于复制或者缓存,数据可能存在于多个副本当中,于是就有了多个不同版本的key-value对。现在某个客户端发送了一个put请求,并希望将key 1改成值21。这个put请求发送给了第一台服务器,之后会发送给第二台服务器(某种策略),因为相同的put请求需要发送给两个副本,这样这两个副本才能保持同步。但是就在客户端准备给第二台服务器发送相同请求时,这个客户端故障了。这时候一个是20,一个是21,这是一个很不好的结果。
现在某人通过get读取key为1的值,那么他可能获得21,也可能获得20,取决于get请求发送到了哪个服务器。即使规定了总是把请求先发送给第一个服务器,那么我们在构建容错系统时,如果第一台服务器故障了,请求也会发给第二台服务器。
强一致(Strong Consistency): get请求可以得到最近一次完成的put请求写入的值。
弱一致 (Weak Consistency):不保证get请求可以得到最近一次完成的put请求写入的值。
事实上,构建一个弱一致的系统也是非常有用的。
人们对于弱一致感兴趣的原因是,虽然强一致可以确保get获取的是最新的数据,但是实现这一点的代价非常高。如果我们要实现强一致,简单的方法就是同时读两个副本,如果有多个副本就读取所有的副本,并使用最近一次写入的数据。但是这样的代价很高,因为需要大量的通信才能得到一个数据,而且失去了分布式的本意,工作量并没有分摊。所以,为了尽可能的避免通信,尤其当副本相隔的很远的时候,人们会构建弱一致系统,并允许读取出旧的数据。当然,为了让弱一致更有实际意义,人们还会定义更多的规则。
MapReduce
需要一种框架,使得普通工程师也可以很容易的完成并运行大规模的分布式运算。这就是MapReduce出现的背景。
MapReduce的思想是,应用程序设计人员和分布式运算的使用者,只需要写简单的Map函数和Reduce函数,而不需要知道任何有关分布式的事情,MapReduce框架会处理剩下的事情。
抽象来看,MapReduce假设启动的时候,有一些输入,这些输入被分割成大量的不同的文件或者数据块。
MapReduce启动时,会查找Map函数。之后,MapReduce框架会为每个输入文件运行Map函数。这里很明显有一些可以并行运算的地方,比如说可以并行运行多个只关注输入和输出的Map函数。
Map函数会输出key-value对,其中key是单词,而value是1。我们对所有的输入文件都运行了Map函数,并得到了论文中称之为中间输出(intermediate output),也就是每个Map函数输出的key-value对。
运算的第二阶段是运行Reduce函数。MapReduce框架会收集所有Map函数输出的每一个单词的统计。比如说,MapReduce框架会先收集每一个Map函数输出的key为a的key-value对。收集了之后,会将它们提交给Reduce函数。
所以第一个Reduce输出a=2,第二个Reduce输出b=2,第三个Reduce输出c=1。
这就是一个典型的MapReduce Job。从整体来看,为了保证完整性,有一些术语要介绍一下:
- Job。整个MapReduce计算称为Job。
- Task。每一次MapReduce调用称为Task。
所以,对于一个完整的MapReduce Job,它由一些Map Task和一些Reduce Task组成。所以这是一个单词计数器的例子,它解释了MapReduce的基本工作方式。
Map函数和Reduce函数
Map函数使用一个key和一个value作为参数。我们这里说的函数是由普通编程语言编写(c++,Java)
入参中,key是输入文件的名字,通常会被忽略,因为我们不太关心文件名是什么,value是输入文件的内容。所以,对于一个单词计数器来说,value包含了要统计的文本,我们会将这个文本拆分成单词。
之后对于每一个单词,我们都会调用emit。emit由MapReduce框架提供,并且这里的emit属于Map函数。emit会接收两个参数,其中一个是key,另一个是value。在单词计数器的例子中,emit入参的key是单词,value是字符串“1”。这就是一个Map函数。在一个单词计数器的MapReduce Job中,Map函数实际就可以这么简单。
Reduce函数的入参是某个特定key的所有实例(Map输出中的key-value对中,出现了一次特定的key就可以算作一个实例)。所以Reduce函数也是使用一个key和一个value作为参数,其中value是一个数组,里面每一个元素是Map函数输出的key的一个实例的value。对于单词计数器来说,key就是单词,value就是由字符串“1”组成的数组,所以,我们不需要关心value的内容是什么,我们只需要关心value数组的长度。
Reduce函数也有一个属于自己的emit函数。这里的emit函数只会接受一个参数value,这个value会作为Reduce函数入参的key的最终输出。所以,对于单词计数器,我们会给emit传入数组的长度。这就是一个最简单的Reduce函数
疑问:
### 调用emit时,数据会发生什么变化?emit函数在哪运行?
首先看,这些函数在哪运行。这里可以看MapReduce论文的图1。现实中,MapReduce运行在大量的服务器之上,我们称之为worker服务器或者worker。同时,也会有一个Master节点来组织整个计算过程。这里实际发生的是,Master服务器知道有多少输入文件,例如5000个输入文件,之后它将Map函数分发到不同的worker。所以,它会向worker服务器发送一条消息说,请对这个输入文件执行Map函数吧。之后,MapReduce框架中的worker进程会读取文件的内容,调用Map函数并将文件名和文件内容作为参数传给Map函数。worker进程还需要实现emit,这样,每次Map函数调用emit,worker进程就会将数据写入到本地磁盘的文件中。所以,Map函数中调用emit的效果是在worker的本地磁盘上创建文件,这些文件包含了当前worker的Map函数生成的所有的key和value。
所以,Map阶段结束时,我们看到的就是Map函数在worker上生成的一些文件。之后,MapReduce的worker会将这些数据移动到Reduce所需要的位置。对于一个典型的大型运算,Reduce的入参包含了所有Map函数对于特定key的输出。通常来说,每个Map函数都可能生成大量key。所以通常来说,在运行Reduce函数之前。运行在MapReduce的worker服务器上的进程需要与集群中每一个其他服务器交互来询问说,看,我需要对key=a运行Reduce,请看一下你本地磁盘中存储的Map函数的中间输出,找出所有key=a,并通过网络将它们发给我。所以,Reduce worker需要从每一个worker获取特定key的实例。这是通过由Master通知到Reduce worker的一条指令来触发。一旦worker收集完所有的数据,它会调用Reduce函数,Reduce函数运算完了会调用自己的emit,这个emit与Map函数中的emit不一样,它会将输出写入到一个Google使用的共享文件服务中。
有关输入和输出文件的存放位置,这是我之前没有提到的,它们都存放在文件中,但是因为我们想要灵活的在任意的worker上读取任意的数据,这意味着我们需要某种网络文件系统(network file system)来存放输入数据。
所以实际上,MapReduce论文谈到了GFS(Google File System)。GFS是一个共享文件服务,并且它也运行在MapReduce的worker集群的物理服务器上。GFS会自动拆分你存储的任何大文件,并且以64MB的块存储在多个服务器之上。所以,如果你有了10TB的网页数据,你只需要将它们写入到GFS,甚至你写入的时候是作为一个大文件写入的,GFS会自动将这个大文件拆分成64MB的块,并将这些块平均的分布在所有的GFS服务器之上,而这是极好的,这正是我们所需要的。如果我们接下来想要对刚刚那10TB的网页数据运行MapReduce Job,数据已经均匀的分割存储在所有的服务器上了。如果我们有1000台服务器,我们会启动1000个Map worker,每个Map worker会读取1/1000输入数据。这些Map worker可以并行的从1000个GFS文件服务器读取数据,并获取巨大的读取吞吐量,也就是1000台服务器能提供的吞吐量。