1.介绍
What is Curator?
Curator n ˈkyoor͝ˌātər: a keeper or custodian of a museum or other collection - A ZooKeeper Keeper.
Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.
2.连接对象创建
maven依赖:
<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.6.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.7</version> <scope>test</scope> </dependency> </dependencies>
连接对象创建的代码
public static void main(String[] args) { CuratorFramework ct = CuratorFrameworkFactory.builder() //ip:端口 .connectString("192.168.10.132:2181,192.168.10.133:2181,192.168.10.135:2181")//连接集群 //超时时间 .sessionTimeoutMs(5000) //连接断开5秒后,会进行一次重连 .retryPolicy(new RetryOneTime(5000)) //命名空间,该命名空间作为父节点 .namespace("ct").build(); //打开连接 ct.start(); //是否连接成功 System.out.println(ct.isStarted()); //关闭连接 ct.close(); }
重连策略:
RetryOneTime(int sleepMsBetweenRetry):只重连一次
RetryNTimes(int n, int sleepMsBetweenRetries):重连n次
RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries):每sleepMsBetweenRetries毫秒重连一次,总等待时间超过maxElapsedTimeMs毫秒后停止重连
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries):重连maxRetries次,重连间隔基于baseSleepTimeMs计算
计算公式如下:baseSleepTimeMs*Math.max(1,random.nextInt(1<<(retryCount+1)))。
3.创建节点
ct.create() //节点类型 .withMode(CreateMode.PERSISTENT) //节点权限 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //节点路径和数据 .forPath("/node1","node1".getBytes());
支持递归创建节点
ct.create() //父节点不存在则创建 .creatingParentsIfNeeded() //节点类型 .withMode(CreateMode.PERSISTENT) //节点权限 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //节点路径和数据 .forPath("/node2/node2","node2".getBytes());
异步方式创建(删除、更新、查询省略):
ct.create() //父节点不存在则创建 .creatingParentsIfNeeded() //节点类型 .withMode(CreateMode.PERSISTENT) //节点权限 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //异步回调 .inBackground(new BackgroundCallback() { public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println(event.getPath()+":"+event.getType()); } }) //节点路径和数据 .forPath("node3","node3".getBytes());
4.更新节点
ct.setData().withVersion(-1).forPath("/node3","node33".getBytes());
5.删除节点
ct.delete().withVersion(-1).forPath("/node3");
删除包含子节点的节点
//递归删除 ct.delete() .deletingChildrenIfNeeded() .withVersion(-1).forPath("/node2");
6.查询节点
//查询 byte[] bytes = ct.getData().forPath("/node1"); System.out.println(new String(bytes));
读取属性:
Stat stat = new Stat(); byte[] bytes = ct.getData() .storingStatIn(stat).forPath("/node1"); System.out.println(new String(bytes)); System.out.println(stat);
7.查询子节点数据
// /ct/node1/node2 List<String> list = ct.getChildren().forPath("/node1"); for (String s : list) { System.out.println(s); }
8.判断节点是否存在
//如果节点不存在,则返回值为null Stat stat = ct.checkExists().forPath("/node11");