介绍
1.创建一个永久节点
2.竞争锁的时候同样的的key 所有线程都往永久节点插入指定key name的临时节点(节点不允许重复只会有一个插入成功)
3.插入失败的开启对永久节点的监听
4.当时获得锁的线程down机或者删除会触发监听。然后尝试获取CLH第一个线程节点 尝试重新获取锁
代码已上传github:https://github.com/aa310958153/zookeeper-lock
注:仅仅用于熟悉AQS如果要用到Zookeeper分布式锁直接使用Curator基于对Zookeeper锁的实现
InterProcessMutex:分布式可重入排它锁
InterProcessSemaphoreMutex:分布式排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:将多个锁作为单个实体管理的容器
加锁
@Override protected boolean tryAcquire(int acquires) { //获取当前线程 final Thread current = Thread.currentThread(); //获取锁状态 int c = getState(); //等于0表示 当前空闲状态可以尝试获取 <1>zkLock加锁 if (c == 0) { if (zkLock()&&compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //可重入判断 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
<1>zkLock
/** * zookeeper保存临时节点实现加锁 * @return */ public boolean zkLock() { String path= getLockPath(); boolean haveLock=false; try { curatorFramework .create() .creatingParentContainersIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath(path, syncValue.get().getValue().getBytes(StandardCharsets.UTF_8)); haveLock= true; } catch (org.apache.zookeeper.KeeperException.NodeExistsException e) {//重复的标识未获取到锁 haveLock=false; } catch (Exception e) { e.printStackTrace(); haveLock= false; } /** * 未获取到锁监听永久节点 */ if(!haveLock){ TreeCache treeCache = new TreeCache(CuratorClient.getCurator(), SYN_SWITCH_ZK_NODE); try { treeCache.start(); } catch (Exception e) { e.printStackTrace(); } if(!syncValue.get().isAddListener) { treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData eventData = event.getData(); switch (event.getType()) { case NODE_ADDED: //System.out.println(path + "节点添加:" + eventData.getPath() + "\t添加数据为:" + new String(eventData.getData())); break; case NODE_UPDATED: // System.out.println(eventData.getPath() + "节点数据更新\t更新数据为:" + new String(eventData.getData()) + "\t版本为:" + eventData.getStat().getVersion()); break; case NODE_REMOVED: //监听到节点删除。表示锁正常释放 或者持有锁的服务断开连接 //获得第一个阻塞线程 唤醒 尝试获取锁 Thread firstThread=getFirstQueuedThread(); if( firstThread!=null) { LockSupport.unpark(firstThread); } break; default: break; } } }); syncValue.get().setAddListener(true); } } return haveLock; }
释放锁
@Override protected boolean tryRelease(int releases) { //状态-1 大于0的数字表示可重入加了多少次锁 int c = getState() - releases; //如果加锁线程非当前线程抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); //根据节点存储的值校验是否是当前线程加锁。如果不是抛出异常 String value; try { value= new String(curatorFramework.getData().forPath(getLockPath())); } catch (Exception e) { e.printStackTrace(); throw new IllegalMonitorStateException(); } if(value==null||!value.equals(syncValue.get().getValue())){ throw new IllegalMonitorStateException(); } boolean free = false; //当c等于0表示最后一次调用unlock 则进行锁的释放 if (c == 0) { free = true; //获得锁的线程设置为null setExclusiveOwnerThread(null); String path= getLockPath(); try { //删除节点 会触发节点监听 curatorFramework.delete().forPath(path); } catch (Exception e) { throw new IllegalMonitorStateException(); } syncValue.remove(); } //设置state setState(c); return free; }