【8】Zookeeper脚本及API

一、客户端脚本

1.1、客户端连接

cd /usr/local/services/zookeeper/zookeeper-3.4.13/bin
##连接本地Zookeeper服务器
sh zkCli.sh
##连接主机server1上的zookeeper服务
sh zkCli.sh -server server1:2181

1.2、CRUD操作

创建

##语法:create [-s] [-e] path data acl
create /zk-temp "tmp"

【8】Zookeeper脚本及API

说明:-s / -e分别制定节点特性:顺序或临时节点。默认情况下,创建的是持久节点。

读取

##获取节点子节点列表,语法:ls path [watch]
ls /zk-temp
##获取节点数据,语法:get path [watch]
get /zk-temp

【8】Zookeeper脚本及API

更新

##语法:set path data [version]
set /zk-temp "zk"

【8】Zookeeper脚本及API

删除

##语法:delete path [version]
delete /zk-temp

【8】Zookeeper脚本及API

二、Java客户端API

2.1、创建会话

创建会话Java代码
  
package zookeeper.javaapi;

import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class CreateSession implements Watcher {
    
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    
    public void process(WatchedEvent event) {
        System.out.println("Receive watched event:" + event);
        if (Event.KeeperState.SyncConnected == event.getState()) {
            connectedSemaphore.countDown();
        }
    }
    
    public static void main(String[] args) throws Exception{    
        ZooKeeper zooKeeper = new ZooKeeper("server1:2181",1000,new CreateSession());
        System.out.println(zooKeeper.getState());
        try {
            connectedSemaphore.await();         
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Zookeeper session established.");
    }
}
    

【8】Zookeeper脚本及API

2.2、创建节点

2.2.1、同步创建节点

方法:String create(final String path, byte data[], List acl, CreateMode createMode)
参数说明:
【8】Zookeeper脚本及API

同步创建节点Java代码
  
package zookeeper.javaapi;

import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

/**
 * ZooKeeper API创建节点,使用同步(sync)接口。
 */
public class Sync_Create_Usage implements Watcher {

    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        
        ZooKeeper zookeeper = new ZooKeeper("server1:2181", 5000, new Sync_Create_Usage());
        connectedSemaphore.await();
                
        //创建临时节点
        String path1 = zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("Success create znode: " + path1);

        //临时顺序节点
        String path2 = zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Success create znode: " + path2);
        
        //阻塞线程,查看临时节点创建结果
        Thread.sleep( Integer.MAX_VALUE );
    }
    public void process(WatchedEvent event) {
        if (KeeperState.SyncConnected == event.getState()) {
            connectedSemaphore.countDown();
        }
    }
}

    

【8】Zookeeper脚本及API

2.2.2、异步创建节点

方法:void create(final String path, byte data[], List acl, CreateMode createMode, StringCallback cb, Object ctx)
参数说明:
【8】Zookeeper脚本及API

异步创建节点Java代码
  
package zookeeper.javaapi;

import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;

/**
 * ZooKeeper API创建节点,使用异步(Async)接口。
 */
public class Async_Create_Usage implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zookeeper = new ZooKeeper("server1:2181", 5000, new Async_Create_Usage());
        System.out.println(zookeeper.getState());
        connectedSemaphore.await();

        //创建临时节点
        zookeeper.create("/zk-test-demo-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                new IStringCallback(), "I am context. ");
        //创建临时节点
        zookeeper.create("/zk-test-demo-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                new IStringCallback(), "I am context. ");
        //阻塞线程,查看临时节点创建结果
        zookeeper.create("/zk-test-demo-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                new IStringCallback(), "I am context. ");
        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (KeeperState.SyncConnected == event.getState()) {
            connectedSemaphore.countDown();
        }
    }
}

class IStringCallback implements AsyncCallback.StringCallback {
    public void processResult(int rc, String path, Object ctx, String name) {
        System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name);
    }
}
    

【8】Zookeeper脚本及API
【8】Zookeeper脚本及API

2.2.3、回调函数方法说明

方法:void processResult(int rc, String path, Object ctx, String name)
参数说明:
【8】Zookeeper脚本及API

2.3、删除节点

2.3.1、同步删除节点

方法:public void delete(final String path, int version)
参数说明:
【8】Zookeeper脚本及API

同步删除节点Java代码
 
package zookeeper.javaapi;

