zookeeper学习二-ZK客户端

1 常用的shell命令

1.1 创建节点

create [-s] [-e] path data

-s 有序节点

-e 临时节点

1.2 更新节点

set data path [v]

v 版本,乐观锁机制,如果版本号不匹配则拒绝修改

1.3 删除节点

delete path [v]

1.4 查看节点

1.4.1 查看节点信息

get path

1.4.2 查看节点状态

stat path

1.4.3 查看节点列表

ls/ls2 path 

后者是前者的增强,不仅可以查看指定路径下的节点,也可以查看当前节点信息

1.5 监听器

1.5.1 节点信息监听器

get path watch

路径节点内容改变的时候会向客户端发送通知。监听器触发一次即失效

1.5.2 节点状态监听器

stat path watch

节点状态发生改变的时候向客户端发送通知

1.5.3 子节点监听器

ls/ls2 path watch

该节点增加/删除子节点后向客户端发送通知

2 java客户端

2.1 获取连接

	public void init() throws Exception {
		zooKeeper = new ZooKeeper(Constant.ZK_HOST, Constant.SESSION_TIMEOUT, event -> {
			//获取事件的状态
			Watcher.Event.KeeperState keeperState = event.getState();
			Watcher.Event.EventType eventType = event.getType();
			//如果是建立连接
			if(Watcher.Event.KeeperState.SyncConnected == keeperState){
				if(Watcher.Event.EventType.None == eventType){
					//如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
					connectedSemaphore.countDown();
					log.info("zk connect success");
				}
			}
		});

		//进行阻塞
		connectedSemaphore.await();
	}

2.2 新增节点

/**
 * 同步方式
 */
create(final String path, byte data[], List<ACL> acl, CreateMode createMode)

/**
 * 异步方式
 */
create(final String path, byte data[], List<ACL> acl, CreateMode createMode,  StringCallback cb, Object ctx)

测试代码

	@Test
	public void testCreate() throws KeeperException, InterruptedException {
		String result = zooKeeper.create(Constant.ROOT_PATH, Constant.ROOT_PATH.getBytes(),
				ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		System.out.println("create node end, result:" + result);
	}

	@Test
	public void testCreateAsync() throws KeeperException, InterruptedException {
		zooKeeper.create(Constant.ROOT_PATH, Constant.ROOT_PATH.getBytes(),
				ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
					// rc: 创建结果,0表示成功,ctx: 上下文参数
					System.out.println("create node end, rc:" + rc + ", path:" + path + ", name:" + name + ", ctx:" + ctx);
				}, "param");
		Thread.sleep(1000);
	}

2.3 修改节点

/**
 * 同步方式
 */
Stat setData(final String path, byte data[], int version)

/**
 * 异步方式
 */
void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)

测试代码

    @Test
    public void testUpdate() throws KeeperException, InterruptedException {
        // 修改节点的值,version=-1表示不作版本控制
        Stat stat = zooKeeper.setData(Constant.ROOT_PATH, "modify data root".getBytes(), -1);
        System.out.println("update data end, version:" + stat.getVersion());
    }

    @Test
    public void testUpdateAsync() throws KeeperException, InterruptedException {
        // 修改节点的值,version=-1表示不作版本控制
        zooKeeper.setData(Constant.ROOT_PATH, "modify data async root".getBytes(), -1, (rc, path, ctx, stat) ->
                System.out.println("update data end, version:" + stat.getVersion()), "param");
        Thread.sleep(1000);
    }

2.4 删除节点

/*
 * 同步删除
 */
void delete(final String path, int version);

/*
 * 异步删除
 */
void delete(final String path, int version, VoidCallback cb, Object ctx);

测试代码

    @Test
    public void testDelete() throws KeeperException, InterruptedException {
        // 删除节点(不能删除父节点),第二个参数是版本号,表示删除那个版本的数据。-1表示全部删除
        zooKeeper.delete(Constant.ROOT_PATH, -1);
    }

    @Test
    public void testDeleteAsync() throws KeeperException, InterruptedException {
        // 删除节点(不能删除父节点),第二个参数是版本号,表示删除那个版本的数据。-1表示全部删除
        zooKeeper.delete(Constant.ROOT_PATH, -1, (rc, path, ctx) -> System.out.println("delete node end, rc:" + rc), "param");
        Thread.sleep(1000);
    }

