ReentrantLock

要讲AbstractQueuedSynchronizer,我觉得最好还是实际使用一个重入锁,来看内部实现,这里使用ReentrantLock

    ReentrantLock NonFairLock = new ReentrantLock();
    ReentrantLock fairLock = new ReentrantLock(true);

可以看到ReentrantLock构造函数有两种实现,一个是默认的非公平锁,另一个是非公平锁

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

看一下ReentrantLock内部构造

//首先实际lock,调用的是一个Sync类,Sync继承了AQS
abstract static class Sync extends AbstractQueuedSynchronizer {}
//具体的非公平锁,继承了Sync类
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}

非公平锁

加锁

以下是nonFairLock.lock()方法的大致流程

ReentrantLock

ReentrantLock.lock()

    public void lock() {
        sync.lock();
    }

这里使用了ReentrantLock的类常量,private final Sync sync,sync调用它子类NonfairSync.lock()方法

NonfairSync.lock()

       final void lock() {
           //初次获取锁
           if (compareAndSetState(0, 1))
               setExclusiveOwnerThread(Thread.currentThread());
           else
               //判断是重入锁还是去竞争别人的锁
               acquire(1);
       }

重点关注下,当使用CAS更改state变量失败后(也即是没有初次获取到锁,如果是重入锁,也会失败),AQS.acuire(1)

AQS.acquire(int arg)

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            //addWaiter(独占锁模式)
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

这个方法首先会去调用子类实现的tryAcquire(int arg)方法,在非公平锁中会调用Sync的nonfairTryAcquire(int acquires)方法

ReentrantLock.Sync.nonfairTryAcquire(int acquires)

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //获取当前的state
            int c = getState();
            //如果当前state == 0,那么使用CAS修改state,即获取锁
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //当前线程已经获取了锁
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //获取锁失败,返回false,调用后续的函数
            return false;
        }

获取锁成功,!tryacquire(arg)相当于是false,直接返回

如果获取锁失败,!tryacquire(arg)相当于是true,需要执行if判断中的acquireQueued(final Node node, int arg)函数


这里看一看获取锁失败后的流程

首先将当前线程封装成Node节点,进入队列(实际是链表)

AQS.addWaiter(Node mode)

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //直接获取连边末尾的节点,尝试进行快速插入
        //这里的条件是链表中已经有了等待节点,也即是链表初始化了
        //当采用一次CAS设置末尾节点的时候,还是会调用enq方法,进行for死循环插入队列
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //看,这里又用到了CAS
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果CAS设置失败
        //或者链表还没有初始化,则进行初始化
        enq(node);
        return node;
    }

看一下enq(final Node node) 方法

AQS.enq(final Node node)

    private Node enq(final Node node) {
        //这里采用了for(;;)循环的方式
        for (;;) {
            Node t = tail;
            //链表没有初始化,此时会生成一个dummy虚头节点
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //链表已经初始化了
                //将新的node节点放入末尾
                node.prev = t;
                //再次采用CAS设置,如果失败,则会反复for循环
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

将等待线程封装成节点后,调用acquireQueued(final Node node, int arg)方法

AQS.acquireQueued(final Node node, int arg)

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //再次for(;;)循环
            for (;;) {
                //获取到当前节点的前继
                final Node p = node.predecessor();
                //当前继为头结点的时候,可以尝试使用tryAcuqire(arg)方法获取锁
                //在非公平锁中会调用Sync的nonfairTryAcquire(int acquires)方法
                if (p == head && tryAcquire(arg)) {
                    //获取锁成功,将当前节点设置为头结点
                    setHead(node);
                    /**
                    AQS.setHead(node)方法
                        private void setHead(Node node) {
                        head = node;
                        //将头结点中的线程属性设位null
                        node.thread = null;
                        node.prev = null;
                    }
                    */
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //当该节点的前继不是head节点,或者获取锁失败
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //如果挂起线程,检测到被中断,那么将中断标识设置为true
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在for循环中,如果节点的前继不是head,或者前继是head,但是获取锁失败(前继没有完全释放,state != 0),会在if()判断中调用shouldParkAfterFailedAcquire(p, node) 和 parkAndCheckInterrupt()方法

注意,如果should()方法返回是false,不会执行park()方法

AQS.shouldParkAfterFailedAcquire(p, node)

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            //前继的等待状态已经设置为SIGNAL,该节点可以调用LockSupport.park()方法挂起
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            //前继节点已经取消了,此时需要找新的前继节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //前继节点为0或者PROPAGARE,采用CAS的方式更改为SIGNAL,并返回false
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

如果should()返回true,说明前继的等待状态已经是SIGNAL了,则执行parkAndCheckInterrupt()挂起当前线程

AQS.parkAndCheckInterrupt()

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        //判断当前线程是否被中断
        return Thread.interrupted();
    }

解锁

因为解锁的线程首先要求获取到了锁,所以不存在锁竞争,调用链相对简单

ReentrantLock

ReentrantLock.unlock()

    public void unlock() {
        sync.release(1);
    }

调用了sync.release(int arg)方法,也即是父类AQS的release(int arg)方法

AQS.release(int arg)

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

首先会执行tryRelease(1)方法

ReentrantLock.Sync.tryRelease(int releases)

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                //如果当前线程没有获得锁,抛出异常
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                //如果state被释放为0,则标记free,锁完全释放
                free = true;
                //设置当前锁没有被任何线程获取
                setExclusiveOwnerThread(null);
            }
            //更新state
            setState(c);
            //返回free标记,用于AQS.release()流程判断
            return free;
        }

如果Sync.tryRelease()方法完全释放了锁,则AQS.release()方法中会执行unparkSuccessor(head)方法,去唤醒后继节点

AQS.unparkSuccessor(Node node)

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        //如果node的后继节点为空或者被取消,则从后往前找下一个可以唤醒的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //唤醒后继节点(线程)
            LockSupport.unpark(s.thread);
    }

LockSupport.unpark()

    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

至此,ReentrantLock的非公平加锁和解锁,流程看完了

上一篇:aop 对 controller请求参数等信息 输出日志


下一篇:C和指针 学习(二)第七章习题第五题、第六题