import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class Sync_Delete_Usage implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zookeeper;

    public static void main(String[] args) throws Exception {
        String path = "/zk-delete-demo";
        zookeeper = new ZooKeeper("server1:2181", 5000, new Sync_Delete_Usage());
        connectedSemaphore.await(); 

        //同步创建持久节点 /zk-delete-demo
        zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path);
        //同步创建持久节点 /zk-delete-demo/test
        zookeeper.create(path + "/test", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path + "/test");
        
        //删除非叶子节点将报错
        try {
            zookeeper.delete(path, -1);
        } catch (Exception e) {
            System.out.println("fail to delete znode: " + path);
        }
        
        //删除叶子节点 /zk-delete-demo/test
        zookeeper.delete(path + "/test", -1);
        System.out.println("success delete znode: " + path + "/c1");
        //删除叶子节点 /zk-delete-demo
        zookeeper.delete(path, -1);        
        System.out.println("success delete znode: " + path);

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (KeeperState.SyncConnected == event.getState()) {
            if (EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
            }
        }
    }
}
    

【8】Zookeeper脚本及API

2.3.2、异步删除节点

方法:public void delete(final String path, int version, VoidCallback cb, Object ctx)
参数说明:
【8】Zookeeper脚本及API

异步删除节点Java代码
 
package zookeeper.javaapi;

import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;

public class Async_Delete_Usage implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zookeeper;

    public static void main(String[] args) throws Exception {
        String path = "/zk-delete-demo";
        zookeeper = new ZooKeeper("server1:2181", 5000, new Async_Delete_Usage());
        connectedSemaphore.await();

        //同步创建持久节点 /zk-delete-demo
        zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path);
        
        //同步创建持久节点 /zk-delete-demo/test
        zookeeper.create(path + "/test", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path + "/test");
            
        //删除非叶子节点将报错
        zookeeper.delete(path, -1, new IVoidCallback(), null);      
        //删除叶子节点 /zk-delete-demo/test
        zookeeper.delete(path + "/test", -1, new IVoidCallback(), null);
        //删除叶子节点 /zk-delete-demo
        zookeeper.delete(path, -1, new IVoidCallback(), null);        

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (KeeperState.SyncConnected == event.getState()) {
            if (EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
            }
        }
    }
}

class IVoidCallback implements AsyncCallback.VoidCallback {
    public void processResult(int rc, String path, Object ctx) {
        System.out.println(rc + ", " + path + ", " + ctx);
    }
}
    

【8】Zookeeper脚本及API

2.4、获取节点

2.4.1、同步获取节点列表

方法:
? List getChildren(final String path, Watch watcher)
? List getChildren(String path, boolean watch)
? List getChildren(final String path, Watch watcher, Stat stat)
? List getChildren(String path, boolean watch, Stat stat)

参数说明:
【8】Zookeeper脚本及API
【8】Zookeeper脚本及API

同步获取节点列表Java代码
 
package zookeeper.javaapi;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;

public class Sync_Update_Usage implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zookeeper = null;

    public static void main(String[] args) throws Exception {
        String path = "/zk-update-demo";
        zookeeper = new ZooKeeper("server1:2181", 5000, new Sync_Update_Usage());
        connectedSemaphore.await();

        //同步创建持久节点 /zk-delete-demo
        zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path);
        
        //同步创建持久节点 /zk-delete-demo/test_1
        zookeeper.create(path + "/test_1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path + "/test_1");
        
        //同步获取节点,同时使用默认Watcher
        List childrenList = zookeeper.getChildren(path, true);
        System.out.println(childrenList);

        //同步创建持久节点 /zk-delete-demo/test_2
        zookeeper.create(path + "/test_2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path + "/test_2");
        
        Thread.sleep(3000);
        
        //同步创建持久节点 /zk-delete-demo/test_3
        zookeeper.create(path + "/test_3", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path + "/test_3");
        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (KeeperState.SyncConnected == event.getState()) {
            if (EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
            } else if (event.getType() == EventType.NodeChildrenChanged) {
                try {
                    //Watcher通知是一次性的,一旦触发一次通知后,该Watcher就失效了
                    //因此客户端需要反复注册Watcher,即程序中在process里面再次注册了Watcher,否则将无法获取之后该节点的变动通知
                    System.out.println("ReGet Child:" + zookeeper.getChildren(event.getPath(), true));
                } catch (Exception e) {
                }
            }
        }
    }
}
    

【8】Zookeeper脚本及API

2.4.2、异步获取节点列表

方法:
? void getChildren(final String path, Watch watcher, ChildrenCallback cb, Object ctx)
? void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
? void getChildren(final String path, Watch watcher, ChildrenCallback cb, Object ctx, Stat stat)
? void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx, Stat stat)
参数说明:
【8】Zookeeper脚本及API

异步获取节点列表Java代码
 
