一、客户端脚本
1.1、客户端连接
cd /usr/local/services/zookeeper/zookeeper-3.4.13/bin
##连接本地Zookeeper服务器
sh zkCli.sh
##连接主机server1上的zookeeper服务
sh zkCli.sh -server server1:2181
1.2、CRUD操作
创建
##语法:create [-s] [-e] path data acl
create /zk-temp "tmp"
说明:-s / -e分别制定节点特性:顺序或临时节点。默认情况下,创建的是持久节点。
读取
##获取节点子节点列表,语法:ls path [watch]
ls /zk-temp
##获取节点数据,语法:get path [watch]
get /zk-temp
更新
##语法:set path data [version]
set /zk-temp "zk"
删除
##语法:delete path [version]
delete /zk-temp
二、Java客户端API
2.1、创建会话
创建会话Java代码
package zookeeper.javaapi;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class CreateSession implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public void process(WatchedEvent event) {
System.out.println("Receive watched event:" + event);
if (Event.KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}
public static void main(String[] args) throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("server1:2181",1000,new CreateSession());
System.out.println(zooKeeper.getState());
try {
connectedSemaphore.await();
} catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println("Zookeeper session established.");
}
}
2.2、创建节点
2.2.1、同步创建节点
方法:String create(final String path, byte data[], List
参数说明:
同步创建节点Java代码
package zookeeper.javaapi;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
/**
* ZooKeeper API创建节点,使用同步(sync)接口。
*/
public class Sync_Create_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
ZooKeeper zookeeper = new ZooKeeper("server1:2181", 5000, new Sync_Create_Usage());
connectedSemaphore.await();
//创建临时节点
String path1 = zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("Success create znode: " + path1);
//临时顺序节点
String path2 = zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Success create znode: " + path2);
//阻塞线程,查看临时节点创建结果
Thread.sleep( Integer.MAX_VALUE );
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}
}
2.2.2、异步创建节点
方法:void create(final String path, byte data[], List
参数说明:
异步创建节点Java代码
package zookeeper.javaapi;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
/**
* ZooKeeper API创建节点,使用异步(Async)接口。
*/
public class Async_Create_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ZooKeeper zookeeper = new ZooKeeper("server1:2181", 5000, new Async_Create_Usage());
System.out.println(zookeeper.getState());
connectedSemaphore.await();
//创建临时节点
zookeeper.create("/zk-test-demo-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
new IStringCallback(), "I am context. ");
//创建临时节点
zookeeper.create("/zk-test-demo-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
new IStringCallback(), "I am context. ");
//阻塞线程,查看临时节点创建结果
zookeeper.create("/zk-test-demo-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new IStringCallback(), "I am context. ");
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
connectedSemaphore.countDown();
}
}
}
class IStringCallback implements AsyncCallback.StringCallback {
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name);
}
}
2.2.3、回调函数方法说明
方法:void processResult(int rc, String path, Object ctx, String name)
参数说明:
2.3、删除节点
2.3.1、同步删除节点
方法:public void delete(final String path, int version)
参数说明:
同步删除节点Java代码
package zookeeper.javaapi;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class Sync_Delete_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zookeeper;
public static void main(String[] args) throws Exception {
String path = "/zk-delete-demo";
zookeeper = new ZooKeeper("server1:2181", 5000, new Sync_Delete_Usage());
connectedSemaphore.await();
//同步创建持久节点 /zk-delete-demo
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
//同步创建持久节点 /zk-delete-demo/test
zookeeper.create(path + "/test", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path + "/test");
//删除非叶子节点将报错
try {
zookeeper.delete(path, -1);
} catch (Exception e) {
System.out.println("fail to delete znode: " + path);
}
//删除叶子节点 /zk-delete-demo/test
zookeeper.delete(path + "/test", -1);
System.out.println("success delete znode: " + path + "/c1");
//删除叶子节点 /zk-delete-demo
zookeeper.delete(path, -1);
System.out.println("success delete znode: " + path);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
}
}
}
}
2.3.2、异步删除节点
方法:public void delete(final String path, int version, VoidCallback cb, Object ctx)
参数说明:
异步删除节点Java代码
package zookeeper.javaapi;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
public class Async_Delete_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zookeeper;
public static void main(String[] args) throws Exception {
String path = "/zk-delete-demo";
zookeeper = new ZooKeeper("server1:2181", 5000, new Async_Delete_Usage());
connectedSemaphore.await();
//同步创建持久节点 /zk-delete-demo
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
//同步创建持久节点 /zk-delete-demo/test
zookeeper.create(path + "/test", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path + "/test");
//删除非叶子节点将报错
zookeeper.delete(path, -1, new IVoidCallback(), null);
//删除叶子节点 /zk-delete-demo/test
zookeeper.delete(path + "/test", -1, new IVoidCallback(), null);
//删除叶子节点 /zk-delete-demo
zookeeper.delete(path, -1, new IVoidCallback(), null);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
}
}
}
}
class IVoidCallback implements AsyncCallback.VoidCallback {
public void processResult(int rc, String path, Object ctx) {
System.out.println(rc + ", " + path + ", " + ctx);
}
}
2.4、获取节点
2.4.1、同步获取节点列表
方法:
? List
? List
? List
? List
参数说明:
同步获取节点列表Java代码
package zookeeper.javaapi;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
public class Sync_Update_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zookeeper = null;
public static void main(String[] args) throws Exception {
String path = "/zk-update-demo";
zookeeper = new ZooKeeper("server1:2181", 5000, new Sync_Update_Usage());
connectedSemaphore.await();
//同步创建持久节点 /zk-delete-demo
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
//同步创建持久节点 /zk-delete-demo/test_1
zookeeper.create(path + "/test_1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/test_1");
//同步获取节点,同时使用默认Watcher
List childrenList = zookeeper.getChildren(path, true);
System.out.println(childrenList);
//同步创建持久节点 /zk-delete-demo/test_2
zookeeper.create(path + "/test_2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/test_2");
Thread.sleep(3000);
//同步创建持久节点 /zk-delete-demo/test_3
zookeeper.create(path + "/test_3", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/test_3");
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
} else if (event.getType() == EventType.NodeChildrenChanged) {
try {
//Watcher通知是一次性的,一旦触发一次通知后,该Watcher就失效了
//因此客户端需要反复注册Watcher,即程序中在process里面再次注册了Watcher,否则将无法获取之后该节点的变动通知
System.out.println("ReGet Child:" + zookeeper.getChildren(event.getPath(), true));
} catch (Exception e) {
}
}
}
}
}
2.4.2、异步获取节点列表
方法:
? void getChildren(final String path, Watch watcher, ChildrenCallback cb, Object ctx)
? void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
? void getChildren(final String path, Watch watcher, ChildrenCallback cb, Object ctx, Stat stat)
? void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx, Stat stat)
参数说明:
异步获取节点列表Java代码
package zookeeper.javaapi;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class Async_Update_Usage implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zookeeper = null;
public static void main(String[] args) throws Exception {
String path = "/zk-get-demo";
zookeeper = new ZooKeeper("server1:2181", 5000, new Async_Update_Usage());
connectedSemaphore.await();
zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode: " + path);
zookeeper.create(path + "/test_1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/test_1");
//异步获取节点,使用默认Watcher
zookeeper.getChildren(path, true, new IChildren2Callback(), null);
zookeeper.create(path + "/test_2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("success create znode: " + path + "/test_2");
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (KeeperState.SyncConnected == event.getState()) {
if (EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
} else if (event.getType() == EventType.NodeChildrenChanged) {
try {
System.out.println("ReGet Child:" + zookeeper.getChildren(event.getPath(), true));
} catch (Exception e) {
}
}
}
}
}
class IChildren2Callback implements AsyncCallback.Children2Callback {
public void processResult(int rc, String path, Object ctx, List children, Stat stat) {
System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx: "
+ ctx + ", children list: " + children + ", stat: " + stat);
}
}
说明:stat对象中记录了一个节点的基本属性信息,例如节点创建时的事务ID、最后一次修改的事务ID和节点数据内容的长度等。有时候,我们不仅需要获取节点最新的子节点列表,还要获取这个节点最新的节点状态信息。对于这种情况,我们可以将一个旧的stat变量传入API接口,该stat变量会在方法执行过程中,被来自服务器响应的新stat对象替换;
妈呀!写着好累,剩下的介绍下方法,代码就不贴了,大同小异。
2.4.3、同步获取节点数据
同步方法:
? byte[] getData(final String path, Watcher watcher, Stat stat);
? byte[] getData(String path, boolean watch, Stat stat);
异步方法:
? void getData(String path, boolean watch, DataCallback cb, Object ctx);
? void getData(String path, boolean watch, DataCallback cb, Object ctx);
参数说明:
说明:客户端在获取一个节点的数据内容时,可进行watcher注册,一旦该节点的状态发生变更,那么zookeeper服务端就会向客户端发送一NodeDataChanged(EventType.NodeDataChanged)的事件通知。注意,节点数据内容或者节点版本的变化都被看做是zookeeper节点的变化,都会触发NodeDataChanged通知。
2.5、更新节点
同步方法:
? Stat setData(final String path, byte data[], int version);
异步方法:
? void setData(final String path, byte data[], int version, StatCallback cb, Object ctx);
参数说明:
说明:在调用更新操作的时候,就可以添加version这个参数,该参数可以对应于CAS原理中的“预期值”,表明是针对该数据版本进行更新的;在zookeeper中,数据版本都是从0开始计数的,所以严格的讲,“-1”并不是一个合法的数据版本,它仅仅是一个标识符,如果客户端传入的版本参数是“-1”,就是告诉zookeeper服务器,客户端需要基于数据的最新版本进行更新操作。如果对zookeeper数据节点的更新操作没有原子性要求,那么就可以使用“-1”。
2.6、检测节点是否存在
同步方法:
? Stat exists(final String path, Watcher watcher);
? Stat exists(String path, boolean watch);
异步方法:
? void exists(final String path, Watcher watcher, StatCallback cb, Object ctx);
? void exists(String path, boolean watch, StatCallback cb, Object ctx);
参数说明:
说明:无论指定节点是否存在,通过调用exists()接口都可以注册watcher;exists()接口中注册的watcher,能够对节点创建、节点删除和节点数据更新事件进行监听;对于指定节点的子节点的各种变化,都不会通知客户端。
2.7、权限控制
方法:
void addAuthInfo(String scheme, byte auth[]);
参数说明:
说明:对于节点的权限,当一个客户端为一个节点添加权限信息的时候,该权限信息是添加到了该节点的叶子节点上,操作这些授权节点需要权限信息;但如果操作该父节点,是不需要权限的。
参考资料
参考书籍:从Paxos到Zookeeper:分布式一致性原理与实践
出处: https://www.cnblogs.com/DeepInThought
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。