异步排队导出csv文件

一、简介。

多人同时导出时进行排队等候,可以自定义排队人数,在大数据情况下可以分批处理数据、导出为异步,导出过程中可获取导出进度,导出完成之后调用下载接口。

可以自定义导出的列。

二、代码实现

1、创建队列,队列是使用的java的Queue

首先先创建队列的类

@Slf4j
@Component
public class Queue {

    /**
     * 缓存导出的队列。数量就是你最大几个请求排队
     */
    public static BlockingQueue<ColumnForm> queue = new ArrayBlockingQueue<ColumnForm>(5);
    /**
     * 导出的线程状态
     */
    private static boolean isRunning = false;
    /**
     * 超时时间间隔
     */
    public static long TIMEOUT_INTERVAL = 3 * 60 * 1000;


    public static boolean export(CmsNewsColumnForm cmsNewsColumnForm) {
        boolean rsp = false;
        try {
            //阻塞3秒
            rsp = queue.offer(cmsNewsColumnForm, 3, TimeUnit.SECONDS);

            if (!rsp) {
                throw new Exception(异常信息);
            }
            if (!isRunning) {
                startup(cmsNewsColumnForm);
                isRunning = true;
            }

        } catch (Exception e) {
            throw new Exception(异常信息);
        }
        return rsp;
    }

    public static void startup(CmsNewsColumnForm cmsNewsColumnForm) {
        if (isRunning) {
            return;
        }

        Thread th = new Thread(() -> {
            long timestamp = System.currentTimeMillis();
            //获取service
            Service service = SpringContextUtils.getBean(Service.class);
            while (isRunning) {
                long timeout = timestamp + TIMEOUT_INTERVAL;
                if (System.currentTimeMillis() > timeout) {
                    //空跑一段时间(3分钟)后线程退出
                    break;
                }
                try {
                    if (queue.size() == 0) {
                        Thread.sleep(1000);
                        continue;
                    }
                    //这里是你导出业务的方法
                    Service.export(columnForm);
                    queue.poll();
                } catch (Exception e) {
                    log.error("导出出错", e);
                }
            }
            isRunning = false;
            log.debug("导出线程停止。");
        });;
        th.start();
    }
}

