分布式架构-ZK分布式锁解决方案

分布式架构-ZK分布式锁解决方案

一、解决思路

  1. 获取锁方法:
    多个jvm同时在zk上创建一个临时节点,最终只能够有一个jvm创建临时节点成功,如果能够创建临时节点成功jvm 表示获取锁成功能够正常执行业务逻辑,如果没有创建临时节点成功的jvm,则表示获取锁失败。获取锁失败之后,可以采用不断重试策略,重试多次获取锁失败之后,当前的jvm就进入到阻塞状态,并监听节点事件。

  2. 释放锁方法:
    直接调用.close()释放锁,因为采用临时节点,当我们调用close()方法的时候该临时节点会自动被删除。其他没有获取到锁的jvm会触发节点删除事件监听,然后唤起线程进入到获取锁的状态。

  3. 被唤醒的方法:
    被阻塞的jvm(没有获取锁成功的jvm),采用事件监听的方式监听到节点已经被删除的情况下,则开始从新进入到获取锁的状态。

  4. 获取锁原理:
    当多个jvm同时获取锁的时候,实际上底层在zk同时创建一个临时节点路径,但是最终只能有一个jvm能够创建该节点路径成功,如果成功的情况下,则认为获取锁成功,如果失败的情况下,则认为获取锁失败 可以采用重试机制,重试多次开始失败的情况下,当前jvm会进入到阻塞状态,等待获取锁的jvm释放锁唤醒。

  5. 释放锁原理:
    主动关闭当前会话连接,由于使用临时节点,则该节点自动会被删除。

  6. 唤醒原理:
    正在等待的jvm通过watch机制,收到该节点被释放之后,开始重新进入到获取锁流程。

二、zk实现分布式锁

  1. pom依赖
<dependency>
     <groupId>com.101tec</groupId>
     <artifactId>zkclient</artifactId>
     <version>0.8</version>
 </dependency>
  1. ZK工具类
public class ZkClientUtils{

    private static ZkClient zkClient = null;

    public static ZkClient getZkClient() {
        return zkClient;
    }

    public static ZkClient newZkClient() {
        if (zkClient != null) {
            zkClient.close();
        }
        zkClient = new ZkClient("127.0.0.1:2181", 5000);
        return zkClient;
    }
}
  1. Lock接口定义
public interface Lock {
    void getLock(); //获取锁
    void unlock(); //释放锁
}
  1. 在此采用模板设计模式实现,定义模板
@Slf4j
public abstract class ZookeeperAbstractTemplateLock implements Lock {

    protected abstract void waitLock();
    protected abstract boolean tryLock();

    @Override
    public void unlock() {
        ZkClientUtils.getZkClient().close();
        log.info("zk-----> 释放锁");
    }

    @Override
    public void getLock() {
        for (int i = 0; i < 5; i++) {
            if (tryLock()) {
                log.info("zk-----> 获取锁");
                return;
            }
        }
        waitLock();
        getLock();
    }
}
  1. 定义具体逻辑
@Slf4j
@Component
public class ZookeeperTemplateLock extends ZookeeperAbstractTemplateLock {
    private String lockPath = "/lockPath";
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    @Override
    protected boolean tryLock() {
        try {
            ZkClientUtils.newZkClient().createEphemeral(lockPath);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    @Override
    protected void waitLock() {
        try {
            boolean exists = ZkClientUtils.getZkClient().exists(lockPath);
            if (exists) {
                log.info("zk-----> 未获取到锁等待 ");
                // 注册一个监听事件
                IZkDataListener iZkDataListener = new IZkDataListener() {
                    @Override
                    public void handleDataChange(String s, Object o) throws Exception {
                    }
                    @Override
                    public void handleDataDeleted(String s) throws Exception {
                        log.info("zk-----> 临时节点删除:"+lockPath);
                        countDownLatch.countDown();
                    }
                };
                ZkClientUtils.getZkClient().subscribeDataChanges(lockPath, iZkDataListener);
                countDownLatch.await();
                // 唤醒成功之后,则移除该事件
                ZkClientUtils.getZkClient().unsubscribeDataChanges(lockPath, iZkDataListener);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
上一篇:网络编程系列---【文件上传(B/S)】


下一篇:jmeter插件开发及zookeeper压测示例插件