Zookeeper 客户端 apache-curator

博文目录

文章目录


Apache Curator 客户端

Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper 处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简单,不但减少了开发时间,而且增强了程序的可靠性。

curator-framework 包,该包是对 ZooKeeper 底层 API 的一些封装。
curator-recipes 包,该包封装了一些 ZooKeeper 服务的高级特性,如:Cache 事件监听、选举、分布式锁、分布式 Barrier。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
</dependency>

Curator应用场景(一)-分布式计数器DistributedAtomicInteger

好的项目源码让人从包名就能看出其功能
Zookeeper 客户端 apache-curator
atomic: 分布式计数器(DistributedAtomicLong),能在分布式环境下实现原子自增
barriers: 分布式屏障(DistributedBarrier),使用屏障来阻塞分布式环境中进程的运行,直到满足特定的条件
cache: 监听机制,分为NodeCache(监听节点数据变化),PathChildrenCache(监听节点的子节点数据变化),TreeCache(既能监听自身节点数据变化也能监听子节点数据变化), 目前已经由CuratorCache替代了这3个Cache
leader: leader选举, 从集群应用中选出来一台处理任务
locks: 分布式锁
nodes: 提供持久化节点(PersistentNode)服务,即使客户端与zk服务的连接或者会话断开
queue: 分布式队列(包括优先级队列DistributedPriorityQueue,延迟队列DistributedDelayQueue等)
shared: 分布式计数器SharedCount

会话创建

// 创建连接
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.128.129:2181")
                .sessionTimeoutMs(5000)  // 会话超时时间
                .connectionTimeoutMs(5000) // 连接超时时间
                .retryPolicy(retryPolicy)
                .namespace("curator") // 包含隔离名称(初始目录节点?)
                .build();
client.start();
  • connectionString: 服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3,port3 。
  • retryPolicy: 重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
  • 超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。

重试策略

  • ExponentialBackoffRetry: 重试一组次数,重试之间的睡眠时间增加
  • RetryNTimes: 重试最大次数
  • RetryOneTime: RetryOneTime
  • RetryUntilElapsed: 在给定的时间结束之前重试

代码演示

@Slf4j
public class CuratorStandalone {

	private static final String ADDRESS = "116.62.162.47:2181";
	private static CuratorFramework curator;
	private static final String NODE = "/node";

	@Test
	public void before2() {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
		CuratorFramework curator = CuratorFrameworkFactory.newClient(ADDRESS, retryPolicy);
		curator.start();
	}

	@Before
	public void before() {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
		curator = CuratorFrameworkFactory.builder()
				.connectString(ADDRESS)
				.sessionTimeoutMs(60 * 1000)  // 会话超时时间
				.connectionTimeoutMs(60 * 1000) // 连接超时时间
				.retryPolicy(retryPolicy)
				.namespace("curator") // 包含隔离名称(初始目录节点?)
				.build();
		curator.start();
	}

	@Test
	public void createTest() {
		try {
			String path = curator.create().forPath(NODE);
			log.info(path);
			// 默认值是 local address, 默认是永久节点
			log.info(new String(curator.getData().forPath(NODE)));
			// 在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息。
			curator.create().withMode(CreateMode.EPHEMERAL).forPath("/test", "data".getBytes());
			log.info(new String(curator.getData().forPath("/test")));
		} catch (Throwable cause) {
			cause.printStackTrace();
		}
	}

	@Test
	public void createWithHierarchyPath() {
		try {
			// 层级节点创建
			String path = curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/one/two/three");
			log.info(path);
			log.info(new String(curator.getData().forPath("/one/two/three")));
		} catch (Throwable cause) {
			cause.printStackTrace();
		}
	}

	@Test
	public void getSetDataTest() {
		try {
			Stat stat = curator.setData().forPath(NODE, "test".getBytes());
			log.info("{}", stat);
			log.info(new String(curator.getData().forPath(NODE)));
			curator.setData().forPath(NODE);
			log.info(new String(curator.getData().forPath(NODE)));
		} catch (Throwable cause) {
			cause.printStackTrace();
		}
	}

