Java Api
ZK java的API主要复现shell的方法,增加了同步和异步的实现方法。
ZK的客户端可以是观察者也可以是被观察者。注册监听的对象Watcher就是观察者,能接受事件回调的信息。同时我们也可以自己实现
Watcher
接口,重写process
方法自定义事件的输出
Api主要有连接
,(是否异步or递归)创建节点
,(是否异步or递归)删除节点
,更新节点数据
,查看节点信息
,判断节点存在
,获取子节点
,关闭连接
,实现监听等
pom和log依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
创建连接抽象类
public abstract class ZkClientFactory {
private static String hosts="hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessionTimeOut=6000;
/**
* 同步创建节点方法
* @return
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
public static ZooKeeper createNode() throws IOException, KeeperException, InterruptedException {
/**
* 构造一个 同步计数器 初始化的CountDownLatch 。
* 参数:
* count –在线程可以通过await之前必须调用countDown的次数
* 这里填1表示在之后程序调用一次
*/
final CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* hosts: 连接对象
* sessionTimeOut: 会话超时时间
* Watcher: 观察者对象,接收响应事件
*/
ZooKeeper zooKeeper = new ZooKeeper(hosts, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//已建立连接
if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
countDownLatch.countDown();
}
}
});
countDownLatch.await();
System.out.println("over");
return zooKeeper;
}
public void closeClient(){}
}
继承创建实例
public class ZkClient extends ZkClientFactory{
private ZooKeeper zooKeeper=createNode();
...}
四种同步创建节点
/**
* 测试四种同步节点创建方式
*/
@Test
public void createSyncNodes() {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
//持久节点
String app1 = zooKeeper.create("/app1", "app1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//临时节点
String app2 = zooKeeper.create("/app2", "app2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
//持久顺序编号节点
String app3 = zooKeeper.create("/app3", "app3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
//临时顺序编号节点
String app4 = zooKeeper.create("/app4", "app4".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
countDownLatch.await();
System.out.println("sync over");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
自定义异步CallBack实现类
class IStringCallBack implements AsyncCallback.StringCallback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
switch (KeeperException.Code.get(rc)){
case CONNECTIONLOSS:
System.out.println("无连接传输模式");
break;
case OK:
System.out.println("{"+"OK"+path+ ", " + name + ", " + ctx + "}");
CountDownLatch countDownLatch1 = (CountDownLatch) ctx;
countDownLatch1.countDown();
break;
case NODEEXISTS:
System.out.println(path+"exists");
break;
default:
System.out.println("default");
break;
}
}
}
四种异步创建节点
/**
* 测试四种异步节点创建方式
*/
@Test
public void createAsyncNodes() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper.create("/Async",
"hello Async".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
(AsyncCallback.StringCallback) new IStringCallBack(),
countDownLatch);
zooKeeper.create("/Async",
"hello Async".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL,
(AsyncCallback.StringCallback) new IStringCallBack(),
countDownLatch);
zooKeeper.create("/Async",
"hello Async".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL,
(AsyncCallback.StringCallback) new IStringCallBack(),
countDownLatch);
zooKeeper.create("/Async",
"hello Async".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL,
(AsyncCallback.StringCallback) new IStringCallBack(),
countDownLatch);
countDownLatch.await();
System.out.println("Async over");
}
删除节点
/**
* 同步异步删除节点数据
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void delete() throws KeeperException, InterruptedException {
//SyncDelete
zooKeeper.delete("/Async",-1);
//AsyncDelete
// zooKeeper.delete();
}
递归删除
/**
* 递归删除多层节点
* @param path 多层节点路径
* @throws KeeperException
* @throws InterruptedException
*/
private void delete(String path) throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren(path, false);
for (String child : children) {
delete(path + "/" + child);
}
zooKeeper.delete(path, -1);
}
递归创建
/**
* 递归创建多层节点
* @param path 多层节点路径
* @param data 多层节点数据
* @throws KeeperException
* @throws InterruptedException
*/
private void create(String path, String data) throws KeeperException, InterruptedException {
String[] split = path.split("/");
StringBuilder p = new StringBuilder();
for (int i = 1; i < split.length - 1; i++) {
p.append("/").append(split[i]);
if(null == zooKeeper.exists(p.toString(), null)) {
zooKeeper.create(p.toString(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
获取节点数据
/**
* 同步异步获取节点数据
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void getData() throws KeeperException, InterruptedException {
//sync
byte[] data1 = zooKeeper.getData("/Async", null, null);
System.out.println(new String(data1));
//async
zooKeeper.getData("/Async", null, (rc, path, ctx, data, stat) -> {
System.out.printf("/Async data:{}", new String(data));
}, null);
}
更新节点数据
/**
* 更新节点数据
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void updateNode() throws KeeperException, InterruptedException {
zooKeeper.setData("/Async", "Async yes or no?".getBytes(), -1);
getData();
}
判断节点是否存在
/**
* 判断节点是否存在,获取节点信息
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void isExists() throws KeeperException, InterruptedException {
Stat stat = zooKeeper.exists("/app1", false);
}