Zookeeper 快速入门(上)

来源:holynull,

blog.leanote.com/post/holynull/Zookeeper

如有好文章投稿,请点击 → 这里了解详情

Zookeeper是Hadoop分布式调度服务,用来构建分布式应用系统。构建一个分布式应用是一个很复杂的事情,主要的原因是我们需要合理有效的处理分布式集群中的部分失败的问题。例如,集群中的节点在相互通信时,A节点向B节点发送消息。A节点如果想知道消息是否发送成功,只能由B节点告诉A节点。那么如果B节点关机或者由于其他的原因脱离集群网络,问题就出现了。A节点不断的向B发送消息,并且无法获得B的响应。B也没有办法通知A节点已经离线或者关机。集群中其他的节点完全不知道B发生了什么情况,还在不断的向B发送消息。这时,你的整个集群就发生了部分失败的故障。

Zookeeper不能让部分失败的问题彻底消失,但是它提供了一些工具能够让你的分布式应用安全合理的处理部分失败的问题。

安装和运行Zookeeper

我们采用standalone模式,安装运行一个单独的zookeeper服务。安装前请确认您已经安装了Java运行环境。

我们去Apache ZooKeeper releases page下载zookeeper安装包,并解压到本地:

% tar xzf zookeeper-x.y.z.tar.gz

ZooKeeper提供了一些可执行程序的工具,为了方便起见,我们将这些工具的路径加入到PATH环境变量中:

% export ZOOKEEPER_HOME=~/sw/zookeeper-x.y.z

% export PATH=$PATH:$ZOOKEEPER_HOME/bin

运行ZooKeeper之前我们需要编写配置文件。配置文件一般在安装目录下的conf/zoo.cfg。我们可以把这个文件放在/etc/zookeeper下,或者放到其他目录下,并在环境变量设置ZOOCFGDIR指向这个个目录。下面是配置文件的内容:

tickTime=2000

dataDir=/Users/tom/zookeeper

clientPort=2181

tickTime是zookeeper中的基本时间单元,单位是毫秒。datadir是zookeeper持久化数据存放的目录。clientPort是zookeeper监听客户端连接的端口,默认是2181.

启动命令:

% zkServer.sh start

我们通过nc或者telnet命令访问2181端口,通过执行ruok(Are you OK?)命令来检查zookeeper是否启动成功:

% echo ruok | nc localhost 2181

imok

那么我看见zookeeper回答我们“I’m OK”。下表中是所有的zookeeper的命名,都是由4个字符组成。

Zookeeper 快速入门(上)

Zookeeper 快速入门(上)

3.5.0以上的版本会有一个内嵌的web服务,通过访问http://localhost:8080/commands来访问以上的命令列表。

Zookeeper开发实例

这一节我们将讲解如何编写Zookeeper客户端的程序,来控制zookeeper上的数据,以达到管理客户端所在集群的成员关系。

ZooKeeper中的组和成员

我们可以把Zookeeper理解为一个高可用的文件系统。但是它没有文件和文件夹的概念,只有一个叫做znode的节点概念。那么znode即是数据的容器,也是其他节点的容器。(其实znode就可以理解为文件或者是文件夹)我们用父节点和子节点的关系来表示组和成员的关系。那么一个节点代表一个组,组节点下的子节点代表组内的成员。如下图所示:

Zookeeper 快速入门(上)

创建组

我们使用zookeeper的Java API来创建一个/zoo的组节点:

public class CreateGroup implements Watcher {

private static final int SESSION_TIMEOUT = 5000;

private ZooKeeper zk;

private CountDownLatch connectedSignal = new CountDownLatch(1);

public void connect(String hosts) throws IOException, InterruptedException {

zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);

connectedSignal.await();

}

@Override

public void process(WatchedEvent event) { // Watcher interface

if (event.getState() == KeeperState.SyncConnected) {

connectedSignal.countDown();

}

}

public void create(String groupName) throws KeeperException,

InterruptedException {

String path = "/" + groupName;

String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,

CreateMode.PERSISTENT);

System.out.println("Created " + createdPath);

}

public void close() throws InterruptedException {

zk.close();

}

public static void main(String[] args) throws Exception {

CreateGroup createGroup = new CreateGroup();

createGroup.connect(args[0]);

createGroup.create(args[1]);

createGroup.close();

}

}

