文章目录
Apache Curator 客户端
Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper 处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简单,不但减少了开发时间,而且增强了程序的可靠性。
curator-framework 包,该包是对 ZooKeeper 底层 API 的一些封装。
curator-recipes 包,该包封装了一些 ZooKeeper 服务的高级特性,如:Cache 事件监听、选举、分布式锁、分布式 Barrier。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
好的项目源码让人从包名就能看出其功能
atomic: 分布式计数器(DistributedAtomicLong),能在分布式环境下实现原子自增
barriers: 分布式屏障(DistributedBarrier),使用屏障来阻塞分布式环境中进程的运行,直到满足特定的条件
cache: 监听机制,分为NodeCache(监听节点数据变化),PathChildrenCache(监听节点的子节点数据变化),TreeCache(既能监听自身节点数据变化也能监听子节点数据变化), 目前已经由CuratorCache替代了这3个Cache
leader: leader选举, 从集群应用中选出来一台处理任务
locks: 分布式锁
nodes: 提供持久化节点(PersistentNode)服务,即使客户端与zk服务的连接或者会话断开
queue: 分布式队列(包括优先级队列DistributedPriorityQueue,延迟队列DistributedDelayQueue等)
shared: 分布式计数器SharedCount
会话创建
// 创建连接
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.128.129:2181")
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(5000) // 连接超时时间
.retryPolicy(retryPolicy)
.namespace("curator") // 包含隔离名称(初始目录节点?)
.build();
client.start();
- connectionString: 服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3,port3 。
- retryPolicy: 重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
- 超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。
重试策略
- ExponentialBackoffRetry: 重试一组次数,重试之间的睡眠时间增加
- RetryNTimes: 重试最大次数
- RetryOneTime: RetryOneTime
- RetryUntilElapsed: 在给定的时间结束之前重试
代码演示
@Slf4j
public class CuratorStandalone {
private static final String ADDRESS = "116.62.162.47:2181";
private static CuratorFramework curator;
private static final String NODE = "/node";
@Test
public void before2() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curator = CuratorFrameworkFactory.newClient(ADDRESS, retryPolicy);
curator.start();
}
@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
curator = CuratorFrameworkFactory.builder()
.connectString(ADDRESS)
.sessionTimeoutMs(60 * 1000) // 会话超时时间
.connectionTimeoutMs(60 * 1000) // 连接超时时间
.retryPolicy(retryPolicy)
.namespace("curator") // 包含隔离名称(初始目录节点?)
.build();
curator.start();
}
@Test
public void createTest() {
try {
String path = curator.create().forPath(NODE);
log.info(path);
// 默认值是 local address, 默认是永久节点
log.info(new String(curator.getData().forPath(NODE)));
// 在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息。
curator.create().withMode(CreateMode.EPHEMERAL).forPath("/test", "data".getBytes());
log.info(new String(curator.getData().forPath("/test")));
} catch (Throwable cause) {
cause.printStackTrace();
}
}
@Test
public void createWithHierarchyPath() {
try {
// 层级节点创建
String path = curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/one/two/three");
log.info(path);
log.info(new String(curator.getData().forPath("/one/two/three")));
} catch (Throwable cause) {
cause.printStackTrace();
}
}
@Test
public void getSetDataTest() {
try {
Stat stat = curator.setData().forPath(NODE, "test".getBytes());
log.info("{}", stat);
log.info(new String(curator.getData().forPath(NODE)));
curator.setData().forPath(NODE);
log.info(new String(curator.getData().forPath(NODE)));
} catch (Throwable cause) {
cause.printStackTrace();
}
}
@Test
public void deleteTest() {
try {
curator.create().creatingParentsIfNeeded().forPath("/one/two/three");
log.info(new String(curator.getData().forPath("/one/two/three")));
// guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
// deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。
curator.delete().guaranteed().deletingChildrenIfNeeded().forPath("/one/two/three");
log.info(new String(curator.getData().forPath("/one/two/three")));
// 断言线程中的代码会抛出 KeeperException.NoNodeException
Assert.assertThrows(KeeperException.NoNodeException.class, () -> {
log.info(new String(curator.getData().forPath(NODE)));
});
} catch (Throwable cause) {
cause.printStackTrace();
}
}
}
异步接口
Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。
public interface BackgroundCallback
{
/**
* Called when the async background operation completes
*
* @param client the client
* @param event operation result details
* @throws Exception errors
*/
// 如上接口,主要参数为 client 客户端, 和 服务端事件 event, inBackground 异步处理默认在EventThread中执行
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}
@Test
public void backgroundTest() {
try {
curator.getData().inBackground((client, event) -> {
log.info("event: {}", event);
log.info(new String(event.getData()));
log.info("{}", event.getStat());
}).forPath(NODE);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (Throwable cause) {
cause.printStackTrace();
}
}
@Test
public void backgroundWithExecutorServiceTest() {
try {
// 指定线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();
curator.getData().inBackground((client, event) -> {
log.info("event: {}", event);
log.info(new String(event.getData()));
log.info("{}", event.getStat());
}, executorService).forPath(NODE);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (Throwable cause) {
cause.printStackTrace();
}
}
监听
/**
* Receives notifications about errors and background events
*/
public interface CuratorListener
{
/**
* Called when a background task has completed or a watch has triggered
*
* @param client client
* @param event the event
* @throws Exception any errors
*/
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}
@Test
public void listenerTest() {
try {
// 针对 background 通知和错误通知。使用此监听器之后,调用inBackground 方法会异步获得监听 // todo, 说的什么玩意儿
// 貌似是一次监听
CuratorListener listener = (client, event) -> {
log.info("listener event: {}", event);
if (Watcher.Event.KeeperState.SyncConnected.getIntValue() == event.getWatchedEvent().getState().getIntValue()) {
log.info("连接成功");
}
};
curator.getCuratorListenable().addListener(listener);
curator.getData().inBackground((client, event) -> {
log.info("event: {}", event);
log.info(new String(event.getData()));
log.info("{}", event.getStat());
}).forPath(NODE);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (Throwable cause) {
cause.printStackTrace();
}
}
Curator Caches
原生监听机制只能生效一次, 需要不停的设置监听才能一直生效, 不是很好用
curator.getData().usingWatcher(new org.apache.zookeeper.Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听到节点事件:" + JSON.toJSONString(watchedEvent));
}
}).forPath(path);
Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。
新版本中使用 CuratorCache 替代了 NodeCache, PathChildrenCache, TreeCache 的功能
node cache
可以监听当前节点的节点事件(修改数据+节点变化)
path cache
可以监听当前节点的一级子节点的节点事件(修改数据+节点变化)
tree cache
可以监听当前节点及其下所有节点的节点事件(修改数据+节点变化)
代码演示
@Test
public void nodeCacheTest() {
try {
// NodeCache: 可以监听当前节点的节点事件(修改数据+节点变化)
// NodeCache: @deprecated replace by {@link org.apache.curator.framework.recipes.cache.CuratorCache}
NodeCache nodeCache = new NodeCache(curator, NODE + "/a");
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
log.info("{} path nodeChanged: ", NODE);
log.info(new String(curator.getData().forPath(NODE)));
}
});
nodeCache.start();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (Throwable cause) {
cause.printStackTrace();
}
}
@Test
public void pathCacheTest() {
try {
// PathChildrenCache: 可以监听当前节点的一级子节点的节点事件(修改数据+节点变化)
// PathChildrenCache: @deprecated replace by {@link org.apache.curator.framework.recipes.cache.CuratorCache}
PathChildrenCache pathChildrenCache = new PathChildrenCache(curator, NODE, true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
ChildData childData = event.getData();
if (null == childData) {
log.info("event:{}, type:{}", event, event.getType());
} else {
byte[] data = childData.getData();
log.info("event:{}, type:{}, path:{}, data:{}", event, event.getType(), childData.getPath(), null == data ? null : new String(childData.getData()));
}
}
});
// 如果设置为true则在首次启动时就会缓存节点内容到Cache中
// start(): @deprecated use {@link #start(StartMode)}
pathChildrenCache.start(true);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (Throwable cause) {
cause.printStackTrace();
}
}
@Test
public void treeCacheTest() {
try {
// TreeCache: 可以监听当前节点及其下所有节点的事件(修改数据+节点变化)
// TreeCache: @deprecated replace by {@link org.apache.curator.framework.recipes.cache.CuratorCache}
TreeCache treeCache = new TreeCache(curator, NODE);
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
log.info(" tree cache: {}",event);
}
});
treeCache.start();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (Throwable cause) {
cause.printStackTrace();
}
}
@Test
public void curatorCacheTest() {
try {
// CuratorCache: 可以监听当前节点及其下所有节点的事件(修改数据+节点变化)
// 用于替代 NodeCache, PathChildrenCache, TreeCache
// ----------
// Persistent Watchers are not available in the version of the ZooKeeper library being used
// Could not reset persistent watch at path: /node
// 可以使用 CuratorCache curatorCache = CuratorCache.bridgeBuilder(curator, NODE).build(); 就不会报错了
CuratorCache curatorCache = CuratorCache.builder(curator, NODE).build();
curatorCache.listenable().addListener(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData oldData, ChildData data) {
log.info("type:{}, oldData:{}, newData:{}", type, oldData, data);
}
});
curatorCache.start();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (Throwable cause) {
cause.printStackTrace();
}
}
@Test
public void curatorCacheBridgeTest() {
try {
// CuratorCache: 可以监听当前节点及其下所有节点的节点事件(修改数据+节点变化)
// 用于替代 NodeCache, PathChildrenCache, TreeCache
CuratorCache curatorCache = CuratorCache.bridgeBuilder(curator, NODE).build();
curatorCache.listenable().addListener(new CuratorCacheListener() {
@Override
public void event(Type type, ChildData oldData, ChildData data) {
Optional<ChildData> oldOptional = Optional.ofNullable(oldData);
Optional<ChildData> newOptional = Optional.ofNullable(data);
log.info("type:{}, oldData:{}:{}:{}, newData:{}:{}:{}", type,
oldData, oldOptional.map(ChildData::getPath).orElse(null), oldOptional.map(item-> new String(item.getData())).orElse(null),
data, newOptional.map(ChildData::getPath).orElse(null), newOptional.map(item-> new String(item.getData())).orElse(null));
}
});
curatorCache.start();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (Throwable cause) {
cause.printStackTrace();
}
}
集群模式下的使用
bin/zkCli.sh -server ip1:port1,ip2:port2,ip3:port3
../bin/zkCli.sh -server 172.16.138.202:2281,172.16.138.202:2282,172.16.138.202:2283,172.16.138.202:2284
private static final String ADDRESS = "116.62.162.48:2281,116.62.162.48:2282,116.62.162.48:2283,116.62.162.48:2284";
使用单独的ip和端口也可以连接, 但是多ip和端口的方式在服务宕机的时候能自动切换到服务正常的机器上