使用zookeeper原生API实现一些复杂的东西比较麻烦。所以,出现了两款比较好的开源客户端,对zookeeper的原生API进行了包装:zkClient和curator。后者是Netflix出版的,必属精品,也是最好用的zk的开源客户端。
一 curator基本API使用
引入依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
该依赖引入后,默认引入的zookeeper版本是3.4.8。
注意:不要引入>=3.0.0的curator-framework,默认引入的zookeeper版本是3.5.x(该版本还不稳定),目前测试起来还是有点问题的。
完整代码:
package com.hulk.curator; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat; import java.util.concurrent.Executors; public class CuratorTest {
private static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("10.211.55.4:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception {
/**
* 创建会话
*/
client.start(); /**
* 创建节点
* 注意:
* 1 除非指明创建节点的类型,默认是持久节点
* 2 ZooKeeper规定:所有非叶子节点都是持久节点,所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点
*/
client.create().forPath("/China");//创建一个初始内容为空的节点
client.create().forPath("/America", "zhangsan".getBytes());
client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//创建一个初始内容为空的临时节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//递归创建,/Russia是持久节点 /**
* 异步创建节点
* 注意:如果自己指定了线程池,那么相应的操作就会在线程池中执行,如果没有指定,那么就会使用Zookeeper的EventThread线程对事件进行串行处理
*/
client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
+ ",type:" + event.getType());
}
}, Executors.newFixedThreadPool(10)).forPath("/async-curator-my"); client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
+ ",type:" + event.getType());
}
}).forPath("/async-curator-zookeeper"); /**
* 获取节点内容
*/
byte[] data = client.getData().forPath("/America");
System.out.println(new String(data));
byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //传入一个旧的stat变量,来存储服务端返回的最新的节点状态信息
System.out.println(new String(data2));
/**
* 更新数据
*/
Stat stat = client.setData().forPath("/America");
client.setData().withVersion(4).forPath("/America", "lisi".getBytes()); /**
* 删除节点
*/
client.delete().forPath("/China");//只能删除叶子节点
client.delete().deletingChildrenIfNeeded().forPath("/Russia");//删除一个节点,并递归删除其所有子节点
client.delete().withVersion(5).forPath("/America");//强制指定版本进行删除
client.delete().guaranteed().forPath("/America");//注意:由于一些网络原因,上述的删除操作有可能失败,使用guaranteed(),如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止 Thread.sleep(Integer.MAX_VALUE);
}
}
1 创建会话
curator创建会话有两种方式,推荐流式API。
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("10.211.55.4:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
参数:
- connectString:zk的server地址,多个server之间使用英文逗号分隔开
- connectionTimeoutMs:连接超时时间,如上是30s,默认是15s
- sessionTimeoutMs:会话超时时间,如上是50s,默认是60s
- retryPolicy:失败重试策略
- ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
- baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,
- 计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
- maxRetries:最大重试次数
- maxSleepMs:最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间
- baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,
- 其他,查看org.apache.curator.RetryPolicy接口的实现类
- ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
此时会话还没创建,使用如下代码创建会话:
client.start();
start()会阻塞到会话创建成功为止。
2 创建节点
2.1 同步创建
client.create().forPath("/China");//创建一个初始内容为空的节点
client.create().forPath("/America", "zhangsan".getBytes());
client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//创建一个初始内容为空的临时节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//递归创建,/Russia是持久节点
注意:
- 除非指明创建节点的类型,默认是持久节点
- ZooKeeper规定:所有非叶子节点都是持久节点,所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点
- creatingParentsIfNeeded():可以实现递归创建
2.2 异步创建
client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
+ ",type:" + event.getType());
}
}, Executors.newFixedThreadPool(10)).forPath("/async-curator-my"); client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
+ ",type:" + event.getType());
}
}).forPath("/async-curator-zookeeper");
注意:
- 在curator中所有异步操作,都使用org.apache.curator.framework.api.BackgroundCallback接口的实现类完成
- 如果在BackgroundCallback中自己指定了线程池,那么相应的操作就会在线程池中执行,如果没有指定,那么就会使用Zookeeper的EventThread线程对事件进行串行处理,所以上述的两个输出分别如下:
当前线程:pool-3-thread-1,code:0,type:CREATE
当前线程:main-EventThread,code:0,type:CREATE
3 获取节点内容
byte[] data = client.getData().forPath("/America");
System.out.println(new String(data));
byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //传入一个旧的stat变量,来存储服务端返回的最新的节点状态信息
System.out.println(new String(data2));
4 获取节点子节点列表
List<String> children = client.getChildren().forPath("/Russia");
5 更新数据
Stat stat = client.setData().forPath("/America");
client.setData().withVersion(4).forPath("/America", "lisi".getBytes());
注意:
- version版本号还是为了实现CAS并发处理,也会强制某个线程必须更新相应的版本的数据
6 删除节点
client.delete().forPath("/China");//只能删除叶子节点
client.delete().deletingChildrenIfNeeded().forPath("/Russia");//删除一个节点,并递归删除其所有子节点
client.delete().withVersion(5).forPath("/America");//强制指定版本进行删除
client.delete().guaranteed().forPath("/America");
注意:
- deletingChildrenIfNeeded()实现级联删除
- guaranteed()由于一些网络原因,上述的删除操作有可能失败,使用guaranteed(),如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止
二 curator实现事件监听
引入两个依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
给出全部代码:
package com.hulk.curator; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry; /**
* 事件监听器
*/
public class CuratorWatcherTest {
private static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("10.211.55.4:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build(); public static void main(String[] args) throws Exception {
/**
* 创建会话
*/
client.start();
client.create().creatingParentsIfNeeded().forPath("/book/computer","java".getBytes());
/**
* 监听指定节点本身的变化,包括节点本身的创建和节点本身数据的变化
*/
NodeCache nodeCache = new NodeCache(client,"/book/computer");
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("新的节点数据:" + new String(nodeCache.getCurrentData().getData()));
}
});
nodeCache.start(true); client.setData().forPath("/book/computer","c++".getBytes());
/**
* 监听子节点变化情况
* 1 新增子节点
* 2 删除子节点
* 3 子节点数据变更
*/
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/book13",true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()){
case CHILD_ADDED:
System.out.println("新增子节点:" + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("子节点数据变化:" + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("删除子节点:" + event.getData().getPath());
break;
default:
break;
}
}
});
pathChildrenCache.start(); client.create().forPath("/book13"); client.create().forPath("/book13/car", "bmw".getBytes()); client.setData().forPath("/book13/car", "audi".getBytes()); client.delete().forPath("/book13/car");
}
}
curator的事件监听分为:
- NodeCache:对节点本身的监听
- 监听节点本身的创建
- 监听节点本身的数据的变化
- PathChildrenCache:对节点的子节点的监听
- 监听新增子节点
- 监听删除子节点
- 监听子节点数据变化
注意:
- PathChildrenCache只会监听指定节点的一级子节点,不会监听节点本身(例如:“/book13”),也不会监听子节点的子节点(例如,“/book13/car/color”)
三 zkui
zk的操作我们一般可以登上zk所在的机器,然后执行“sh zkCli.sh”,之后执行一些命令,但是由于这样始终效率低下,这里推荐一款比较好用的zk的ui界面:zkui。
假设我们要在10.211.55.5机器上安装该程序。
1 下载打包
git clone https://github.com/DeemOpen/zkui.git
cd zkui/
mvn clean install
通过上述的操作,在zkui/target目录下我们会生成一个fatjar:zkui-2.0-SNAPSHOT-jar-with-dependencies.jar,在启动这个jar之前先要进行相关配置。
2 配置zkui
cp config.cfg target/
vi config.cfg
修改内容如下,其他不变:
zkServer=10.211.55.5:2181
注意:需要将配置文件config.cfg与fatjar放在同一个目录下。
3 启动zkui
之后进入target/目录下,执行:
nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar &
4 浏览器访问
浏览器访问“http://10.211.55.5:9090”,之后在登录页面输入用户名密码:admin/manager进行登录。(可以去config.cfg进行配置)