Java并发编程—JUC的Lock锁

 

一、Lock (JUC锁)

JUC 锁位于java.util.concurrent.locks包下,为锁和等待条件提供一个框架,它不同于内置同步和监视器。

Java并发编程—JUC的Lock锁

CountDownLatch,CyclicBarrier 和 Semaphore 不在包中属于并发编程中的工具类,但也是通过 AQS(后面会讲) 来实现的。因此,我也将它们归纳到 JUC 锁中进行介绍。

   
1、Lock  Lock实现提供了比使用synchronized方法和语句可获得的更广泛的锁定操作。
2、ReentrantLock 一个可重入的互斥锁,它具有与隐式锁synchronized相同的一些基本行为和语义,但功能更强大。
3、AQS类

AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer

AbstractQueuedSynchronizer 就是被称之为AQS的类,为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。ReentrantLock,ReentrantReadWriteLock,CountDownLatch,CyclicBarrier和Semaphore等这些类都是基于AQS类实现的。 
AbstractQueuedLongSynchronizer以long形式维护同步状态的一个AbstractQueuedSynchronizer版本。 
AbstractQueuedSynchronizer与AbstractQueuedLongSynchronizer都继承了AbstractOwnableSynchronizer。

AbstractOwnableSynchronizer是可以由线程以独占方式拥有的同步器。

4、Condition 

Condition又称等待条件,它实现了对锁更精确的控制。

Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。

不同的是,Object中的wait(),notify(),notifyAll()方法是和synchronized组合使用的;而Condition需要与Lock组合使用。

5、ReentrantReadWriteLock  ReentrantReadWriteLock维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。
6、LockSupport 用来创建锁和其他同步类的基本线程阻塞原语。
7、CountDownLatch  一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
8、CyclicBarrier  一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。
9、Semaphore  一个计数信号量。从概念上讲,信号量维护了一个许可集。Semaphore通常用于限制可以访问某些资源的线程数目。

二、Lock与ReentrantLock

1、概述

Java中的锁有两种,synchronized与Lock。因为使用synchronized并不需要显示地加锁与解锁,所以往往称synchronized为隐式锁,而使用Lock时则相反,所以一般称Lock为显示锁。synchronized修饰方法或语句块,所有锁的获取和释放都必须出现在一个块结构中。当需要灵活地获取或释放锁时,synchronized显然是不符合要求的。Lock接口的实现允许锁在不同的范围内获取和释放,并支持以任何顺序获取和释放多个锁。一句话,Lock实现比synchronized更灵活。但凡事有利就有弊,不使用块结构锁就失去了使用synchronized修饰方法或语句时会出现的锁自动释放功能,在大多数情况下,Lock实现需要手动释放锁。除了更灵活之外,Lock还有以下优点:

  • Lock 实现提供了使用 synchronized 方法和语句所没有的其他功能,包括提供了一个非块结构的获取锁尝试 tryLock()、一个获取可中断锁的尝试 lockInterruptibly() 和一个获取超时失效锁的尝试 tryLock(long, TimeUnit)
  • Lock 类还可以提供与隐式监视器锁完全不同的行为和语义,如保证排序、非重入用法或死锁检测。如果某个实现提供了这样特殊的语义,则该实现必须对这些语义加以记录。

ReentrantLock是一个可重入的互斥锁。顾名思义,“互斥锁”表示在某一时间点只能被同一线程所拥有。“可重入”表示锁可被某一线程多次获取。当然 synchronized 也是可重入的互斥锁。当锁没有被某一线程占有时,调用 lock() 方法的线程将成功获取锁。可以使用isHeldByCurrentThread()和 getHoldCount()方法来判断当前线程是否拥有该锁。

ReentrantLock既可以是公平锁又可以是非公平锁。当此类的构造方法 ReentrantLock(boolean fair) 接收true作为参数时,ReentrantLock就是公平锁,线程依次排队获取公平锁,即锁将被等待最长时间的线程占有。与默认情况(使用非公平锁)相比,使用公平锁的程序在多线程环境下效率比较低。而且公平锁不能保证线程调度的公平性,tryLock方法可在锁未被其他线程占用的情况下获得该锁。

