JAVA多线程(八) Condition源码分析

Condition接口

Condition是一个接口,其提供的就两个核心方法,await和signal方法。分别对应着Object的wait和notify方法。调用Object对象的监视器方法的这两个方法,
需要在同步代码块里面,即必须先获取到锁才能执行这两个方法。同理,Condition调用这两个方法,也必须先获取到锁,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

Object的监视器方法与Condition接口的对比

JAVA多线程(八) Condition源码分析

Condition简单用法

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说Condition是依赖Lock对象的。Condition的使用方式比较简单,需要注意在调用方法前获取锁.如下面代码所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

package com.brian.mutilthread.condition;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class ConditionDemo {
    private static Lock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {

        Thread thread = new Thread(ConditionDemo::run);
        thread.start();
        try {
            Thread.sleep(1000);
        } catch (Exception e) {  }
        lock.lock();
        // 唤醒
        condition.signal();
        lock.unlock();
        log.info("    === {}     ===: {} 33333",Thread.currentThread().getName());
    }

    private static void run() {
        lock.lock();
        try {
            log.info("=== {} ===: {} 11111", Thread.currentThread().getName());
            // 等待
            condition.await();
            log.info("=== {} ===: {} 22222", Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Condition常用API

JAVA多线程(八) Condition源码分析

手写基于condition的队列

package com.brian.mutilthread.condition;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class BrianQueue<T> {
    private Object[] items;
    // 添加的下标,删除的下标和数组当前数量
    private int addIndex, removeIndex, count;
    private Lock lock = new ReentrantLock();
    private Condition emptyCondition = lock.newCondition();
    private Condition fullCondition = lock.newCondition();

    public BrianQueue(int size) {
        items = new Object[size];
    }

    // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
    public void add(T t) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                log.info("===== queue is full and be blocked ======");
                fullCondition.await();
            }
            items[addIndex] = t;
            if (++addIndex == items.length) {
                addIndex = 0;
            }
            log.info("=== add() ===: {}", addIndex);
            ++count;
            emptyCondition.signal();
        } finally {
            lock.unlock();
        }
    }

    // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
    @SuppressWarnings("unchecked")
    public T remove() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                log.info("===== queue is empty and be blocked ======");
                emptyCondition.await();
            }
            Object x = items[removeIndex];
            if (++removeIndex == items.length) {
                removeIndex = 0;
            }
            log.info("=== remove() ===: {}", removeIndex);
            --count;
            fullCondition.signal();
            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}

测试类

package com.brian.mutilthread.condition;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class BrianQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        BrianQueue<Integer> brianQueue = new BrianQueue<>(5);

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(()->{
            Integer num = 0;
            while (true){
                try {
                    brianQueue.add(++num);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        executorService.execute(()->{
            while (true){
                try {
                    brianQueue.remove();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

    }
}

 JAVA多线程(八) Condition源码分析

Condition await() 和 signal()源码解读

此处以Condition的实现类ConditionObject,ConditionObject是同步器AbstractQueuedSynchronizer的内部类来分析

public final void await() throws InterruptedException {

    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前节点添加到最后一个节点
    Node node = addConditionWaiter();
    //释放锁的状态
    long savedState = fullyRelease(node);
    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {

        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //重新获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
//获取单向链表,
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

 调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

上一篇:SQL之修改语句


下一篇:android ConditionVariable