java线程并发工具类

  本次内容主要讲Fork-Join、CountDownLatch、CyclicBarrier以及Callable、Future和FutureTask,最后再手写一个自己的FutureTask,绝对干货满满!

 

1、Fork-Join

1.1 什么是Fork-Join

  Java多线程的开发可以我们自己启用多线程,线程池,还可以使用forkjoin。forkjoin可以让我们不去了解诸如Thread、Runnable等相关的知识,只要遵循forkjoin的开发模式,就可以写出很好的多线程并发程序。

   forkjoin采用的是分而治之。分而治之思想是:将一个难以直接解决的大问题,分割成一些规模较小的相同问题,以便各个击破,分而治之。分而治之的策略是:对于一个规模为n的问题,若该问题可以容易地解决(比如说规模n较小)则直接解决,否则将其分解为m个规模较小的子问题,这些子问题互相独立且与原问题形式相同(子问题相互之间有联系就会变为动态规范算法),递归地解这些子问题,然后将各子问题的解合并得到原问题的解,这种算法设计策略叫做分治法。用一张图来表示forkjoin原理。

java线程并发工具类

  我们可以了解一下计算机的十大经典算法:快速排序、堆排序、归并排序 、二分查找、BFPRT(线性查找)、DFS(深度优先搜索)、BFS(广度优先搜索)、Dijkstra、动态规划、朴素贝叶斯分类。其中有哪一些用到的是分而治之呢?有3个,分别是快速排序、归并排序和二分查找。

  归并排序是建立在归并操作上的一种有效的排序算法。该算法是采用分治法的一个非常典型的应用。将已有序的子序列合并,得到完全有序的序列;即先使每个子序列有序,再使子序列段间有序。若将两个有序表合并成一个有序表,称为2路归并,与之对应的还有多路归并。对于给定的一组数据,利用递归与分治技术将数据序列划分成为越来越小的半子表,在对半子表排序后,再用递归方法将排好序的半子表合并成为越来越大的有序序列。为了提升性能,有时我们在半子表的个数小于某个数(比如15)的情况下,对半子表的排序采用其他排序算法,比如插入排序。下面演示一下归并排序的过程。

1.2 归并排序(升序)示例

java线程并发工具类

 先将数组划分为左右2个子表:

java线程并发工具类

 然后继续对左右2个子表进行拆分:

java线程并发工具类

对拆分好的4个子表进行排序:

java线程并发工具类

 对有序子表进行比较合并:

java线程并发工具类

 对合并后的子表继续比较合并:

java线程并发工具类

 第二次合并后,数组呈有序排列。

 1.3 Fork-Join工作窃取

  工作窃取是指当前线程的Task已经全被执行完毕,则自动取到其他线程的Task队列中取出Task继续执行。ForkJoinPool中维护着多个线程在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。用一张图进行说明。

java线程并发工具类

1.3 Fork-Join使用

   Fork-Join使用两个类来完成以上两件事情:ForkJoinTask和ForkJoinPool。我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork和join的操作机制,通常我们不直接继承ForkjoinTask类,只需要直接继承其子类。

(1)RecursiveAction,用于没有返回结果的任务

(2)RecursiveTask,用于有返回值的任务

 task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行。join()和get方法当任务完成的时候返回计算结果。调用get/join方法的时候会阻塞。还是用一个图来说明forkjoin的工作流程。

java线程并发工具类

  在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

 1.4 Fork-Join VS 单线程

  假设有一个业务场景,数据库中有编号为0到1千万的会员信息,要统计所有会员的余额总和。为了对比结果的一致性,用户的余额不用随机数表示,就用编号代表用户的余额。现在的做法是每次从数据库查询出5000条数据进行统计,直到所有数据统计完成,进行汇总。对比看看单线程和Fork-Join的差异。

先看单线程场景:

public class SingleThreadSumNumber {
    /**
     * 每次查询5000条进行统计
     */
    private static final int THRESHOLD = 5000;

    /**
     * 最小值
     */
    private static final int MIN = 0;

    /**
     * 最大值
     */
    private static final int MAX = 10000000;

    public void sumNumber() {
        long sum = 0;
        long startTime = System.currentTimeMillis();
        int start = MIN;
        int end = MIN + THRESHOLD;

        boolean isFirstTime = true;
        while (end <= MAX) {
            sum = sum + batchSum(start, end);
            if (isFirstTime) {
                start = start + THRESHOLD + 1;
                isFirstTime = false;
            } else {
                start = start + THRESHOLD;
            }
            end = end + THRESHOLD;
        }
        System.out.println("The result is " + sum
                + " spend time:" + (System.currentTimeMillis() - startTime) + "ms");
    }