2、API

1、构造方法

//创建一个 ReentrantLock 的实例。
ReentrantLock() 

//创建一个具有给定公平策略的 ReentrantLock。  
ReentrantLock(boolean fair) 

2、方法摘要

int getHoldCount() 
          //查询当前线程保持此锁的次数。
protected  Thread   getOwner() 
          //返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。
protected  Collection<Thread>   getQueuedThreads() 
          //返回一个 collection,它包含可能正等待获取此锁的线程。
 int    getQueueLength() 
          //返回正等待获取此锁的线程估计数。
protected  Collection<Thread>   getWaitingThreads(Condition condition) 
          //返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。
 int    getWaitQueueLength(Condition condition) 
          //返回等待与此锁相关的给定条件的线程估计数。
 boolean    hasQueuedThread(Thread thread) 
          //查询给定线程是否正在等待获取此锁。
 boolean    hasQueuedThreads() 
          //查询是否有些线程正在等待获取此锁。
 boolean    hasWaiters(Condition condition) 
          //查询是否有些线程正在等待与此锁有关的给定条件。
 boolean    isFair() 
          //如果此锁的公平设置为 true,则返回 true。
 boolean    isHeldByCurrentThread() 
          //查询当前线程是否保持此锁。
 boolean    isLocked() 
          //查询此锁是否由任意线程保持。
 void   lock() 
          //获取锁。
 void   lockInterruptibly() 
          //如果当前线程未被中断,则获取锁。
 Condition  newCondition() 
          //返回用来与此 Lock 实例一起使用的 Condition 实例。
 String toString() 
          //返回标识此锁及其锁定状态的字符串。
 boolean    tryLock() 
          //仅在调用时锁未被另一个线程保持的情况下,才获取该锁。
 boolean    tryLock(long timeout, TimeUnit unit) 
          //如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁。
 void   unlock() 
          //试图释放此锁。

3、代码

1、典型的代码

class X {
    private final ReentrantLock lock = new ReentrantLock();
    // ...

    public void m() { 
        lock.lock();  // block until condition holds
        try {
            // ... method body
        } finally {
            lock.unlock()
        }
    }
}

2、买票


public class SellTickets {

    public static void main(String[] args) {
        TicketsWindow tw1 = new TicketsWindow();
        Thread t1 = new Thread(tw1, "一号窗口");
        Thread t2 = new Thread(tw1, "二号窗口");
        t1.start();
        t2.start();
    }
}

class TicketsWindow implements Runnable {
    private int tickets = 1;
    private final ReentrantLock lock = new ReentrantLock();

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                if (tickets > 0) {
                    System.out.println(Thread.currentThread().getName() 
                                       + "还剩余票:" + tickets + "张");
                    --tickets;
                    System.out.println(Thread.currentThread().getName() 
                                       + "卖出一张火车票,还剩" + tickets + "张");
                } else {
                    System.out.println(Thread.currentThread().getName() 
                                       + "余票不足,暂停出售!");
                    try {
                        Thread.sleep(1000 * 60);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    }
}

4、总结

  • 与synchronized 相比ReentrantLock的使用更灵活。Lock接口的实现允许锁在不同的范围内获取和释放,并支持以任何顺序获取和释放多个锁。
  • ReentrantLock具有与使用 synchronized 相同的一些基本行为和语义,但功能更强大。包括提供了一个非块结构的获取锁尝试 tryLock()、一个获取可中断锁的尝试 lockInterruptibly() 和一个获取超时失效锁的尝试 tryLock(long, TimeUnit)
  • ReentrantLock 具有 synchronized 所没有的许多特性,比如时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者轮询锁。
  • ReentrantLock 可伸缩性强,应当在高度争用的情况下使用它。

三、AQS

1、概述

谈到 ReentrantLock,不得不谈 AbstractQueuedSynchronizer(AQS)!AQS,AbstractQueuedSynchronizer的缩写(抽象的队列式的同步器),是 JUC 的核心。AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLockSemaphoreCountDownLatch。

Java并发编程—JUC的Lock锁

它维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列)

