Forkjoin架构 归并算法

ForkJoin

ForkJoin是由JDK1.7后提供多线并发处理框架,ForkJoin的框架的基本思想是分而治之。使用ForkJoin将相同的计算任务通过多线程的进行执行。从而能提高数据的计算速度。

分而治之

分而治之就是将一个复杂的计算,按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总。相应的ForkJoin将复杂的计算当做一个任务。而分解的多个计算则是当做一个子任务。

使用

使用ForkJoin框架,需要创建一个ForkJoin的任务。因为ForkJoin框架为我们提供了RecursiveAction和RecursiveTask。

我们只需要继承ForkJoin为我们提供的抽象类的其中一个并且实现compute方法。

RecursiveTask在进行exec之后会使用一个result的变量进行接受返回的结果。而RecursiveAction在exec后是不会保存返回结果。

任务分割

ForkJoinTask : 基本任务,使用forkjoin框架必须创建的对象,提供fork,join操作,常用的两个子类

  • RecursiveAction : 无结果返回的任务
  • RecursiveTask : 有返回结果的任务

说明:

  1. fork : 让task异步执行
  2. join : 让task同步执行,可以获取返回值
  3. ForkJoinTask 在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行

结果合并

ForkJoinPool 执行 ForkJoinTask

  • 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。
  • 当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务