	@Test
	public void deleteTest() {
		try {
			curator.create().creatingParentsIfNeeded().forPath("/one/two/three");
			log.info(new String(curator.getData().forPath("/one/two/three")));
			// guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
			// deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。
			curator.delete().guaranteed().deletingChildrenIfNeeded().forPath("/one/two/three");
			log.info(new String(curator.getData().forPath("/one/two/three")));
			// 断言线程中的代码会抛出 KeeperException.NoNodeException
			Assert.assertThrows(KeeperException.NoNodeException.class, () -> {
				log.info(new String(curator.getData().forPath(NODE)));
			});
		} catch (Throwable cause) {
			cause.printStackTrace();
		}
	}

}

异步接口

Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。

public interface BackgroundCallback
{
    /**
     * Called when the async background operation completes
     *
     * @param client the client
     * @param event operation result details
     * @throws Exception errors
     */
    // 如上接口,主要参数为 client 客户端, 和 服务端事件 event, inBackground 异步处理默认在EventThread中执行
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}
@Test
public void backgroundTest() {
	try {
		curator.getData().inBackground((client, event) -> {
			log.info("event: {}", event);
			log.info(new String(event.getData()));
			log.info("{}", event.getStat());
		}).forPath(NODE);
		TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
	} catch (Throwable cause) {
		cause.printStackTrace();
	}
}

@Test
public void backgroundWithExecutorServiceTest() {
	try {
		// 指定线程池
		ExecutorService executorService = Executors.newSingleThreadExecutor();
		curator.getData().inBackground((client, event) -> {
			log.info("event: {}", event);
			log.info(new String(event.getData()));
			log.info("{}", event.getStat());
		}, executorService).forPath(NODE);
		TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
	} catch (Throwable cause) {
		cause.printStackTrace();
	}
}

监听

/**
 * Receives notifications about errors and background events
 */
public interface CuratorListener
{
    /**
     * Called when a background task has completed or a watch has triggered
     *
     * @param client client
     * @param event the event
     * @throws Exception any errors
     */
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}
@Test
public void listenerTest() {
	try {
		// 针对 background 通知和错误通知。使用此监听器之后,调用inBackground 方法会异步获得监听 // todo, 说的什么玩意儿
		// 貌似是一次监听
		CuratorListener listener = (client, event) -> {
			log.info("listener event: {}", event);
			if (Watcher.Event.KeeperState.SyncConnected.getIntValue() == event.getWatchedEvent().getState().getIntValue()) {
				log.info("连接成功");
			}
		};
		curator.getCuratorListenable().addListener(listener);
		curator.getData().inBackground((client, event) -> {
			log.info("event: {}", event);
			log.info(new String(event.getData()));
			log.info("{}", event.getStat());
		}).forPath(NODE);
		TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
	} catch (Throwable cause) {
		cause.printStackTrace();
	}
}

Curator Caches

原生监听机制只能生效一次, 需要不停的设置监听才能一直生效, 不是很好用

curator.getData().usingWatcher(new org.apache.zookeeper.Watcher() {
   @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println("监听到节点事件:" + JSON.toJSONString(watchedEvent));
    }
}).forPath(path);

Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。

新版本中使用 CuratorCache 替代了 NodeCache, PathChildrenCache, TreeCache 的功能

node cache

可以监听当前节点的节点事件(修改数据+节点变化)

path cache

可以监听当前节点的一级子节点的节点事件(修改数据+节点变化)

tree cache

可以监听当前节点及其下所有节点的节点事件(修改数据+节点变化)

代码演示

@Test
public void nodeCacheTest() {
	try {
		// NodeCache: 可以监听当前节点的节点事件(修改数据+节点变化)
		// NodeCache: @deprecated replace by {@link org.apache.curator.framework.recipes.cache.CuratorCache}
		NodeCache nodeCache = new NodeCache(curator, NODE + "/a");
		nodeCache.getListenable().addListener(new NodeCacheListener() {
			@Override
			public void nodeChanged() throws Exception {
				log.info("{} path nodeChanged: ", NODE);
				log.info(new String(curator.getData().forPath(NODE)));
			}
		});
		nodeCache.start();
		TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
	} catch (Throwable cause) {
		cause.printStackTrace();
	}
}

