JUC锁框架源码阅读-AQS+Zookeeper实现分布式锁

介绍

1.创建一个永久节点

2.竞争锁的时候同样的的key 所有线程都往永久节点插入指定key name的临时节点(节点不允许重复只会有一个插入成功)

3.插入失败的开启对永久节点的监听

4.当时获得锁的线程down机或者删除会触发监听。然后尝试获取CLH第一个线程节点 尝试重新获取锁

代码已上传github:https://github.com/aa310958153/zookeeper-lock

注:仅仅用于熟悉AQS如果要用到Zookeeper分布式锁直接使用Curator基于对Zookeeper锁的实现

InterProcessMutex:分布式可重入排它锁
InterProcessSemaphoreMutex:分布式排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:将多个锁作为单个实体管理的容器

加锁

 @Override
        protected boolean tryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            //获取锁状态
            int c = getState();
            //等于0表示 当前空闲状态可以尝试获取 <1>zkLock加锁
            if (c == 0) {
                if (zkLock()&&compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //可重入判断
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

<1>zkLock

 /**
         * zookeeper保存临时节点实现加锁
         * @return
         */
        public boolean zkLock() {
            String path= getLockPath();
            boolean haveLock=false;
            try {
                curatorFramework
                        .create()
                        .creatingParentContainersIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .forPath(path, syncValue.get().getValue().getBytes(StandardCharsets.UTF_8));

                haveLock= true;
            } catch (org.apache.zookeeper.KeeperException.NodeExistsException e) {//重复的标识未获取到锁
                haveLock=false;
            } catch (Exception e) {
                e.printStackTrace();
                haveLock= false;
            }
            /**
             * 未获取到锁监听永久节点
             */
            if(!haveLock){
                TreeCache treeCache = new TreeCache(CuratorClient.getCurator(), SYN_SWITCH_ZK_NODE);
                try {
                    treeCache.start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                if(!syncValue.get().isAddListener) {
                    treeCache.getListenable().addListener(new TreeCacheListener() {
                        @Override
                        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                            ChildData eventData = event.getData();
                            switch (event.getType()) {
                                case NODE_ADDED:
                                    //System.out.println(path + "节点添加:" + eventData.getPath() + "\t添加数据为:" + new String(eventData.getData()));
                                    break;
                                case NODE_UPDATED:
                                  //  System.out.println(eventData.getPath() + "节点数据更新\t更新数据为:" + new String(eventData.getData()) + "\t版本为:" + eventData.getStat().getVersion());
                                    break;
                                case NODE_REMOVED:
                                    //监听到节点删除。表示锁正常释放 或者持有锁的服务断开连接
                                    //获得第一个阻塞线程 唤醒 尝试获取锁
                                    Thread firstThread=getFirstQueuedThread();
                                    if( firstThread!=null) {
                                        LockSupport.unpark(firstThread);
                                    }
                                    break;
                                default:
                                    break;
                            }
                        }
                    });
                    syncValue.get().setAddListener(true);
                }
            }
            return haveLock;
        }

释放锁

  @Override
        protected boolean tryRelease(int releases) {
            //状态-1 大于0的数字表示可重入加了多少次锁
            int c = getState() - releases;
            //如果加锁线程非当前线程抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            //根据节点存储的值校验是否是当前线程加锁。如果不是抛出异常
            String value;
            try {
                value= new String(curatorFramework.getData().forPath(getLockPath()));
            } catch (Exception e) {
                e.printStackTrace();
                throw new IllegalMonitorStateException();
            }
            if(value==null||!value.equals(syncValue.get().getValue())){
                throw new IllegalMonitorStateException();
            }
            boolean free = false;
            //当c等于0表示最后一次调用unlock 则进行锁的释放
            if (c == 0) {
                free = true;
                //获得锁的线程设置为null
                setExclusiveOwnerThread(null);
                String path= getLockPath();
                try {
                    //删除节点 会触发节点监听
                    curatorFramework.delete().forPath(path);
                } catch (Exception e) {
                    throw new IllegalMonitorStateException();
                }
                syncValue.remove();
            }
            //设置state
            setState(c);

            return free;
        }

 

JUC锁框架源码阅读-AQS+Zookeeper实现分布式锁

上一篇:C# 关于画图Graphics Bitmap image


下一篇:Linux下有关进程的相关命令