ForkJoin
ForkJoin是由JDK1.7后提供多线并发处理框架,ForkJoin的框架的基本思想是分而治之。使用ForkJoin将相同的计算任务通过多线程的进行执行。从而能提高数据的计算速度。
分而治之
分而治之就是将一个复杂的计算,按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总。相应的ForkJoin将复杂的计算当做一个任务。而分解的多个计算则是当做一个子任务。
使用
使用ForkJoin框架,需要创建一个ForkJoin的任务。因为ForkJoin框架为我们提供了RecursiveAction和RecursiveTask。
我们只需要继承ForkJoin为我们提供的抽象类的其中一个并且实现compute方法。
RecursiveTask在进行exec之后会使用一个result的变量进行接受返回的结果。而RecursiveAction在exec后是不会保存返回结果。
任务分割
ForkJoinTask
: 基本任务,使用forkjoin框架必须创建的对象,提供fork,join操作,常用的两个子类
-
RecursiveAction
: 无结果返回的任务 -
RecursiveTask
: 有返回结果的任务
说明:
-
fork
: 让task异步执行 -
join
: 让task同步执行,可以获取返回值 - ForkJoinTask 在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行
结果合并
ForkJoinPool
执行 ForkJoinTask
,
- 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。
- 当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务
三中提交方式:
-
execute
异步,无返回结果 -
submit
异步,有返回结果 (返回Future<T>
) -
invoke
同步,有返回结果 (会阻塞)
package com.bxwell.hj360.device.module.devicedatamanagement.service.impl;
import com.bxwell.dc.service.api.data.vo.*;
import com.bxwell.hj360.common.base.common.until.StringUtil;
import com.bxwell.hj360.common.model.device.BxWellDeviceFactor;
import org.springframework.util.ObjectUtils;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class TimeForkJoinPoolCalcRate {
class CalcTask extends RecursiveAction{
private final Integer MAX = 100;
private String deviceId;
String factorId;
private Map<String, BigDecimal> unitRateMap;
private Map<String, BxWellDeviceFactor> cunit;
private List<MinDataShowVO> listMinDataShowVO;
private List<HourDataShowVO> listHourDataShowVO;
private List<DayDataShowVO> listDayDataShowVO;
private BigDecimal rateBig;
private BigDecimal totalUnitIdRateBig;
public CalcTask(String deviceId,String factorId ,Map<String, BigDecimal> unitRateMap,Map<String, BxWellDeviceFactor> cunit,
List<MinDataShowVO> listMinDataShowVO, List<HourDataShowVO> listHourDataShowVO, List<DayDataShowVO> listDayDataShowVO,BigDecimal rateBig,BigDecimal totalUnitIdRateBig){
this.deviceId = deviceId;
this.factorId = factorId;
this.unitRateMap = unitRateMap;
this.cunit = cunit;
this.listMinDataShowVO = listMinDataShowVO;
this.listHourDataShowVO = listHourDataShowVO;
this.listDayDataShowVO = listDayDataShowVO;
this.rateBig = rateBig;
this.totalUnitIdRateBig = totalUnitIdRateBig;
}
@Override
protected void compute() {
if(listMinDataShowVO != null){
if(listMinDataShowVO.size() <= MAX){
handleMinData(listMinDataShowVO);
}else {
handleThreadNum(listMinDataShowVO);
}
}else if(listHourDataShowVO != null){
if(listHourDataShowVO.size() <= MAX){
handleHourData(listHourDataShowVO);
}else {
handleThreadNum(listHourDataShowVO);
}
}else if(listDayDataShowVO != null){
if(listDayDataShowVO.size() <= MAX){
handleDayData(listDayDataShowVO);
}else {
handleThreadNum(listDayDataShowVO);
}
}
}
private void handleThreadNum(List<?> list){
int res = list.size() <= 1000 ? 1 : list.size() <= 10000 ? 2 : list.size() <= 50000 ? 3 : list.size() <= 100000 ? 4 : 5;
switch (res){
case 1 :
CreateCalcTask(list,res + 5);
break;
case 2 :
CreateCalcTask(list,res + 10);
break;
case 3 :
CreateCalcTask(list,res + 15);
break;
case 4 :
CreateCalcTask(list,res + 20);
break;
case 5 :
CreateCalcTask(list,res + 30);
break;
}
}
private void handleMinData(List<MinDataShowVO> list){
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for(MinDataShowVO vo : list){
if(!StringUtil.isEmpty(vo.getEndTime())){
vo.setTimeStr(format.format(vo.getTime())+"-"+vo.getEndTime());
}
if(null != rateBig){
if(null != vo.getMin()){
vo.setMin(new BigDecimal(SampleForkJoinPoolCalcRate.handleRateNum(vo.getMin().multiply(rateBig))));
}
if(null != vo.getMax()){
vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax().multiply(rateBig))));
}
if(null != vo.getAvg()){
vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg().multiply(rateBig))));
}
}else{//如果转换率的两个单位一致 导致取不到转换率 也需要保留有效小数点
if(null != vo.getMin()){
vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin())));
}
if(null != vo.getMax()){
vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax())));
}
if(null != vo.getAvg()){
vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg())));
}
}
if(null != totalUnitIdRateBig && null != vo.getCou()){
vo.setCou(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getCou().multiply(totalUnitIdRateBig))));
}
if(null == totalUnitIdRateBig || null == vo.getCou()){
//前端需处理成斜杠
vo.setCou(null);
}
}
}
private void handleHourData(List<HourDataShowVO> list){
for(HourDataShowVO vo : list){
if(null != rateBig){
if(null != vo.getMin()){
vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin().multiply(rateBig))));
}
if(null != vo.getMax()){
vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax().multiply(rateBig))));
}
if(null != vo.getAvg()){
vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg().multiply(rateBig))));
}
}else{//如果转换率的两个单位一致 导致取不到转换率 也需要保留有效小数点
if(null != vo.getMin()){
vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin())));
}
if(null != vo.getMax()){
vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax())));
}
if(null != vo.getAvg()){
vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg())));
}
}
if(null != totalUnitIdRateBig){
if(null != vo.getCou()){
vo.setCou(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getCou().multiply(totalUnitIdRateBig))));
}
}else{
//前端需处理成斜杠
vo.setCou(null);
}
}
}
private void handleDayData(List<DayDataShowVO> list){
for(DayDataShowVO vo : list){
if(null != rateBig){
if(null != vo.getMin()){
vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin().multiply(rateBig))));
}
if(null != vo.getMax()){
vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax().multiply(rateBig))));
}
if(null != vo.getAvg()){
vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg().multiply(rateBig))));
}
}else{//如果转换率的两个单位一致 导致取不到转换率 也需要保留有效小数点
if(null != vo.getMin()){
vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin())));
}
if(null != vo.getMax()){
vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax())));
}
if(null != vo.getAvg()){
vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg())));
}
}
if(null != totalUnitIdRateBig){
if(null != vo.getCou()){
vo.setCou(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getCou().multiply(totalUnitIdRateBig))));
}
}else{
//前端需处理成斜杠
vo.setCou(null);
}
}
}
private void CreateCalcTask(List<?> list,int num){
int cou = 0;
if(list != null){
List<? extends List<?>> lists = rangeList(list, list.size() / (num - 1));
while (cou < num){
CalcTask calcTask = new CalcTask(deviceId,factorId, unitRateMap, cunit,
listMinDataShowVO != null ? (List<MinDataShowVO>)lists.get(cou) : null,
listHourDataShowVO != null ? (List<HourDataShowVO>)lists.get(cou) : null,
listDayDataShowVO != null ? (List<DayDataShowVO>)lists.get(cou) : null,
rateBig,totalUnitIdRateBig);
cou++;
calcTask.fork();
calcTask.join();
}
}
}
/**
* 平均分割集合 最后一个角标数据取余
* @param list 数据集合
* @param size []
* @param <T>
* @return
*/
private <T> List<List<T>> rangeList(List<T>list , int size){
if(ObjectUtils.isEmpty(list)){
return Collections.emptyList();
}
int block = (list.size() + size -1) / size;
return IntStream.range(0,block).
boxed().map(i->{
int start = i*size;
int end = Math.min(start + size,list.size());
return list.subList(start,end);
}).collect(Collectors.toList());
}
}
/**
* 返回保留几位小数
* @param str
* @return
*/
private static int countNum(String str){
int d = str.indexOf(".");
String str2 = str.substring(d+1,str.length());
String[] split = str2.split("");
int num = 2;
for (int a = 0;a < split.length;a++) {
if(!split[a].equals("0")){
if(a > 1){
num = a+2;
break;
}
}
}
return num;
}
/**
* 根据规则保留有效数字
* @param val
* @return
*/
public static String handleRateNum(BigDecimal val){
if(val == null){
return null;
}
String strVal = val.toPlainString();
BigDecimal vall = new BigDecimal(strVal);
BigDecimal bigdecimal = null;
//小于0.01跟-0.01需要保留有效位数两位
if(vall.compareTo(new BigDecimal(0.01)) == -1){
bigdecimal = vall.setScale(countNum(strVal),BigDecimal.ROUND_HALF_UP);
}else{
//大于0.01需要保留两位小数
bigdecimal = vall.setScale(2,BigDecimal.ROUND_HALF_UP);
}
//如果保留的两位小数是00的话直接取整
if(bigdecimal.intValue() > 0 && String.valueOf(bigdecimal).contains("00")){
return String.valueOf(bigdecimal.intValue());
}else{
return bigdecimal.toPlainString();
}
}
/**
*
* @param deviceId
* @param factorId
* @param unitRateMap 单位转换map
* @param cunit 自定义单位 基准单位map 累计值单位
* @param listMinDataShowVO
* @param listHourDataShowVO
* @param listDayDataShowVO
* @throws InterruptedException
*/
public static void CalcRate(String deviceId,String factorId, Map<String, BigDecimal> unitRateMap, Map<String, BxWellDeviceFactor> cunit,
List<MinDataShowVO> listMinDataShowVO, List<HourDataShowVO> listHourDataShowVO, List<DayDataShowVO> listDayDataShowVO) throws InterruptedException {
//自定义单位
BigDecimal rateBig = unitRateMap.get(cunit.get(deviceId +"_"+ factorId));
//自定义累计单位
String factorTotalUnitId = cunit.get(deviceId + "_" + factorId).getFactorTotalUnitId();
String totalUnitId = cunit.get(deviceId + "_" + factorId).getTotalUnitId();
BigDecimal totalUnitIdRateBig = unitRateMap.get(factorTotalUnitId + "_" + totalUnitId);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(new TimeForkJoinPoolCalcRate().new CalcTask(deviceId,factorId, unitRateMap, cunit,
listMinDataShowVO,listHourDataShowVO,listDayDataShowVO,
rateBig,totalUnitIdRateBig));
pool.shutdown();
}
}