1. 简介
curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。
提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、 共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。
1.1 原生zookeeperAPI的不足
连接对象异步创建,需要开发人员自行编码等待
连接没有自动重连超时机制
watcher一次注册生效一次
不支持递归创建树形节点
1.2 curator特点
解决session会话超时重连
watcher反复注册
简化开发api
遵循Fluent风格的API
提供了分布式锁服务、共享计数器、缓存机制等机制
1.3 依赖
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<type>jar</type>
</dependency>
</dependencies>
2. 连接与关闭
采用了工厂设计模式和建造者设计模式。通过输入一些连接信息,可以获取到一个连接Zookeeper服务器的客户端。 public static void main(String[] args) {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); // 表示间隔1s,最多尝试重连3次
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.233.133:2181,192.168.233.131:2181,192.168.233.132:2181")
.sessionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("create")
.build();
client.start(); // 开启客户端
log.info(client.isStarted());
client.close(); // 关闭客户端
}
connectString
:用于设置地址及端口号;sessionTimeoutMs
:用于设置超时时间;retryPolicy
:用于设置重连策略namespace
:表示根节点路径,可以没有
2.1 测试模版
因此,可以写一个测试模板,在开始之前打开客户端,在结束之后关闭客户端。public class CreateTest {
private final static Logger log = Logger.getLogger(ConnectTest.class);
private String connectString = "192.168.233.133:2181,192.168.233.131:2181,192.168.233.132:2181";
CuratorFramework client;
Integer sessionTimeoutMs = 5000;
Integer baseSleepTimeMs = 1000;
Integer maxRetries = 3;
String namespace = "create";
@Before
public void before() {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
client = CuratorFrameworkFactory
.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.retryPolicy(retryPolicy)
.namespace(namespace)
.build();
client.start();
log.info("客户端已开启");
}
@After
public void after() {
client.close();
log.info("客户端已关闭");
}
}
3. 新增节点
3.1 案例一:简单创建
@Test
public void testCreate() throws Exception {
client.create()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/node", "data".getBytes());
log.info("create结束");
}
3.2 案例二:自定义权限创建
@Test
public void testCreate2() throws Exception {
Id ip = new Id("ip", "192.168.233.133");
List<ACL> acl = Collections.singletonList(new ACL(ZooDefs.Perms.ALL, ip));
client.create()
.withMode(CreateMode.PERSISTENT)
.withACL(acl)
.forPath("/node1", "data".getBytes());
log.info("create结束");
}
3.3 案例三:递归创建节点
3.4 案例四:异步方法创建节点
在此说明一下,方法接收到的第一个参数curatorFramework
实际上就是客户端;curatorFramework
保存了一些查询的结果。 @Test
public void testCreate4() throws Exception {
// 异步方式创建节点
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorFramework == client); // true
log.info("getResultCode(): " + curatorEvent.getResultCode()); // 0表示创建成功
log.info("getType(): " + curatorEvent.getType().toString()); // 获取操作类型 CREATE
log.info("getPath(): " + curatorEvent.getPath()); // 获取节点路径
}
})
.forPath("/node2/node38", "data".getBytes());
log.info("create结束");
}
4. 更新节点
4.1 案例一:更新一个节点
@Test
public void testSet() throws Exception {
client.setData()
.forPath("/node", "set".getBytes());
log.info("设置完成");
}
4.2 案例二:带版本更新一个节点
@Test
public void testSet2() throws Exception {
client.setData()
.withVersion(1) // 带有版本号
.forPath("/node", "12".getBytes());
log.info("设置完成");
}
4.3 案例三:带回调方法更新一个节点
@Test
public void testSet3() throws Exception {
client.setData()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getResultCode()); // 0
log.info(curatorEvent.getType()); // SET_DATA
log.info(curatorEvent.getPath()); // /node
log.info(curatorEvent.getStat().toString()); // 21474836489,21474836542,1620040487612,1620042328488,4,0,0,0,3,0,21474836489
}
})
.forPath("/node", "432".getBytes());
log.info("设置完成");
}
5. 删除节点
5.1 案例一:删除一个节点
@Test
public void testDelete() throws Exception {
client.delete()
.forPath("/node");
log.info("删除结束");
}
5.2 案例二:递归删除节点
@Test
public void testDelete1() throws Exception {
client.delete()
.deletingChildrenIfNeeded()
.forPath("/node2");
log.info("删除结束");
}
5.3 案例三:带回调方法删除一个节点
@Test
public void testDelete3() throws Exception {
client.delete()
.deletingChildrenIfNeeded()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getType()); // DELETE
log.info(curatorEvent.getPath()); // /node1
}
})
.forPath("/node1");
log.info("删除结束");
}
6. 查看节点
6.1 案例一:查看一个节点
@Test
public void testGet() throws Exception {
byte[] data = client.getData()
.forPath("/node2");
log.info(new String(data));
}
6.2 案例二:查看节点的值和状态
@Test
public void testGet2() throws Exception {
Stat stat = new Stat();
byte[] data = client.getData()
.storingStatIn(stat)
.forPath("/node2");
log.info(new String(data));
log.info(stat.getVersion());
}
6.3 案例三:带回调方法查看一个节点
@Test
public void testGet3() throws Exception {
client.getData()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(new String(curatorEvent.getData())); // 4134134
log.info(curatorEvent.getStat().toString()); // 21474836566,21474836566,1620042863998,1620042863998,0,0,0,0,7,0,21474836566
log.info(curatorEvent.getType().toString()); // GET_DATA
}
})
.forPath("/node2");
}
7. 查看子节点
7.1 案例一:查看一个节点的所有子节点
@Test
public void testChildren() throws Exception {
List<String> children = client.getChildren()
.forPath("/");
log.info(children.toString());
}
7.2 案例二:带回调方法查看一个节点的所有子节点
@Test
public void testChildren2() throws Exception {
client.getChildren()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getPath()); // /
log.info(curatorEvent.getType().toString()); // CHILDREN
log.info(curatorEvent.getChildren().toString()); // [node, node2, node3]
}
})
.forPath("/");
}
8. 检查节点是否存在
8.1 案例一:检查一个节点是否存在
@Test
public void testExists() throws Exception {
Stat stat = client.checkExists()
.forPath("/node");
if (stat != null)
log.info(stat.toString());
else
log.info("节点不存在");
}
8.2 案例二:带回调方法检查一个节点是否存在
@Test
public void testExists1() throws Exception {
client.checkExists()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
log.info(curatorEvent.getType().toString()); // EXISTS
Stat stat = curatorEvent.getStat();
if (stat != null)
log.info(stat.toString()); // 21474836548,21474836548,1620042534164,1620042534164,0,0,0,0,0,0,21474836548
else
log.info("节点不存在");
}
})
.forPath("/node");
}
9. Watcher
curator提供了两种Watcher(Cache)来监听结点的变化
NodeCache
: 只是监听某一个特定的节点,监听节点的新增、修改数据、删除 。(子节点的新增、删除、修改均不会管)
PathChildrenCache
: 监控一个ZNode的子节点. 当一个子节点增加、修改数据、删除 时, PathCache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态
这个监视器可以多次使用
9.1 案例一:NodeCache
@Test
public void testWatch() throws Exception {
// 观察节点的变化
NodeCache nodeCache = new NodeCache(client, "/node22");
nodeCache.start();
nodeCache.getListenable()
.addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData currentData = nodeCache.getCurrentData();
if (currentData != null) {
log.info(currentData.getPath());
log.info(new String(currentData.getData()));
} else {
log.info("删除了某个节点");
}
}
});
Thread.sleep(60000); //睡30s
nodeCache.close();
}
9.2 案例二:PathChildrenCache
@Test
public void testWatch2() throws Exception {
// 观察节点的变化
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node22", true);
pathChildrenCache.start();
pathChildrenCache.getListenable()
.addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
log.info(pathChildrenCacheEvent.getType()); // CHILD_ADDED, CHILD_REMOVED, CHILD_UPDATED
log.info(pathChildrenCacheEvent.getData().toString()); // ChildData{path='/node22/child', stat=21474836630,21474836630,1620044984259,1620044984259,0,0,0,0,2,0,21474836630, data=[50, 50]}
log.info(new String(pathChildrenCacheEvent.getData().getData()));
log.info(pathChildrenCacheEvent.getData().getPath()); // ChildData{path='/node22/child'
log.info(pathChildrenCacheEvent.getData().getStat().toString()); // 21474836630,21474836630,1620044984259,1620044984259,0,0,0,0,2,0,21474836630
}
});
Thread.sleep(60000); //睡30s
pathChildrenCache.close();
}
10. 事务
10.1 案例一:使用事务创建两个节点
@Test
public void testTransaction() throws Exception {
client.inTransaction()
.create().forPath("/node100", "100".getBytes())
.and() // 桥
.create().forPath("/node101", "101".getBytes())
.and() // 桥
.commit(); // 提交
log.info("提交成功");
}
11. 分布式锁
11.1 使用分布式可重入排它锁
11.2 使用读写锁
读锁和写锁是两种类型的锁,但是如果两者争抢同一个锁节点的时候,也会发生一些有趣的事情。
当读锁进入之后,其他的读锁也可以进入;但是写锁只能在外面等;
当写锁进入之后,读写锁都不能进入。 /**
* 读锁在运行的时候,写锁不允许工作,在阻塞。
* 读锁运行的时候,允许另一个读锁也进入读数据
* 写锁运行时,其他读写锁都不能进入
* @throws Exception
*/
@Test
public void testReadLock() throws Exception {
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock");
InterProcessLock readLock = interProcessReadWriteLock.readLock();
log.info("等待获取读锁对象");
readLock.acquire();
for (int i = 0; i < 10; i++) {
Thread.sleep(3000);
System.out.println(i);
}
readLock.release();
log.info("释放锁");
}
@Test
public void testWriteLock() throws Exception {
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock");
InterProcessLock writeLock = interProcessReadWriteLock.writeLock();
log.info("等待获取写锁对象");
writeLock.acquire();
for (int i = 0; i < 10; i++) {
Thread.sleep(3000);
System.out.println(i);
}
writeLock.release();
log.info("释放锁");
}