zookeeper03-java的Api操作(1)

Java Api

ZK java的API主要复现shell的方法,增加了同步和异步的实现方法。

ZK的客户端可以是观察者也可以是被观察者。注册监听的对象Watcher就是观察者,能接受事件回调的信息。同时我们也可以自己实现Watcher接口,重写process方法自定义事件的输出

Api主要有连接(是否异步or递归)创建节点,(是否异步or递归)删除节点更新节点数据查看节点信息判断节点存在获取子节点,关闭连接实现监听等

pom和log依赖

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.7</version>
        </dependency>
    </dependencies>
log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

创建连接抽象类

public abstract class ZkClientFactory {
    private static String hosts="hadoop102:2181,hadoop103:2181,hadoop104:2181";
    private static int sessionTimeOut=6000;
    /**
     * 同步创建节点方法
     * @return
     * @throws IOException
     * @throws KeeperException
     * @throws InterruptedException
     */
    public static ZooKeeper createNode() throws IOException, KeeperException, InterruptedException {
        /**
         * 构造一个 同步计数器 初始化的CountDownLatch 。
         * 参数:
         * count –在线程可以通过await之前必须调用countDown的次数
         * 这里填1表示在之后程序调用一次
         */
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        /**
         * hosts:  连接对象
         * sessionTimeOut: 会话超时时间
         * Watcher:  观察者对象,接收响应事件
         */
        ZooKeeper zooKeeper = new ZooKeeper(hosts, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //已建立连接
                if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
                    countDownLatch.countDown();
                }
            }
        });
        countDownLatch.await();
        System.out.println("over");
        return zooKeeper;
    }
    public void closeClient(){}
}

继承创建实例

public class ZkClient extends ZkClientFactory{
    private ZooKeeper zooKeeper=createNode();
...}

四种同步创建节点

/**
* 测试四种同步节点创建方式
*/
@Test
public void createSyncNodes() {
    try {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        //持久节点
        String app1 = zooKeeper.create("/app1", "app1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        //临时节点
        String app2 = zooKeeper.create("/app2", "app2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        //持久顺序编号节点
        String app3 = zooKeeper.create("/app3", "app3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        //临时顺序编号节点
        String app4 = zooKeeper.create("/app4", "app4".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        countDownLatch.await();
        System.out.println("sync over");
    } catch (KeeperException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }finally {
        try {
            zooKeeper.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

自定义异步CallBack实现类

class IStringCallBack implements AsyncCallback.StringCallback {
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        switch (KeeperException.Code.get(rc)){
            case CONNECTIONLOSS:
                System.out.println("无连接传输模式");
                break;
            case OK:
                System.out.println("{"+"OK"+path+ ", " + name + ", " + ctx + "}");
                CountDownLatch countDownLatch1 = (CountDownLatch) ctx;
                countDownLatch1.countDown();
                break;
            case NODEEXISTS:
                System.out.println(path+"exists");
                break;
            default:
                System.out.println("default");
                break;
        }
    }
}

四种异步创建节点

/**
* 测试四种异步节点创建方式
*/
@Test
public void createAsyncNodes() throws InterruptedException {
    final CountDownLatch countDownLatch = new CountDownLatch(1);
    zooKeeper.create("/Async",
                     "hello Async".getBytes(),
                     ZooDefs.Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT,
                     (AsyncCallback.StringCallback) new IStringCallBack(),
                     countDownLatch);
    zooKeeper.create("/Async",
                     "hello Async".getBytes(),
                     ZooDefs.Ids.OPEN_ACL_UNSAFE,
                     CreateMode.EPHEMERAL,
                     (AsyncCallback.StringCallback) new IStringCallBack(),
                     countDownLatch);
    zooKeeper.create("/Async",
                     "hello Async".getBytes(),
                     ZooDefs.Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT_SEQUENTIAL,
                     (AsyncCallback.StringCallback) new IStringCallBack(),
                     countDownLatch);
    zooKeeper.create("/Async",
                     "hello Async".getBytes(),
                     ZooDefs.Ids.OPEN_ACL_UNSAFE,
                     CreateMode.PERSISTENT_SEQUENTIAL,
                     (AsyncCallback.StringCallback) new IStringCallBack(),
                     countDownLatch);
    countDownLatch.await();
    System.out.println("Async over");
}

删除节点

    /**
     * 同步异步删除节点数据
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void delete() throws KeeperException, InterruptedException {
        //SyncDelete
        zooKeeper.delete("/Async",-1);
        //AsyncDelete
        //        zooKeeper.delete();
    }

递归删除

    /**
     * 递归删除多层节点
     * @param path 多层节点路径
     * @throws KeeperException
     * @throws InterruptedException
     */
    private void delete(String path) throws KeeperException, InterruptedException {
        List<String> children = zooKeeper.getChildren(path, false);

        for (String child : children) {
            delete(path + "/" + child);
        }
        zooKeeper.delete(path, -1);
    }

递归创建

    /**
     * 递归创建多层节点
     * @param path 多层节点路径
     * @param data 多层节点数据
     * @throws KeeperException
     * @throws InterruptedException
     */
    private void create(String path, String data) throws KeeperException, InterruptedException {
        String[] split = path.split("/");

        StringBuilder p = new StringBuilder();
        for (int i = 1; i < split.length - 1; i++) {
            p.append("/").append(split[i]);
            if(null == zooKeeper.exists(p.toString(), null)) {
                zooKeeper.create(p.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }
        zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

获取节点数据

    /**
     * 同步异步获取节点数据
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void getData() throws KeeperException, InterruptedException {
        //sync
        byte[] data1 = zooKeeper.getData("/Async", null, null);
        System.out.println(new String(data1));
        //async
        zooKeeper.getData("/Async", null, (rc, path, ctx, data, stat) -> {
            System.out.printf("/Async data:{}", new String(data));
        }, null);
    }

更新节点数据

    /**
     * 更新节点数据
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void updateNode() throws KeeperException, InterruptedException {
        zooKeeper.setData("/Async", "Async yes or no?".getBytes(), -1);
        getData();
    }

判断节点是否存在

    /**
     * 判断节点是否存在,获取节点信息
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void isExists() throws KeeperException, InterruptedException {
        Stat stat = zooKeeper.exists("/app1", false);
    }
上一篇:多线程同步器之CountDownLatch


下一篇:Java 多线程(sleep)