当main()执行时,首先创建了一个CreateGroup的对象,然后调用connect()方法,通过zookeeper的API与zookeeper服务器连接。创建连接我们需要3个参数:一是服务器端主机名称以及端口号,二是客户端连接服务器session的超时时间,三是Watcher接口的一个实例。Watcher实例负责接收Zookeeper数据变化时产生的事件回调。

在连接函数中创建了zookeeper的实例,然后建立与服务器的连接。建立连接函数会立即返回,所以我们需要等待连接建立成功后再进行其他的操作。我们使用CountDownLatch来阻塞当前线程,直到zookeeper准备就绪。这时,我们就看到Watcher的作用了。我们实现了Watcher接口的一个方法:

public void process(WatchedEvent event);

当客户端连接上了zookeeper服务器,Watcher将由process()函数接收一个连接成功的事件。我们接下来调用CountDownLatch,释放之前的阻塞。

连接成功后,我们调用create()方法。我们在这个方法中调用zookeeper实例的create()方法来创建一个znode。参数包括:一是znode的path;二是znode的内容(一个二进制数组),三是一个access control list(ACL,访问控制列表,这里使用完全开放模式),最后是znode的性质。

znode的性质分为ephemeral和persistent两种。ephemeral性质的znode在创建他的客户端的会话结束,或者客户端以其他原因断开与服务器的连接时,会被自动删除。而persistent性质的znode就不会被自动删除,除非客户端主动删除,而且不一定是创建它的客户端可以删除它,其他客户端也可以删除它。这里我们创建一个persistent的znode。

create()将返回znode的path。我们讲新建znode的path打印出来。

我们执行如上程序:

