CompletableFutures多线程阻塞获取结果

voidCompletableFuture.get();
voidCompletableFuture.join();

一样会阻塞当前线程,直到所有子任务都完成一起打印结果

package com.async;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author Allen
 * @version 1.0
 * @date 2021/8/12 20:23
 * @功能说明
 */
public class AsyncTest {
    public static void main(String[] args) {
        System.out.println();
        long l = System.currentTimeMillis();
//        任务列表
        List<Integer> jobList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
        int nThreads = 3;
//        工作线程池
        ExecutorService executorService = new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
//        接收每个任务的执行结果
        List<CompletableFuture<String>> completableFutures = new ArrayList<>(jobList.size());

//        任务分派
        for (int i = 0; i < jobList.size(); i++) {

            try {
                Integer jobId = jobList.get(i);

                CompletableFuture<String> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
//                    睡几秒
                    return jobId + "任务执行结果";
                }, executorService);
//            保存本次任务执行结果
                completableFutures.add(integerCompletableFuture);
            } catch (Exception e) {
            }

        }


//        等待全部任务执行完毕
        CompletableFuture[] completableFutures1 = new CompletableFuture[completableFutures.size()];
        CompletableFuture<Void> voidCompletableFuture =
                CompletableFuture.allOf(completableFutures.toArray(completableFutures1));
        try {
            
            voidCompletableFuture.get();
            //voidCompletableFuture.join();
            completableFutures.stream()
                    .forEach(tmp -> {
                        try {
                            System.out.println(tmp.get());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    });
        } catch (Exception e) {
            e.printStackTrace();
        }

        executorService.shutdown();
        System.out.println(System.currentTimeMillis()-l);
    }
}

异步多线程配置

这里面可以添加@Bean注解返回的多线程 默认是使用getAsyncExecutor方法内的线程池

@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("Anno-Executor"); //指定用于新创建的线程名称的前缀。
        executor.setCorePoolSize(10); //核心线程数
        executor.setMaxPoolSize(20);  //最大线程数
        executor.setQueueCapacity(1000); //队列大小
        executor.setKeepAliveSeconds(300); //线程最大空闲时间
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(一共四种,此处省略)

        executor.initialize();
        return executor;
    }

    // 异常处理器:当然你也可以自定义的,这里我就这么简单写了~~~
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

比如如下 如果需要指定对应其他的线程池@Async("taskExecutor") +异常处理方法

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {
private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);
 
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
    log.debug("Creating Async Task Executor");
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10); //核心线程数
    executor.setMaxPoolSize(20);  //最大线程数
    executor.setQueueCapacity(1000); //队列大小
    executor.setKeepAliveSeconds(300); //线程最大空闲时间 
    executor.setThreadNamePrefix("ics-Executor-"); 指定用于新创建的线程名称的前缀。
    executor.setRejectedExecutionHandler(
	new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
    return new ExceptionHandlingAsyncTaskExecutor(executor);
}
 
@Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncExceptionHandler();
    }
 
    class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
 
        @Override
        public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
            log.info("Exception message - " + throwable.getMessage());
            log.info("Method name - " + method.getName());
            for (Object param : objects) {
                log.info("Parameter value - " + param);
            }
        }
    }
}

开发代码例子


/**
 * 每天晚上10点持行一次
 */
@RequestMapping("/getUrlsByDate")
public void getUrlsByDate(String date) {
    log.info("指定日期获取文件");
    String toDay = date;
    File file = new File(fileAddressPrefix + toDay);
    log.info("文件转换:" + (fileAddressPrefix + toDay));
    File[] tempList = file.listFiles();
    if (tempList != null) {
        log.info("文件转换文件个数:" + (tempList.length));
        int onece = 4;
        int time = tempList.length % onece == 0 ? tempList.length / onece : tempList.length / onece + 1;
        final CountDownLatch countDownLatch = new CountDownLatch(time);
        ArrayList<Future> futures = new ArrayList<>();
        //Semaphore semaphore = new Semaphore(5);
        for (int taskTime = 0; taskTime < time; taskTime++) {
            int start = taskTime * onece;
            int end = start + onece<tempList.length?start + onece:tempList.length;
            threadPoolTaskExecutor.submit(() -> {
                try {
                    Future<String> stringFuture = addFileBatchService.insertBatch(tempList, start,end, toDay);
                    futures.add(stringFuture);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        String s="";
        try {
            countDownLatch.await();
            System.out.println("----------------全部完成");
            for (Future future : futures) {
                //log.info(toDay + "执行结束"+future.get());
                s+=future.get();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(s);
    }
}

countDownLatch没有用 调用异步方法直接走到finally

可以改用这样写法 指定对应的线程池 再加阻塞获取全部结果的

voidCompletableFuture.get();/ voidCompletableFuture.join();

        CompletableFuture<String> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //                    睡几秒
                return jobId + "任务执行结果";
            }, executorService);
@Override
@Async
@Transactional(rollbackFor = Exception.class)//事务
public CompletableFuture<String> insertBatch(File[] tempList, int start, int end, String toDay) throws Exception {
    Date now = new Date();
    long startTime = System.currentTimeMillis();
    ArrayList<TRuVoiceFile> tRuVoiceFiles = new ArrayList<>();
    //文件拷贝
    for (int i = start; i < end; i++) {
        if (tempList[i].isFile()) {
            //把当前文件转化成8bit备份到新文件夹
            try{
                transFile(tempList[i], fileAddressPrefix + "trans" + File.separator + toDay + File.separator + tempList[i].getName().replace("wav", "mp3"));
                TRuVoiceFile entity = new TRuVoiceFile();
                //sid,file_name,url,create_time,update_time,version,status
                entity.setSid(SnowFlakeUtil.getId());
                //通过fileName访问的是转换后文件地址目录 /opt/ucc/data/imageserver/voiceFile/ trans/ yyyy-MM-dd / fileName.mp3
                entity.setFileName(tempList[i].getName().replace("wav", "mp3"));
                entity.setUrl(httpFileAddress + toDay + File.separator + tempList[i].getName());
                entity.setCreateTime(now);
                entity.setUpdateTime(now);
                entity.setVersion(1);
                entity.setStatus("initial");
                tRuVoiceFiles.add(entity);
            }catch (FileAlreadyExistsException e){
                continue;
            }

        }
    }
    //批量插入
    tRuVoiceFileService.addUpateBatch(tRuVoiceFiles, now);
    long endTime = System.currentTimeMillis();
    return CompletableFuture.completedFuture( "完成任务"+Thread.currentThread()+"用时:"+String.valueOf(endTime-startTime)+"ms");

}

on duplicate key update mybatis写法设置的file_name为唯一索引,如果file_name有冲突则更新

version = version+1,指定是让数据库的值+1 如果写成 version = value(version)+1,则是insert中的值+1

<insert id="insertBatch" parameterType="java.util.List">
        insert into t_ru_voice_file
        (sid,file_name,url,create_time,version,status)
        values
        <foreach collection="list" item="item" separator=",">
           (#{item.sid},#{item.fileName},#{item.url},#{item.createTime},#{item.version},#{item.status})
        </foreach>
        on duplicate key update
        version = version+1,
        update_time = now()
    </insert>
上一篇:把之前CompletableFuture留下的坑给填上。


下一篇:07 异步回调