java并行之parallelStream与CompletableFuture比较

1.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList; public class CompletableFutureTest {
static final UserFeatureable[] UserFeatures = {new GetCareerUserFeature(), new GetTradeUserFeature(), new
GetVipLevelUserFeature()}; public static void main(String[] args) {
int id = 10001;
List<UserFeatureable> userFeatures = Arrays.asList(UserFeatures);
long startTime = System.currentTimeMillis();
String result = userFeatures.parallelStream().map(p -> p.getKey() + ":" + p.getValue(id)).collect(joining(","));
long endTime = System.currentTimeMillis();
System.out.println(String.format("parallelStream消耗时间:%d,返回结果:%s", (endTime - startTime), result)); startTime = System.currentTimeMillis();
List<Future<String>> futureList = userFeatures.stream().map(
p -> CompletableFuture.supplyAsync(() -> p.getKey() + ":" + p.getValue(id)))
.collect(toList());
result = futureList.stream().map(p->getVal(p,"")).collect(joining(","));
endTime = System.currentTimeMillis();
System.out.println(String.format("CompletableFuture的默认的ForkJoin线程池消耗时间:%d,返回结果:%s", (endTime - startTime),
result)); //当userFeature越多,使用自定义线程池更有利
startTime = System.currentTimeMillis();
futureList = userFeatures.stream().map(
p -> CompletableFuture.supplyAsync(() -> p.getKey() + ":" + p.getValue(id),CustomThreadPool.INSTANCE))
.collect(toList());
result = futureList.stream().map(p->getVal(p,"")).collect(joining(","));
endTime = System.currentTimeMillis();
System.out.println(String.format("CompletableFuture的自定义线程池消耗时间:%d,返回结果:%s", (endTime - startTime), result));
} private static <T>T getVal(Future<T> future,T defaultV){
try {
return future.get(2,TimeUnit.SECONDS);
}catch (Exception ex){
return defaultV;
}
}
} interface UserFeatureable {
String getKey(); String getValue(int id);
} class GetCareerUserFeature implements UserFeatureable { @Override
public String getKey() {
return "career";
} @Override
public String getValue(int id) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) { }
return "10";
}
} class GetTradeUserFeature implements UserFeatureable { @Override
public String getKey() {
return "trade";
} @Override
public String getValue(int id) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) { }
return "5";
}
} class GetVipLevelUserFeature implements UserFeatureable { @Override
public String getKey() {
return "vip";
} @Override
public String getValue(int id) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) { }
return "v1";
}
}

2.自定义线程池配置

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class CustomThreadPool {
/**
* 默认核心线程池大小
*/
private static final int DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors(); /**
* 最大线程池大小
* 最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
* 最佳线程数目 = (1s/0.1s + 1) * CPU数目
*/
private static final int DEFAULT_MAXIMUM_POOL_SIZE = 11 * DEFAULT_CORE_POOL_SIZE; /**
* 超过核心线程后,空闲线程等待时间
*/
private static final long DEFAULT_KEEP_ALIVE_SECONDS = 10; /**
* 等待执行的线程队列
*/
private static BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingDeque(DEFAULT_MAXIMUM_POOL_SIZE * 2); public static ThreadPoolExecutor INSTANCE = new ThreadPoolExecutor(
DEFAULT_CORE_POOL_SIZE,
DEFAULT_MAXIMUM_POOL_SIZE,
DEFAULT_KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
WORK_QUEUE,
new ThreadFactoryBuilder().setNameFormat("task-pool-thread-%d").build()); }

3.结果

parallelStream消耗时间:2889,返回结果:career:10,trade:5,vip:v1
CompletableFuture的ForkJoin线程池消耗时间:1010,返回结果:career:10,trade:5,vip:v1
CompletableFuture的自定义线程池消耗时间:1011,返回结果:career:10,trade:5,vip:v1
上一篇:tzcacm去年训练的好题的AC代码及题解


下一篇:C语言入门(7)——自定义函数