简介
Curator最初由Netflix的Jordan Zimmerman开发, Curator提供了一套Java类库, 可以更容易的使用ZooKeeper.
所谓ZooKeeper技巧(ZooKeeper Recipes),也可以称之为解决方案, 或者叫实现方案, 是指ZooKeeper的使用方法, 比如分布式的配置管理, Leader选举等
Curator作为Apache ZooKeeper天生配套的组件。ZooKeeper的Java开发者自然而然的会选择它在项目中使用。
官网链接:http://curator.apache.org/
提供的功能组件
-
Framework 提供了一套高级的API, 简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制
-
Client 是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法
-
Recipes 实现了通用ZooKeeper的recipe, 该组件建立在Framework的基础之上
-
Utilities 各种工具类
-
Errors 异常处理, 连接, 恢复等.
-
Extensions curator-recipes包实现了通用的技巧,这些技巧在ZooKeeper文档中有介绍。为了避免是这个包(package)变得巨大, recipes/applications将会放入一个独立的extension包下。并使用命名规则curator-x-name.
Curator 编译好的类库被发布到Maven Center中。Curator包含几个artifact. 你可以根据你的需要在你的项目中加入相应的依赖。对于大多数开发者来说,引入curator-recipes
这一个就足够了
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.6.0</version>
</dependency>
代码示例
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* DateTime: 2015年1月9日 上午9:14:08
*
*/
public class CuratorTest {
private static String zkAddress = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
public static void main(String[] args) throws Exception {
CuratorUtil curator = new CuratorUtil(zkAddress);
curator.createNode("/zkroot/test1", "你好abc11");
curator.createNode("/zkroot/test2", "你好abc22");
curator.updateNode("/zkroot/test2", "你好abc22");
List<String> list = curator.listChildren("/zkroot");
Map<String, String> map = curator.listChildrenDetail("/zkroot");
// curator.deleteNode("/zkroot");
// curator.destory();
System.out.println("=========================================");
for (String str : list) {
System.out.println(str);
}
System.out.println("=========================================");
for (Map.Entry<String, String> entry : map.entrySet()) {
System.out.println(entry.getKey() + "=>" + entry.getValue());
}
// 增加监听
curator.addWatch("/zkroot", false);
TimeUnit.SECONDS.sleep(600);
}
}
class CuratorUtil {
private CuratorFramework client;
public CuratorUtil(String zkAddress) {
client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener(new NodeEventListener());
client.start();
}
/**
* 创建node
*
* @param nodeName
* @param value
* @return
*/
public boolean createNode(String nodeName, String value) {
boolean suc = false;
try {
Stat stat = getClient().checkExists().forPath(nodeName);
if (stat == null) {
String opResult = null;
if (Strings.isNullOrEmpty(value)) {
opResult = getClient().create().creatingParentsIfNeeded().forPath(nodeName);
}
else {
opResult =
getClient().create().creatingParentsIfNeeded()
.forPath(nodeName, value.getBytes(Charsets.UTF_8));
}
suc = Objects.equal(nodeName, opResult);
}
}
catch (Exception e) {
e.printStackTrace();
}
return suc;
}
/**
* 更新节点
*
* @param nodeName
* @param value
* @return
*/
public boolean updateNode(String nodeName, String value) {
boolean suc = false;
try {
Stat stat = getClient().checkExists().forPath(nodeName);
if (stat != null) {
Stat opResult = getClient().setData().forPath(nodeName, value.getBytes(Charsets.UTF_8));
suc = opResult != null;
}
}
catch (Exception e) {
e.printStackTrace();
}
return suc;
}
/**
* 删除节点
*
* @param nodeName
*/
public void deleteNode(String nodeName) {
try {
getClient().delete().deletingChildrenIfNeeded().forPath(nodeName);
}
catch (Exception e) {
e.printStackTrace();
}
}
/**
* 找到指定节点下所有子节点的名称与值
*
* @param node
* @return
*/
public Map<String, String> listChildrenDetail(String node) {
Map<String, String> map = Maps.newHashMap();
try {
GetChildrenBuilder childrenBuilder = getClient().getChildren();
List<String> children = childrenBuilder.forPath(node);
GetDataBuilder dataBuilder = getClient().getData();
if (children != null) {
for (String child : children) {
String propPath = ZKPaths.makePath(node, child);
map.put(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8));
}
}
}
catch (Exception e) {
e.printStackTrace();
}
return map;
}
/**
* 列出子节点的名称
*
* @param node
* @return
*/
public List<String> listChildren(String node) {
List<String> children = Lists.newArrayList();
try {
GetChildrenBuilder childrenBuilder = getClient().getChildren();
children = childrenBuilder.forPath(node);
}
catch (Exception e) {
e.printStackTrace();
}
return children;
}
/**
* 增加监听
*
* @param node
* @param isSelf
* true 为node本身增加监听 false 为node的子节点增加监听
* @throws Exception
*/
public void addWatch(String node, boolean isSelf) throws Exception {
if (isSelf) {
getClient().getData().watched().forPath(node);
}
else {
getClient().getChildren().watched().forPath(node);
}
}
/**
* 增加监听
*
* @param node
* @param isSelf
* true 为node本身增加监听 false 为node的子节点增加监听
* @param watcher
* @throws Exception
*/
public void addWatch(String node, boolean isSelf, Watcher watcher) throws Exception {
if (isSelf) {
getClient().getData().usingWatcher(watcher).forPath(node);
}
else {
getClient().getChildren().usingWatcher(watcher).forPath(node);
}
}
/**
* 增加监听
*
* @param node
* @param isSelf
* true 为node本身增加监听 false 为node的子节点增加监听
* @param watcher
* @throws Exception
*/
public void addWatch(String node, boolean isSelf, CuratorWatcher watcher) throws Exception {
if (isSelf) {
getClient().getData().usingWatcher(watcher).forPath(node);
}
else {
getClient().getChildren().usingWatcher(watcher).forPath(node);
}
}
/**
* 销毁资源
*/
public void destory() {
if (client != null) {
client.close();
}
}
/**
* 获取client
*
* @return
*/
public CuratorFramework getClient() {
return client;
}
}
// 监听器
final class NodeEventListener implements CuratorListener {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println(event.toString() + ".......................");
final WatchedEvent watchedEvent = event.getWatchedEvent();
if (watchedEvent != null) {
System.out.println(watchedEvent.getState() + "=======================" + watchedEvent.getType());
if (watchedEvent.getState() == KeeperState.SyncConnected) {
switch (watchedEvent.getType()) {
case NodeChildrenChanged:
// TODO
break;
case NodeDataChanged:
// TODO
break;
default:
break;
}
}
}
}