一.需求分析
最近接到一个需求,导入十万级,甚至可能百万数据量的记录了车辆黑名单的Excel文件,借此机会分析下编码过程;
首先将这个需求拆解,发现有三个比较复杂的问题:
问题一:Excel文件导入后首先要被解析为存放对象的列表,数据量大的情况下可能会导致内存溢出,解析时间过长;
问题二:插入数据库的时候,数据量大,写入的时间长
问题三:要对数据库中的现有数据进项判断,不仅仅要做插入动作,还要将数据库的数据与导入的数据对比,判断是否做更新操作
其中:
问题一和问题三,可以看做同一类,因为主要涉及内存计算导致的性能问题,以及内存占用过大的溢出问题,
关于这两个问题,现在线上的机器基本上是4核8G的配置集群部署,内存并不是关键,我会在另一篇文章中给出我的方案,
今天主要针对问题二,写入的数据库的问题给出我的方案,
问题二主要是多次写入数据库的问题,显然,如果有几十万条数据,那么是不可能连续写几十万次的,不然要写到后年马月才能全部入库,
解决方案:
这里我主要采用了多线程的写入方式,十万条数据,2000条写一次(可以自己定义),用线程池提交多个线程任务同时写入,提高性能
二.代码环境
Springboot2.1.3+POI+PGSQL
controller层代码
@PostMapping("/upload")
public void upload1(MultipartFile file, @Validated UploadReq req) throws Exception {
//从数据库查询出现有的数据,根据去重的字段分组去构建成一个HashMap,通过containsKey()判断
//将需要更新的数据放到updateList中
List<User> updateList=new ArrayList<>(); //已取值的行数
int rowNum = 0;
//列号
int colNum = 0;
//真正有数据的行数
int realRowCount = 0;
//得到工作空间
Workbook workbook = null; try {
workbook = ExcelUtil.getWorkbookByInputStream(file.getInputStream(), file.getOriginalFilename());
} catch (IOException e) {
e.printStackTrace();
}
//得到工作表
int numberOfSheets = workbook.getNumberOfSheets();
for (int i = 0; i < numberOfSheets; i++) {
Sheet sheet = ExcelUtil.getSheetByWorkbook(workbook, i)
realRowCount = sheet.getPhysicalNumberOfRows();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<User> list = new ArrayList<>();
User user = null; for(Row row:sheet) {
if(realRowCount == rowNum) {
break;
}
//空行跳过
if(ExcelUtil.isBlankRow(row)) {
continue;
}
if(row.getRowNum() == -1) {
continue;
}else {
//第一行表头跳过
if(row.getRowNum() == 0) {
continue;
}
}
rowNum ++;
colNum = 1;
user = new User();
ExcelUtil.validCellValue(sheet, row, colNum, "id");
user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1)));
ExcelUtil.validCellValue(sheet, row, ++ colNum, "name");
user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1)));
//判断是否是已存在的数据,如果是就更新,不是就新增
//updateList.add(user);
list.add(user); } //新增的逻辑
userService.saveBatch(list);
System.out.println(list);
}
}
service层代码
@Service
public class UserServiceImpl implements IUserService { @Autowired
private UserMapper userMapper; @Override
public void saveBatch(List<User> list) throws Exception {
//一个线程处理200条数据
int count = 200;
//数据集合大小
int listSize = list.size();
//开启的线程数
int runSize = (listSize / count) + 1;
//存放每个线程的执行数据
List<User> newlist = null; //创建一个线程池,数量和开启线程的数量一样
//Executors 的写法
// ExecutorService executor = Executors.newFixedThreadPool(runSize); //ThreadPoolExecutor的写法
ThreadPoolExecutor executor = new ThreadPoolExecutor(runSize, runSize, 1,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
new ThreadPoolExecutor.DiscardOldestPolicy()); //创建两个个计数器
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(runSize);
//循环创建线程
for (int i = 0; i < runSize; i++) {
//计算每个线程执行的数据
if ((i + 1) == runSize) {
int startIndex = (i * count);
int endIndex = list.size();
newlist = list.subList(startIndex, endIndex);
} else {
int startIndex = (i * count);
int endIndex = (i + 1) * count;
newlist = list.subList(startIndex, endIndex);
}
//线程类
ImportThread mythead = new ImportThread(newlist, begin, end,userMapper);
//这里执行线程的方式是调用线程池里的executor.execute(mythead)方法。
executor.execute(mythead);
}
begin.countDown();
end.await();
//执行完关闭线程池
executor.shutdown();
}
线程类
public class ImportThread implements Runnable { public ImportThread() {
} UserMapper userMapper;
private List<User> list;
private CountDownLatch begin;
private CountDownLatch end; /**
* 方法名: ImportThread
* 方法描述: 创建个构造函数初始化 list,和其他用到的参数
* @throws
*/
public ImportThread(List<User> list, CountDownLatch begin, CountDownLatch end,UserMapper userMapper) {
this.list = list;
this.begin = begin;
this.end = end;
this.userMapper=userMapper;
} @Override
public void run() {
try {
//执行完让线程直接进入等待
userMapper.saveBatch(list);
begin.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//这里要主要了,当一个线程执行完 了计数要减一不然这个线程会被一直挂起
//这个方法就是直接把计数器减一的
end.countDown();
}
} }