Apache Curator操作zookeeper实现基本增删改查、事务、分布式锁等

        Apache Curator封装了一套高级API 简化zookeeper的操作,提供了对zookeeper基本操作,watch,分布式锁等场景封装

引入Curator包

        需要注意不同Curator兼容不同zookeeper版本,可以去查看下发行版本说明https://cwiki.apache.org/confluence/display/CURATOR/Releases

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.0.0</version>
</dependency>

建立连接

    // 会话超时时间
	private static final int SESSION_TIMEOUT = 10 * 1000;

	// 连接超时时间
	private static final int CONNECTION_TIMEOUT = 3 * 1000;

	// ZooKeeper服务地址
	private static final String CONNECT_ADDR = "ip:2181";

	/**
	 * 建立连接
	 * 
	 * @return
	 */
	public static CuratorFramework createConnection() {
		// 重试策略
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
		// 工厂创建连接
		final CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)
				.connectionTimeoutMs(CONNECTION_TIMEOUT).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy)
				.build();
		// 开启连接
		client.start();
		return client;
	}

基本增删改查

/**
	 * 基本增删改查
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void basicOperation(CuratorFramework client) throws Exception {
		// 创建内容为空的永久节点

		client.create().forPath("/nonData");

		// 创建有内容的永久节点
		client.create().forPath("/permanent", "/data".getBytes());
		client.create().forPath("/permanent/one", "/data".getBytes());
		// 创建永久有序节点
		client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/permanentOrder", "/data".getBytes());

		// 创建临时节点,session过期节点失效
		client.create().withMode(CreateMode.EPHEMERAL).forPath("/temp", "data".getBytes());

		// 创建临时有序节点
		client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/tempOrder", "data".getBytes());
		// 判断节点是否存在
		Stat tempStat = client.checkExists().forPath("/temp");

		System.out.println("临时节点temp是否存在:" + tempStat != null);

		// 获取节点数据
		System.out.println("permanent节点数据:" + new String(client.getData().forPath("/permanent")));
		// 修改节点数据
		client.setData().forPath("/permanent", "updateData".getBytes());
		System.out.println("修改后permanent节点数据:" + new String(client.getData().forPath("/permanent")));
		// 删除节点
		client.delete().forPath("/nonData");
		// 删除节点及子节点
		client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/permanent");

	}

事务

/**
	 * 事务
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void transaction(CuratorFramework client) throws Exception {

		CuratorOp create = client.transactionOp().create().forPath("/test", "data".getBytes());

		CuratorOp delete = client.transactionOp().delete().forPath("/test1");
		// 添加节点与删除不存在节点一个事务提交,发生异常都会回滚
		List<CuratorTransactionResult> results = client.transaction().forOperations(create, delete);

	}

watch监听

        分为三种监听NodeCache监听指定节点数据变化,PathChildrenCache监听指定节点的下一级子节点变化情况,TreeCache功能比较全,监听自身节点数据变化,并且能向下监听Integer.MAX_VALUE级子节点变化。

/**
	 * 三种监听机制NodeCache监听指定节点数据变化,PathChildrenCache监听指定节点的下一级子节点变化情况,
	 * TreeCache功能比较全,监听自身节点数据变化,并且能向下监听Integer.MAX_VALUE级子节点变化
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void treeCacheWatch(CuratorFramework client) throws Exception {
		TreeCache treeCache = new TreeCache(client, "/test");
		treeCache.start();
		treeCache.getListenable().addListener(new TreeCacheListener() {
			public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
				switch (treeCacheEvent.getType()) {
				case CONNECTION_RECONNECTED:
					System.out.println("重新建立连接");
					break;
				case CONNECTION_SUSPENDED:
					System.out.println("连接超时");
					break;
				case CONNECTION_LOST:
					System.out.println("会话已过期");
					break;
				case INITIALIZED:
					System.out.println("初始化");
					break;
				case NODE_ADDED:
					System.out.println("添加节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				case NODE_UPDATED:
					System.out.println("更新节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				case NODE_REMOVED:
					System.out.println("删除节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				}
			}
		});
		while (true) {

		}
	}

分布式锁

        zookeeper分布式锁主要依赖于临时顺序节点,当A,B两个线程获取锁时,假如A线程先发起加锁请求,zookeeper会在加锁的node下创建一个临时顺序节点node-001,然后查询该节点下所有临时顺序节点,看自己是否排在第一位,如果排在第一位可以获取锁,A获取到锁后B线程请求加锁,创建临时顺序节点node-002,然后查询该节点下所有临时顺序节点,看自己是否排在第一位,发现此时排在第二位,此时线程B会对前一个节点加一个监听器,监听节点是否被删除,当A执行完删除节点后,B监听到A节点删除,判断自己是否是第一个节点,如果是会获取到锁。

/**
	 * 分布式锁,使用Curator两步acquire获取锁(),release()释放锁
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void lock(CuratorFramework client, InterProcessMutex interProcessMutex) {
		try {
			String name = Thread.currentThread().getName();
			if (interProcessMutex.acquire(1, TimeUnit.SECONDS)) {
				System.out.println(name + "获取锁成功");
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				interProcessMutex.release();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}

完整demo

package com.shaofei;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class demo {
	
	
	// 会话超时时间
	private static final int SESSION_TIMEOUT = 10 * 1000;

	// 连接超时时间
	private static final int CONNECTION_TIMEOUT = 3 * 1000;

	// ZooKeeper服务地址
	private static final String CONNECT_ADDR = "ip:2181";

	public static void main(String[] args) throws Exception {
		final CuratorFramework client = createConnection();
		basicOperation(client);
		transaction(client);
		treeCacheWatch(client);
		final InterProcessMutex interProcessMutex = new InterProcessMutex(client, "/lock/test"); // 互斥锁
		final InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock/test");// 读写锁
		final InterProcessSemaphoreMutex interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(client,
				"/lock/test"); // 不可重入互斥锁
		List<String> list = new ArrayList<String>();
		list.add("/lock/test");
		list.add("/lock/test1");
		final InterProcessMultiLock interProcessMultiLock = new InterProcessMultiLock(client, list); // 集合锁
		new Thread("thread-1") {
			@Override
			public void run() {
				lock(client, interProcessMutex);
			}
		}.start();
		new Thread("thread-2") {
			@Override
			public void run() {
				lock(client, interProcessMutex);
			}
		}.start();

	}

	/**
	 * 建立连接
	 * 
	 * @return
	 */
	public static CuratorFramework createConnection() {
		// 重试策略
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
		// 工厂创建连接
		final CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)
				.connectionTimeoutMs(CONNECTION_TIMEOUT).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy)
				.build();
		// 开启连接
		client.start();
		return client;
	}

	
	
	/**
	 * 分布式锁,使用Curator两步acquire获取锁(),release()释放锁
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void lock(CuratorFramework client, InterProcessMutex interProcessMutex) {
		try {
			String name = Thread.currentThread().getName();
			if (interProcessMutex.acquire(1, TimeUnit.SECONDS)) {
				System.out.println(name + "获取锁成功");
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				interProcessMutex.release();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}

	/**
	 * 三种监听机制NodeCache监听指定节点数据变化,PathChildrenCache监听指定节点的下一级子节点变化情况,
	 * TreeCache功能比较全,监听自身节点数据变化,并且能向下监听Integer.MAX_VALUE级子节点变化
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void treeCacheWatch(CuratorFramework client) throws Exception {
		TreeCache treeCache = new TreeCache(client, "/test");
		treeCache.start();
		treeCache.getListenable().addListener(new TreeCacheListener() {
			public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
				switch (treeCacheEvent.getType()) {
				case CONNECTION_RECONNECTED:
					System.out.println("重新建立连接");
					break;
				case CONNECTION_SUSPENDED:
					System.out.println("连接超时");
					break;
				case CONNECTION_LOST:
					System.out.println("会话已过期");
					break;
				case INITIALIZED:
					System.out.println("初始化");
					break;
				case NODE_ADDED:
					System.out.println("添加节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				case NODE_UPDATED:
					System.out.println("更新节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				case NODE_REMOVED:
					System.out.println("删除节点,路径为" + treeCacheEvent.getData().getPath() + ",数据为"
							+ new String(treeCacheEvent.getData().getData()));
					break;
				}
			}
		});
		while (true) {

		}
	}

	/**
	 * 事务
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void transaction(CuratorFramework client) throws Exception {

		CuratorOp create = client.transactionOp().create().forPath("/test", "data".getBytes());

		CuratorOp delete = client.transactionOp().delete().forPath("/test1");
		// 添加节点与删除不存在节点一个事务提交,发生异常都会回滚
		List<CuratorTransactionResult> results = client.transaction().forOperations(create, delete);

	}

	/**
	 * 基本增删改查
	 * 
	 * @param client
	 * @throws Exception
	 */
	public static void basicOperation(CuratorFramework client) throws Exception {
		// 创建内容为空的永久节点

		client.create().forPath("/nonData");

		// 创建有内容的永久节点
		client.create().forPath("/permanent", "/data".getBytes());
		client.create().forPath("/permanent/one", "/data".getBytes());
		// 创建永久有序节点
		client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/permanentOrder", "/data".getBytes());

		// 创建临时节点,session过期节点失效
		client.create().withMode(CreateMode.EPHEMERAL).forPath("/temp", "data".getBytes());

		// 创建临时有序节点
		client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/tempOrder", "data".getBytes());
		// 判断节点是否存在
		Stat tempStat = client.checkExists().forPath("/temp");

		System.out.println("临时节点temp是否存在:" + tempStat != null);

		// 获取节点数据
		System.out.println("permanent节点数据:" + new String(client.getData().forPath("/permanent")));
		// 修改节点数据
		client.setData().forPath("/permanent", "updateData".getBytes());
		System.out.println("修改后permanent节点数据:" + new String(client.getData().forPath("/permanent")));
		// 删除节点
		client.delete().forPath("/nonData");
		// 删除节点及子节点
		client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/permanent");

	}
}

上一篇:Zookeeper客户端Curator使用详解


下一篇:分布式协调-Zookeeper使用(Watcher、Curator、Session、Acl)