JUC学习-7-ForkJoin-异步回调-Future

ForkJoin

分支合并

ForkJoin在JDK1.7出现的 并行执行任务 提高效率 大数据量

把大任务分成小任务

fork join 特点 工作窃取

public class ForkJoinDemo extends RecursiveTask<Long> {

    private long start = 0L;
    private long end = 0L;
    private long temp = 10000L;

    public ForkJoinDemo(long start, long end) {
        this.start = start;
        this.end = end;
    }

    public static void main(String[] args) {

    }

    @Override
    protected Long compute() {
        if ((end - start) < temp) {
            long sum = 0L;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            long middle = (start + end) / 2; // 中间值
            ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
            task1.fork(); // 拆分任务,把任务压入线程队列
            ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
            task2.fork(); // 拆分任务,把任务压入线程队列

            return task1.join() + task2.join();
        }
    }


}
public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        test1(); // Long 6434 long 318
        test2(); // Long 5232  long 292
//        test3(); // 219
    }

    public static void test1() {
        long start = System.currentTimeMillis();
        long sum = 0L;
        for (long i = 1L; i <= 10_0000_0000L; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println(sum);
        System.out.println(end - start);
    }

    public static void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task1 = new ForkJoinDemo(0L, 10_0000_0000L);

        ForkJoinTask<Long> submit = forkJoinPool.submit(task1);
        long sum = submit.get();


        long end = System.currentTimeMillis();
        System.out.println(sum);
        System.out.println(end - start);
    }

    public static void test3() {
        // LongStream流
        long start = System.currentTimeMillis();
        long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
        long end = System.currentTimeMillis();
        System.out.println(sum);
        System.out.println(end - start);
    }
}

异步回调

Future 设计的初衷: 对将来某个时间的结果进行建模

public class Demo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 发起一个请求
        // 没有返回值的异步回调
//        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
//            try {
//                TimeUnit.SECONDS.sleep(2);
//                System.out.println(Thread.currentThread().getName() + "runAsync");
//
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        });
        System.out.println("-1-1-11--1");
//        completableFuture.get();// 获取执行结果

        // 有返回值的异步回调
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "supplyAsync");
            int x = 1 / 0;
            return 1024;
        });
        Integer integer = completableFuture.whenComplete((t, u) -> {
            System.out.println("t===" + t + "u====" + u);
        }).exceptionally((e) -> {
//            e.printStackTrace();
            return 500;
        }).get();
        System.out.println(integer);
    }
}
上一篇:id_rsa 与 id_rsa.pub 文件详解


下一篇:使用 Rust 实现并查集