基本介绍
java.util.concurrent.locks是java1.5之后出现的一种锁实现方式,是一个接口。但是在这之前已经有一个同步机制的实现就是synchronized关键字,那为什么还要再出现一个LOCK接口呢?最主要的原因就是为了弥补synchronized使用中的不足。
synchronized优缺点
优点
1.synchronized所不用手动释放锁,即便抛出异常jvm也是让线程自动释放锁
2.当 JVM 用 synchronized 管理锁定请求和释放时,JVM 在生成线程转储时能够包括锁定信息。这些对调试非常有价值,因为它们能标识死锁或者其他异常行为的来源
缺点
1.使用synchronized如果其中一个线程不释放锁,那么其他需要获取锁的线程会一直等待下去,等待的线程不能中途中断,直到使用完释放或者出现异常jvm会让线程自动释放锁
2.也无法通过投票得到锁,如果不想等下去,也就没法得到锁
3.同步还要求锁的释放只能在与获得锁所在的堆栈帧相同的堆栈帧中进行,多数情况下,这没问题(而且与异常处理交互得很好),但是,确实存在一些非块结构的锁定更合适的情况
4.在激烈争用情况下更佳的性能,也即是如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作,性能比较低。因为当 JVM 用 synchronized 管理锁定请求和释放时,JVM 在生成线程转储时能够包括锁定信息,这些虽然对调试非常有价值,因为它们能标识死锁或者其他异常行为的来源,但是这势必会增大资源的消耗和耗时增加。
Lock的优缺点
优点
1.Lock可以让等待锁的线程响应中断
2.通过Lock可以知道有没有成功获取锁,而synchronized却无法办到
3.Lock可以提高多个线程进行读操作的效率
4.Lockhe和 Condition类结合可以实现特定条件的等待中断
5.当许多线程都想访问共享资源时,JVM 可以花更少的时候来调度线程,把更多时间用在执行线程上。
6.使用synchronized关键字时,锁的控制和释放是在synchronized同步代码块的开始和结束位置。而在使用Lock实现同步时,锁的获取和释放可以在不同的代码块、不同的方法中。这一点是基于使用者手动获取和释放锁的特性
缺点
1.使用lock必须手动释放锁,并且一般要放到finally里保证一定释放
2. Lock 类只是普通的类,JVM 不知道具体哪个线程拥有 Lock 对象
什么是AQS
AQS是AbustactQueuedSynchronizer的简称,它是一个Java提高的底层同步工具类,用一个int类型的变量表示同步状态,并提供了一系列的CAS操作来管理这个同步状态。AQS的主要作用是为Java中的并发同步组件提供统一的底层支持,例如ReentrantLock,CountdowLatch就是基于AQS实现的,用法是通过继承AQS实现其模版方法,然后将子类作为同步组件的内部类。
同步队列
同步队列是AQS很重要的组成部分,它是一个双端队列,遵循FIFO原则,主要作用是用来存放在锁上阻塞的线程,当一个线程尝试获取锁时,如果已经被占用,那么当前线程就会被构造成一个Node节点假如到同步队列的尾部,队列的头节点是成功获取锁的节点,当头节点线程是否锁时,会唤醒后面的节点并释放当前头节点的引用。
- LOCK接口
Lock是一个接口,里面有6个方法我们介绍一下每个使用方法。
lock():lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
Lock lock = ...;
lock.lock();
try{
//处理任务
}catch(Exception ex){ }finally{
lock.unlock(); //释放锁
}
tryLock():有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。
Lock lock = ...;
if(lock.tryLock()) {
try{
//处理任务
}catch(Exception ex){ }finally{
lock.unlock(); //释放锁
}
}else {
//如果不能获取锁,则直接做其他事情
}
tryLock(long time, TimeUnit unit):和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
lockInterruptibly():当通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状态。也就使说,当两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B只有在等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。
注意:当一个线程获取了锁之后,是不会被interrupt()方法中断的。单独调用interrupt()方法不能中断正在运行过程中的线程,只能中断阻塞过程中的线程。
public void method() throws InterruptedException {
lock.lockInterruptibly();
try {
//.....
}
finally {
lock.unlock();
}
}
unlock():释放锁,一般需要加锁的资源(临界区)需要放到try中,而unlock()要放到finally中
newCondition():
isFair() //判断锁是否是公平锁
isLocked() //判断锁是否被任何线程获取了
isHeldByCurrentThread() //判断锁是否被当前线程获取了
hasQueuedThreads() //判断是否有线程在等待该锁
ReentrantLock实现类
ReenTrantLock顾名思义是可重入锁,实现接口Lock,有些地方也说ReenTrantLock是Lock接口的唯一实现类,其实WriteLock和ReadLock也是实现了Lock接口,但WriteLock和ReadLock是ReentrantReadWriteLock的静态内部实现类。和synchronized不同的是ReenTrantLock增加了类似锁投票的的机制
ReentrantLock它默认情况下是非公平锁,但是可以设置为公平锁,通过构造函数来决定创建的是公平锁还是非公平锁
注意:要想让创建的锁生效,这个锁一定是类的成员变量,而不能再方法中创建.
经过观察ReentrantLock把所有Lock接口的操作都委派到一个Sync类上,该类继承了AbstractQueuedSynchronizer
static abstract class Sync extends AbstractQueuedSynchronizer
Sync又有两个子类:
final static class NonfairSync extends Sync final static class FairSync extends Sync
显然是为了支持公平锁和非公平锁而定义,默认情况下为非公平锁,先理一下Reentrant.lock()方法的调用过程(默认非公平锁):
AbstractQueuedSynchronizer中抽象了绝大多数Lock的功能,而只把tryAcquire方法延迟到子类中实现。tryAcquire方法的语义在于用具体子类判断请求线程是否可以获得锁,无论成功与否AbstractQueuedSynchronizer都将处理后面的流程.AbstractQueuedSynchronizer会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态,非公平锁模式时刚才获取锁的线程会优先获取锁的执行权(通过代码可以看出这点)。经过调查线程的显式阻塞是通过调用LockSupport.park()完成,而LockSupport.park()则调用sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。当有线程竞争锁时,该线程会首先尝试获得锁,这对于那些已经在队列中排队的线程来说显得不公平,这也是非公平锁的一个由来。与synchronized相同的是,这也是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。原生的CLH队列是用于自旋锁,但Doug Lea把其改造为阻塞锁。 如果已经存在Running线程,则新的竞争线程会被追加到队尾,具体是采用基于CAS的Lock-Free算法,因为线程并发对Tail调用CAS可能会导致其他线程CAS失败,解决办法是循环CAS直至成功。另外,nonfairTryAcquire方法将是lock方法间接调用的第一个方法,每次请求锁时都会首先调用该方法。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
该方法会首先判断当前状态,如果c==0说明没有线程正在竞争该锁,如果不c !=0 说明有线程正拥有了该锁。 如果发现c==0,则通过CAS设置该状态值为acquires,acquires的初始调用值为1,每次线程重入该锁都会+1,每次unlock都会-1,但为0时释放锁。如果CAS设置成功,则可以预计其他任何线程调用CAS都不会再成功,也就认为当前线程得到了该锁,也作为Running线程,很显然这个Running线程并未进入等待队列。 如果c !=0 但发现自己已经拥有锁,只是简单地++acquires,并修改status值,但因为没有竞争,所以通过setStatus修改,而非CAS,也就是说这段代码实现了偏向锁的功能,并且实现的非常漂亮。
AbstractQueuedSynchronizer.addWaiter方法负责把当前无法获得锁的线程包装为一个Node添加到队尾:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
其中参数mode是独占锁还是共享锁,默认为null,独占锁。追加到队尾的动作分两步: 1.如果当前队尾已经存在(tail!=null),则使用CAS把当前线程更新为Tail 2.如果当前Tail为null或则线程调用CAS设置队尾失败,则通过enq方法继续设置Tail 。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
该方法就是循环调用CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)。总而言之,addWaiter的目的就是通过CAS把当前线程追加到队尾,并返回包装后的Node实例。
把线程要包装为Node对象的主要原因,除了用Node构造供虚拟队列外,还用Node包装了各种线程状态,这些状态被精心设计为一些数字值:
- SIGNAL(-1) :线程的后继线程正/已被阻塞,当该线程release或cancel时要重新这个后继线程(unpark)
- CANCELLED(1):因为超时或中断,该线程已经被取消
- CONDITION(-2):表明该线程被处于条件队列,就是因为调用了Condition.await而被阻塞
- PROPAGATE(-3):传播共享锁
- 0:0代表无状态
AbstractQueuedSynchronizer.acquireQueued的主要作用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回:
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
仔细看看这个方法是个无限循环,感觉如果p == head && tryAcquire(arg)条件不满足循环将永远无法结束,当然不会出现死循环,奥秘在于第12行的parkAndCheckInterrupt会把当前线程挂起,从而阻塞住线程的调用栈。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如前面所述,LockSupport.park最终把线程交给系统(Linux)内核进行阻塞。当然也不是马上把请求不到锁的线程进行阻塞,还要检查该线程的状态,比如如果该线程处于Cancel状态则没有必要,具体的检查在shouldParkAfterFailedAcquire中:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
检查原则在于:
规则1:如果前继的节点状态为SIGNAL,表明当前节点需要unpark,则返回成功,此时acquireQueued方法的第12行(parkAndCheckInterrupt)将导致线程阻塞
规则2:如果前继节点状态为CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false,acquireQueued方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞
规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,返回false后进入acquireQueued的无限循环,与规则2同
总体看来,shouldParkAfterFailedAcquire就是靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列。
至此,锁住线程的逻辑已经完成,下面讨论解锁的过程。
- ReadWriteLock接口
ReadWriteLock接口中只有两个方法:
Lock readLock();
Lock writeLock();
ReentrantReadWriteLock实现类
它是一个实现了ReadWriteLock接口的可重入的读写锁,同样可以通过构造函数来控制创建的是公平所还是非公平锁。其中读锁是共享锁,写锁是独享锁。也就是说ReentrantReadWriteLock读写锁对同一资源的访问分成两个锁。ReentrantReadWriteLock是基于AQS实现的
,该读写锁的实现原理是:将同步变量state按照高16位和低16位进行拆分,高16位表示读锁,低16位表示写锁。
ReentrantReadWriteLock可以构造为公平的或者非公平的两种类型。如果在构造时不显式指定则会默认的创建非公平锁。在非公平锁的模式下,线程访问的顺序是不确定的,就是可以闯入;可以由写者降级为读者,但是读者不能升级为写者。
可以看到实际上读/写锁在构造时都是引用的ReentrantReadWriteLock的sync锁对象。而这个Sync类是ReentrantReadWriteLock的一个内部类。总之读/写锁都是通过Sync来完成的。它是如何来协作这两者关系呢?区别主要是读锁获得的是共享锁,而写锁获取的是独占锁。这里有个点可以提一下,就是ReentrantReadWriteLock为了保证可重入性,共享锁和独占锁都必须支持持有计数和重入数。而ReentrantLock是使用state来存储的,而state只能存一个整形值,为了兼容两个锁的问题,所以将其划分了高16位和低16位分别存共享锁的线程数量或独占锁的线程数量或者重入计数。
不过要注意的是,如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。
锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级称为读锁
示例:缓存数据
class dataCatch{
Object data; //缓存的数据
public volatile Boolean isCatch = false; //是否有缓存
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //生成读写锁 public void process(){
lock.readLock().lock(); //先加读锁,此时数据不会被修改
//数据没有缓存
if(!isCatch){
lock.readLock().unlock(); //解读锁
lock.writeLock().lock(); //加写锁,此时数据不会被读到
/********
*
* 执行数据查询操作并赋值给data
*
********/
isCatch = true;
lock.readLock().lock(); //先加读锁后解写锁
lock.writeLock().unlock();
}
/********
*
* 放回data数据给用户
*
********/
lock.readLock().unlock(); //解读锁
}
}
Condition接口介绍
我们通过之前的学习知道了:synchronized关键字与wait()和notify/notifyAll()方法相结合可以实现等待/通知机制,ReentrantLock类当然也可以实现,但是需要借助于Condition接口与newCondition() 方法。Condition是JDK1.5之后才有的,它具有很好的灵活性,比如可以实现多路通知功能也就是在一个Lock对象中可以创建多个Condition实例(即对象监视器),线程对象可以注册在指定的Condition中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。
在使用notify/notifyAll()方法进行通知时,被通知的线程是有JVM选择的,使用ReentrantLock类结合Condition实例可以实现“选择性通知”,这个功能非常重要,而且是Condition接口默认提供的。
而synchronized关键字就相当于整个Lock对象中只有一个Condition实例,所有的线程都注册在它一个身上。如果执行notifyAll()方法的话就会通知所有处于等待状态的线程这样会造成很大的效率问题,而Condition实例的signalAll()方法 只会唤醒注册在该Condition实例中所有等待线程
方法:
void await() | 相当于Object类的wait方法 |
boolean await(long time, TimeUnit unit) | 相当于Object类的wait(long timeout)方法 |
signal() | 相当于Object类的notify方法 |
signalAll() | 相当于Object类的notifyAll方法 |
conditon使用示例:
在使用wait/notify实现等待通知机制的时候我们知道必须执行完notify()方法所在的synchronized代码块后才释放锁。在这里也差不多,必须执行完signal所在的try语句块之后才释放锁,condition.await()后的语句才能被执行。
注意: 必须在condition.await()方法调用之前调用lock.lock()代码获得同步监视器,不然会报错。
UseMoreConditionWaitNotify.java:
public class UseMoreConditionWaitNotify {
public static void main(String[] args) throws InterruptedException { MyserviceMoreCondition service = new MyserviceMoreCondition(); ThreadA a = new ThreadA(service);
a.setName("A");
a.start(); ThreadB b = new ThreadB(service);
b.setName("B");
b.start(); Thread.sleep(3000); service.signalAll_A(); }
static public class ThreadA extends Thread { private MyserviceMoreCondition service; public ThreadA(MyserviceMoreCondition service) {
super();
this.service = service;
} @Override
public void run() {
service.awaitA();
}
}
static public class ThreadB extends Thread { private MyserviceMoreCondition service; public ThreadB(MyserviceMoreCondition service) {
super();
this.service = service;
} @Override
public void run() {
service.awaitB();
}
} }
public class MyserviceMoreCondition { private Lock lock = new ReentrantLock();
public Condition conditionA = lock.newCondition();
public Condition conditionB = lock.newCondition(); public void awaitA() {
lock.lock();
try {
System.out.println("begin awaitA时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
conditionA.await();
System.out.println(" end awaitA时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} public void awaitB() {
lock.lock();
try {
System.out.println("begin awaitB时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
conditionB.await();
System.out.println(" end awaitB时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} public void signalAll_A() {
lock.lock();
try {
System.out.println(" signalAll_A时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
conditionA.signalAll();
} finally {
lock.unlock();
}
} public void signalAll_B() {
lock.lock();
try {
System.out.println(" signalAll_B时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
conditionB.signalAll();
} finally {
lock.unlock();
}
}
}
运行结果:
只有A线程被唤醒
示例,对文件的访问:
LOCK和synchronized如何选择
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。
java.util.concurrent.lock
中的锁定类是用于高级用户和高级情况的工具 。一般来说,除非您对 Lock
的某个高级特性有明确的需要,或者有明确的证据(而不是仅仅是怀疑)表明在特定情况下,同步已经成为可伸缩性的瓶颈,否则还是应当继续使用 synchronized。
既然如此,我们什么时候才应该使用 ReentrantLock
呢?答案非常简单 —— 在确实需要一些 synchronized 所没有的特性的时候,比如时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者锁投票。 ReentrantLock
还具有可伸缩性的好处,应当在高度争用的情况下使用它,但是请记住,大多数 synchronized 块几乎从来没有出现过争用,所以可以把高度争用放在一边。我建议用 synchronized 开发,直到确实证明 synchronized 不合适,而不要仅仅是假设如果使用 ReentrantLock
“性能会更好”。请记住,这些是供高级用户使用的高级工具。(而且,真正的高级用户喜欢选择能够找到的最简单工具,直到他们认为简单的工具不适用为止。)。一如既往,首先要把事情做好,然后再考虑是不是有必要做得更快。
ReentrantLock的实现在争用下提供了更好的性能。但是,这些明显存在的好处,还不足以成为用 ReentrantLock
代替 synchronized
的理由。相反,应当根据您是否 需要 ReentrantLock
的能力来作出选择。大多数情况下,您不应当选择它 —— synchronized 工作得很好,可以在所有 JVM 上工作,更多的开发人员了解它,而且不太容易出错。只有在真正需要 Lock
的时候才用它。
ReentrantLock公平锁和非公平锁的实现原理
公平锁实现
1.公平锁在调用lock方法时,lock()中只有一个acquire()
2.而acquire()中调用两个方法tryAcquire()和acquireQueued(addWaiter(),arg)前面一个方式是试图获取锁的使用权限而后面那个方式是试图将当前线程作为一个节点维护到QPS队列中,如果第一个方法尝试获取失败锁权限失败并且尝试加入到QPS队列失败那么此线程就将被挂起
2.1:tryAcquire()中首先判断当前锁的状态是否是0也就是无所状态,
如果是,判断QPS队列中是否有其他线程,如果没有其他线程,并且通过cas设置同步状态为1成功,则将当前线程设置为拥有锁的线程,就是当前线程获取锁;否者返回false也就是尝试获取锁资源失败;
如果否,说明有当前线程占用着锁资源,判断占有锁资源的线程和当前线程是否相同,如果相同则再次获取锁,同时将同步状态status加1(所以同步锁状态是个可以大于1的值),这也就是锁重入的实现;否则尝试获取锁失败
2.2:这里是讲解addWaiter()的内容前面说过判断当前线程是否需要被挂起需要两个条件同时满足,如果走到2.2这一步说明tryAcquire方法返回的是false,这样就会进行将当前线程作为一个节点加入到QPS队列尾部的操作。首先会判断QPS队列的尾结点是否为空,
如果尾节点不为空,则将当前线程节点指向尾节点,同时通过cas的操作尝试将尾节点的下一个节点设置为当前节点,如果cas成功,则说明将当前线程节点设置为尾节点成功,直接返回ture,如果cas失败则进行一直的循环尝试直到将当前线程设置到尾节点。
如果尾节点为空,则创建一个新节点,并通过cas尝试将新创建的节点设置为QPS的头结点,如果设置成功则将QPS的尾节点和头结点相等,但此时并没有跳出循环,所以会再次执行循环中的代码,然后就会粥尾节点不空的设置,进而将当前线程节点设置为尾节点,同样如果一次设置失败,会无限循环设置
2.3:acquireQueued方法这是主要是判断是否终端当前线程
学习链接