package zookeeper.javaapi;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class Async_Update_Usage implements Watcher {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zookeeper = null;

    public static void main(String[] args) throws Exception {
        String path = "/zk-get-demo";
        zookeeper = new ZooKeeper("server1:2181", 5000, new Async_Update_Usage());
        connectedSemaphore.await();
        
        zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode: " + path);
        
        zookeeper.create(path + "/test_1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path + "/test_1");
        
        //异步获取节点,使用默认Watcher
        zookeeper.getChildren(path, true, new IChildren2Callback(), null);

        zookeeper.create(path + "/test_2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("success create znode: " + path + "/test_2");

        Thread.sleep(Integer.MAX_VALUE);
    }

    public void process(WatchedEvent event) {
        if (KeeperState.SyncConnected == event.getState()) {
            if (EventType.None == event.getType() && null == event.getPath()) {
                connectedSemaphore.countDown();
            } else if (event.getType() == EventType.NodeChildrenChanged) {
                try {
                    System.out.println("ReGet Child:" + zookeeper.getChildren(event.getPath(), true));
                } catch (Exception e) {
                }
            }
        }
    }
}

class IChildren2Callback implements AsyncCallback.Children2Callback {
    public void processResult(int rc, String path, Object ctx, List children, Stat stat) {
        System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx: "
                + ctx + ", children list: " + children + ", stat: " + stat);
    }
}
    

【8】Zookeeper脚本及API

说明:stat对象中记录了一个节点的基本属性信息,例如节点创建时的事务ID、最后一次修改的事务ID和节点数据内容的长度等。有时候,我们不仅需要获取节点最新的子节点列表,还要获取这个节点最新的节点状态信息。对于这种情况,我们可以将一个旧的stat变量传入API接口,该stat变量会在方法执行过程中,被来自服务器响应的新stat对象替换;

妈呀!写着好累,剩下的介绍下方法,代码就不贴了,大同小异。

2.4.3、同步获取节点数据

同步方法:
? byte[] getData(final String path, Watcher watcher, Stat stat);
? byte[] getData(String path, boolean watch, Stat stat);
异步方法:
? void getData(String path, boolean watch, DataCallback cb, Object ctx);
? void getData(String path, boolean watch, DataCallback cb, Object ctx);

参数说明:
【8】Zookeeper脚本及API

说明:客户端在获取一个节点的数据内容时,可进行watcher注册,一旦该节点的状态发生变更,那么zookeeper服务端就会向客户端发送一NodeDataChanged(EventType.NodeDataChanged)的事件通知。注意,节点数据内容或者节点版本的变化都被看做是zookeeper节点的变化,都会触发NodeDataChanged通知。

2.5、更新节点

同步方法:
? Stat setData(final String path, byte data[], int version);
异步方法:
? void setData(final String path, byte data[], int version, StatCallback cb, Object ctx);

参数说明:
【8】Zookeeper脚本及API

说明:在调用更新操作的时候,就可以添加version这个参数,该参数可以对应于CAS原理中的“预期值”,表明是针对该数据版本进行更新的;在zookeeper中,数据版本都是从0开始计数的,所以严格的讲,“-1”并不是一个合法的数据版本,它仅仅是一个标识符,如果客户端传入的版本参数是“-1”,就是告诉zookeeper服务器,客户端需要基于数据的最新版本进行更新操作。如果对zookeeper数据节点的更新操作没有原子性要求,那么就可以使用“-1”。

2.6、检测节点是否存在

同步方法:
? Stat exists(final String path, Watcher watcher);
? Stat exists(String path, boolean watch);
异步方法:
? void exists(final String path, Watcher watcher, StatCallback cb, Object ctx);
? void exists(String path, boolean watch, StatCallback cb, Object ctx);

参数说明:
【8】Zookeeper脚本及API

说明:无论指定节点是否存在,通过调用exists()接口都可以注册watcher;exists()接口中注册的watcher,能够对节点创建、节点删除和节点数据更新事件进行监听;对于指定节点的子节点的各种变化,都不会通知客户端。

2.7、权限控制

方法:
void addAuthInfo(String scheme, byte auth[]);

参数说明:
【8】Zookeeper脚本及API

说明:对于节点的权限,当一个客户端为一个节点添加权限信息的时候,该权限信息是添加到了该节点的叶子节点上,操作这些授权节点需要权限信息;但如果操作该父节点,是不需要权限的。

参考资料

参考书籍:从Paxos到Zookeeper:分布式一致性原理与实践

  • 作者: DeepInThought
    出处: https://www.cnblogs.com/DeepInThought
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
  • 【8】Zookeeper脚本及API

    上一篇:C#采用双重循环实现A矩阵与B矩阵相加赋给C矩阵


    下一篇:对 android apk 进行重新签名操作