2、编写实现类service。

 @Autowired
 private ExportThread exportThread;

 public void export(ColumnForm ColumnForm) {
        List<Future> futures = new ArrayList<>();
         //定义文件存放的地址
        File file = FileUtil.mkdir(System.getProperty("user.dir") + File.separator + "upload" + File.separator + ColumnForm.getUserId());
        String filePath = file.getPath() + File.separator;
        
        //这里是你预先设置可能会需要导出的列
        LinkedHashMap<String, String> linkedHashMap = ExportDict.newsMap;
        //把map变成线程安全
        Map<String, String> newsMap = Collections.synchronizedMap(new LinkedHashMap<>());
       
        //需要导出的列,前端传过来的列是驼峰的,这里我们做一个转换,把他转为下划线,方便数据库查询使用。
        Map<String, String> map = new HashMap<>();

        ColumnForm.getColumn().forEach(news -> {
            //驼峰转下划线命名
            String chang = StringUtils.toUnderScoreCase(news);
            map.put(news, chang);
            //构建excel表头
            linkedHashMap.forEach((k, v) -> {
                if (k.equals(news)) {
                    newsMap.put(k, v);
                }
            });
        });
        把转换好的列放进map里面
        ColumnForm.setChang(map);

        //下面是分批次导出

        //查询总共需要导出的总数
        int totalCount = this.count();
        //单个excel最大行数
        int pageSize = CommonConstants.POINTS_DATA_LIMIT;
        //总页数
        int pageCount = (totalCount + pageSize - 1) / pageSize;
        int pageBegin = 0;

        //开始和结束条数
        int pageEnd = pageSize;
         
        int percentage = pageCount;

        //把开始的坐标放入redis,用于获取导出进度使用。这里我用userId作为每个人进度的唯一标识,避免进度串。
        redisTemplate.opsForValue().set(redisKeyConfig.getKey() + CommonRedisKey.EXPORT_NEWS_COUNT + cmsNewsColumnForm.getUserId(), Double.valueOf(pageCount));

        //这里把总页数放进redis,用于后面计算百分比时使用。
        redisTemplate.opsForValue().set(redisKeyConfig.getKey() + CommonRedisKey.PERCENTAGE + cmsNewsColumnForm.getUserId(), percentage);

        //线程池
        ThreadPoolTaskExecutor threadPoolTaskExecutor = threadPoolConfig.threadPoolTaskExecutor();

        //分批多线程查询数据库数据并且写入到本地路径
        for (int i = 0; i < pageCount; i++) {
            final int begin = pageBegin;
            //设置文件名称(name+i下标防止文件名重复)
            final String fileUrl = filePath + "导出信息" + (i + 1) + ".csv";
 
            //开启线程
            Future<?> submit = threadPoolTaskExecutor.submit(new Runnable() {
                @Override
                public void run() {
             
                List<exportVO> exportVOS = Collections.synchronizedList(new ArrayList<>());
                exportVOS =Mapper.export(cmsNewsColumnForm, begin, pageEnd);
         
      
                exportThread.export(exportVOS, newsMap, fileUrl);

                //每次完成之后递减次数,用于计算进度。
                redisTemplate.getConnectionFactory().getConnection().decr(                            redisTemplate.getKeySerializer().serialize(redisKeyConfig.getKey() + "percentage" + cmsNewsColumnForm.getUserId())
                    );
                }
            });
            //返回的结果集存储
            futures.add(submit);

            //分批的页数
            pageBegin = pageEnd + pageBegin;
        }

        //这里必须要阻塞,否则的话当前主线程不会等待所有子线程执行完后在执行,会直接完成,导致获取不到进度。
        futures.forEach(f -> {
            try {
                f.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
                throw new ServiceException(ResponseCode.THREAD_INTERRUPTION_ERROR.msg(), ResponseCode.THREAD_INTERRUPTION_ERROR.code());
            } catch (ExecutionException e) {
                e.printStackTrace();
                throw new ServiceException();
            }
        });
    }

这是sql、这里使用的是mybatis-plus

    <select id="export" resultType="com.puyiyun.cms.entity.vo.ExportVO">
        SELECT
        <foreach collection="ns.chang.entrySet()" index="key" item="value" open="" separator="," close="">
         ${value} as ${key}
        </foreach>
        FROM users
        LIMIT ${pageBegin},${pageEnd}
    </select>

3、写入的工具类


@Component
public class ExportThread {

    /**
     * 新闻导出异步线程
     *
     * @param newsMap           (需要导出的列)
     * @param fileUrl
     */
    public <T> String exportNews(List<T> data, Map<String, String> newsMap, String fileUrl) {
        try {
            ExcelWriter writer = new ExcelWriter();
            //创建一个不重名文件
            writer = ExcelUtil.getWriter(fileUrl);
            //设置列名
            for (Map.Entry<String, String> entry : newsMap.entrySet()) {
                writer.addHeaderAlias(entry.getKey(), entry.getValue());
            }
            //设置是否只显示设置了别名的字段
            writer.setOnlyAlias(true);
            writer.write(data, true);
            writer.close();
            return fileUrl;
        } catch (Exception e) {
            e.printStackTrace();
            throw new ServiceException("导出异常");
        }
    }
}

4、预设置的列。

/**
 * 导出对应字典类
 */
public class ExportDict {
    public static final LinkedHashMap<String, String> newsMap = new LinkedHashMap<>();

    static{
        newsMap.put("userId","用户id");
        newsMap.put("name","用户名称");
        newsMap.put("address","地址");
   
    }
}

