【Java并发编程实战14】构建自定义同步工具(Building-Custom-Synchronizers)(中)

2.3 丢失的信号

保证notify一定在wait之后

2.4 通知

调用notify和notifyAll也得持有与条件队列对象相关联的锁。调用notify,JVM Thread Scheduler在这个条件队列上等待的多个线程中选择一个唤醒,而notifyAll则会唤醒所有线程。因此一旦notify了那么就需要尽快的释放锁,否则别人都竞争等着拿锁,都会进行blocked的状态,而不是线程挂起waiting状态,竞争都了不是好事,但是这是你考了性能因素和安全性因素的一个矛盾,具体问题要具体分析。


下面的方法可以进来减少竞争,但实现有些难写,所以还得折中考虑:


/

public synchronized void alternatePut(V v) throws InterruptedException {
        while (isFull())
            wait();
        boolean wasEmpty = isEmpty();
        doPut(v);
        if (wasEmpty)
            notifyAll();
    }

使用notify容易丢失信号,所以大多数情况下用notifyAll,比如take notify,却通知了另外一个take,没有通知put,那么这就是信号丢失,是一种“被劫持的”信号。


因此只有满足下面两个条件,才能用notify,而不是notifyAll:


  • 所有等待线程的类型都相同
  • 单进单出


2.5 示例:阀门类A Gate Class

和第5章的那个TestHarness中使用CountDownLatch类似,完全可以使用wait/notifyAll做阀门。

@ThreadSafe
public class ThreadGate {
    // CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n)
    @GuardedBy("this") private boolean isOpen;
    @GuardedBy("this") private int generation;

    public synchronized void close() {
        isOpen = false;
    }

    public synchronized void open() {
        ++generation;
        isOpen = true;
        notifyAll();
    }

    // BLOCKS-UNTIL: opened-since(generation on entry)
    public synchronized void await() throws InterruptedException {
        int arrivalGeneration = generation;
        while (!isOpen && arrivalGeneration == generation)
            wait();
    }
}

3 Explicit Condition Objects

Lock是一个内置锁的替代,而Condition也是一种广义的内置条件队列

Condition的API:

public interface Condition {
 void await() throws InterruptedException;
 boolean await(long time, TimeUnit unit)throws InterruptedException;
 long awaitNanos(long nanosTimeout) throws InterruptedException;
 void awaitUninterruptibly();
 boolean awaitUntil(Date deadline) throws InterruptedException;
 void signal();
 void signalAll();
}

内置条件队列存在一些缺陷,每个内置锁都只能有一个相关联的条件队列。所以在BoundedBuffer这种类中,多个线程可能在同一个条件队列上等待不同的条件谓词,所以notifyAll经常通知不是同一个类型的需求。如果想编写一个带有多个条件谓词的并发对象,或想获得除了条件队列可见性之外的更多的控制权,可以使用Lock和Condition,而非内置锁和条件队列,这更加灵活。


一个Condition和一个lock关联,想象一个条件队列和内置锁关联一样。在Lock上调用newCondition就可以新建无数个条件谓词,这些condition是可中断的、可有时间限制的,公平的或者非公平的队列操作。


The equivalents of wait, notify, and notifyAll for Condition objects are await, signal, and signalAll。


下面的例子就是改造后的BoundedBuffer:


/

@ThreadSafe
public class ConditionBoundedBuffer <T> {
    protected final Lock lock = new ReentrantLock();
    // CONDITION PREDICATE: notFull (count < items.length)
    private final Condition notFull = lock.newCondition();
    // CONDITION PREDICATE: notEmpty (count > 0)
    private final Condition notEmpty = lock.newCondition();
    private static final int BUFFER_SIZE = 100;
    @GuardedBy("lock") private final T[] items = (T[]) new Object[BUFFER_SIZE];
    @GuardedBy("lock") private int tail, head, count;

    // BLOCKS-UNTIL: notFull
    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[tail] = x;
            if (++tail == items.length)
                tail = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // BLOCKS-UNTIL: notEmpty
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            T x = items[head];
            items[head] = null;
            if (++head == items.length)
                head = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

注意这里使用了signal而不是signalll,能极大减少每次缓存操作中发生的上下文切换和锁请求次数。


使用condition和内置锁和条件队列一样,必须保卫在lock里面。

4 Synchronizer剖析

看似ReentrantLock和Semaphore功能很类似,每次只允许一定的数量线程通过,到达阀门时:

  • 可以通过 lock或者acquire
  • 等待,阻塞住了
  • 取消tryLock,tryAcquire
  • 可中断的,限时的
  • 公平等待和非公平等待


下面的程序是使用Lock做一个Mutex也就是持有一个许可的Semaphore。

@ThreadSafe
public class SemaphoreOnLock {
    private final Lock lock = new ReentrantLock();
    // CONDITION PREDICATE: permitsAvailable (permits > 0)
    private final Condition permitsAvailable = lock.newCondition();
    @GuardedBy("lock") private int permits;

    SemaphoreOnLock(int initialPermits) {
        lock.lock();
        try {
            permits = initialPermits;
        } finally {
            lock.unlock();
        }
    }

    // BLOCKS-UNTIL: permitsAvailable
    public void acquire() throws InterruptedException {
        lock.lock();
        try {
            while (permits <= 0)
                permitsAvailable.await();
            --permits;
        } finally {
            lock.unlock();
        }
    }

    public void release() {
        lock.lock();
        try {
            ++permits;
            permitsAvailable.signal();
        } finally {
            lock.unlock();
        }
    }
}

实际上很多J.U.C下面的类都是基于AbstractQueuedSynchronizer (AQS)构建的,例如CountDownLatch, ReentrantReadWriteLock, SynchronousQueue和FutureTask(java7之后不是了)。AQS解决了实现同步器时设计的大量细节问题,例如等待线程采用FIFO队列操作顺序。AQS不仅能极大极少实现同步器的工作量,并且也不必处理竞争问题,基于AQS构建只可能在一个时刻发生阻塞,从而降低上下文切换的开销,提高吞吐量。在设计AQS时,充分考虑了可伸缩性。


上一篇:制作ubuntu安装u盘


下一篇:一张图片能导致数百万Android手机被黑?