2.5 查看节点

/**
 * 同步方式
 */
byte[] getData(String path, boolean watch, Stat stat);

/**
 * 异步方式
 */
void getData(String path, boolean watch, DataCallback cb, Object ctx);

测试代码

    @Test
    public void testGet() throws KeeperException, InterruptedException {
        // 获取节点信息
        byte[] data = zooKeeper.getData(Constant.ROOT_PATH, false, null);
        System.out.println("get data:" + new String(data));
    }

    @Test
    public void testGetAsync() throws KeeperException, InterruptedException {
        // 获取节点信息
        zooKeeper.getData(Constant.ROOT_PATH, false, (rc, path, ctx, data, stat) ->
                System.out.println("get data:" + new String(data)), "param");
        Thread.sleep(1000);
    }

2.6 查看子节点

/**
 * 同步方式
 */
List<String> getChildren(String path, boolean watch);

/**
 * 异步方式
 */
void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx);

2.7 查看节点是否存在

/**
 * 同步方式
 */
Stat exists(String path, boolean watch);

/**
 * 异步方式
 */
void exists(final String path, Watcher watcher, StatCallback cb, Object ctx);

2.8 节点监控

创建监听器

public class ZKWatcher implements Watcher {

	private CountDownLatch connectedSemaphore;

	public ZKWatcher(CountDownLatch connectedSemaphore) {
		this.connectedSemaphore = connectedSemaphore;
	}

	@Override
	public void process(WatchedEvent event) {
		if (event == null) {
			return;
		}
		// 连接状态
		KeeperState keeperState = event.getState();
		// 事件类型
		EventType eventType = event.getType();
		// 受影响的path
		String path = event.getPath();
		System.out.println("【zk event】>>>" + event);
		if (KeeperState.SyncConnected == keeperState) {
			// 成功连接上ZK服务器
			if (EventType.None == eventType) {
				System.out.println("connect server");
				connectedSemaphore.countDown();
			} else if (EventType.NodeCreated == eventType) {
				System.out.println("create node:" + path);
			} else if (EventType.NodeDataChanged == eventType) {
				System.out.println("update node:" + path);
			} else if (EventType.NodeChildrenChanged == eventType) {
				System.out.println("update node children:" + path);
			} else if (EventType.NodeDeleted == eventType) {
				System.out.println("delete node:" + path);
			}
		} else if (KeeperState.Disconnected == keeperState) {
			System.out.println("disconnected server");
		} else if (KeeperState.AuthFailed == keeperState) {
			System.out.println("auth failed");
		} 
		else if (KeeperState.Expired == keeperState) {
			System.out.println("session expired");
		}
	}
}

监听器测试

    @Test
    public void testWatch() throws KeeperException, InterruptedException {
        testDelete();
        // 触发监控
        zooKeeper.exists(Constant.ROOT_PATH, true);
        zooKeeper.create(Constant.ROOT_PATH, Constant.ROOT_PATH.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        // 触发监控
        zooKeeper.exists(Constant.ROOT_PATH, true);
        testUpdate();
        // 触发监控
        zooKeeper.exists(Constant.ROOT_PATH, true);
        testDelete();
    }

输出

【zk event】>>>WatchedEvent state:SyncConnected type:None path:null
connect server
【zk event】>>>WatchedEvent state:SyncConnected type:NodeCreated path:/testRoot
create node:/testRoot
update data end, version:1
【zk event】>>>WatchedEvent state:SyncConnected type:NodeDataChanged path:/testRoot
update node:/testRoot
【zk event】>>>WatchedEvent state:SyncConnected type:NodeDeleted path:/testRoot
delete node:/testRoot

上一篇:ElasticSearch使用篇 - 复合查询


下一篇:使用灵动微MM32F013x函数库,出现编译警告1296-D:extended constant initialiser used case u32