前言
Zookeeper实现分布式锁。
文章目录
一、实现思路
- 为什么有分布式锁?
1)分布式情况下多个客户端请求处理同一个资源,如果不加锁可能会产生数据一致性等其他严重问题,但是单机情况下的一些锁(比如ReentrantLock、Synchorized)只能控制对当前机器请求的数据一致性,不能处理多台机器,因此产生了分布式锁。
2)每个客户在处理资源前先要获取到分布式锁,只有获取到才能操作资源,否则等待。 - Zookeeper怎么实现?
1)如图创建一个临时根节点/locks
,每当有一个请求发送过来就在该根节点基础上创建一个临时有序节点/locks/seq-xxxxx
,然后根据序号判断当前节点是不是当前情况下最小的节点,如果是的话就获取到锁,反之对前一个节点进行监听。
2)当获取到锁的前一个节点处理完数据后,就delete释放掉节点,然后后面的节点会收到通知,重复进行判断。
二、代码实现
/**
* 分布式锁的实现
*/
public class DistributeClock {
// Zookeeper客户端
private ZooKeeper zk = null;
// 等待连接处理
private CountDownLatch connectlatch = new CountDownLatch(1);
// 等待前一个节点删除执行完毕
private CountDownLatch preNodelatch = new CountDownLatch(1);
// 远程服务器地址
private String hostName = "192.168.1.6:2181,192.168.1.7:2181,192.168.1.8:2181";
// 连接建立时间
private int connectTimeout = 60000;
// 前一个被监听的节点名称
private String waitPath;
// 当前节点的名称
private String currentPath;
public DistributeClock() throws Exception{
//1.获取连接
zk = new ZooKeeper(hostName, connectTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
// 状态为连接建立完毕
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
connectlatch.countDown();
}
// 监听前一个节点的处理完毕
if(watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
preNodelatch.countDown();
}
}
});
//2.等待连接建立完毕
connectlatch.await();
//3.连接建立完毕,判断根节点 /locks是否存在,不存在则创建该根节点
Stat stat = zk.exists("/locks", false);
if(stat == null){
zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* 请求加锁
*/
public void zkLock(){
try {
// 创建带序号的临时节点,同时获取到该根路径下的子节点
currentPath = zk.create("/locks" + "/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> childNodes = zk.getChildren("/locks", true);
// 只有1个节点直接返回
if(childNodes.size() == 1){
return;
}
// 多个节点时,当前节点监听前一个节点
else{
// 节点名称:seq-00000000
String thisNode = currentPath.substring("/locks/".length());
Collections.sort(childNodes);
int index = childNodes.indexOf(thisNode);
if(index == -1){
throw new RuntimeException("数据异常");
}
else if(index == 0){
return;
}
else{
waitPath = "/locks/"+childNodes.get(index-1);
zk.getData(waitPath,true,new Stat());
preNodelatch.await();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 释放锁
*/
public void releaseLock(){
try {
zk.delete(currentPath,-1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
注意以下几点:
- 连接建立完毕后,需要检查
"/clocks"
根路径是否存在,不存需要提前创建好该路径; - 每当有一个客户端请求获取锁资源,就先执行创建临时带序号的子节点,然后比较当前节点是不是
/clocks
路径下序号最小的那个节点,如果是就创建该节点;否则当前节点需要去监听前一个节点。 - 整个加锁过程中必须等待连接建立完毕才能继续操作、必须等待前一个节点完全释放掉锁之后才能继续操作,因此需要用到
CountDownLatch
类变量。
java多线程系列:CountDownLatch
整齐划一-CountDownLatch如何协调多线程的开始和结束
三、成熟的框架——Curator
自定义的分布式锁存在很多问题,比如需要通过CountDownLatch
实现异步连接控制,还需要解决反复重连等问题,因此可以直接使用封装成熟的框架Curator
来实现分布式锁的调用。
详细学习Curator
public class CuratorTest {
private static String hostAddress = "192.168.1.6:2181,192.168.1.7:2181,192.168.1.8:2181";
public static void main(String[] args) {
// 创建分布式锁1
final InterProcessMutex clock1 = new InterProcessMutex(connectClient(), "/clocks");
// 创建分布式锁2
final InterProcessMutex clock2 = new InterProcessMutex(connectClient(), "/clocks");
// 尝试获取锁
new Thread(new Runnable() {
public void run() {
try {
clock1.acquire();
System.out.println("-----------客户端1成功获取到分布式锁------------");
clock1.release();
System.out.println("-----------客户端1成功释放分布式锁------------");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
try {
clock2.acquire();
System.out.println("-----------客户端2成功获取到分布式锁------------");
clock2.release();
System.out.println("-----------客户端2成功释放分布式锁------------");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
// 建立客户端连接
private static CuratorFramework connectClient(){
/**
* 建立重试策略
* 间隔时间、最大重试次数
*/
RetryPolicy policy = new ExponentialBackoffRetry(3000,3);
/**
* 建立连接
*/
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(hostAddress)
.connectionTimeoutMs(60000)
.sessionTimeoutMs(60000)
.retryPolicy(policy).build();
// 启动Client实例
client.start();
return client;
}
}