    /**
     * 统计每次查询出来的余额总和
     * @param start
     * @param end
     * @return
     */
    public long batchSum(int start, int end) {
        long sum = 0;
        try {
            Thread.sleep(15);//休眠15毫秒模拟查询业务
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = start; i <= end; i++) {
            sum += i;
        }
        return sum;
    }

    public static void main(String[] args) {
        SingleThreadSumNumber thread = new SingleThreadSumNumber();
        thread.sumNumber();
    }
}

运行程序输出以下结果:

java线程并发工具类

余额总和为50000005000000,花费了30119毫秒,下面使用forkjoin来进行统计:

 1 import java.util.concurrent.ForkJoinPool;
 2 import java.util.concurrent.RecursiveTask;
 3 
 4 public class ForkJoinDemo {
 5     /**
 6      * 门限值,如果任务门限低于此值,则进行计算
 7      */
 8     private static final int THRESHOLD = 5000;
 9 
10     /**
11      * 最小值
12      */
13     private static final int MIN = 0;
14 
15     /**
16      * 最大值
17      */
18     private static final int MAX = 10000000;
19 
20     private static class SumNumberTask extends RecursiveTask<Long> {
21         private int start;
22         private int end;
23 
24         public SumNumberTask(int start, int end) {
25             this.start = start;
26             this.end = end;
27         }
28 
29         @Override
30         protected Long compute() {
31             if (end - start < THRESHOLD) {
32                 return sumBatch(start, end);
33             } else {
34                 int mid = (start + end) / 2;
35                 SumNumberTask left = new SumNumberTask(start, mid);
36                 SumNumberTask right = new SumNumberTask(mid + 1, end);
37                 invokeAll(left, right);
38                 long leftResult = left.join();
39                 long rightResult = right.join();
40                 return leftResult + rightResult;
41             }
42         }
43     }
44 
45     public void sumNumber() {
46         ForkJoinPool pool = new ForkJoinPool();
47         long start = System.currentTimeMillis();
48         int recordMin = MIN;
49         int recordMax = MAX;
50         SumNumberTask sumTask = new SumNumberTask(recordMin, recordMax);
51         pool.invoke(sumTask);
52         System.out.println("Task is Running.....");
53         Long result = sumTask.join();
54         System.out.println("The result is " + result + " spend time:"
55                 + (System.currentTimeMillis() - start) + "ms");
56     }
57 
58     /**
59      * 统计每次任务的总和
60      * @param fromId
61      * @param toId
62      * @return
63      */
64     public static long sumBatch(int fromId, int toId) {
65         long sum = 0;
66         try {
67             Thread.sleep(15);//休眠15毫秒模拟查询业务
68         } catch (InterruptedException e) {
69             e.printStackTrace();
70         }
71         for (int i = fromId; i <= toId; i++) {
72             sum += i;
73         }
74         return sum;
75     }
76 
77     public static void main(String[] args) {
78         ForkJoinDemo forkJoinDemo = new ForkJoinDemo();
79         forkJoinDemo.sumNumber();
80     }
81 }

输出结果:

java线程并发工具类

   余额总和为50000005000000,和使用单线程统计时一致,使用forkjoin达到了同样的目的,但是只花费了4078毫秒,性能提升了7倍多。为了使性能有进一步提升,我们可以在第44行指定并发数量。不传参情况下,默认并发量是当前服务器的逻辑CPU个数。我们把并发量调整成64,即ForkJoinPool pool = new ForkJoinPool(16 * 4),执行程序,输出结果为:

java线程并发工具类

 统计结果一致,花费了567毫秒,比起单线程统计,性能提升了53倍之多,由此可见forkjoin的并发威力。

2、CountDownLatch

 2.1 什么是CountDownLatch

  JDK对CountDownLatch的解释是:一种同步辅助器,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止。举个例子来理解CountDownLatch:隔壁寝室的老王今天要参加学校运动会的400米决赛,跟小王一起争夺冠军的还有另外5个人,不管这6位选手的内心多激动多澎湃,也要等裁判的发令枪响了之后才能起跑,裁判不发出指令,选手就只能在起跑线等待,这就是CountDownLatch的作用。但是实际场景并不只有一个发令裁判,参加过学校运动会的同学都知道,还可能需要若干个裁判进行手动计时,要等所有的裁判都就位后,发令枪一响,运动员才能起跑。假设有3个计时裁判,一个发令裁判,用一个图来说明。

