【Java并发编程实战14】构建自定义同步工具(Building-Custom-Synchronizers)(下)

5 AbstractQueuedSynchronizer (AQS)

基于AQS构建的同步器类中,最进步的操作包括各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,并且通常会阻塞。


如果一个类想成为状态依赖的类,它必须拥有一些状态,AQS负责管理这些状态,通过getState,setState, compareAndSetState等protected类型方法进行操作。这是设计模式中的模板模式。


使用AQS的模板如下:


获取锁:首先判断当前状态是否允许获取锁,如果是就获取锁,否则就阻塞操作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败。另外如果是阻塞线程,那么线程就需要进入阻塞队列。当状态位允许获取锁时就修改状态,并且如果进了队列就从队列中移除。


释放锁:这个过程就是修改状态位,如果有线程因为状态位阻塞的话就唤醒队列中的一个或者更多线程。

boolean acquire() throws InterruptedException {
 while (state does not permit acquire) {
 if (blocking acquisition requested) {
 enqueue current thread if not already queued
 block current thread
 }
 else
 return failure
 }
 possibly update synchronization state
 dequeue thread if it was queued
 return success
}
void release() {
 update synchronization state
 if (new state may permit a blocked thread to acquire)
 unblock one or more queued threads
}

要支持上面两个操作就必须有下面的条件:

  • 原子性操作同步器的状态位
  • 阻塞和唤醒线程
  • 一个有序的队列


1 状态位的原子操作

这里使用一个32位的整数来描述状态位,前面章节的原子操作的理论知识整好派上用场,在这里依然使用CAS操作来解决这个问题。事实上这里还有一个64位版本的同步器(AbstractQueuedLongSynchronizer),这里暂且不谈。

2 阻塞和唤醒线程

标准的JAVA API里面是无法挂起(阻塞)一个线程,然后在将来某个时刻再唤醒它的。JDK 1.0的API里面有Thread.suspend和Thread.resume,并且一直延续了下来。但是这些都是过时的API,而且也是不推荐的做法。


HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。


在JDK 5.0以后利用JNI在LockSupport类中实现了此特性。



LockSupport.park() LockSupport.park(Object) LockSupport.parkNanos(Object, long) LockSupport.parkNanos(long) LockSupport.parkUntil(Object, long) LockSupport.parkUntil(long) LockSupport.unpark(Thread)


上面的API中park()是在当前线程中调用,导致线程阻塞,带参数的Object是挂起的对象,这样监视的时候就能够知道此线程是因为什么资源而阻塞的。由于park()立即返回,所以通常情况下需要在循环中去检测竞争资源来决定是否进行下一次阻塞。park()返回的原因有三:


  • 其他某个线程调用将当前线程作为目标调用unpark
  • 其他某个线程中断当前线程;
  • 该调用不合逻辑地(即毫无理由地)返回。


其实第三条就决定了需要循环检测了,类似于通常写的while(checkCondition()){Thread.sleep(time);}类似的功能。

3 有序队列

在AQS中采用CHL列表来解决有序的队列的问题。

AQS采用的CHL模型采用下面的算法完成FIFO的入队列和出队列过程。该队列的操作均通过Lock-Free(CAS)操作.

自己实现的CLH SpinLock如下:

class ClhSpinLock {
    private final ThreadLocal<Node> prev;
    private final ThreadLocal<Node> node;
    private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());

    public ClhSpinLock() {
        this.node = new ThreadLocal<Node>() {
            protected Node initialValue() {
                return new Node();
            }
        };

        this.prev = new ThreadLocal<Node>() {
            protected Node initialValue() {
                return null;
            }
        };
    }

    public void lock() {
        final Node node = this.node.get();
        node.locked = true;
        // 一个CAS操作即可将当前线程对应的节点加入到队列中,
        // 并且同时获得了前继节点的引用,然后就是等待前继释放锁
        Node pred = this.tail.getAndSet(node);
        this.prev.set(pred);
        while (pred.locked) {// 进入自旋
        }
    }

    public void unlock() {
        final Node node = this.node.get();
        node.locked = false;
        this.node.set(this.prev.get());
    }

    private static class Node {
        private volatile boolean locked;
    }
}

