并发容器 & 并发工具

一、并发容器

CopyOnWriteArrayList

1.读写分离

写操作在一个复制的数组上进行,读操作还是在原数组中进行,读写分离,互不影响。

写操作需要加锁,防止并发写入时导致写入数据丢失。

写操作结束之后需要把原数组指向新的复制数组。

//写操作:
//通过过创建底层数组的新副本来实现的。
//当 List 需要被修改的时候,并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写入副本。
//写完之后,把原数组指向新的复制数组。
//这样可以保证写操作实在一个复制的数组上进行,而读操作还是在原数组中进行,不会影响读操作。
public boolean add(E e) {
   //加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // newElements 是一个复制的数组
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        // 写操作在一个复制的数组上进行
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

final void setArray(Object[] a) {
    array = a;
}
//读操作
//读操作没有任何同步控制和锁操作, 
//因为内部数组 array 不会被修改。
private transient volatile Object[] array;

public E get(int index) {
    return get(getArray(), index);
}

@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
    return (E) a[index];
}

final Object[] getArray() {
    return array;
}

2. 适用场景

CopyOnWriteArrayList 在写操作的同时允许读操作,大大提高了读操作的性能,很适合读多写少的应用场景。

CopyOnWriteArrayList 有其缺陷:

  • 内存占用:在写操作时需要复制一个新的数组,使得内存占用为原来的两倍左右;
  • 数据不一致:读操作不能读取实时性的数据,因为部分写操作的数据还未同步到读数组中。

所以 CopyOnWriteArrayList 不适合内存敏感以及对实时性要求很高的场景。

ConcurrentHashMap

1. 储存结构

static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
}

ConcurrentHashMap 采用了分段锁(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶, 从而使其并发度更高(并发度就是 Segment 的个数)。

//Segment 继承自 ReentrantLock。
static final class Segment<K,V> extends ReentrantLock implements Serializable {

    private static final long serialVersionUID = 2249069246763182397L;

    static final int MAX_SCAN_RETRIES =
        Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

    transient volatile HashEntry<K,V>[] table;

    transient int count;

    transient int modCount;

    transient int threshold;

    final float loadFactor;
}
final Segment<K,V>[] segments; 

//默认的并发级别为 16,也就是说默认创建 16 个 Segment。
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

并发容器 & 并发工具

2. size 操作

每个 Segment 维护了一个 count 变量来统计该 Segment 中的键值对个数。

/**
 * The number of elements. Accessed only either within locks
 * or among other volatile reads that maintain visibility.
 */
transient int count;Copy to clipboardErrorCopied

在执行 size 操作时,需要遍历所有 Segment 然后把 count 累计起来。

ConcurrentHashMap 在执行 size 操作时先尝试不加锁,如果连续两次不加锁操作得到的结果一致,那么可以认为这个结果是正确的

尝试次数使用 RETRIES_BEFORE_LOCK 定义,该值为 2,retries 初始值为 -1,因此尝试次数为 3。

如果尝试的次数超过 3 次,就需要对每个 Segment 加锁。

/**
 * Number of unsynchronized retries in size and containsValue
 * methods before resorting to locking. This is used to avoid
 * unbounded retries if tables undergo continuous modification
 * which would make it impossible to obtain an accurate result.
 */
static final int RETRIES_BEFORE_LOCK = 2;

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            // 超过尝试次数,则对每个 Segment 加锁
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            // 连续两次得到的结果一致,则认为这个结果是正确的
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}Copy to clipboardErrorCopied

3. JDK 1.8 的改动

ConcurrentHashMap 取消了 Segment 分段锁。

JDK 1.8 使用 CAS 操作来支持更高的并发度,在 CAS 操作失败时使用内置锁 synchronized

数据结构与HashMap 1.8 的结构类似,数组+链表 / 红黑二叉树(链表长度 > 8 时,转换为红黑树 )。synchronized 只锁定当前链表或红黑二叉树的首节点,这样只要 Hash 值不冲突,就不会产生并发。

4. JDK 1.8 中的 put 方法

(1)hash 算法

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

(2)定位索引位置

i = (n - 1) & hash

(3)获取 table 中对应索引的元素 f