三中提交方式:

  1. execute 异步,无返回结果
  2. submit 异步,有返回结果 (返回Future<T>
  3. invoke 同步,有返回结果 (会阻塞)
package com.bxwell.hj360.device.module.devicedatamanagement.service.impl;

import com.bxwell.dc.service.api.data.vo.*;
import com.bxwell.hj360.common.base.common.until.StringUtil;
import com.bxwell.hj360.common.model.device.BxWellDeviceFactor;
import org.springframework.util.ObjectUtils;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class TimeForkJoinPoolCalcRate {


    class CalcTask extends RecursiveAction{


        private final Integer MAX = 100;

        private String deviceId;

        String factorId;

        private Map<String, BigDecimal> unitRateMap;

        private Map<String, BxWellDeviceFactor> cunit;

        private List<MinDataShowVO> listMinDataShowVO;

        private List<HourDataShowVO> listHourDataShowVO;

        private List<DayDataShowVO> listDayDataShowVO;

        private BigDecimal rateBig;

        private BigDecimal totalUnitIdRateBig;

        public CalcTask(String deviceId,String factorId ,Map<String, BigDecimal> unitRateMap,Map<String, BxWellDeviceFactor> cunit,
                        List<MinDataShowVO> listMinDataShowVO, List<HourDataShowVO> listHourDataShowVO, List<DayDataShowVO> listDayDataShowVO,BigDecimal rateBig,BigDecimal totalUnitIdRateBig){

            this.deviceId = deviceId;

            this.factorId = factorId;

            this.unitRateMap = unitRateMap;

            this.cunit = cunit;

            this.listMinDataShowVO = listMinDataShowVO;

            this.listHourDataShowVO = listHourDataShowVO;

            this.listDayDataShowVO = listDayDataShowVO;

            this.rateBig = rateBig;

            this.totalUnitIdRateBig = totalUnitIdRateBig;

        }


        @Override
        protected void compute() {

            if(listMinDataShowVO != null){
                if(listMinDataShowVO.size() <= MAX){
                    handleMinData(listMinDataShowVO);
                }else {
                    handleThreadNum(listMinDataShowVO);
                }
            }else if(listHourDataShowVO != null){
                if(listHourDataShowVO.size() <= MAX){
                    handleHourData(listHourDataShowVO);
                }else {
                    handleThreadNum(listHourDataShowVO);
                }
            }else if(listDayDataShowVO != null){
                if(listDayDataShowVO.size() <= MAX){
                    handleDayData(listDayDataShowVO);
                }else {
                    handleThreadNum(listDayDataShowVO);
                }
            }
        }

        private void handleThreadNum(List<?> list){
            int res = list.size() <= 1000 ? 1 : list.size() <= 10000 ? 2 : list.size() <= 50000 ? 3 : list.size() <= 100000 ? 4 : 5;
            switch (res){
                case 1 :
                    CreateCalcTask(list,res + 5);
                    break;
                case 2 :
                    CreateCalcTask(list,res + 10);
                    break;
                case 3 :
                    CreateCalcTask(list,res + 15);
                    break;
                case 4 :
                    CreateCalcTask(list,res + 20);
                    break;
                case 5 :
                    CreateCalcTask(list,res + 30);
                    break;
            }

        }

        private void handleMinData(List<MinDataShowVO> list){

            SimpleDateFormat format =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            for(MinDataShowVO vo : list){
                if(!StringUtil.isEmpty(vo.getEndTime())){
                    vo.setTimeStr(format.format(vo.getTime())+"-"+vo.getEndTime());
                }
                if(null != rateBig){
                    if(null != vo.getMin()){
                        vo.setMin(new BigDecimal(SampleForkJoinPoolCalcRate.handleRateNum(vo.getMin().multiply(rateBig))));
                    }
                    if(null != vo.getMax()){
                        vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax().multiply(rateBig))));
                    }
                    if(null != vo.getAvg()){
                        vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg().multiply(rateBig))));
                    }
                }else{//如果转换率的两个单位一致 导致取不到转换率 也需要保留有效小数点
                    if(null != vo.getMin()){
                        vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin())));
                    }
                    if(null != vo.getMax()){
                        vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax())));
                    }
                    if(null != vo.getAvg()){
                        vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg())));
                    }
                }
                if(null != totalUnitIdRateBig && null != vo.getCou()){
                    vo.setCou(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getCou().multiply(totalUnitIdRateBig))));
                }
                if(null == totalUnitIdRateBig || null == vo.getCou()){
                    //前端需处理成斜杠
                    vo.setCou(null);
                }
            }
        }

        private void handleHourData(List<HourDataShowVO> list){

            for(HourDataShowVO vo : list){
                if(null != rateBig){
                    if(null != vo.getMin()){
                        vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin().multiply(rateBig))));
                    }
                    if(null != vo.getMax()){
                        vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax().multiply(rateBig))));
                    }
                    if(null != vo.getAvg()){
                        vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg().multiply(rateBig))));
                    }
                }else{//如果转换率的两个单位一致 导致取不到转换率 也需要保留有效小数点
                    if(null != vo.getMin()){
                        vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin())));
                    }
                    if(null != vo.getMax()){
                        vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax())));
                    }
                    if(null != vo.getAvg()){
                        vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg())));
                    }
                }
                if(null != totalUnitIdRateBig){
                    if(null != vo.getCou()){
                        vo.setCou(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getCou().multiply(totalUnitIdRateBig))));
                    }
                }else{
                    //前端需处理成斜杠
                    vo.setCou(null);
                }
            }
        }

        private void handleDayData(List<DayDataShowVO> list){

            for(DayDataShowVO vo : list){
                if(null != rateBig){
                    if(null != vo.getMin()){
                        vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin().multiply(rateBig))));
                    }
                    if(null != vo.getMax()){
                        vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax().multiply(rateBig))));
                    }
                    if(null != vo.getAvg()){
                        vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg().multiply(rateBig))));
                    }
                }else{//如果转换率的两个单位一致 导致取不到转换率 也需要保留有效小数点
                    if(null != vo.getMin()){
                        vo.setMin(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMin())));
                    }
                    if(null != vo.getMax()){
                        vo.setMax(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getMax())));
                    }
                    if(null != vo.getAvg()){
                        vo.setAvg(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getAvg())));
                    }
                }
                if(null != totalUnitIdRateBig){
                    if(null != vo.getCou()){
                        vo.setCou(new BigDecimal(TimeForkJoinPoolCalcRate.handleRateNum(vo.getCou().multiply(totalUnitIdRateBig))));
                    }
                }else{
                    //前端需处理成斜杠
                    vo.setCou(null);
                }
            }
        }

        private void CreateCalcTask(List<?> list,int num){
            int cou = 0;
            if(list != null){
                List<? extends List<?>> lists = rangeList(list, list.size() / (num - 1));
                while (cou < num){
                    CalcTask calcTask = new CalcTask(deviceId,factorId, unitRateMap, cunit,
                            listMinDataShowVO != null ? (List<MinDataShowVO>)lists.get(cou) : null,
                            listHourDataShowVO != null ? (List<HourDataShowVO>)lists.get(cou) : null,
                            listDayDataShowVO != null ? (List<DayDataShowVO>)lists.get(cou) : null,
                            rateBig,totalUnitIdRateBig);
                    cou++;
                    calcTask.fork();
                    calcTask.join();
                }
            }
        }

        /**
         * 平均分割集合 最后一个角标数据取余
         * @param list 数据集合
         * @param size  []
         * @param <T>
         * @return
         */
        private <T> List<List<T>> rangeList(List<T>list , int size){
            if(ObjectUtils.isEmpty(list)){
                return Collections.emptyList();
            }
            int block = (list.size() + size -1) / size;
            return IntStream.range(0,block).
                    boxed().map(i->{
                int start = i*size;
                int end = Math.min(start + size,list.size());
                return list.subList(start,end);
            }).collect(Collectors.toList());
        }

    }

    /**
     *  返回保留几位小数
     * @param str
     * @return
     */
    private static int countNum(String str){
        int d = str.indexOf(".");
        String str2 = str.substring(d+1,str.length());
        String[] split = str2.split("");
        int num = 2;
        for (int a = 0;a < split.length;a++) {
            if(!split[a].equals("0")){
                if(a > 1){
                    num = a+2;
                    break;
                }
            }
        }
        return num;
    }

    /**
     * 根据规则保留有效数字
     * @param val
     * @return
     */
    public static String handleRateNum(BigDecimal val){
        if(val == null){
            return null;
        }
        String strVal = val.toPlainString();

        BigDecimal vall = new BigDecimal(strVal);

        BigDecimal bigdecimal = null;

        //小于0.01跟-0.01需要保留有效位数两位
        if(vall.compareTo(new BigDecimal(0.01)) == -1){
            bigdecimal = vall.setScale(countNum(strVal),BigDecimal.ROUND_HALF_UP);
        }else{
            //大于0.01需要保留两位小数
            bigdecimal = vall.setScale(2,BigDecimal.ROUND_HALF_UP);
        }
        //如果保留的两位小数是00的话直接取整
        if(bigdecimal.intValue() > 0 && String.valueOf(bigdecimal).contains("00")){
            return String.valueOf(bigdecimal.intValue());
        }else{
            return bigdecimal.toPlainString();
        }
    }

    /**
     *
     * @param deviceId
     * @param factorId
     * @param unitRateMap 单位转换map
     * @param cunit 自定义单位 基准单位map 累计值单位
     * @param listMinDataShowVO
     * @param listHourDataShowVO
     * @param listDayDataShowVO
     * @throws InterruptedException
     */
    public static void CalcRate(String deviceId,String factorId, Map<String, BigDecimal> unitRateMap, Map<String, BxWellDeviceFactor> cunit,
                                List<MinDataShowVO> listMinDataShowVO, List<HourDataShowVO> listHourDataShowVO, List<DayDataShowVO> listDayDataShowVO) throws InterruptedException {

        //自定义单位
        BigDecimal rateBig = unitRateMap.get(cunit.get(deviceId +"_"+ factorId));
        //自定义累计单位
        String factorTotalUnitId = cunit.get(deviceId + "_" + factorId).getFactorTotalUnitId();
        String totalUnitId = cunit.get(deviceId + "_" + factorId).getTotalUnitId();
        BigDecimal totalUnitIdRateBig = unitRateMap.get(factorTotalUnitId + "_" + totalUnitId);

        ForkJoinPool pool = new ForkJoinPool();

        pool.invoke(new TimeForkJoinPoolCalcRate().new CalcTask(deviceId,factorId, unitRateMap, cunit,
                listMinDataShowVO,listHourDataShowVO,listDayDataShowVO,
                rateBig,totalUnitIdRateBig));
        pool.shutdown();
    }

}

上一篇:java 常用类BigDecimal


下一篇:关于java生成科学计数法的数字