@Test
public void pathCacheTest() {
	try {
		// PathChildrenCache: 可以监听当前节点的一级子节点的节点事件(修改数据+节点变化)
		// PathChildrenCache: @deprecated replace by {@link org.apache.curator.framework.recipes.cache.CuratorCache}
		PathChildrenCache pathChildrenCache = new PathChildrenCache(curator, NODE, true);
		pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
			@Override
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
				ChildData childData = event.getData();
				if (null == childData) {
					log.info("event:{}, type:{}", event, event.getType());
				} else {
					byte[] data = childData.getData();
					log.info("event:{}, type:{}, path:{}, data:{}", event, event.getType(), childData.getPath(), null == data ? null : new String(childData.getData()));
				}
			}
		});
		// 如果设置为true则在首次启动时就会缓存节点内容到Cache中
		// start(): @deprecated use {@link #start(StartMode)}
		pathChildrenCache.start(true);
		TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
	} catch (Throwable cause) {
		cause.printStackTrace();
	}
}

@Test
public void treeCacheTest() {
	try {
		// TreeCache: 可以监听当前节点及其下所有节点的事件(修改数据+节点变化)
		// TreeCache: @deprecated replace by {@link org.apache.curator.framework.recipes.cache.CuratorCache}
		TreeCache treeCache = new TreeCache(curator, NODE);
		treeCache.getListenable().addListener(new TreeCacheListener() {
			@Override
			public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
				log.info(" tree cache: {}",event);
			}
		});
		treeCache.start();
		TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
	} catch (Throwable cause) {
		cause.printStackTrace();
	}
}

@Test
public void curatorCacheTest() {
	try {
		// CuratorCache: 可以监听当前节点及其下所有节点的事件(修改数据+节点变化)
		// 用于替代 NodeCache, PathChildrenCache, TreeCache
		// ----------
		// Persistent Watchers are not available in the version of the ZooKeeper library being used
		// Could not reset persistent watch at path: /node
		// 可以使用 CuratorCache curatorCache = CuratorCache.bridgeBuilder(curator, NODE).build(); 就不会报错了
		CuratorCache curatorCache = CuratorCache.builder(curator, NODE).build();
		curatorCache.listenable().addListener(new CuratorCacheListener() {
			@Override
			public void event(Type type, ChildData oldData, ChildData data) {
				log.info("type:{}, oldData:{}, newData:{}", type, oldData, data);
			}
		});
		curatorCache.start();
		TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
	} catch (Throwable cause) {
		cause.printStackTrace();
	}
}

@Test
public void curatorCacheBridgeTest() {
	try {
		// CuratorCache: 可以监听当前节点及其下所有节点的节点事件(修改数据+节点变化)
		// 用于替代 NodeCache, PathChildrenCache, TreeCache
		CuratorCache curatorCache = CuratorCache.bridgeBuilder(curator, NODE).build();
		curatorCache.listenable().addListener(new CuratorCacheListener() {
			@Override
			public void event(Type type, ChildData oldData, ChildData data) {
				Optional<ChildData> oldOptional = Optional.ofNullable(oldData);
				Optional<ChildData> newOptional = Optional.ofNullable(data);
				log.info("type:{}, oldData:{}:{}:{}, newData:{}:{}:{}", type,
						oldData, oldOptional.map(ChildData::getPath).orElse(null), oldOptional.map(item-> new String(item.getData())).orElse(null),
						data, newOptional.map(ChildData::getPath).orElse(null), newOptional.map(item-> new String(item.getData())).orElse(null));
			}
		});
		curatorCache.start();
		TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
	} catch (Throwable cause) {
		cause.printStackTrace();
	}
}

集群模式下的使用

bin/zkCli.sh -server ip1:port1,ip2:port2,ip3:port3
../bin/zkCli.sh -server 172.16.138.202:2281,172.16.138.202:2282,172.16.138.202:2283,172.16.138.202:2284
private static final String ADDRESS = "116.62.162.48:2281,116.62.162.48:2282,116.62.162.48:2283,116.62.162.48:2284";

使用单独的ip和端口也可以连接, 但是多ip和端口的方式在服务宕机的时候能自动切换到服务正常的机器上

上一篇:基于Zookeeper+Curator实现分布式锁


下一篇:【zookeeper】 curator介绍