f = tabAt(tab, i = (n - 1) & hash
// Unsafe.getObjectVolatile 获取 f
// 因为可以直接指定内存中的数据,保证了每次拿到的数据都是新的
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

(4)如果 f 是 null,说明 table 中是第一次插入数据,利用

  • 如果 CAS 成功,说明 Node 节点插入成功
  • 如果 CAS 失败,说明有其他线程提前插入了节点,自旋重新尝试在该位置插入 Node

(5)其余情况把新的 Node 节点按链表或红黑树的方式插入到合适位置,这个过程采用内置锁实现并发。

5. 和Hashtable

底层数据结构:

  • JDK1.7 的ConcurrentHashMap底层采用分段的数组+链表实现, JDK1.8 的ConcurrentHashMap底层采用的数据结构与JDK1.8 的HashMap的结构一样,数组+链表/红黑二叉树
  • Hashtable和JDK1.8 之前的HashMap的底层数据结构类似都是采用数组+链表的形式, 数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的。

实现线程安全的方式

  • JDK1.7的ConcurrentHashMap(分段锁)对整个桶数组进行了分割分段(Segment), 每一把锁只锁容器其中一部分数据,多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问度。 JDK 1.8 采用数组+链表/红黑二叉树的数据结构来实现,并发控制使用synchronized和CAS来操作。
  • Hashtable:使用 synchronized 来保证线程安全,效率非常低下。 当一个线程访问同步方法时,其他线程也访问同步方法,可能会进入阻塞或轮询状态, 如使用 put 添加元素,另一个线程不能使用 put 添加元素,也不能使用 get,竞争会越来越激烈。

二、并发工具

J.U.C -AQS

java.util.concurrent(J.U.C)大大提高了并发性能,AQS 被认为是 J.U.C 的核心。

CountDownLatch

用来控制一个或者多个线程等待多个线程。

维护了一个计数器 cnt,每次调用 countDown() 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await() 方法而在等待的线程就会被唤醒。

并发容器 & 并发工具

public class CountdownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        final int totalThread = 3;
        CountDownLatch countDownLatch = new CountDownLatch(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.print("run..");
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        System.out.println("end");
        executorService.shutdown();
    }
}
run..run..run..end

CyclicBarrier

用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。

和 CountdownLatch 相似,都是通过维护计数器来实现的。线程执行 await() 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await() 方法而在等待的线程才能继续执行。

CyclicBarrier 和 CountdownLatch 的一个区别是,CyclicBarrier 的计数器通过调用 reset() 方法可以循环使用,所以它才叫做循环屏障。

CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会执行一次。

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

并发容器 & 并发工具

public class CyclicBarrierExample {

    public static void main(String[] args) {
        final int totalThread = 10;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.print("before..");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.print("after..");
            });
        }
        executorService.shutdown();
    }
}
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..

Semaphore

Semaphore 类似于操作系统中的信号量,可以控制对互斥资源的访问线程数。

以下代码模拟了对某个服务的并发请求,每次只能有 3 个客户端同时访问,请求总数为 10。

public class SemaphoreExample {
    public static void main(String[] args) {
        final int threadNum=1;
        final int totalThread=10;

        Semaphore semaphore=new Semaphore(threadNum);

        ExecutorService service= Executors.newCachedThreadPool();

        for(int i=0;i<totalThread;i++){
            final int num=i;
            service.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        test(num);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        semaphore.release();
                    }
                }
            });
        }
        service.shutdown();
    }

    private static void test(int i) throws InterruptedException {
        System.out.println("Thread: "+i);
        Thread.sleep(1000);
    }
}
// 每隔一秒钟出现一次
Thread: 0
Thread: 1
Thread: 2
Thread: 3
Thread: 4
Thread: 5
Thread: 7
Thread: 6
Thread: 8
Thread: 9

CountDownLatch 和 CyclicBarrier 比较

  • 循环使用

    CountDownLatch 只能用一次;

    CyclicBarrier 通过 reset() 可以循环使用

  • 计数方式

    CountDownLatch 是减计数方式,计数为 0 时释放所有等待的线程;

    CyclicBarrier 是加计数方式,计数达到构造方法中参数指定的值释放所有等待的线程

  • 应用场景

    CountDownLatch 主要应用于主/从任务模式。一个任务(主任务)等待多个任务(从任务)执行完后才能执行;

    CyclicBarrier 主要应用于队友模式。一组 N 个线程(N 个队友)相互等待,任意个线程(某个队友)没有完成任务,所有线程都等着,直到这一组所有线程的任务完成,这组中每个线程才能继续往下运行。

  • 底层原理

    CountDownLatch 底层是共享锁;

    CyclicBarrier 底层是独占锁。