这里 volatile 是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

AQS定义两种资源共享方式:Exclusive独占锁,只有一个线程能执行,如 ReentrantLock)和 Share共享锁,多个线程可同时执行,如Semaphore/CountDownLatch)。AQS的子类(锁或者同步器)实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock() 时,会调用tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire() 时就会失败,直到 A 线程 unlock() 到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会 unpark() 主线程,然后主线程就会从 await() 函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现 获取-释放资源 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock

2、总结

在AQS的设计中,在父类AQS中实现了对 等待队列的默认实现,子类中几乎不用修改该部分功能。而state在子类中根据需要被赋予了不同的意义,子类通过对state的不同操作来提供不同的同步器功能,进而对封装的工具类提供不同的功能。AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用就将 暂时获取不到锁的线程加入到队列中(CLH队列 自旋 双向队列),AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

四、Condition

在任务协作中,关键问题是任务之间的通信,除了 同步监视器之外,Java 1.5 之后还提供了 Lock 跟 Condition 组合来实现线程之间的通信

1、与Object监视器监视器方法的比较

对比项 Condition Object监视器
使用条件 获取锁 获取锁,创建Condition对象
等待队列的个数 一个 多个
是否支持通知指定等待队列 支持 不支持
是否支持当前线程释放锁进入等待状态 支持 支持
是否支持当前线程释放锁并进入超时等待状态 支持 支持
是否支持当前线程释放锁并进入等待状态直到指定最后期限 支持 不支持
是否支持唤醒等待队列中的一个任务 支持 支持
是否支持唤醒等待队列中的全部任务 支持 支持

2、API

 void   await() 
          //造成当前线程在接到信号或被中断之前一直处于等待状态。
 boolean    await(long time, TimeUnit unit) 
          //造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
 long   awaitNanos(long nanosTimeout) 
          //造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
 void   awaitUninterruptibly() 
          //造成当前线程在接到信号之前一直处于等待状态。
 boolean    awaitUntil(Date deadline) 
          //造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
 void   signal() 
          //唤醒一个等待线程。
 void   signalAll() 
          //唤醒所有等待线程。

3、演示下Condition是如何更精细地控制线程的休眠与唤醒的。

public class BoundedBuffer {
    final Lock lock = new ReentrantLock();//锁
    final Condition notFull = lock.newCondition();//写条件
    final Condition notEmpty = lock.newCondition();//读条件

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    //存数据
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)//如果队列已满
                notFull.await();//阻塞写线程
            items[putptr] = x;
            if (++putptr == items.length)
                putptr = 0;
            ++count;
            notEmpty.signal();//唤醒读线程
        } finally {
            lock.unlock();
        }
    }

    //写数据
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)//如果队列已空
                notEmpty.await();//阻塞读线程
            Object x = items[takeptr];
            if (++takeptr == items.length)
                takeptr = 0;
            --count;
            notFull.signal();//唤醒写线程
            return x;
        } finally {
            lock.unlock();
        }
    }
}

这是一个有界的缓冲区,支持put(Object)与take()方法。put(Object)负责向缓冲区中存数据,take负责从缓冲区中读数据。在多线程环境下,调用put(Object)方法,当缓冲区已满时,会阻塞写线程,如果缓冲区不满,则写入数据,并唤醒读线程。调用take()方法时,当缓冲区为空,会阻塞读线程,如果缓冲区不空,则读取数据,并唤醒写线程。

这就是多个Condition的强大之处,假设缓存队列已满,那么阻塞的肯定是写线程,唤醒的肯定是读线程,相反,阻塞的肯定是读线程,唤醒的肯定是写线程。如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒读线程时,通过notify()或notifyAll()无法明确的指定唤醒读线程,而只能通过notifyAll唤醒所有线程,但notifyAll无法区分唤醒的线程是读线程,还是写线程。 如果唤醒的是写线程,那么线程刚被唤醒,又被阻塞了,这样就降低了效率。

