ForkJoin
分支合并
ForkJoin在JDK1.7出现的 并行执行任务 提高效率 大数据量
把大任务分成小任务
fork join 特点 工作窃取
public class ForkJoinDemo extends RecursiveTask<Long> {
private long start = 0L;
private long end = 0L;
private long temp = 10000L;
public ForkJoinDemo(long start, long end) {
this.start = start;
this.end = end;
}
public static void main(String[] args) {
}
@Override
protected Long compute() {
if ((end - start) < temp) {
long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = (start + end) / 2; // 中间值
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork(); // 拆分任务,把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
task2.fork(); // 拆分任务,把任务压入线程队列
return task1.join() + task2.join();
}
}
}
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1(); // Long 6434 long 318
test2(); // Long 5232 long 292
// test3(); // 219
}
public static void test1() {
long start = System.currentTimeMillis();
long sum = 0L;
for (long i = 1L; i <= 10_0000_0000L; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println(sum);
System.out.println(end - start);
}
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task1 = new ForkJoinDemo(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task1);
long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println(sum);
System.out.println(end - start);
}
public static void test3() {
// LongStream流
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println(sum);
System.out.println(end - start);
}
}
异步回调
Future 设计的初衷: 对将来某个时间的结果进行建模
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 发起一个请求
// 没有返回值的异步回调
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
// try {
// TimeUnit.SECONDS.sleep(2);
// System.out.println(Thread.currentThread().getName() + "runAsync");
//
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// });
System.out.println("-1-1-11--1");
// completableFuture.get();// 获取执行结果
// 有返回值的异步回调
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "supplyAsync");
int x = 1 / 0;
return 1024;
});
Integer integer = completableFuture.whenComplete((t, u) -> {
System.out.println("t===" + t + "u====" + u);
}).exceptionally((e) -> {
// e.printStackTrace();
return 500;
}).get();
System.out.println(integer);
}
}