J.U.C - 其他组件

FutureTask

在介绍 Callable 时我们知道它可以有返回值,返回值通过 Future 进行封装。FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future 接口,这使得 FutureTask 既可以当做一个任务执行,也可以有返回值。

public class FutureTask<V> implements RunnableFuture<V>Copy to clipboardErrorCopied
public interface RunnableFuture<V> extends Runnable, Future<V>Copy to clipboardErrorCopied

FutureTask 可用于异步获取执行结果或取消执行任务的场景。当一个计算任务需要执行很长时间,那么就可以用 FutureTask 来封装这个任务,主线程在完成自己的任务之后再去获取结果。

public class FutureTaskExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<Integer>(
            new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int result = 0;
                for (int i = 0; i < 100; i++) {
                    Thread.sleep(10);
                    result += i;
                }
                return result;
            }
        });

        Thread computeThread = new Thread(futureTask);
        computeThread.start();

        Thread otherThread = new Thread(() -> {
            System.out.println("other task is running...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        otherThread.start();
        System.out.println(futureTask.get());
    }
}
other task is running...
4950

BlockingQueue

java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:

  • FIFO 队列 :LinkedBlockingQueue、ArrayBlockingQueue(固定长度)
  • 优先级队列 :PriorityBlockingQueue

阻塞队列是一个自持两个附加操作的队列:

  • 支持阻塞的插入方法(put):当队列满时,会阻塞插入元素的线程,直到队列中的元素不满为止。
  • 支持阻塞的移除方法(take):队列为空时,会阻塞获取元素的线程,直到队列中的元素不空为止。

使用 BlockingQueue 实现生产者消费者问题

public class ProducerConsumer {
    private static BlockingQueue<String> queue=new LinkedBlockingQueue<>();

    private static class Producer extends Thread{
        @Override
        public void run() {
            try {
                queue.put("product");
                System.out.println("produce...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static class Consumer extends Thread{
        @Override
        public void run() {
            try {
                queue.take();
                System.out.println("consume...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        for(int i=0;i<2;i++){
            Producer p=new Producer();
            p.start();
        }

        for(int i=0;i<5;i++){
            Consumer c=new Consumer();
            c.start();
        }

        for(int i=0;i<3;i++){
            Producer p=new Producer();
            p.start();
        }

    }
}
produce...
produce...
consume...
consume...
produce...
produce...
consume...
consume...
produce...
consume...

ForkJoin

主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。

public class ForkJoinExample extends RecursiveTask<Integer> {

    private final int threshold = 5;
    private int first;
    private int last;

    public ForkJoinExample(int first, int last) {
        this.first = first;
        this.last = last;
    }

    @Override
    protected Integer compute() {
        int result = 0;
        if (last - first <= threshold) {
            // 任务足够小则直接计算
            for (int i = first; i <= last; i++) {
                result += i;
            }
        } else {
            // 拆分成小任务
            int middle = first + (last - first) / 2;
            ForkJoinExample leftTask = new ForkJoinExample(first, middle);
            ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
            leftTask.fork();
            rightTask.fork();
            result = leftTask.join() + rightTask.join();
        }
        return result;
    }
}Copy to clipboardErrorCopied
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ForkJoinExample example = new ForkJoinExample(1, 10000);
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    Future result = forkJoinPool.submit(example);
    System.out.println(result.get());
}Copy to clipboardErrorCopied

ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。

public class ForkJoinPool extends AbstractExecutorServiceCopy to clipboardErrorCopied

ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率。

每个线程都维护了一个双端队列,用来存储需要执行的任务。

工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。

例如下图中,Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务,Thread1 会拿出 Task2 来执行,这样就避免发生竞争。但是如果队列中只有一个任务时还是会发生竞争。并发容器 & 并发工具

 

上一篇:CyberTeam宣称已攻破视频聊天软件Skype,下一个是游戏平台Steam


下一篇:与众不同 windows phone (38) - 8.0 关联启动: 使用外部程序打开一个文件或URI, 关联指定的文件类型或协议