通过ZooKeeper的有序节点、节点路径不回重复、还有节点删除会触发Wathcer事件的这些特性,我们可以实现分布式锁。
一、思路
- zookeeper中创建一个根节点Locks,用于后续各个客户端的锁操作。
- 当要获取锁的时候,在Locks节点下创建“Lock_序号”的零时有序节点(临时节点为了客户端突发断开连接,则此节点消失)。
- 如果没有得到锁,就监控排在自己前面的序号节点,等待它的释放。
- 当前面的锁被释放后,触发Process方法,然后继续获取当前子节点,判断当前节点是不是第一个,是 返回锁,否 获取锁失败。
二、实现
在实现是要了解一个类 AutoResetEvent。AutoResetEvent 常常被用来在两个线程之间进行信号发送。它有两个重要的方法:
Set() :发送信号到等待线程以继续其工作。
bool WaitOne():等待另一个线程发信号,只有收到信号,线程才继续往下执行 ,会一直等待下去,返回值表示是否收到信号。
bool WaitOne(int millisecondsTimeout):等待指定时间,如果没有收到信号继续执行,返回值表示是否收到信号。
下面为具体实现方法:
public class ZooKeeperLock { private MyWatcher myWatcher; private string lockNode; private org.apache.zookeeper.ZooKeeper zooKeeper; public ZooKeeperLock() { myWatcher = new MyWatcher(); } /// <summary> /// 获取锁 /// </summary> /// <param name="millisecondsTimeout">等待时间</param> /// <returns></returns> public async Task<bool> TryLock(int millisecondsTimeout = 0) { try { zooKeeper = new org.apache.zookeeper.ZooKeeper("127.0.0.1", 50000, new MyWatcher()); //创建锁节点 if (await zooKeeper.existsAsync("/Locks") == null) await zooKeeper.createAsync("/Locks", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //新建一个临时锁节点 lockNode = await zooKeeper.createAsync("/Locks/Lock_", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //获取锁下所有节点 var lockNodes = await zooKeeper.getChildrenAsync("/Locks"); lockNodes.Children.Sort(); //判断如果创建的节点就是最小节点 返回锁 if (lockNode.Split("/").Last() == lockNodes.Children[0]) return true; else { //当前节点的位置 var location = lockNodes.Children.FindIndex(n => n == lockNode.Split("/").Last()); //获取当前节点 前面一个节点的路径 var frontNodePath = lockNodes.Children[location - 1]; //在前面一个节点上加上Watcher ,当前面那个节点删除时,会触发Process方法 await zooKeeper.getDataAsync("/Locks/" + frontNodePath, myWatcher); //如果时间为0 一直等待下去 if (millisecondsTimeout == 0) myWatcher.AutoResetEvent.WaitOne(); else //如果时间不为0 等待指定时间后,返回结果 { var result = myWatcher.AutoResetEvent.WaitOne(millisecondsTimeout); if (result)//如果返回True,说明在指定时间内,前面的节点释放了锁(但是可能是中间节点主机宕机 导致,所以不一定触发了Process方法就是得到了锁。需要重新判断是不是第一个节点) { //获取锁下所有节点 lockNodes = await zooKeeper.getChildrenAsync("/Locks"); //判断如果创建的节点就是最小节点 返回锁 if (lockNode.Split("/").Last() == lockNodes.Children[0]) return true; else return false; } else return false; } } } catch (KeeperException e) { await UnLock(); throw e; } return false; } /// <summary> /// 释放锁 /// </summary> /// <returns></returns> public async Task UnLock() { try { myWatcher.AutoResetEvent.Dispose();await zooKeeper.deleteAsync(lockNode); } catch (KeeperException e) { throw e; } } }
Process方法实现:
public class MyWatcher : Watcher { public AutoResetEvent AutoResetEvent; public MyWatcher() { this.AutoResetEvent = new AutoResetEvent(false); } public override Task process(WatchedEvent @event) { if (@event.get_Type() == EventType.NodeDeleted) { AutoResetEvent.Set(); } return null; } }