如果拒绝策略是callerrunspolicy时,如果队列满了,线程已达到最大线程数,那么,如果在继续提交任务,就会使用拒绝策略,将新提交的任务,交给调用者线程或者说上层线程(没关闭的话)去执行,
所以,如果还有新任务提交,此时调用者线程会阻塞(不是真正的阻塞,就是卡在这里)新任务的提交,阻塞在这里(因为没有资源可以处理去新提交的任务)。知道所有任务提交完成,才继续往下执行!
另外 completablefuture 提交移步任务时也是如此。此时,调用者线程会阻塞在这行代码上。尽管阻塞在这行代码上(也就是调用者线程),调用者线程还是可以消费分过来的新任务。等到调用者线程全部把任务提交出去了,才继续往下执行
注意:get结果时,才是真正的阻塞,占有线程资源,不释放。直到返回结果或者超时,线程回到可消费,其他任务才可以再利用
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//创建线程池,并获得线程池管理对象
// ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
// TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy());
ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
FutureTask<?> producerTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + "子线程执行了");
//线程内部的线程池
// ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 60,
// TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new ThreadPoolExecutor.CallerRunsPolicy());
List<Integer> processTaskList = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
processTaskList.add(i);
}
List<CompletableFuture<String>> futureTaskList = processTaskList.stream().map(v ->
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "执行了" + "[" + v + "]");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return String.valueOf(v);
}, threadPoolExecutor1)).collect(Collectors.toList());
System.out.println("这里没有执行");
CompletableFuture<?>[] futureTaskArr = new CompletableFuture[futureTaskList.size()];
CompletableFuture<?>[] completableFutures = futureTaskList.toArray(futureTaskArr);
CompletableFuture<Void> allFuture = CompletableFuture.allOf(completableFutures);
CompletableFuture<List<String>> resFuture = allFuture.thenApply(v ->
futureTaskList.stream().map(CompletableFuture::join).collect(Collectors.toList()));
return resFuture.get(20000, TimeUnit.MILLISECONDS).size();
});
threadPoolExecutor1.submit(producerTask);
System.out.println("process:"+ producerTask.get());
}