学习AQS
一丶AQS
什么是AQS? Java并发包中的抽象队列同步器(AbstractQueuedSchronizer), 它是java中构建锁和其他同步组件的基础框架. 如常用的重入锁ReetrantLock, 都是基于该同步框架实现的.
二丶理解
整体思路, 维护一个锁状态state(int类型), 和一个先入先出(FIFO)队列.
多线程去竞争锁时, 会使用CAS操作去修改state值, 如果修改成功(获取锁成功), 则将该线程标识为获取锁的线程, 如果修改失败, 会将该线程放进队列尾部, 并加以阻塞, 直到该线程被唤醒. 当队列的头结点线程(即获取锁的线程) 释放锁时, 会唤醒后继节点的线程.
三丶源码解析
3.1) 获取锁
源码入口 ReentrantLock的lock()方法, 在使用非公平锁的情况, 调用内部类NonFairSync中的lock()方法.
final void lock() { if (compareAndSetState(0, 1)) //先是使用cas设置获取锁状态 setExclusiveOwnerThread(Thread.currentThread()); // 如果获取成功,则标识当前线程为获取锁线程 else acquire(1); // 再次尝试获取锁, 如果失败会加入等待队列 }
AQS中的acquire()方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && // 子类实现tryAcquire()方法, 如ReentrantLock中的公平锁,非公平锁实现 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 添加当前线程进等待队列, 如果是头结点的继任节点,则再次尝试获取锁 selfInterrupt(); // 自我中断 }
AQS中的addWaiter()方法
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { // 两种模式, 排他模式(独占模式), 共享模式 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { //首先尝试直接进队 pred.next = node; return node; } } enq(node); // 进队列 return node; }
AQS中的enq()方法
private Node enq(final Node node) { for (;;) { // 死循环,会使用cas一直尝试, 直到成功位置 Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
AQS中的acquireQueued()方法
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 死循环 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 如果头结点的继任节点是当前节点,则尝试获取锁 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 阻塞当前线程 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
AQS中shouldParkAfterFailedAcquire()方法, 用于判断是否应该阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; // condition.signal() , 请求释放锁, 说明可以安全阻塞 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
3.2) 释放锁
源码入口 ReentrantLock的unlock()方法, 实际是调用AQS中的release()方法
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { // 在排他模式时使用释放 if (tryRelease(arg)) { // tryRelease() 方法由子类实现 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 唤醒后继节点 return true; } return false; }
在共享模式下, 也有对应的释放方法acquireShared()
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }View Code
AQS中的unparkSuccessor()方法
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
3.3) 几种等待状态
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; //取消 /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; // 指出后继节点线程需要唤醒 /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; // 指出线程在等待相应的条件 /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; // 指出下一一个共享获取锁需要无条件传播
3.4) 两种获取锁模式
/** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); // 共享 /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; // 排他
学习资料: