ReentrantLock 的公平锁源码分析

ReentrantLock 源码分析   以公平锁源码解析为例:

1:数据结构:

维护Sync 对象的引用:   private final Sync sync;

Sync对象继承 AQS,  Sync  分为两个类:处理公平锁锁和非公平锁:

FairSync   NonfairSync

 具体的类图如下:

  

2:接下来重点分析AQS这个类:AbstractQueuedSynchronizer:

AQS中的成员变量:

private transient volatile Node head;   //AQS维护队列的头结点

private transient volatile Node tail;     // AQS维护队列的尾结点

private volatile int state;                            // AQS 锁的状态  数量标识锁被获取的次数

下面看看NODE 结点的成员变量:

volatile int waitStatus;   //等待状态

volatile Node prev;      //前继节点

volatile Node next;      //后继节点

volatile Thread thread;   //线程对象

Node nextWaiter;       //下一个等待节点

从NODE的数据结构可以看出来,AQS里面维护的队列的数据结构是双链表的形式;

 
   

 

 

 

 

 

 

 

 

3:接下来分析 ReentrantLock  的构造方法:

ReentrantLock lock = new ReentrantLock(true);   //传入true,说明是构造公平锁

具体的构造方法如下,返回FairSync 对象:

public ReentrantLock(boolean fair) {

        sync = fair ? new FairSync() : new NonfairSync();

    }

4:lock方法的分析:因为是公平锁,所以调用 FairSync 下的lock方法:

final void lock() {

            acquire(1);

        }

   Acquire 的方法如下:

