/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
为了表述清楚假设当前请求获取锁的线程名称为“线程x”
(1)非公平锁的第一步:当“线程x”请求获取到锁的时候,直接调用compareAndSetState(0, 1),这个方法会将判断state=0?如果等于0,说明这个锁没有被占领,那么“线程x”直接获取,这就是非公平锁,竞争锁成功之后,就将属性Thread exclusiveOwnerThread设置为“线程x”。
(2)如果没有获取到,那么就调用 acquire(1);
一: acquire(1);
这个方法首先调用tryAcquire方法获取锁,如果获取不到就要把“线程x”加入到队列中,并且阻塞“线程x”。
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
各个方法定义如下:
1、tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法由NonfairSync自己实现。
2、addWaiter:如果tryAcquire返回FALSE(获取锁失败),则调用该方法将“线程x”加入到CLH同步队列尾部。
3、acquireQueued:“线程x”加入到队列之后就会调用这个方法将“线程x”挂起,直到获取到(许可)permit之后,如果线程被打断了,则将“线程x”从队列中移除。
4、selfInterrupt:如果是产生了中断,则返回true,否则返回false。
二、公平锁和非公平锁都对tryAcquire(arg)方法有各自的实现。查看非公平锁的实现。
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {(1)
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {(2)
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这个方法有三个逻辑判断:
(1)如果没有线程占领锁,即c==0,那么竞争锁。
(2)如果锁被占领,判断锁是否被“线程x”占领,是就计算重入一次,并且返回true
(3)当锁被非“线程x”占领,就返回false.
三、 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
“线程x”竞争锁失败之后来到这个方法,这个方法首先执行的是addWaiter,将“线程x”加入到队列的尾端。
3.1、addWaiter(Node.EXCLUSIVE)
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
这个方法包含两个部分,enq(node)以上是一个部分,enq(node)是一个部分。第一部分是一个快速入队列的方法,当只有一个线程来请求锁的时候或者请求的方式是异步方式的时候,可以实现线程节点一个一个进入队列。enq(node)是给多个线程竞争入队列的。这部分在我的下载文件中,有详细的画图说明,看不明白可以下载看看。
3.2、enq(node)
这个方法是将节点入队,是一个死循环,只有把节点添加到队列中之后才会返回,返回的是添加节点的前置节点。在这个方法中包括了一个节点初始化的部分,就是创建头节点和尾节点。
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
3.3、acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();(1)
if (p == head && tryAcquire(arg)) {(2)
setHead(node);(3)
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&(4)
parkAndCheckInterrupt())(5)
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);(6)
}
}
根据标号分析每个部分的作用:
(1)“线程x”添加到队列之后,返回的是“线程x”节点。(1)的作用是取到“线程x”节点的前一个节点,这里假设是“线程y”
(2)如果“线程y”是头节点,并且“线程x”获取锁成功,则执行(3),把这个“线程x”节点设置成头节点,并且更改当前线程节点的thread=null,prev=null,这里其实就是把当前节点更改成了头节点的状态了。
(4)如果“线程y”节点不是头节点(就是说前面已经有其他线程节点来请求锁,并且加入到队列中,又阻塞了)或者“线程x”没有获取锁成功,那么执行(4)shouldParkAfterFailedAcquire的源码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;// 获取前置节点“线程y”的waitStatus
if (ws == Node.SIGNAL) // 如果ws ==-1
/*
* Node.SIGNAL的作用就是,当前的线程节点释放锁之后,可以顺利的通知它的继任线程节点(这个会在释放锁的时候说明),让他取获取锁
*这里问题就是,如果继任节点还在请求锁的话可以正常获取到,一旦继任节点被中断或者抛出异常了,怎么把这个节点移除队列?
*
*/
return true;
if (ws > 0) {
/*
*如果ws>0,目前来看只有CANCELLED。那么就会就要往“线程y”节点的前面去找一个节点,
*直到这个节点的pred.waitStatus <=0,
*然后 pred.next = node;把当前的节点赋值给找到的这个节点的next
*如果执行了这个方法,那么“线程x”的前置节点就会改变,不会再是“线程y”节点
*这里假设“线程y”节点ws = -1
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*ws必须等于0或者-3.表示我们需要一个信号,但是不是停止。调用者必须再次确认当前的节点在停止前必须时没有获取到锁的。
*这里就是又要跳出当前的方法去执行死循环了,下次还会再进来,知道返回true,那个时候才会去执行parkAndCheckInterrupt
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
(5) parkAndCheckInterrupt(),当确定“线程x”节点需要挂起的时候,那么就会执行这个方法去挂起“线程x”节点。
(6)cancelAcquire(node);这个方法的作用就是当“线程x”节点出现异常获取被终端之后把它从队列中移除。
总结:
第一次看源码,建议先使用一下重入锁(ReentrantLock),然后带着下面几个问题看源码,并且从源码中找答案:
1、使用一个自己可以调试源码的IDE工具,我使用的intellij,写测试类调试源码很方便?
2、ReentrantLock是重入锁,那么实现的原理是什么?
3、重入锁是什么意思,应用场景是什么,怎么实现的?
4、公平锁与非公平锁是什么意思?为什么公平与不公平?
5、线程是怎么竞争获取到锁的?CAS指的是什么?
6、锁的排他模式与共享模式?
7、线程怎么被封装成node,并且通过什么方式加入到队列中?(这个部分最好画图,而且要考虑到各个线程之间的竞争入队)?
8、线程使用完锁之后,怎么传递给下一个线程使用?