单条数据的插入
@Test
public void insertOne(){
ProfilerUtil.start("insertOne");
BlogPO blog = this.init();
blogMapper.insert(blog);
ProfilerUtil.end("insertOne");
}
使用时间:
【insertOne】 use time:【590】ms
循环插入10000条
@Test
public void insertLoop(){
ProfilerUtil.start("insertLoop");
for(int i = 0; i< 1000; i++){
BlogPO blog = this.init();
blogMapper.insert(blog);
}
ProfilerUtil.end("insertLoop");
}
使用时间:
【insertLoop】 use time:【44939】ms
10000条批量插入
@Test
public void batchInsert(){
ProfilerUtil.start("batchInsert");
List<BlogPO> lists = new ArrayList<>();
for(int i = 0; i< 1000; i++){
BlogPO blog = this.init();
lists.add(blog);
}
blogService.saveBatch(lists);
ProfilerUtil.end("batchInsert");
}
使用时间:
【batchInsert】 use time:【4279】ms
使用多线程异步插入10000数据(每次插入2500,开启4个线程)
@Test
public void batchInsertExecutor(){
ProfilerUtil.start("batchInsertExecutor");
List<BlogPO> lists = new ArrayList<>();
for(int i = 0; i< 10000; i++){
BlogPO blog = this.init();
lists.add(blog);
}
saveBatch(lists);
ProfilerUtil.end("batchInsertExecutor");
}
/**
* 批量保存
* @param lists
*/
private void saveBatch(List<BlogPO> lists){
if (CollectionUtils.isEmpty(lists)) {
return;
}
int size = lists.size();
int batchSize = 2500;
int batchCount = size % batchSize == 0 ? size / batchSize : size / batchSize + 1;
AtomicInteger commitCount = new AtomicInteger();
AtomicBoolean rollBack = new AtomicBoolean();
CountDownLatch countDownLatch = new CountDownLatch(1);
List<BlogPO> blogList = new ArrayList<>(batchSize);
int scStockRealPOSSize = 0;
//async to db operation
for (int i = 0; i < size; i++) {
blogList.add(lists.get(i));
scStockRealPOSSize++;
if (batchSize != scStockRealPOSSize && size - 1 != i) {
continue;
}
//async to batch insert
List<BlogPO> batchWithTxScStockRealPOS = blogList;
commonThreadPool.execute(() -> batchWithTx(batchWithTxScStockRealPOS, batchSize, commitCount, rollBack, countDownLatch));
blogList = new ArrayList<>(batchSize);
scStockRealPOSSize = 0;
}
//check all threads result
while (commitCount.get() != batchCount && !rollBack.get()) {
}
//tell other all threads to continue
countDownLatch.countDown();
}
/**
* 手动控制事务
* @param scStockRealPOS
* @param batchSize
* @param commitCount
* @param rollBack
* @param countDownLatch
*/
private void batchWithTx(List<BlogPO> scStockRealPOS, int batchSize, AtomicInteger commitCount, AtomicBoolean rollBack,
CountDownLatch countDownLatch) {
tx.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
//batch db operate
batchInsert(scStockRealPOS, batchSize, commitCount, rollBack);
//wait all end
try {
countDownLatch.await();
} catch (InterruptedException e) {
rollBack.set(true);
log.error("saveBatch countDownLatch.await() error", e);
throw new RuntimeException("aveBatch countDownLatch.await() error", e);
}
//tx rollback for when need
if (rollBack.get()) {
status.setRollbackOnly();
}
}
});
}
/**
* 插入数据库
* @param batchSize
* @param commitedNum
* @param rollBack
*/
private void batchInsert(List<BlogPO> blogList, int batchSize, AtomicInteger commitedNum, AtomicBoolean rollBack) {
try {
blogService.saveBatch(blogList, batchSize);
commitedNum.incrementAndGet();
} catch (Throwable e) {
rollBack.set(true);
log.error("asyncBatchInsert error", e);
}
}
使用时间(经测试2500每条为最佳条数,使用时间最少):
【batchInsertExecutor】 use time:【3795】ms
github地址:https://github.com/gao-tao/performance