使用Zookeeper实现分布式锁的最佳实践

package cn.juwatech.distributedlock; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; public class DistributedLockExample { private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:2181"; private static final int SESSION_TIMEOUT = 5000; private static final String LOCK_PATH = "/distributed_lock"; private ZooKeeper zooKeeper; private String currentZnodeName; public DistributedLockExample() throws IOException { this.zooKeeper = new ZooKeeper(ZOOKEEPER_CONNECTION_STRING, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { // Watcher实现 } }); } public void acquireLock() throws KeeperException, InterruptedException { while (true) { createLockNode(); List<String> children = zooKeeper.getChildren(LOCK_PATH, false); Collections.sort(children); String smallestNode = children.get(0); if (currentZnodeName.equals(LOCK_PATH + "/" + smallestNode)) { return; } else { waitForLock(children.get(Collections.binarySearch(children, currentZnodeName.substring(LOCK_PATH.length() + 1)) - 1)); } } } private void createLockNode() throws KeeperException, InterruptedException { Stat stat = zooKeeper.exists(LOCK_PATH, false); if (stat == null) { zooKeeper.create(LOCK_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } currentZnodeName = zooKeeper.create(LOCK_PATH + "/", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } private void waitForLock(String prevZnodeName) throws KeeperException, InterruptedException { CountDownLatch latch = new CountDownLatch(1); Stat stat = zooKeeper.exists(LOCK_PATH + "/" + prevZnodeName, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted) { latch.countDown(); } } }); if (stat != null) { latch.await(); } } public void releaseLock() throws KeeperException, InterruptedException { zooKeeper.delete(currentZnodeName, -1); zooKeeper.close(); } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { DistributedLockExample lockExample = new DistributedLockExample(); lockExample.acquireLock(); // 执行需要加锁的业务逻辑 lockExample.releaseLock(); } }
上一篇:在 PostgreSQL 里如何实现数据的自动清理和过期处理?


下一篇:阶段三:项目开发---大数据开发运行环境搭建:任务2:安装配置ZooKeeper