准备工作
搭建一个zk集群,参考ZooKeeper学习笔记一:集群搭建。
确保项目可以访问集群的每个节点
新建一个基于jdk1.8的maven项目。
配置依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
注:zookeeper的依赖版本要和集群安装的zookeeper版本一致。
zk配置类
新建ZookeeperConfig.java,作为一个工具类,获取zk客户端实例,具体代码如下:
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZookeeperConfig {
private static final String ADDRESS = "192.168.205.145:2181,192.168.205.146:2181,192.168.205.147:2181,192.168.205.148:2181";
private static ZooKeeper zk;
static CountDownLatch latch;
public static ZooKeeper create() {
latch = new CountDownLatch(1);
try {
zk = new ZooKeeper(ADDRESS, 3000, new DefaultWatch());
latch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return zk;
}
public static void close() {
if (zk != null) {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class DefaultWatch implements Watcher {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
}
}
调用
Zookeeper zookeeper = ZookeeperConfig.create();
即可获取一个zk客户端。
简单使用
对于一些set/get方法,我做了一些简单的封装,包括直接getData,getData同时增加watcher,通过回调函数来实现getData的后续逻辑,代码很简单,如下:
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.zookeeper.CreateMode.EPHEMERAL;
import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;
/**
* Reactive方式实现的zk客户端
*/
public class ReactiveClient {
public static final String ADDRESS = "192.168.205.145:2181,192.168.205.146:2181,192.168.205.147:2181,192.168.205.148:2181";
private static final ZooKeeper CLIENT = ZookeeperConfig.create();
public static void main(String[] args) {
getData();
getDataWithWatcher();
getDataAndCallback();
pending(10000);
}
private static void getDataAndCallback() {
System.out.println("get data and callback");
String path = "/abc";
String data = "Hello";
createOrUpdate(path, data);
CLIENT.getData(path, false, (rc, path1, ctx, data1, stat) -> {
//System.out.println(rc);
//System.out.println(ctx);
System.out.println("call back get data : " + new String(data1));
//System.out.println(stat);
}, "abc");
}
private static void getDataWithWatcher() {
System.out.println("---create and get data with watcher---");
String path = "/abc";
String data = "Hello";
createOrUpdate(path, data);
Stat stat = new Stat();
try {
Stat finalStat = stat;
byte[] data1 = CLIENT.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
// System.out.println("get data event: " + event);
try {
byte[] data2 = CLIENT.getData(path, this, finalStat);
System.out.println("get data from event : " + new String(data2));
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}, stat);
System.out.println(new String(data1));
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
String newData = "World";
try {
// 触发回调
stat = CLIENT.setData(path, newData.getBytes(UTF_8), stat.getVersion());
stat = CLIENT.setData(path, newData.getBytes(UTF_8), stat.getVersion());
stat = CLIENT.setData(path, newData.getBytes(UTF_8), stat.getVersion());
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
private static void getData() {
String path = "/abc";
String data = "Hello";
createOrUpdate(path, data);
String result = getData(path);
System.out.println(result);
createOrUpdate(path, "world");
result = getData(path);
System.out.println(result);
}
public static void pending(long sec) {
try {
Thread.sleep(sec);
ZookeeperConfig.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static String getData(String path) {
try {
return new String(CLIENT.getData(path, false, new Stat()));
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
return null;
}
public static void createOrUpdate(String path, String data) {
try {
Stat exists = CLIENT.exists(path, false);
if (null != exists) {
CLIENT.setData(path, data.getBytes(UTF_8), exists.getVersion());
return;
}
// 创建一个节点
CLIENT.create(path, data.getBytes(UTF_8), OPEN_ACL_UNSAFE, EPHEMERAL);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}