业务中,要实现数据日终同步,采用将同步文件中的数据封装成List集合分批处理加多线程的方式,根据数据量动态设置线程数,同时控制最大并发数量(业务中有IO操作,避免过大并发导致堵塞),实现效率提高
//最大线程数控制 private static int MAX_THREADS= 5; //跑批分页大小 private static int EXPIRED_PAGE_SIZE = 30; private void dataHandler(List<SyncFileDto> list) { //处理数据数量 int listSize = list.size(); //线程数 int runSize; if (listSize % EXPIRED_PAGE_SIZE == 0) { runSize = (listSize / EXPIRED_PAGE_SIZE); } else { runSize = (listSize / EXPIRED_PAGE_SIZE) + 1; } ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(runSize); CountDownLatch countDownLatch = new CountDownLatch(runSize); //最大并发线程数控制 final Semaphore semaphore = new Semaphore(MAX_THREADS); List handleList = null; for (int i = 0; i < runSize; i++) { if ((i + 1) == runSize) { int startIndex = i * EXPIRED_PAGE_SIZE; int endIndex = list.size(); handleList = list.subList(startIndex, endIndex); } else { int startIndex = i * EXPIRED_PAGE_SIZE; int endIndex = (i + 1) * EXPIRED_PAGE_SIZE; handleList = list.subList(startIndex, endIndex); } SyncTask task = new SyncTask(handleList, countDownLatch, semaphore); executor.execute(task); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } executor.shutdown(); } class SyncTask implements Runnable { private List<SyncFileDto> list; private CountDownLatch countDownLatch; private Semaphore semaphore; public SyncSyncTask(List<SyncFileDto> list, CountDownLatch countDownLatch, Semaphore semaphore) { this.list = list; this.countDownLatch = countDownLatch; this.semaphore = semaphore; } @Override public void run() { if (!CollectionUtils.isEmpty(list)) { try { semaphore.acquire(); list.stream().forEach(fileDto -> { //业务处理 }); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } //线程任务完成 countDownLatch.countDown(); } }