五、ReentrantReadWriteLock

1、概述

ReentrantLock是互斥锁。与互斥锁相对应的是共享锁。ReadWriteLock就是一种共享锁 ,ReentrantReadWriteLock是支持与 ReentrantLock 类似语义的 ReadWriteLock 实现。ReadWriteLock 维护了两个锁,读锁和写锁,所以一般称其为读写锁。写锁是独占的。读锁是共享的,如果没有写锁,读锁可以由多个线程共享。与互斥锁相比,虽然一次只能有一个写线程可以修改共享数据,但大量读线程可以同时读取共享数据,所以,在共享数据很大,且读操作远多于写操作的情况下,读写锁值得一试。

ReadWriteLock源码如下:

public interface ReadWriteLock {
    //返回用于读取操作的锁。
    Lock readLock();
    //返回用于写入操作的锁。
    Lock writeLock();
}

从源码中可以看到,ReadWriteLock并不是Lock的子接口。所以ReadWriteLock并没有Lock的那些特性。

2、使用场景

在使用某些种类的Collection时,可以使用ReentrantReadWriteLock来提高并发性。通常,在预期collection很大,读取者线程访问它的次数多于写入者线程,并且entail操作的开销高于同步开销时,这很值得一试。例如,以下是一个使用 TreeMap的类,预期它很大,并且能被同时访问。

public class RWDictionary {
    // TeepMap就是读的多,插入的少的场景
    private final Map<String, Data> m = new TreeMap<String, Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    //读锁
    private final Lock r = rwl.readLock();
    //写锁
    private final Lock w = rwl.writeLock();

    public Data get(String key) {
        r.lock();
        try {
            return m.get(key);
        } finally {
            r.unlock();
        }
    }

    public String[] allKeys() {
        r.lock();
        try {
            return (String[])m.keySet().toArray();
        } finally {
            r.unlock();
        }
    }

    public Data put(String key, Data value) {
        w.lock();
        try {
            return m.put(key, value);
        } finally {
            w.unlock();
        }
    }

    public void clear() {
        w.lock();
        try {
            m.clear();
        } finally {
            w.unlock();
        }
    }
}

3、特性

ReentrantReadWriteLock 具有以下特性:(有待详细介绍)

  • 公平性
  • 重入性
  • 锁降级
  • 锁获取中断
  • 支持Condition
  • 检测系统状态

优点 

与互斥锁相比,虽然一次只能有一个写线程可以修改共享数据,但大量读线程可以同时读取共享数据。在共享数据很大,且读操作远多于写操作的情况下,ReentrantReadWriteLock值得一试。

缺点 

只有当前没有线程持有读锁或者写锁时才能获取到写锁,这可能会导致写线程发生饥饿现象,即读线程太多导致写线程迟迟竞争不到锁而一直处于等待状态。StampedLock 可以解决这个问题,解决方法是如果在读的过程中发生了写操作,应该重新读而不是直接阻塞写线程。

六、StampedLock

1、概述

StampedLock是JDK1.8新增的一个锁,是对读写锁ReentrantReadWriteLock的改进。前面已经学习了ReentrantReadWriteLock,我们了解到,在共享数据很大,且读操作远多于写操作的情况下,ReentrantReadWriteLock 值得一试。但要注意的是,只有当前没有线程持有读锁或者写锁时才能获取到写锁,这可能会导致写线程发生饥饿现象,即读线程太多导致写线程迟迟竞争不到锁而一直处于等待状态。StampedLock可以解决这个问题,解决方法是如果在读的过程中发生了写操作,应该重新读而不是直接阻塞写线程。