java线程并发工具类

   在比赛开始前,发令裁判会用洪荒之力吼一声,各~就~各~位,此时发令裁判会用炯炯有神的目光和3位计时裁判交流,3位裁判分别点头示意已经准备好了,此时发令裁判会再次大吼一声,预备~~~跑!!!此时憋了许久的6位运动员飞奔出去,当然老王遥遥领先,毕竟女神给他说了跑第一名的话晚上有奖励。发令裁判的任务完成,不用继续执行下去,而3个计时裁判继续工作,对6位选手的成绩进行一个记录。

 2.1 CountDownLatch实战

用一段程序来模拟老王参加运动会400米决赛的场景。

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

    /**
     * 运动员在计时裁判和发令裁判就位后才能起跑
     */
    static CountDownLatch sportsManLatch = new CountDownLatch(4);

    /**
     * 发令裁判在3个计时裁判准备好之后才能发令
     */
    static CountDownLatch orderRefereeLatch = new CountDownLatch(3);

    /**
     * 计时裁判
     */
    static class TimeReferee implements Runnable {
        private int no;

        public TimeReferee(int no) {
            this.no = no;
        }

        @Override
        public void run() {
            System.out.println(no + "号计时裁判就位");
            orderRefereeLatch.countDown();
            sportsManLatch.countDown();
        }
    }

    /**
     * 发令裁判
     */
    static class OrderReferee implements Runnable {
        @Override
        public void run() {
            try {
                orderRefereeLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("发令裁判发出指令~~~~~~");
            sportsManLatch.countDown();
        }
    }

    /***
     * 运动员
     */
    static class SportsMan implements Runnable {
        private int no;

        public SportsMan(int no) {
            this.no = no;
        }

        @Override
        public void run() {
            try {
                System.out.println(no + "号运动员已经就位");
                sportsManLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(no + "号选手说,我要跑第一");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //6个运动员就位
        for (int i = 0; i < 6; i++) {
            new Thread(new SportsMan(i)).start();
        }

        //发令裁判和计时裁判眼神确认,等计时裁判都准备好之后发令
        new Thread(new OrderReferee()).start();

        //3个计时裁判就位
        for (int i = 0; i < 3; i++) {
            new Thread(new TimeReferee(i)).start();
        }
    }
}

程序输出:

java线程并发工具类

3、CyclicBarrier

 3.1 什么是CyclicBarrier

  JDK对CyclicBarrier的解释是:一种同步辅助工具,它允许一组线程全部互相等待以到达一个公共的障碍点。我们可以从字面意思理解它,可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才能继续运行。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),parties表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在parties个线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。还用一张图来说明。

java线程并发工具类

 3.2 CyclicBarrier实战

  CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。我们模拟3个子线程向一个map中添加数据,它们添加数据完成后,到一个屏障点进行等待,由统计线程对数据进行打印,统计线程工作结束后,3个子线程再被统一释放去干其他工作。我们设置2个屏障点来演示,,体现其可循环使用的特征。

public class CyclicBarrierDemo {
    private static CyclicBarrier barrier = new CyclicBarrier(3, new CollectThread());

    /**存放子线程产生数据的容器*/
    private static ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            Thread thread = new Thread(new WorkThread());
            thread.start();
        }
        Thread.sleep(5);
    }

    /**
     * 负责对子线程的结果进行其他处理
     */
    private static class CollectThread implements Runnable {
        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for (Map.Entry<String, Long> workResult : map.entrySet()) {
                result.append("[" + workResult.getValue() + "]");
            }
            System.out.println("the result = " + result);
            System.out.println("CollectThread do other things");
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("CollectThread end........");
        }
    }

    /**
     * 实际工作的子线程
     */
    private static class WorkThread implements Runnable {
        @Override
        public void run() {
            long id = Thread.currentThread().getId();
            map.put(id + "", id);
            Random r = new Random();
            try {
                Thread.sleep(r.nextInt(1000));
                System.out.println("Thread_" + id + " first do something ");
                //第一次到达屏障
                barrier.await();
                System.out.println("Thread_" + id + " first do other things");

                Thread.sleep(r.nextInt(500));
                map.put(id * 2 + "", id * 2);
                System.out.println("Thread_" + id + " second do something ");
                //第二次到达屏障
                barrier.await();
                System.out.println("Thread_" + id + " second other things ");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

程序输出:

java线程并发工具类

3.3 CountDownLatch和CyclicBarrier对比

  CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以反复使用。CountDownLatch.await()一般阻塞工作线程,所有的进行预备工作的线程执行countDown(),而CyclicBarrier通过工作线程调用await()从而自行阻塞,直到所有工作线程达到指定屏障,再大家一起往下走。在控制多个线程同时运行上,CountDownLatch可以不限线程数量,而CyclicBarrier是固定线程数。同时,CyclicBarrier还可以提供一个barrierAction,合并多线程计算结果。

4、Callable、Future和FutureTask

4.1 Runnable、Callable、Future和FutureTask之间的关系

  Runnable是一个接口,在它里面只声明了一个run()方法,由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call(),这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。要获取返回结果时可以调用get方,该方法会阻塞直到任务返回结果。因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了FutureTask。FutureTask类实现了RunnableFuture接口,RunnableFuture继承了Runnable接口和Future接口,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。用一个图来说明。

java线程并发工具类

  因此当我们想通过一个线程运行Callable,但是Thread不支持构造方法中传递Callable的实例,我们需要通过FutureTask把一个Callable包装成Runnable,然后再通过这个FutureTask拿到Callable运行后的返回值。要想new出一个FutureTask的实例,有2种方式,直接贴出代码。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }


    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

4.2 Callable和FutureTask实战

  这个例子比较简单,在一个主线程中创建一个callable来对1到10000进行累加,再休眠3秒,然后把这个callable封装成一个futureTask,交给一个线程去运行,最终查看callable的返回结果和阻塞效果。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<Long> callable = new Callable<Long>() {
            long sum = 0;

            @Override
            public Long call() throws Exception {
                for (int i = 0; i <= 10000; i++) {
                    sum += i;
                }
                Thread.sleep(3000);//主要是为了演示get()时候的阻塞效果
                return sum;
            }
        };
        FutureTask<Long> futureTask = new FutureTask<>(callable);
        new Thread(futureTask).start();
        Thread.sleep(10);
        System.out.println("main线程继续执行");
        System.out.println("获取callable计算结果 = " + futureTask.get());
        System.out.println("main线程继续执行 ");
    }
}

