分布式架构-ZK分布式锁解决方案
一、解决思路
-
获取锁方法:
多个jvm同时在zk上创建一个临时节点,最终只能够有一个jvm创建临时节点成功,如果能够创建临时节点成功jvm 表示获取锁成功能够正常执行业务逻辑,如果没有创建临时节点成功的jvm,则表示获取锁失败。获取锁失败之后,可以采用不断重试策略,重试多次获取锁失败之后,当前的jvm就进入到阻塞状态,并监听节点事件。 -
释放锁方法:
直接调用.close()释放锁,因为采用临时节点,当我们调用close()方法的时候该临时节点会自动被删除。其他没有获取到锁的jvm会触发节点删除事件监听,然后唤起线程进入到获取锁的状态。 -
被唤醒的方法:
被阻塞的jvm(没有获取锁成功的jvm),采用事件监听的方式监听到节点已经被删除的情况下,则开始从新进入到获取锁的状态。 -
获取锁原理:
当多个jvm同时获取锁的时候,实际上底层在zk同时创建一个临时节点路径,但是最终只能有一个jvm能够创建该节点路径成功,如果成功的情况下,则认为获取锁成功,如果失败的情况下,则认为获取锁失败 可以采用重试机制,重试多次开始失败的情况下,当前jvm会进入到阻塞状态,等待获取锁的jvm释放锁唤醒。 -
释放锁原理:
主动关闭当前会话连接,由于使用临时节点,则该节点自动会被删除。 -
唤醒原理:
正在等待的jvm通过watch机制,收到该节点被释放之后,开始重新进入到获取锁流程。
二、zk实现分布式锁
- pom依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
</dependency>
- ZK工具类
public class ZkClientUtils{
private static ZkClient zkClient = null;
public static ZkClient getZkClient() {
return zkClient;
}
public static ZkClient newZkClient() {
if (zkClient != null) {
zkClient.close();
}
zkClient = new ZkClient("127.0.0.1:2181", 5000);
return zkClient;
}
}
- Lock接口定义
public interface Lock {
void getLock(); //获取锁
void unlock(); //释放锁
}
- 在此采用模板设计模式实现,定义模板
@Slf4j
public abstract class ZookeeperAbstractTemplateLock implements Lock {
protected abstract void waitLock();
protected abstract boolean tryLock();
@Override
public void unlock() {
ZkClientUtils.getZkClient().close();
log.info("zk-----> 释放锁");
}
@Override
public void getLock() {
for (int i = 0; i < 5; i++) {
if (tryLock()) {
log.info("zk-----> 获取锁");
return;
}
}
waitLock();
getLock();
}
}
- 定义具体逻辑
@Slf4j
@Component
public class ZookeeperTemplateLock extends ZookeeperAbstractTemplateLock {
private String lockPath = "/lockPath";
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
protected boolean tryLock() {
try {
ZkClientUtils.newZkClient().createEphemeral(lockPath);
return true;
} catch (Exception e) {
return false;
}
}
@Override
protected void waitLock() {
try {
boolean exists = ZkClientUtils.getZkClient().exists(lockPath);
if (exists) {
log.info("zk-----> 未获取到锁等待 ");
// 注册一个监听事件
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
log.info("zk-----> 临时节点删除:"+lockPath);
countDownLatch.countDown();
}
};
ZkClientUtils.getZkClient().subscribeDataChanges(lockPath, iZkDataListener);
countDownLatch.await();
// 唤醒成功之后,则移除该事件
ZkClientUtils.getZkClient().unsubscribeDataChanges(lockPath, iZkDataListener);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}