对于入队列(*enqueue):*采用CAS操作,每次比较尾结点是否一致,然后插入的到尾结点中。

do {
        pred = tail;
}while ( !compareAndSet(pred,tail,node) );

对于出队列(dequeue):由于每一个节点也缓存了一个状态,决定是否出队列,因此当不满足条件时就需要自旋等待,一旦满足条件就将头结点设置为下一个节点。

AQS里面有三个核心字段:


private volatile int state;

private transient volatile Node head;

private transient volatile Node tail;


其中state描述的有多少个线程取得了锁,对于互斥锁来说state<=1。head/tail加上CAS操作就构成了一个CHL的FIFO队列。下面是Node节点的属性。


独占操作的API都是不带有shared,而共享的包括semaphore和countdownlatch都是使用带有shared字面的API。


一些有用的参考资料:


**java.util.concurrent.locks.AbstractQueuedSynchronizer - **AQS


http://gee.cs.oswego.edu/dl/papers/aqs.pdf论文


http://www.blogjava.net/xylz/archive/2010/07/08/325587.html 一个比较全面的另外一个人的解读


http://suo.iteye.com/blog/1329460


http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer


http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-overview.html


http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-clh-and-spin-lock.html


http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-locksupport-and-thread-interrupt.html

独占的就用TRyAcquire, TRyRelease, and isHeldExclusively,共享的就用 tryAcquireShared and TRyReleaseShared. 带有try前缀的方法都是模板方法,AQS用于判断是否可以继续,例如如果tryAcquireShared返回一个负值,那么表示获取锁失败,失败的就需要进入CLH队列,并且挂起线程。


举一个例子,一个简单的闭锁:

@ThreadSafe
public class OneShotLatch {
    private final Sync sync = new Sync();

    public void signal() {
        sync.releaseShared(0);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(0);
    }

    private class Sync extends AbstractQueuedSynchronizer {
        protected int tryAcquireShared(int ignored) {
            // Succeed if latch is open (state == 1), else fail
            return (getState() == 1) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignored) {
            setState(1); // Latch is now open
            return true; // Other threads may now be able to acquire

        }
    }
}

下面是自己实现的一个Mutex。

/**
 * Lock free的互斥锁,简单实现,不可重入锁
 */
public class Mutex implements Lock {

    private static final int FREE = 0;
    private static final int BUSY = 1;

    private static class LockSync extends AbstractQueuedSynchronizer {

        private static final long serialVersionUID = 4689388770786922019L;

        protected boolean isHeldExclusively() {
            return getState() == BUSY;
        }

        public boolean tryAcquire(int acquires) {
            return compareAndSetState(FREE, BUSY);
        }

        protected boolean tryRelease(int releases) {
            if (getState() == FREE) {
                throw new IllegalMonitorStateException();
            }

            setState(FREE);
            return true;
        }

        Condition newCondition() {
            return new ConditionObject();
        }

    }

    private final LockSync sync = new LockSync();

    public void lock() {
        sync.acquire(0);
    }

    public boolean tryLock() {
        return sync.tryAcquire(0);
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    public void unlock() {
        sync.release(0);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(0);
    }

}

6 AQS实现类

ReentrantLock

protected boolean tryAcquire(int ignored) {
 final Thread current = Thread.currentThread();
 int c = getState();
 if (c == 0) {
 if (compareAndSetState(0, 1)) {
 owner = current;
 return true;
 }
 } else if (current == owner) {
 setState(c+1);
 return true;
 }
 return false;
} 

Semaphore和CountDownLatch

protected int tryAcquireShared(int acquires) {
 while (true) {
 int available = getState();
 int remaining = available - acquires;
 if (remaining < 0
 || compareAndSetState(available, remaining))
 return remaining;
 }
}
protected boolean tryReleaseShared(int releases) {
 while (true) {
 int p = getState();
 if (compareAndSetState(p, p + releases))
 return true;
 }
} 
上一篇:Sersync实时同步企业应用配置实战


下一篇:制作ubuntu安装u盘