程序输出:

java线程并发工具类

 可以看到当futureTask.get()没有获取到返回结果时,主线程是处于阻塞状态。

4.3 手写一个FutureTask

   要实现一个简易的FutureTask,通过上面对几个接口之间关系的介绍,以及阅读FutureTask代码可以看出,只需定义一个类,实现Runnable和Future接口,并实现run()方法和get()方法就可以了,核心思想就是上一篇文章中提到的通知/等待机制。直接上代码:

import java.util.concurrent.*;

public class MyFutureTask<V> implements Runnable, Future<V> {
    private Callable<V> callable;

    private V result = null;

    public MyFutureTask(Callable<V> callable) {
        this.callable = callable;
    }

    @Override
    public void run() {
        V temp = null;
        try {
            temp = callable.call();
        } catch (Exception e) {
            e.printStackTrace();
        }
        synchronized (this) {
            result = temp;
            this.notifyAll();
        }
    }

    @Override
    public V get() throws InterruptedException {
        if (result != null) {
            return result;
        }
        System.out.println("等待结果执行完成。。。。。");
        synchronized (this) {
            this.wait();
        }
        return result;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return false;
    }


    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return null;
    }
}

 为了验证效果,把上一段代码中的FutureTask改成MyFutureTask,其余代码统统不变。

import java.util.concurrent.Callable;

public class FutureTaskDemo {
    public static void main(String[] args) throws InterruptedException {
        Callable<Long> callable = new Callable<Long>() {
            long sum = 0;

            @Override
            public Long call() throws Exception {
                for (int i = 0; i <= 10000; i++) {
                    sum += i;
                }
                Thread.sleep(3000);//主要是为了演示get()时候的阻塞效果
                return sum;
            }
        };
        MyFutureTask<Long> futureTask = new MyFutureTask<>(callable);
        new Thread(futureTask).start();
        Thread.sleep(10);
        System.out.println("main线程继续执行");
        System.out.println("获取callable计算结果 = " + futureTask.get());
        System.out.println("main线程继续执行 ");
    }
}

运行程序,可以看到输出结果和阻塞现象与使用FutureTask一致:

java线程并发工具类

 

 

  这篇随笔就介绍这么多内容,希望大家看了有收获。原子操作CAS在下一篇文章中介绍,阅读过程中如发现描述有误,请指出,谢谢。

 

上一篇:除了Thread和Runnable,你还知道第三种创建线程的方式Callable吗


下一篇:说一下 runnable 和 callable 有什么区别?