import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
public class ZkTest {
private RetryPolicy retryPolicy;
private CuratorFramework client;
public ZkTest() {
// 重试策略
this.retryPolicy = new ExponentialBackoffRetry(1000,3);
//创建连接
//注意sessionTimeoutMs 会话时间不能够设置太短,否则会报错
this.client = CuratorFrameworkFactory.newClient(
"192.168.88.3:2181,192.168.88.4:2181,192.168.88.5:2181"
,50000
,1000
, this.retryPolicy
);
System.out.println("正在连接");
this.client.start();
System.out.println("连接成功");
}
/*
* 创建节点 路径 + 内容
*/
@Test
public void createPointFun1() throws Exception {
this.client.create().forPath("/ZkTest/fun1","测试".getBytes());
}
/*
* 创建节点 路径
* 初始内容为空
*/
@Test
public void createPointFun2() throws Exception {
this.client.create().forPath("/ZkTest/fun2");
}
/*
* 创建节点
* 自动递归创建父节点
*/
@Test
public void createPointFun3() throws Exception {
this.client.create().creatingParentsIfNeeded().forPath("/ZkTest2/fun3");
}
/*
* 创建临时节点
*/
@Test
public void createPointFun4() throws Exception {
this.client.create().withMode(CreateMode.EPHEMERAL).forPath("/ZkTest/fun4","测试创建临时节点".getBytes());
//等待10秒查看结果
Thread.sleep(10000);
}
/*
* 删除节点
*/
@Test
public void deletePointFun1() throws Exception {
this.client.delete().forPath("/ZkTest2/fun3");
}
/*
* 删除节点并递归删除其子节点
*/
@Test
public void deletePointFun2() throws Exception {
this.client.delete().deletingChildrenIfNeeded().forPath("/ZkTest2");
}
/*
* 强制保证删除一个节点
*/
@Test
public void deletePointFun3() throws Exception {
this.client.delete().guaranteed().forPath("/ZkTest2");
}
/*
* 读取数据
*/
@Test
public void getDataFun1() throws Exception {
byte[] result = this.client.getData().forPath("/ZkTest/fun1");
String s = new String(result);
System.out.println(s);
}
/*
* 包含状态的查询
*/
@Test
public void getDataFun2() throws Exception {
Stat stat = new Stat();
byte[] result = this.client.getData().storingStatIn(stat).forPath("/ZkTest/fun1");
System.out.println(new String(result));
System.out.println(stat.toString());
}
/*
* 更新数据
*/
@Test
public void setDataFun3() throws Exception {
this.client.setData().forPath("/ZkTest/fun1","测试 更新内容".getBytes());
}
/*
* 监听节点
*/
@Test
public void watchFun1() throws Exception {
NodeCache nodecache = new NodeCache(this.client,"/ZkTest/fun1");
nodecache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
String path = nodecache.getPath();
System.out.println("节点 " + path + "检测到操作");
}
});
nodecache.start(true);
System.out.println("监听器开启");
Thread.sleep(60000);
}
}