5、获取导出进度的方法(这里是每请求一次获取一次最新进度,需要轮询调用,这里不友好,建议改成用socket推送的方式推送给前端)

    public String exportSchedule() {
        //上下文获取userId
        Long userId = SecurityUtils.getUserId();
        //返回保留两位小数
        NumberFormat nf = new DecimalFormat("0.00 ");
        String outcome = "";
        Double onePercentLimit = 0.0;
        //总次数
        Object exportNewsCount = redisTemplate.opsForValue().get(redisKeyConfig.getKey() + CommonRedisKey.EXPORT_NEWS_COUNT + userId);
        //当前第几次
        Object percentage = redisTemplate.opsForValue().get(redisKeyConfig.getKey() + CommonRedisKey.PERCENTAGE + userId);
        if (ObjectUtil.isNotNull(exportCount) && ObjectUtil.isNotNull(percentage)) {
            int perChang = (Integer) percentage;
            double arraySize = (Double) exportNewsCount;
            double Count = Double.valueOf(perChang);
            //计算进度百分比
            onePercentLimit = (1.00 - Count / arraySize) * 100.00;
            //保留两位小数输出
            outcome = nf.format(onePercentLimit);
        }
        return outcome;
    }

6、导出完成后点击下载

   public void downloadNews(HttpServletResponse response) {
        try {
            Long userId = SecurityUtils.getUserId();
            File file = FileUtil.mkdir(System.getProperty("user.dir") + File.separator + "upload" + File.separator + userId);
            String filePath = file.getPath() + File.separator;
            File[] fileUrl = FileUtil.ls(filePath);
            response.setContentType("application/zip");
            response.reset();
            response.setCharacterEncoding("utf-8");
            String fileNameCode = URLEncoder.encode("名称", "UTF-8");
            OutputStream outputStream = response.getOutputStream();
            //文件路径集合
            List<File> fileList = new ArrayList<>();
            //多个文件打成压缩包
            if (!ArrayUtil.isEmpty(fileUrl)) {
                if (fileUrl.length > 1) {
                    response.setHeader("Content-disposition", "attachment;filename=" + fileNameCode + ".zip");
                    for (int i = 0; i < fileUrl.length; i++) {
                        String path = fileUrl[i].getPath();
                        fileList.add(new File(path));
                    }
                    ZipUtils.toZip(fileList, outputStream);
                } else {
                    response.setHeader("Content-disposition", "attachment;filename=" + fileNameCode + ".csv");
                    //单个文件不打包
                    FileInputStream fileInputStream = new FileInputStream(fileUrl[0].getPath());
                    byte[] buff = new byte[1024];
                    int i;
                    while ((i = fileInputStream.read(buff)) != -1) {
                        outputStream.write(buff, 0, i);
                        outputStream.flush();
                    }
                    fileInputStream.close();
                }

                //完成后删除进度
                redisTemplate.delete(redisKeyConfig.getKey() + CommonRedisKey.EXPORT_NEWS_COUNT + userId);
                redisTemplate.delete(redisKeyConfig.getKey() + CommonRedisKey.PERCENTAGE + userId);

                //完成后删除文件
                String parent = fileUrl[0].getParent();
                FileUtil.del(parent);
            }
        } catch (IOException e) {
            log.error(e.getMessage());
            throw new ServiceException("导出失败,请联系管理员", ResponseCode.UNKNOWN.code());
        }
    }

压缩zip的工具类

public class ZipUtils {

    public static void compressionFile(String path, String zipFilePath) {
        File file = new File(path);
        if (file == null || !file.exists() || !file.isDirectory()) {
            return;
        }
        File zipFile = new File(zipFilePath);
        File[] srcFile = file.listFiles();
        byte[] buffer = new byte[1024];
        try {
            ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile));
            for (int i = 0; i < srcFile.length; i++) {
                FileInputStream fileInputStream = new FileInputStream(srcFile[i]);
                out.putNextEntry(new ZipEntry(srcFile[i].getName()));
                int length;
                while ((length = fileInputStream.read(buffer)) > 0) {
                    out.write(buffer, 0, length);
                }
                out.closeEntry();
                fileInputStream.close();
            }
            out.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static final int BUFFER_SIZE = 2 * 1024;