% export CLASSPATH=ch21-zk/target/classes/:$ZOOKEEPER_HOME/*:\

$ZOOKEEPER_HOME/lib/*:$ZOOKEEPER_HOME/conf

% java CreateGroup localhost zoo

Created /zoo

加入组

接下来我们实现如何在一个组中注册成员。我们将使用ephemeral znode来创建这些成员节点。那么当客户端程序退出时,这些成员将被删除。

我们创建一个ConnetionWatcher类,然后继承实现一个JoinGroup类:

public class ConnectionWatcher implements Watcher {

private static final int SESSION_TIMEOUT = 5000;

protected ZooKeeper zk;

private CountDownLatch connectedSignal = new CountDownLatch(1);

public void connect(String hosts) throws IOException, InterruptedException {

zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);

connectedSignal.await();

}

@Override

public void process(WatchedEvent event) {

if (event.getState() == KeeperState.SyncConnected) {

connectedSignal.countDown();

}

}

public void close() throws InterruptedException {

zk.close();

}

}

public class JoinGroup extends ConnectionWatcher {

public void join(String groupName, String memberName) throws KeeperException,

InterruptedException {

String path = "/" + groupName + "/" + memberName;

String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,

CreateMode.EPHEMERAL);

System.out.println("Created " + createdPath);

}

public static void main(String[] args) throws Exception {

JoinGroup joinGroup = new JoinGroup();

joinGroup.connect(args[0]);

joinGroup.join(args[1], args[2]);

// stay alive until process is killed or thread is interrupted

Thread.sleep(Long.MAX_VALUE);

}

}

加入组与创建组非常相似。我们加入了一个ephemeral znode后,让线程阻塞住。然后我们可以使用命令行查看zookeeper中我们创建的znode。当我们将阻塞的程序强行关闭后,我们会发现我们创建的znode会自动消失。

成员列表

下面我们实现一个程序来列出一个组中的所有成员。

public class ListGroup extends ConnectionWatcher {

public void list(String groupName) throws KeeperException,

InterruptedException {

String path = "/" + groupName;

try {

List<String> children = zk.getChildren(path, false);

if (children.isEmpty()) {

System.out.printf("No members in group %s\n", groupName);

System.exit(1);

}

for (String child : children) {

System.out.println(child);

}

} catch (KeeperException.NoNodeException e) {

System.out.printf("Group %s does not exist\n", groupName);

System.exit(1);

}

}

public static void main(String[] args) throws Exception {

ListGroup listGroup = new ListGroup();

listGroup.connect(args[0]);

listGroup.list(args[1]);

listGroup.close();

}

}

我们在list()方法中通过调用getChildren()方法来获得某一个path下的子节点,然后打印出来。我们这里会试着捕获KeeperException.NoNodeException,当znode不存在时会抛出这个异常。我们运行程序,会看见如下结果,说明我们还没在zoo组中添加任何成员几点:

% java ListGroup localhost zoo

No members in group zoo

我们可以运行之前的JoinGroup来添加成员。在后台运行一些JoinGroup程序,这些程序添加节点后都处于sleep状态:

% java JoinGroup localhost zoo duck &

% java JoinGroup localhost zoo cow &

% java JoinGroup localhost zoo goat &

% goat_pid=$!

最后一行命令的作用是将最后一个启动的java程序的pid记录下来,我们好在列出zoo下面的成员后,将该进程kill掉。

下面我们将zoo下的成员打印出来:

% java ListGroup localhost zoo

goat

duck

cow

然后我们将kill掉最后启动的JoinGroup客户端:

% kill $goat_pid

过几秒后,我们发现goat节点不见了。因为之前我们创建的goat节点是一个ephemeral节点,而创建这个节点的客户端在ZooKeeper上的会话已经被终结了,因为这个回话在5秒后失效了(我们设置了会话的超时时间为5秒):

% java ListGroup localhost zoo

duck

cow

让我们回过头来看看,我们到底都做了一些什么?我们首先创建了一个节点组,这些节点的创建者都在同一个分布式系统中。这些节点的创建者之间互相都不知情。一个创建者想使用这些节点数据进行一些工作,例如通过znode节点是否存在来判断节点的创建者是否存在。

最后一点,我们不能只依靠组成员关系来完全解决在与节点通信时的网络错误。当与一个集群组成员节点进行通信时,发生了通信失败,我们需要使用重试或者试验与组中其他的节点通信,来解决这次通信失败。

Zookeeper的命令行工具

Zookeeper有一套命令行工具。我们可以像如下使用,来查找zoo下的成员节点:

% zkCli.sh -server localhost ls /zoo

[cow, duck]

你可以不加参数运行这个工具,来获得帮助。

删除分组

下面让我们来看一下如何删除一个分组?

ZooKeeper的API提供一个delete()方法来删除一个znode。我们通过输入znode的path和版本号(version number)来删除想要删除的znode。我们除了使用path来定位我们要删除的znode,还需要一个参数是版本号。只有当我们指定要删除的本版号,与znode当前的版本号一致时,ZooKeeper才允许我们将znode删除掉。这是一种optimistic locking机制,用来处理znode的读写冲突。我们也可以忽略版本号一致检查,做法就是版本号赋值为-1。

删除一个znode之前,我们需要先删除它的子节点,就下如下代码中实现的那样:

public class DeleteGroup extends ConnectionWatcher {

public void delete(String groupName) throws KeeperException,

InterruptedException {

String path = "/" + groupName;

try {

List<String> children = zk.getChildren(path, false);

for (String child : children) {

zk.delete(path + "/" + child, -1);

}

zk.delete(path, -1);

} catch (KeeperException.NoNodeException e) {

System.out.printf("Group %s does not exist\n", groupName);

System.exit(1);

}

}

public static void main(String[] args) throws Exception {

DeleteGroup deleteGroup = new DeleteGroup();

deleteGroup.connect(args[0]);

deleteGroup.delete(args[1]);

deleteGroup.close();

}

}

最后我们执行如下操作来删除zoo group:

% java DeleteGroup localhost zoo

% java ListGroup localhost zoo

Group zoo does not exist

Zookeeper 服务

ZooKeeper 是一个高可用的高性能调度服务。这一节我们将讲述他的模型、操作和接口。

数据模型 Data Model

ZooKeeper包含一个树形的数据模型,我们叫做znode。一个znode中包含了存储的数据和ACL(Access Control List)。ZooKeeper的设计适合存储少量的数据,并不适合存储大量数据,所以znode的存储限制最大不超过1M。

数据的访问被定义成原子性的。什么是原子性呢?一个客户端访问一个znode时,不会只得到一部分数据;客户端访问数据要么获得全部数据,要么读取失败,什么也得不到。相似的,写操作时,要么写入全部数据,要么写入失败,什么也写不进去。ZooKeeper能够保证写操作只有两个结果,成功和失败。绝对不会出现只写入了一部分数据的情况。与HDFS不同,ZooKeeper不支持字符的append(连接)操作。原因是HDFS是被设计成支持数据流访问(streaming data access)的大数据存储,而ZooKeeper则不是。

我们可以通过path来定位znode,就像Unix系统定位文件一样,使用斜杠来表示路径。但是,znode的路径只能使用绝对路径,而不能想Unix系统一样使用相对路径,即Zookeeper不能识别../和./这样的路径。

节点的名称是由Unicode字符组成的,除了zookeeper这个字符串,我们可以任意命名节点。为什么不能使用zookeeper命名节点呢?因为ZooKeeper已经默认使用zookeeper来命名了一个根节点,用来存储一些管理数据。

请注意,这里的path并不是URIs,在Java API中是一个String类型的变量。

Ephemeral znodes

我们已经知道,znode有两种类型:ephemeral和persistent。在创建znode时,我们指定znode的类型,并且在之后不会再被修改。当创建znode的客户端的session结束后,ephemeral类型的znode将被删除。persistent类型的znode在创建以后,就与客户端没什么联系了,除非主动去删除它,否则他会一直存在。Ephemeral znode没有任何子节点。

虽然Ephemeral znode没有绑定到客户端的session,但是任何一个客户端都可以访问它,当然是在他们的ACL策略下允许访问的情况下。我们在创建分布式系统时,需要知道分布式资源是否可用。Ephemeral znode就是为这种场景应运而生的。正如我们之前讲述的例子中,使用Ephemeral znode来实现一个成员关系管理,任何一个客户端进程任何时候都可以知道其他成员是否可用。

Znode的序号

如果在创建znode时,我们使用排序标志的话,ZooKeeper会在我们指定的znode名字后面增加一个数字。我们继续加入相同名字的znode时,这个数字会不断增加。这个序号的计数器是由这些排序znode的父节点来维护的。

如果我们请求创建一个znode,指定命名为/a/b-,那么ZooKeeper会为我们创建一个名字为/a/b-3的znode。我们再请求创建一个名字为/a/b-的znode,ZooKeeper会为我们创建一个名字/a/b-5的znode。ZooKeeper给我们指定的序号是不断增长的。Java API中的create()的返回结果就是znode的实际名字。

那么序号用来干什么呢?当然是用来排序用的!后面《A Lock Service》中我们将讲述如何使用znode的序号来构建一个share lock。

观察模式 Watches

观察模式可以使客户端在某一个znode发生变化时得到通知。观察模式有ZooKeeper服务的某些操作启动,并由其他的一些操作来触发。例如,一个客户端对一个znode进行了exists操作,来判断目标znode是否存在,同时在znode上开启了观察模式。如果znode不存在,这exists将返回false。如果稍后,另外一个客户端创建了这个znode,观察模式将被触发,将znode的创建事件通知之前开启观察模式的客户端。我们将在以后详细介绍其他的操作和触发。

观察模式只能被触发一次。如果要一直获得znode的创建和删除的通知,那么就需要不断的在znode上开启观察模式。在上面的例子中,如果客户端还继续需要获得znode被删除的通知,那么在获得创建通知后,客户端还需要继续对这个znode进行exists操作,再开启一次观察模式。

在《A Configuration Service》中,有一个例子将讲述如何使用观察模式在集群中更新配置。

操作 Operations

下面的表格中列出了9种ZooKeeper的操作。

Zookeeper 快速入门(上)

调用delete和setData操作时,我们必须指定一个znode版本号(version number),即我们必须指定我们要删除或者更新znode数据的哪个版本。如果版本号不匹配,操作将会失败。失败的原因可能是在我们提交之前,该znode已经被修改过了,版本号发生了增量变化。那么我们该怎么办呢?我可以考虑重试,或者调用其他的操作。例如,我们提交更新失败后,可以重新获取znode当前的数据,看看当前的版本号是什么,再做更新操作。

ZooKeeper虽然可以被看作是一个文件系统,但是由于ZooKeeper文件很小,所以没有提供像一般文件系统所提供的open、close或者seek操作。

Zookeeper 快速入门(上)

批量更新 Multiupdate

ZooKeeper支持将一些原始的操作组合成一个操作单元,然后执行这些操作。那么这种批量操作也是具有原子性的,只可能有两种执行结果,成功和失败。批量操作单元中的操作,不会出现一些操作执行成功,一些操作执行失败的情况,即要么都成功,要么都失败。

Multiupdate对于绑定一些结构化的全局变量很有用处。例如绑定一个无向图(undirected graph)。无向图的顶点(vertex)由znode来表示。添加和删除边(edge)的操作,由修改边的两个关联znode来实现。如果我们使用ZooKeeper的原始的操作来实现对边(edge)的操作,那么就有可能产生两个znode修改不一致的情况(一个修改成功,一个修改失败)。那么我们将修改两个znode的操作放入到一个Multi修改单元中,就能够保证两个znode,要么都修改成功,要么都修改失败。这样就能够避免修改无向图的边时产生修改不一致的现象。

APIs

ZooKeeper客户端使用的核心编程语言有JAVA和C;同时也支持Perl、Python和REST。执行操作的方式呢,分为同步执行和异步执行。我们之前已经见识过了同步的Java API中的exists。

public Stat exists(String path, Watcher watcher) throws KeeperException,

InterruptedException

下面代码则是异步方式的exists:

public void exists(String path, Watcher watcher, StatCallback cb, Object ctx)

Java API中,异步的方法的返回类型都是void,而操作的返回的结果将传递到回调对象的回调函数中。回调对象将实现StatCallback接口中的一个回调函数,来接收操作返回的结果。函数接口如下:

public void processResult(int rc, String path, Object ctx, Stat stat);

参数rc表示返回码,请参考KeeperException中的定义。在stat参数为null的情况下,非0的值表示一种异常。参数path和ctx与客户端调用的exists方法中的参数相等,这两个参数通常用来确定回调中获得的响应是来至于哪个请求的。参数ctx可以是任意对象,只有当path参数不能消灭请求的歧义时才会用到。如果不需要参数ctx,可以设置为null。

Zookeeper 快速入门(上)

观察模式触发器 Watch triggers

读操作,例如:exists、getChildren、getData会在znode上开启观察模式,并且写操作会触发观察模式事件,例如:create、delete和setData。ACL(Access Control List)操作不会启动观察模式。观察模式被触发时,会生成一个事件,这个事件的类型取决于触发他的操作:

  • exists启动的观察模式,由创建znode,删除znode和更新znode操作来触发。

  • getData启动的观察模式,由删除znode和更新znode操作触发。创建znode不会触发,是因为getData操作成功的前提是znode必须已经存在。

  • getChildren启动的观察模式,由子节点创建和删除,或者本节点被删除时才会被触发。我们可以通过事件的类型来判断是本节点被删除还是子节点被删除:NodeChildrenChanged表示子节点被删除,而NodeDeleted表示本节点删除。

Zookeeper 快速入门(上)

事件包含了触发事件的znode的path,所以我们通过NodeCreated和NodeDeleted事件就可以知道哪个znode被创建了或者删除了。如果我们需要在NodeChildrenChanged事件发生后知道哪个子节点被改变了,我们就需要再调用一次getChildren来获得一个新的子节点列表。与之类似,在NodeDataChanged事件发生后,我们需要调用getData来获得新的数据。我们在编写程序时,会在接收到事件通知后改变znode的状态,所以我们一定要清楚的记住znode的状态变化。

ACLs 访问控制操作

znode的创建时,我们会给他一个ACL(Access Control List),来决定谁可以对znode做哪些操作。

ZooKeeper通过鉴权来获得客户端的身份,然后通过ACL来控制客户端的访问。鉴权方式有如下几种:

  • digest

    使用用户名和密码方式

  • sasl

    使用Kerberos鉴权

  • ip

    使用客户端的IP来鉴权

客户端可以在与ZooKeeper建立会话连接后,自己给自己授权。授权是并不是必须的,虽然znode的ACL要求客户端必须是身份合法的,在这种情况下,客户端可以自己授权来访问znode。下面的例子,客户端使用用户名和密码为自己授权:

zk.addAuthInfo("digest", "tom:secret".getBytes());

ACL是由鉴权方式、鉴权方式的ID和一个许可(permession)的集合组成。例如,我们想通过一个ip地址为10.0.0.1的客户端访问一个znode。那么,我们需要为znode设置一个ACL,鉴权方式使用IP鉴权方式,鉴权方式的ID为10.0.0.1,只允许读权限。使用JAVA我们将像如下方式创建一个ACL对象:

new ACL(Perms.READ,new Id("ip", "10.0.0.1"));

所有的许可权限将在下表中列出。请注意,exists操作不受ACL的控制,所以任何一个客户端都可以通过exists操作来获得任何znode的状态,从而得知znode是否真的存在。

在ZooDefs.Ids类中,有一些ACL的预定义变量,包括OPEN_ACL_UNSAFE,这个设置表示将赋予所有的许可给客户端(除了ADMIN的许可)。

另外,我们可以使用ZooKeeper鉴权的插件机制,来整合第三方的鉴权系统。

上一篇:[机器学习]回归--Support Vector Regression(SVR)


下一篇:HDU 1102 Constructing Roads, Prim+优先队列