1 常用的shell命令
1.1 创建节点
create [-s] [-e] path data
-s 有序节点
-e 临时节点
1.2 更新节点
set data path [v]
v 版本,乐观锁机制,如果版本号不匹配则拒绝修改
1.3 删除节点
delete path [v]
1.4 查看节点
1.4.1 查看节点信息
get path
1.4.2 查看节点状态
stat path
1.4.3 查看节点列表
ls/ls2 path
后者是前者的增强,不仅可以查看指定路径下的节点,也可以查看当前节点信息
1.5 监听器
1.5.1 节点信息监听器
get path watch
路径节点内容改变的时候会向客户端发送通知。监听器触发一次即失效
1.5.2 节点状态监听器
stat path watch
节点状态发生改变的时候向客户端发送通知
1.5.3 子节点监听器
ls/ls2 path watch
该节点增加/删除子节点后向客户端发送通知
2 java客户端
2.1 获取连接
public void init() throws Exception {
zooKeeper = new ZooKeeper(Constant.ZK_HOST, Constant.SESSION_TIMEOUT, event -> {
//获取事件的状态
Watcher.Event.KeeperState keeperState = event.getState();
Watcher.Event.EventType eventType = event.getType();
//如果是建立连接
if(Watcher.Event.KeeperState.SyncConnected == keeperState){
if(Watcher.Event.EventType.None == eventType){
//如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
connectedSemaphore.countDown();
log.info("zk connect success");
}
}
});
//进行阻塞
connectedSemaphore.await();
}
2.2 新增节点
/**
* 同步方式
*/
create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
/**
* 异步方式
*/
create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
测试代码
@Test
public void testCreate() throws KeeperException, InterruptedException {
String result = zooKeeper.create(Constant.ROOT_PATH, Constant.ROOT_PATH.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("create node end, result:" + result);
}
@Test
public void testCreateAsync() throws KeeperException, InterruptedException {
zooKeeper.create(Constant.ROOT_PATH, Constant.ROOT_PATH.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
// rc: 创建结果,0表示成功,ctx: 上下文参数
System.out.println("create node end, rc:" + rc + ", path:" + path + ", name:" + name + ", ctx:" + ctx);
}, "param");
Thread.sleep(1000);
}
2.3 修改节点
/**
* 同步方式
*/
Stat setData(final String path, byte data[], int version)
/**
* 异步方式
*/
void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
测试代码
@Test
public void testUpdate() throws KeeperException, InterruptedException {
// 修改节点的值,version=-1表示不作版本控制
Stat stat = zooKeeper.setData(Constant.ROOT_PATH, "modify data root".getBytes(), -1);
System.out.println("update data end, version:" + stat.getVersion());
}
@Test
public void testUpdateAsync() throws KeeperException, InterruptedException {
// 修改节点的值,version=-1表示不作版本控制
zooKeeper.setData(Constant.ROOT_PATH, "modify data async root".getBytes(), -1, (rc, path, ctx, stat) ->
System.out.println("update data end, version:" + stat.getVersion()), "param");
Thread.sleep(1000);
}
2.4 删除节点
/*
* 同步删除
*/
void delete(final String path, int version);
/*
* 异步删除
*/
void delete(final String path, int version, VoidCallback cb, Object ctx);
测试代码
@Test
public void testDelete() throws KeeperException, InterruptedException {
// 删除节点(不能删除父节点),第二个参数是版本号,表示删除那个版本的数据。-1表示全部删除
zooKeeper.delete(Constant.ROOT_PATH, -1);
}
@Test
public void testDeleteAsync() throws KeeperException, InterruptedException {
// 删除节点(不能删除父节点),第二个参数是版本号,表示删除那个版本的数据。-1表示全部删除
zooKeeper.delete(Constant.ROOT_PATH, -1, (rc, path, ctx) -> System.out.println("delete node end, rc:" + rc), "param");
Thread.sleep(1000);
}
2.5 查看节点
/**
* 同步方式
*/
byte[] getData(String path, boolean watch, Stat stat);
/**
* 异步方式
*/
void getData(String path, boolean watch, DataCallback cb, Object ctx);
测试代码
@Test
public void testGet() throws KeeperException, InterruptedException {
// 获取节点信息
byte[] data = zooKeeper.getData(Constant.ROOT_PATH, false, null);
System.out.println("get data:" + new String(data));
}
@Test
public void testGetAsync() throws KeeperException, InterruptedException {
// 获取节点信息
zooKeeper.getData(Constant.ROOT_PATH, false, (rc, path, ctx, data, stat) ->
System.out.println("get data:" + new String(data)), "param");
Thread.sleep(1000);
}
2.6 查看子节点
/**
* 同步方式
*/
List<String> getChildren(String path, boolean watch);
/**
* 异步方式
*/
void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx);
2.7 查看节点是否存在
/**
* 同步方式
*/
Stat exists(String path, boolean watch);
/**
* 异步方式
*/
void exists(final String path, Watcher watcher, StatCallback cb, Object ctx);
2.8 节点监控
创建监听器
public class ZKWatcher implements Watcher {
private CountDownLatch connectedSemaphore;
public ZKWatcher(CountDownLatch connectedSemaphore) {
this.connectedSemaphore = connectedSemaphore;
}
@Override
public void process(WatchedEvent event) {
if (event == null) {
return;
}
// 连接状态
KeeperState keeperState = event.getState();
// 事件类型
EventType eventType = event.getType();
// 受影响的path
String path = event.getPath();
System.out.println("【zk event】>>>" + event);
if (KeeperState.SyncConnected == keeperState) {
// 成功连接上ZK服务器
if (EventType.None == eventType) {
System.out.println("connect server");
connectedSemaphore.countDown();
} else if (EventType.NodeCreated == eventType) {
System.out.println("create node:" + path);
} else if (EventType.NodeDataChanged == eventType) {
System.out.println("update node:" + path);
} else if (EventType.NodeChildrenChanged == eventType) {
System.out.println("update node children:" + path);
} else if (EventType.NodeDeleted == eventType) {
System.out.println("delete node:" + path);
}
} else if (KeeperState.Disconnected == keeperState) {
System.out.println("disconnected server");
} else if (KeeperState.AuthFailed == keeperState) {
System.out.println("auth failed");
}
else if (KeeperState.Expired == keeperState) {
System.out.println("session expired");
}
}
}
监听器测试
@Test
public void testWatch() throws KeeperException, InterruptedException {
testDelete();
// 触发监控
zooKeeper.exists(Constant.ROOT_PATH, true);
zooKeeper.create(Constant.ROOT_PATH, Constant.ROOT_PATH.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 触发监控
zooKeeper.exists(Constant.ROOT_PATH, true);
testUpdate();
// 触发监控
zooKeeper.exists(Constant.ROOT_PATH, true);
testDelete();
}
输出
【zk event】>>>WatchedEvent state:SyncConnected type:None path:null
connect server
【zk event】>>>WatchedEvent state:SyncConnected type:NodeCreated path:/testRoot
create node:/testRoot
update data end, version:1
【zk event】>>>WatchedEvent state:SyncConnected type:NodeDataChanged path:/testRoot
update node:/testRoot
【zk event】>>>WatchedEvent state:SyncConnected type:NodeDeleted path:/testRoot
delete node:/testRoot