    /**
     * 压缩成ZIP 方法1
     *
     * @param srcDir           压缩文件夹路径
     * @param out              压缩文件输出流
     * @param KeepDirStructure 是否保留原来的目录结构,true:保留目录结构;
     *                         false:所有文件跑到压缩包根目录下(注意:不保留目录结构可能会出现同名文件,会压缩失败)
     * @throws RuntimeException 压缩失败会抛出运行时异常
     */
    public static void toZip(String srcDir, OutputStream out, boolean KeepDirStructure)
            throws RuntimeException {

        long start = System.currentTimeMillis();
        ZipOutputStream zos = null;
        try {
            zos = new ZipOutputStream(out);
            File sourceFile = new File(srcDir);
            compress(sourceFile, zos, sourceFile.getName(), KeepDirStructure);
            long end = System.currentTimeMillis();
            System.out.println("压缩完成,耗时:" + (end - start) + " ms");
        } catch (Exception e) {
            throw new RuntimeException("zip error from ZipUtils", e);
        } finally {
            if (zos != null) {
                try {
                    zos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    /**
     * 压缩成ZIP 方法2
     *
     * @param srcFiles 需要压缩的文件列表
     * @param out      压缩文件输出流
     * @throws RuntimeException 压缩失败会抛出运行时异常
     */
    public static void toZip(List<File> srcFiles, OutputStream out) throws RuntimeException {
        long start = System.currentTimeMillis();
        ZipOutputStream zos = null;
        try {
            zos = new ZipOutputStream(out);
            for (File srcFile : srcFiles) {
                byte[] buf = new byte[BUFFER_SIZE];
                zos.putNextEntry(new ZipEntry(srcFile.getName()));
                int len;
                FileInputStream in = new FileInputStream(srcFile);
                while ((len = in.read(buf)) != -1) {
                    zos.write(buf, 0, len);
                }
                zos.closeEntry();
                in.close();
            }
            long end = System.currentTimeMillis();
            System.out.println("压缩完成,耗时:" + (end - start) + " ms");
        } catch (Exception e) {
            throw new RuntimeException("zip error from ZipUtils", e);
        } finally {
            if (zos != null) {
                try {
                    zos.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    /**
     * 递归压缩方法
     *
     * @param sourceFile       源文件
     * @param zos              zip输出流
     * @param name             压缩后的名称
     * @param KeepDirStructure 是否保留原来的目录结构,true:保留目录结构;
     *                         false:所有文件跑到压缩包根目录下(注意:不保留目录结构可能会出现同名文件,会压缩失败)
     * @throws Exception
     */
    private static void compress(File sourceFile, ZipOutputStream zos, String name,
                                 boolean KeepDirStructure) throws Exception {
        byte[] buf = new byte[BUFFER_SIZE];
        if (sourceFile.isFile()) {
            // 向zip输出流中添加一个zip实体,构造器中name为zip实体的文件的名字
            zos.putNextEntry(new ZipEntry(name));
            // copy文件到zip输出流中
            int len;
            FileInputStream in = new FileInputStream(sourceFile);
            while ((len = in.read(buf)) != -1) {
                zos.write(buf, 0, len);
            }
            // Complete the entry
            zos.closeEntry();
            in.close();
        } else {
            File[] listFiles = sourceFile.listFiles();
            if (listFiles == null || listFiles.length == 0) {
                // 需要保留原来的文件结构时,需要对空文件夹进行处理
                if (KeepDirStructure) {
                    // 空文件夹的处理
                    zos.putNextEntry(new ZipEntry(name + "/"));
                    // 没有文件,不需要文件的copy
                    zos.closeEntry();
                }

            } else {
                for (File file : listFiles) {
                    // 判断是否需要保留原来的文件结构
                    if (KeepDirStructure) {
                        // 注意:file.getName()前面需要带上父文件夹的名字加一斜杠,
                        // 不然最后压缩包中就不能保留原来的文件结构,即:所有文件都跑到压缩包根目录下了
                        compress(file, zos, name + "/" + file.getName(), KeepDirStructure);
                    } else {
                        compress(file, zos, file.getName(), KeepDirStructure);
                    }

                }
            }
        }
    }



三、总结

总的步骤来说就是

1、创建队列

2、导出(导出是先导出到本地磁盘)

3、获取导出进度

4、导出完成后进行下载,下载完成后把进度和文件全部删除

这是第一次写博客,写的不好的地方大家见谅。

上一篇:zip工具类(JAVA)


下一篇:以流的方式进行压缩文件