Apache Curator封装了一套高级API 简化zookeeper的操作,提供了对zookeeper基本操作,watch,分布式锁等场景封装
引入Curator包
需要注意不同Curator兼容不同zookeeper版本,可以去查看下发行版本说明https://cwiki.apache.org/confluence/display/CURATOR/Releases
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.0.0</version>
</dependency>
建立连接
// 会话超时时间
private static final int SESSION_TIMEOUT = 10 * 1000;
// 连接超时时间
private static final int CONNECTION_TIMEOUT = 3 * 1000;
// ZooKeeper服务地址
private static final String CONNECT_ADDR = "ip:2181";
/**
* 建立连接
*
* @return
*/
public static CuratorFramework createConnection() {
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
// 工厂创建连接
final CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)
.connectionTimeoutMs(CONNECTION_TIMEOUT).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy)
.build();
// 开启连接
client.start();
return client;
}
基本增删改查
/**
* 基本增删改查
*
* @param client
* @throws Exception
*/
public static void basicOperation(CuratorFramework client) throws Exception {
// 创建内容为空的永久节点
client.create().forPath("/nonData");
// 创建有内容的永久节点
client.create().forPath("/permanent", "/data".getBytes());
client.create().forPath("/permanent/one", "/data".getBytes());
// 创建永久有序节点
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/permanentOrder", "/data".getBytes());
// 创建临时节点,session过期节点失效
client.create().withMode(CreateMode.EPHEMERAL).forPath("/temp", "data".getBytes());
// 创建临时有序节点
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/tempOrder", "data".getBytes());
// 判断节点是否存在
Stat tempStat = client.checkExists().forPath("/temp");
System.out.println("临时节点temp是否存在:" + tempStat != null);
// 获取节点数据
System.out.println("permanent节点数据:" + new String(client.getData().forPath("/permanent")));
// 修改节点数据
client.setData().forPath("/permanent", "updateData".getBytes());
System.out.println("修改后permanent节点数据:" + new String(client.getData().forPath("/permanent")));
// 删除节点
client.delete().forPath("/nonData");
// 删除节点及子节点
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/permanent");
}
事务
/**
* 事务
*
* @param client
* @throws Exception
*/
public static void transaction(CuratorFramework client) throws Exception {
CuratorOp create = client.transactionOp().create().forPath("/test", "data".getBytes());
CuratorOp delete = client.transactionOp().delete().forPath("/test1");
// 添加节点与删除不存在节点一个事务提交,发生异常都会回滚
List<CuratorTransactionResult> results = client.transaction().forOperations(create, delete);
}
watch监听
分为三种监听NodeCache监听指定节点数据变化,PathChildrenCache监听指定节点的下一级子节点变化情况,TreeCache功能比较全,监听自身节点数据变化,并且能向下监听Integer.MAX_VALUE级子节点变化。
/**
* 三种监听机制NodeCache监听指定节点数据变化,PathChildrenCache监听指定节点的下一级子节点变化情况,
* TreeCache功能比较全,监听自身节点数据变化,并且能向下监听Integer.MAX_VALUE级子节点变化
*
* @param client
* @throws Exception
*/
public static void treeCacheWatch(CuratorFramework client) throws Exception {
TreeCache treeCache = new TreeCache(client, "/test");
treeCache.start();
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
switch (treeCacheEvent.getType()) {
case CONNECTION_RECONNECTED:
System.out.println("重新建立连接");
break;
case CONNECTION_SUSPENDED:
System.out.println("连接超时");
break;
case CONNECTION_LOST:
System.out.println("会话已过期");
break;
case INITIALIZED:
System.out.println("初始化");
break;
case NODE_ADDED:
System.out.println("添加节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
+ new String(treeCacheEvent.getData().getData()));
break;
case NODE_UPDATED:
System.out.println("更新节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
+ new String(treeCacheEvent.getData().getData()));
break;
case NODE_REMOVED:
System.out.println("删除节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
+ new String(treeCacheEvent.getData().getData()));
break;
}
}
});
while (true) {
}
}
分布式锁
zookeeper分布式锁主要依赖于临时顺序节点,当A,B两个线程获取锁时,假如A线程先发起加锁请求,zookeeper会在加锁的node下创建一个临时顺序节点node-001,然后查询该节点下所有临时顺序节点,看自己是否排在第一位,如果排在第一位可以获取锁,A获取到锁后B线程请求加锁,创建临时顺序节点node-002,然后查询该节点下所有临时顺序节点,看自己是否排在第一位,发现此时排在第二位,此时线程B会对前一个节点加一个监听器,监听节点是否被删除,当A执行完删除节点后,B监听到A节点删除,判断自己是否是第一个节点,如果是会获取到锁。
/**
* 分布式锁,使用Curator两步acquire获取锁(),release()释放锁
*
* @param client
* @throws Exception
*/
public static void lock(CuratorFramework client, InterProcessMutex interProcessMutex) {
try {
String name = Thread.currentThread().getName();
if (interProcessMutex.acquire(1, TimeUnit.SECONDS)) {
System.out.println(name + "获取锁成功");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
interProcessMutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
完整demo
package com.shaofei;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class demo {
// 会话超时时间
private static final int SESSION_TIMEOUT = 10 * 1000;
// 连接超时时间
private static final int CONNECTION_TIMEOUT = 3 * 1000;
// ZooKeeper服务地址
private static final String CONNECT_ADDR = "ip:2181";
public static void main(String[] args) throws Exception {
final CuratorFramework client = createConnection();
basicOperation(client);
transaction(client);
treeCacheWatch(client);
final InterProcessMutex interProcessMutex = new InterProcessMutex(client, "/lock/test"); // 互斥锁
final InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock/test");// 读写锁
final InterProcessSemaphoreMutex interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(client,
"/lock/test"); // 不可重入互斥锁
List<String> list = new ArrayList<String>();
list.add("/lock/test");
list.add("/lock/test1");
final InterProcessMultiLock interProcessMultiLock = new InterProcessMultiLock(client, list); // 集合锁
new Thread("thread-1") {
@Override
public void run() {
lock(client, interProcessMutex);
}
}.start();
new Thread("thread-2") {
@Override
public void run() {
lock(client, interProcessMutex);
}
}.start();
}
/**
* 建立连接
*
* @return
*/
public static CuratorFramework createConnection() {
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
// 工厂创建连接
final CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)
.connectionTimeoutMs(CONNECTION_TIMEOUT).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy)
.build();
// 开启连接
client.start();
return client;
}
/**
* 分布式锁,使用Curator两步acquire获取锁(),release()释放锁
*
* @param client
* @throws Exception
*/
public static void lock(CuratorFramework client, InterProcessMutex interProcessMutex) {
try {
String name = Thread.currentThread().getName();
if (interProcessMutex.acquire(1, TimeUnit.SECONDS)) {
System.out.println(name + "获取锁成功");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
interProcessMutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 三种监听机制NodeCache监听指定节点数据变化,PathChildrenCache监听指定节点的下一级子节点变化情况,
* TreeCache功能比较全,监听自身节点数据变化,并且能向下监听Integer.MAX_VALUE级子节点变化
*
* @param client
* @throws Exception
*/
public static void treeCacheWatch(CuratorFramework client) throws Exception {
TreeCache treeCache = new TreeCache(client, "/test");
treeCache.start();
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
switch (treeCacheEvent.getType()) {
case CONNECTION_RECONNECTED:
System.out.println("重新建立连接");
break;
case CONNECTION_SUSPENDED:
System.out.println("连接超时");
break;
case CONNECTION_LOST:
System.out.println("会话已过期");
break;
case INITIALIZED:
System.out.println("初始化");
break;
case NODE_ADDED:
System.out.println("添加节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
+ new String(treeCacheEvent.getData().getData()));
break;
case NODE_UPDATED:
System.out.println("更新节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
+ new String(treeCacheEvent.getData().getData()));
break;
case NODE_REMOVED:
System.out.println("删除节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
+ new String(treeCacheEvent.getData().getData()));
break;
}
}
});
while (true) {
}
}
/**
* 事务
*
* @param client
* @throws Exception
*/
public static void transaction(CuratorFramework client) throws Exception {
CuratorOp create = client.transactionOp().create().forPath("/test", "data".getBytes());
CuratorOp delete = client.transactionOp().delete().forPath("/test1");
// 添加节点与删除不存在节点一个事务提交,发生异常都会回滚
List<CuratorTransactionResult> results = client.transaction().forOperations(create, delete);
}
/**
* 基本增删改查
*
* @param client
* @throws Exception
*/
public static void basicOperation(CuratorFramework client) throws Exception {
// 创建内容为空的永久节点
client.create().forPath("/nonData");
// 创建有内容的永久节点
client.create().forPath("/permanent", "/data".getBytes());
client.create().forPath("/permanent/one", "/data".getBytes());
// 创建永久有序节点
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/permanentOrder", "/data".getBytes());
// 创建临时节点,session过期节点失效
client.create().withMode(CreateMode.EPHEMERAL).forPath("/temp", "data".getBytes());
// 创建临时有序节点
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/tempOrder", "data".getBytes());
// 判断节点是否存在
Stat tempStat = client.checkExists().forPath("/temp");
System.out.println("临时节点temp是否存在:" + tempStat != null);
// 获取节点数据
System.out.println("permanent节点数据:" + new String(client.getData().forPath("/permanent")));
// 修改节点数据
client.setData().forPath("/permanent", "updateData".getBytes());
System.out.println("修改后permanent节点数据:" + new String(client.getData().forPath("/permanent")));
// 删除节点
client.delete().forPath("/nonData");
// 删除节点及子节点
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/permanent");
}
}