StampedLock有三种读/写模式:写、读、乐观读。

  • 写。独占锁,只有当前没有线程持有读锁或者写锁时才能获取到该锁。方法writeLock()返回一个可用于unlockWrite(long)释放锁的方法的戳记。tryWriteLock()提供不计时和定时的版本。
  • 读。共享锁,如果当前没有线程持有写锁即可获取该锁,可以由多个线程获取到该锁。方法readLock()返回可用于unlockRead(long)释放锁的方法的戳记。tryReadLock()也提供不计时和定时的版本。
  • 乐观读。方法tryOptimisticRead()仅当锁定当前未处于写入模式时,方法才会返回非零戳记。返回戳记后,需要调用validate(long stamp)方法验证戳记是否可用。也就是看当调用tryOptimisticRead返回戳记后到到当前时间是否有其他线程持有了写锁,如果有,返回false,否则返回true,这时就可以使用该锁了。

2、代码

StampedLock 则提供了一种乐观的读策略,这种乐观策略的锁非常类似于无锁的操作,使得乐观锁完全不会阻塞写线程

class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();
    /**
     * 改变当前坐标。
     * 先获取写锁,然后对point坐标进行修改,最后释放锁。
     * 该锁是排它锁,这保证了其他线程调用move函数时候会被阻塞,直到当前线程显示释放了该锁。
     */
    void move(double deltaX, double deltaY) { // an exclusively locked method
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }
    /**
     * 计算当前坐标到原点的距离
     * 
     * @return
     */
    double distanceFromOrigin() { 
        //1.尝试获取乐观读锁,返回stamp
        long stamp = sl.tryOptimisticRead();
        //2.拷贝参数到本地方法栈中
        double currentX = x, currentY = y;
        //3.验证stamp是否有效
        if (!sl.validate(stamp)) {
            //4.如果stamp无效,说明得到stamp后,又有其他线程获得了写锁
            //5.获取读锁
            stamp = sl.readLock();
            try {
                //6.其他线程修改了x,y的值,为了数据的一致性,需要再次再次拷贝参数到本地方法栈中
                currentX = x;
                currentY = y;
            } finally {
                //7.释放读锁
                sl.unlockRead(stamp);
            }
        }
        //8.使用参数的拷贝来计算当前坐标到原点的距离。无论步骤3中stamp有没有验证成功
        //,参数的拷贝都是当前坐标的值
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
    /**
     * 如果当前坐标为原点则移动到指定的位置
     */
    void moveIfAtOrigin(double newX, double newY) { // upgrade
        // 获取读锁,保证其他线程不能获取到写锁
        long stamp = sl.readLock();
        try {
            //如果当前坐标为原点
            while (x == 0.0 && y == 0.0) {
                //尝试升级成写锁
                long ws = sl.tryConvertToWriteLock(stamp);
                //如果升级成功,更新坐标值
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {//如果升级成功
                    sl.unlockRead(stamp);//先释放读锁
                    stamp = sl.writeLock();//再获取写锁
                    //循环while中的操作,直到成功更新坐标值
                }
            }
        } finally {
            //最后释放写锁
            sl.unlock(stamp);
        }
    }
}

3、StampedLock 原理

StampedLock的内部实现是基于CLH锁的,CLH锁是一种自旋锁,它保证没有饥饿的发生,并且可以保证FIFO(先进先出)的服务顺序.CLH锁的基本思想如下:锁维护一个等待线程队列,所有申请锁,但是没有成功的线程都记录在这个队列中,每一个节点代表一个线程,保存一个标记位(locked).用与判断当前线程是否已经释放锁;locked=true 没有获取到锁,false 已经成功释放了锁

 当一个线程视图获得锁时,取得等待队列的尾部节点作为其前序节点.并使用类似如下代码判断前序节点是否已经成功释放锁:

只要前序节点(pred)没有释放锁,则表示当前线程还不能继续执行,因此会自旋等待,

 反之,如果前序线程已经释放锁,则当前线程可以继续执行.

 释放锁时,也遵循这个逻辑,线程会将自身节点的locked位置标记位false,那么后续等待的线程就能继续执行了

七、CountDownLatch、Semaphore、Exchanger、CyclicBarrier

博客地址

上一篇:Dubbo源码分析十一、服务路由


下一篇:BZOJ1415[Noi2005]聪聪和可可——记忆化搜索+期望dp