public final void acquire(int arg) {    // arg=1

        if (!tryAcquire(arg) &&    

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

}

接下来分两种场景来分析tryAcquire(arg)  1:同一线程第一次或N次获取锁(其他线程没有获取到锁)   2:其他线程已经获取到锁,当前线程尝试去获取锁;

//场景 1:

protected final boolean tryAcquire(int acquires) {   // acquires=1

            final Thread current = Thread.currentThread();   //当前线程:main-thread

            int c = getState();    //如果为第一次获取锁c=0  如果main线程已经获取过锁,则c为加锁的次数

            if (c == 0) {     //当前线程第一次获取锁

                if (!hasQueuedPredecessors() &&  // hasQueuedPredecessor的分析如下单独分析:

                    compareAndSetState(0, acquires)) {  //cas原子操作 state=1

                    setExclusiveOwnerThread(current);  //独占线程设置为当前线程

                    return true;  //返回true表明加锁成功

                }

            }

            else if (current == getExclusiveOwnerThread()) {  // c=n 的情况下

                int nextc = c + acquires;    // nextc=n+1

                if (nextc < 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);    //设置 state=n+1

                return true;  //获取到锁,返回true

            }

            return false;

        }

说明:hasQueuedPredecessors主要是判断当前线程所在的节点是不是CLH队列的首个位置,这个判断的目的是公平锁的公平获取锁的机制

hasQueuedPredecessors的源码如下:以该线程是第一次获取锁为例分析:

public final boolean hasQueuedPredecessors() {

        Node t = tail;      // tail=null

        Node h = head;    // head=null

        Node s;

        return h != t &&    //返回false

            ((s = h.next) == null || s.thread != Thread.currentThread());

    }

 

//场景2 分析:有其他线程未释放锁(main-thread 持有锁,thread-1尝试去获取锁)

protected final boolean tryAcquire(int acquires) {   // acquires=1

            final Thread current = Thread.currentThread();   //当前线程:thread-1

            int c = getState();    // 由于其他持有锁 state 至少为1        

if (c == 0) {    

                if(!hasQueuedPredecessors()&&  compareAndSetState(0, acquires)) {                    

setExclusiveOwnerThread(current); 

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {  // current=thread1  // getExclusiveOwnerThread()=main

                int nextc = c + acquires;                   

if (nextc < 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);   

                return true; 

            }

            return false;   //此时返回false表明尝试获取锁失败

        }

 

5:上面的场景1 获取到锁后返回true,则lock 方法执行结束。下面分析场景2:

public final void acquire(int arg) {

        if (!tryAcquire(arg) &&     //尝试获取锁失败,返回false

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }

!tryAcquire(arg)  返回为true;接下来进入

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  这个逻辑;

这个方法有两层处理:

1:addWaiter(Node.EXCLUSIVE)   将当前线程调价到CLH队列中

2:acquireQueued();逐步执行CLH队列中的线程,如果当前线程获取到锁则返回,否则,当前线程进行休眠,直到唤醒并重新获取到锁才返回;

以下分析这两个方法:场景:main线程获取到锁但未释放,这是线程 thread-1去获取锁:(假设此时没有其他线程在CLH队列中,即CLH队列为null)

addWaiter(Node.EXCLUSIVE)方法:入参 Node.EXCLUSIVE 为null;

 

   private Node addWaiter(Node mode) {   //mode=null  EXCLUSIVE 标识节点为独占锁模型

        Node node = new Node(Thread.currentThread(), mode); //创建新节点:新节点中线程为当前线程,节点模型为独占锁:thread= thread-1   nextWaiter= mode

        // Try the fast path of enq; backup to full enq on failure

        Node pred = tail;   // 此时tail=null

        if (pred != null) {

            node.prev = pred;

            if (compareAndSetTail(pred, node)) {

                pred.next = node;

                return node;

            }

        }

        enq(node);  //进入enq方法

        return node;

    }

  Enq() 方法如下:

private Node enq(final Node node) {   //入参为上一步新建的节点

        for (;;) {

            Node t = tail;    // 第一次遍历逻辑 tail=null

            if (t == null) { // Must initialize

                if (compareAndSetHead(new Node()))  //CAS 创建表头 Head

                    tail = head;

            } else {

                node.prev = t;  //第二次遍历逻辑:node为上面新建的节点:thread= thread-1   nextWaiter= mode, node.prev指向表头 t 为表头

                if (compareAndSetTail(t, node)) {  //CAS设置队列尾节点为当前node

                    t.next = node;   // 表头后继节点指向当前节点

                    return t;

                }

            }

        }

}

该场景经过上面的处理之后 CLH队列的数据结构如下:

第一次遍历:

 Head,tail节点

 

 

 

 

 

 

 

 

 

 

 

第二次遍历:

   Head               当前线程node:设置为tail

 
   

 

 

 

 

 

 

 

 

接下来分析

acquireQueued这个方法

final boolean acquireQueued(final Node node, int arg) {  //node为当前线程节点 arg=1

        boolean failed = true;

        try {

            boolean interrupted = false; //当前线程在休眠时,有没有被中断过

            for (;;) {

                final Node p = node.predecessor(); //获取前继节点,这里为head节点

                if (p == head && tryAcquire(arg)) { //这里p== head为true,接下来进入

// tryAcquire(arg)这个方法,tryAcquire(arg)方法前面分析过了,这里返回false;

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return interrupted;

                }

//   接下来会进入以下的逻辑:下面会分析这两个方法

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

    }

 

这里再次回顾下tryAcquire(arg) 方法:返回fasle

protected final boolean tryAcquire(int acquires) {    // acquires=1

            final Thread current = Thread.currentThread(); //当前线程 thread-1

            int c = getState();   //state=1

            if (c == 0) {

                if (!hasQueuedPredecessors() &&

                    compareAndSetState(0, acquires)) {

                    setExclusiveOwnerThread(current);

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {  // getExclusiveOwnerThread

获取到的线程是tryAcquire(int acquires)中设置的值 这里是 main; current=thread-1

                int nextc = c + acquires;

                if (nextc < 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

        }

    }

 

1): acquireQueued(final Node node, int arg) 方法中for循环第一次执行shouldParkAfterFailedAcquire(p, node) 方法分析:源码如下:

入参:pred为前继节点,这里是head  node为当前节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        int ws = pred.waitStatus;  // ws=0

        if (ws == Node.SIGNAL) 

                     return true;

        if (ws > 0) {

            do {

                node.prev = pred = pred.prev;

            } while (pred.waitStatus > 0);

            pred.next = node;

        } else {

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  //设置前继节点waitStatus= Node.SIGNAL,

        }

        return false;  //返回false;

    }

   For循环第二次执行 shouldParkAfterFailedAcquire(p, node)方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        int ws = pred.waitStatus;  // ws=-1   在第一次已经设置为-1

        if (ws == Node.SIGNAL)   //返回true

                     return true;

        if (ws > 0) {

            do {

                node.prev = pred = pred.prev;

            } while (pred.waitStatus > 0);

            pred.next = node;

        } else {

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  //设置前继节点waitStatus= Node.SIGNAL,

        }

        return false;  //返回false;

    }

 

返回true后,接下进入parkAndCheckInterrupt()这个方法:

源码分析如下:

private final boolean parkAndCheckInterrupt() {

        LockSupport.park(this);  //阻塞当前线程

        return Thread.interrupted(); // 返回线程的中断状态

    }

LockSupport.park(this); //作用:前继线程节点的状态是 Node.SIGNAL;挂起当前线程;

Thread.interrupted(); //当前被挂起的线程被前继线程中断,返回线程的中断状态;

下面解释一个线程的行为:LockSupport.park(this)  线程被挂起:

当线程被挂起的时候唤醒的方式有两种:

1:unpark 的方式唤醒,前继节点线程使用完锁后,通过unpark方式唤醒当前线程

2:中断唤醒,其他线程通过 interrupt 中断当前线程

 

接下来继续分析:acquire(int arg)

public final void acquire(int arg) {

        if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }

这个时候的重点放在分析 selfInterrupt(); 这个方法上;

进入这个方法的条件是 当前线程被中断过,并且获取锁成功了;

static void selfInterrupt() {

        Thread.currentThread().interrupt();  //当前线程产生一个中断,真正被唤醒

    }

 

到此为止,ReentrantLock 的公平锁源码分析结束。

上一篇:吴裕雄--天生自然 python数据分析:基于Keras使用CNN神经网络处理手写数据集


下一篇:Sklearn K均值聚类