批量插入数据(基于Mybatis的实现-Oracle)

-----------------------------------

20170528 第二次编辑:主要是补充mysql相关内容。

-----------------------------------

mysql支持batch提交改进方案:声明:mysql仍然没有内部游标,让数据库支持executeBatch的方式处理。

MySql 的批量操作,要加rewriteBatchedStatements参数

引用“

MySql的JDBC连接的url中要加rewriteBatchedStatements参数,并保证5.1.13以上版本的驱动,才能实现高性能的批量插入。

例如: String connectionUrl="jdbc:mysql://192.168.1.100:3306/test?rewriteBatchedStatements=true" ;

还要保证Mysql JDBC驱的版本。MySql的JDBC驱动的批量插入操作性能是很优秀的。

原文链接:http://elf8848.iteye.com/blog/770032  (建议去看看这个链接的评论区)

参考资料:https://www.oschina.net/question/2553117_2162171?sort=time

20170528亲测,插入26663条数据,

加上rewriteBatchedStatements后,耗时:3734毫秒
不加rewriteBatchedStatements前,耗时:672551毫秒

mysql装在本机上,行字段数多,仅从本次测试看,性能提高了180倍。

 ---------------------------------------------------

第一版:20170526 原创

 ---------------------------------------------------

前言:做一个数据同步项目,要求:同步数据不丢失的情况下,提高插入性能。

项目DB框架:Mybatis。DataBase:Oracle。

----------------------------------------------------------------------------

批量插入数据方式:

一、Mybatis 全局设置批处理;

二、Mybatis 局部设置批处理;

三、Mybatis foreach批量插入:

①SELECT UNION ALL;

②BEGIN INSERT INTO ...;INSERT INTO...;...;END;

四、java自带的批处理插入;

五、其他方式

-----------------------------------------------------------------------------

先说结论:Mybatis(全局/局部)批处理和java自带的批处理 性能上差不多,属于最优处理办法,我这边各种测试后,最后采用Mybatis局部批处理方式。

一、Mybatis 全局设置批处理

先上Spring-Mybatis.xml 配置信息

 <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd"> <!-- 自动扫描(自动注入) -->
<context:annotation-config/>
<context:component-scan base-package="com.company.dao"/> <!-- 动态数据源 -->
<bean id="dataSource" class="com.company.dao.datasource.DataSource">
<property name="myConfigFile" value="mySource.xml"/>
</bean> <!-- mybatis配置 -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="mapperLocations" value="classpath*:mapper/*/*/*.xml"/>
<property name="configLocation" value="classpath:/mybatisConfig.xml"/>
</bean> <!-- 自动创建映射器,不用单独为每个 mapper映射-->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.company.dao.mapper"/>
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/>
</bean> <!-- 事务管理器配置,单数据源事务 -->
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean> <tx:annotation-driven transaction-manager="transactionManager"/> </beans>

Spring-Mybatis.xml

再上mybatisConfig.xml(在本项目中,我没有设置setting。最终采用的局部批处理,因此未设置全局批处理,具体原因后面再说。)

 <?mapper.xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration> <settings>
<!-- 配置默认的执行器。SIMPLE 就是普通的执行器;REUSE 执行器会重用预处理语句(prepared statements); BATCH 执行器将重用语句并执行批量更新。-->
<setting name="defaultExecutorType" value="BATCH"/>
<!--详见:http://www.mybatis.org/mybatis-3/zh/configuration.html-->
</settings> <!-- 别名列表 -->
<typeAliases>
<!-- typeAliases 中的配置都是配置别名,在此就不贴出来了 -->
</typeAliases> </configuration>

mybatisConfig.xml

这样子设置好后,在BaseService开放saveBatch(List<T> list)方法

 @Override
public void save(List<T> list) {
for (int i = 0;i < list.size();i++){
mapper.insert(list.get(i));
}
} @Override
public void saveBatch(List<T> list) {
int size = list.size();
int unitNum = 500;
int startIndex = 0;
int endIndex = 0;
while (size > 0){
if(size > unitNum){
endIndex = startIndex+unitNum;
}else {
endIndex = startIndex+size;
}
List<T> insertData = list.subList(startIndex,endIndex);
save(insertData);
size = size - unitNum;
startIndex = endIndex;
}
}

BaseService.saveBatch(List list)

虽然看上去是500条记录,一次次INSERT INTO,但由于在全局已经设置Mybatis是批处理执行器,所以这500条INSERT INTO只会与Oracle数据库通信一次。

全局设置批处理的局限性在哪里呢?

先附上mybatis官方的讨论列表中最很关键的一句:“If the BATCH executor is in use, the update counts are being lost. ”

设置全局批处理后,DB里的insert、Update和delete方法,都无法返回进行DML影响DB_TABLE的行数。

1.insert 无法返回影响的行数,这个好解决,一个批处理放在一个事务里,记录批处理失败次数,总数-批处理失败次数*单位批处理数据量,就能得到insert 影响DB_TABLE的行数;

2.但是update和delete就无法很简单的去统计影响行数了,如果做反复查询,反而降低了效率,得不偿失。

虽现在的项目尚未有需要反馈影响DB_TABLE行数的需求,但是为了更灵活,我们放弃了全局批处理的方式。

!这里提个疑问:为什么Mybatis官方,不将批处理的选择方式下沉到方法级别?方便开发者根据实际情况,灵活选择。我觉得这是个可以改进的地方,如有机会,可看源码去进行改进。

---------------------------------------------------------------------------------------------------------

二、Mybatis局部批处理方式

由于领导说全局批处理方式,不够灵活,不适宜项目所需,要另想办法支持。但是java自带的批处理,因为项目代码管理的要求,也不能采用。因此,在仔细阅读官方文档后,设想自己能否获取SQLSession后openSession,将这个会话设置为批处理呢?

先看MyBatis官方网站(须*):http://www.mybatis.org/mybatis-3/zh/getting-started.html

 SqlSession session = sqlSessionFactory.openSession();
try {
BlogMapper mapper = session.getMapper(BlogMapper.class);
// do work
} finally {
session.close();
}

官方建议的写法

后查阅Mybatis java API(须*):  http://www.mybatis.org/mybatis-3/zh/java-api.html

现在你有一个 SqlSessionFactory,可以用来创建 SqlSession 实例。

SqlSessionFactory

SqlSessionFactory 有六个方法可以用来创建 SqlSession 实例。通常来说,如何决定是你 选择下面这些方法时:

  • Transaction (事务): 你想为 session 使用事务或者使用自动提交(通常意味着很多 数据库和/或 JDBC 驱动没有事务)?
  • Connection (连接): 你想 MyBatis 获得来自配置的数据源的连接还是提供你自己
  • Execution (执行): 你想 MyBatis 复用预处理语句和/或批量更新语句(包括插入和 删除)

重载的 openSession()方法签名设置允许你选择这些可选中的任何一个组合。

 SqlSession openSession()
SqlSession openSession(boolean autoCommit)
SqlSession openSession(Connection connection)
SqlSession openSession(TransactionIsolationLevel level)
SqlSession openSession(ExecutorType execType,TransactionIsolationLevel level)
SqlSession openSession(ExecutorType execType)
SqlSession openSession(ExecutorType execType, boolean autoCommit)
SqlSession openSession(ExecutorType execType, Connection connection)
Configuration getConfiguration();

官方提供的openSession方法

因此出来了局部批处理第一套代码实现方式:

 public static void sqlSession(List<Student> data) throws IOException {
String resource = "mybatis-dataSource.xml";
InputStream inputStream = null;
SqlSession batchSqlSession = null;
try{
inputStream = Resources.getResourceAsStream(resource);
SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
int batchCount = 500;//每批commit的个数
for(int index = 0; index < data.size();index++){
Student stu = data.get(index);
batchSqlSession.getMapper(Student.class).insert(stu);
if(index !=0 && index%batchCount == 0){
batchSqlSession.commit();
}
}
batchSqlSession.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
if(batchSqlSession != null){
batchSqlSession.close();
}
if(inputStream != null){
inputStream.close();
}
}
}

sqlSession(List data)

 <?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<environments default="development">
<environment id="development">
<transactionManager type="JDBC"/>
<dataSource type="POOLED">
<property name="driver" value="${driver}"/>
<property name="url" value="${url}"/>
<property name="username" value="${username}"/>
<property name="password" value="${password}"/>
</dataSource>
</environment>
</environments>
<mappers>
<mapper resource="org/mybatis/example/Student.xml"/>
</mappers>
</configuration>

mybatis-dataSource.xml

已经在Spring-Mybatis.xml 中配置了SQLSessionFactory,那我为何还要自己去创建SQLSessionFactory呢?因此继续改良代码

 public static void mybatisSqlSession(List<Student> data){
DefaultSqlSessionFactory sqlSessionFactory = (DefaultSqlSessionFactory) ServiceBeanConstant.CTX.getBean("sqlSessionFactory");
SqlSession batchSqlSession = null;
try{
batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
int batchCount = 500;//每批commit的个数
for(int index = 0; index < data.size();index++){
Student stu = data.get(index);
batchSqlSession.getMapper(StudentMapper.class).insert(stu);
if(index !=0 && index%batchCount == 0){
batchSqlSession.commit();
}
}
batchSqlSession.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
if(batchSqlSession != null){
batchSqlSession.close();
}
}
}

mybatisSqlSession(List data)

这个版本的局部批处理插入是比较满意的,最终采用的方式也是这个版本。

下面放出在IService接口定义和Service的具体实现代码:

IService接口定义

 /**
* 批处理插入数据(方法内部定义500条为一个批次进行提交)
* 使用注意事项:必须在XxxMappper.xml中实现<insert id="insert" ...>....<insert/>的sql
* @param data 批量插入的数据
* @param mClass 调用的XxxMaperr.class
* @auth robin
* Created on 2016/3/14
*/
void saveBatch(List<T> data,Class mClass);

saveBatch(List data,Class mClass)

Service实现

 @Override
public void saveBatch(List<T> data,Class mClass) {
DefaultSqlSessionFactory sqlSessionFactory = (DefaultSqlSessionFactory) ServiceBeanConstant.CTX.getBean("sqlSessionFactory");
SqlSession batchSqlSession = null;
try{
batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
int batchCount = 500;//每批commit的个数
for(int index = 0; index < data.size();index++){
T t = data.get(index);
((BaseMapper)batchSqlSession.getMapper(mClass)).insert(t);
if(index !=0 && index%batchCount == 0){
batchSqlSession.commit();
}
}
batchSqlSession.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
if(batchSqlSession != null){
batchSqlSession.close();
}
}
}

saveBatch(List data,Class mClass)

局部和全局批处理插入对比:局部批处理,可以对特定一类的方法,进行数据批处理,不会影响其他DML语句,其他DML语句,可以正常返回影响DB_TABLE的行数。

!这样既能针对特殊需求(批处理)支持,也能支持未来需要返回影响数据行的要求。

注意:使用批处理方式进行DML操作,是无法反馈影响DB_TABLE行数的数据。无论是局部批处理还是java自带的批处理方式,皆无法反馈DB_TABLE count。

补充完善:

在我的Service实现中,通过注入的方式,获取mapper的实例

 public class BaseService<MAPPER extends BaseMapper, T, PK extends Serializable> implements IBaseService<T, PK> {

     protected T tt;
/**
* 实体操作的自动注入Mapper(随初始化一同注入,必须用set方法)
*/
protected MAPPER mapper; public MAPPER getMapper() {
return mapper;
} @Autowired
public void setMapper(MAPPER mapper) {
this.mapper = mapper;
}
//后续代码略
}

Service

前面的Service saveBatch方法中,还需要传入指定的Mapper.class.对本项目其他开发者来说,与之前的环境相比,多传一个参数感觉别扭。

那么为何我不继续封装,外部无需传入Mapper.class,而是通过内部注入的mapper实例获取Mapper.class.
改良后的代码:

 @Override
public T saveBatch(List<T> data) {
T back = null;
DefaultSqlSessionFactory sqlSessionFactory = (DefaultSqlSessionFactory) ServiceBeanConstant.CTX.getBean("sqlSessionFactory");
SqlSession batchSqlSession = null;
try{
batchSqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
int batchCount = 500;//每批commit的个数
for(int index = 0; index < data.size();index++){
T t = data.get(index);
back = t;
Class<?>[] interfaces=mapper.getClass().getInterfaces();
Class clazz = null;
for (int i=0;i<interfaces.length;i++){
if(BaseMapper.class.isAssignableFrom(interfaces[i])){
clazz = interfaces[i];
}
}
if(clazz == null){
throw new Exception("user-defined exception:mapper not implements interfaces com.company.dao.mapper.BaseMapper");
}
BaseMapper baseMapper = (BaseMapper) batchSqlSession.getMapper(clazz);
baseMapper.insert(t);
if(index !=0 && index%batchCount == 0){
batchSqlSession.commit();
}
}
batchSqlSession.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
if(batchSqlSession != null){
batchSqlSession.close();
}
return back;
}
}

saveBatch(List data)

这里对mapper实例进行一个简短的说明:

1.mapper实例是通过java动态代理来实例化的;

2.mapper的SQLSession是使用mybatis统一的配置实例的;

3.mapper的默认执行器是SIMPLE(普通的执行器);

-------------------------------------------------------------------------------------

三、Mybatis foreach批量插入

Mybatis foreach 批量插入,如果批量插入的数据量大,不得不说这真是一个非常糟糕的做法

无论是SELECT ** UNION ALL 还是BEGIN ...;END; ,相对而言后者比前者稍微好点。

放出DB和我测试的结果:

耗时 占当时整个数据库CPU百分比 说明
15.5 98.33 union all方式拼接插入
16.4 97.75 begin end方式插入块
1.54 64.81 java 自带的batch方式插入

①foreach union all的批量插入,现已有大量的博客资源可供参考,我就不贴出自己的实现方式了。

如果有兴趣可以参阅:http://blog.csdn.net/sanyuesan0000/article/details/19998727 (打开浏览器,复制url)

这篇博客。BEGIN END的方式,也是从这篇博客中得到启发。只不过他是把BEGIN END用在update中。

②foreach begin end 语句块

我的实现:

 <insert id="insertBatch" parameterType="java.util.List">
BEGIN
<foreach collection="list" item="item" index="index" separator=";" >
INSERT INTO TABLE.STUDENT (ID,AGE,NAME,STU_ID) VALUES
( DEMO.SEQ_EID.NEXTVAL,#{item.age},#{item.name},#{item.stuId} )
</foreach>
;END ;
</insert>

insertBatch

调用方式:

 @Override
public void saveBatch(List<T> list) {
int size = list.size();
int unitNum = 500;
int startIndex = 0;
int endIndex = 0;
while (size > 0){
if(size > unitNum){
endIndex = startIndex+unitNum;
}else {
endIndex = startIndex+size;
}
List<T> insertData = list.subList(startIndex,endIndex);
mapper.insertBatch(insertData);
size = size - unitNum;
startIndex = endIndex;
}

saveBatch(List list)

---------------------------------------------------------------------

四、java自带的批处理方式

废话不多说,直接上代码

 package DB;

 import base.Student;

 import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List; /**
* Created by robin on 2016/5/23.
*
* @author robin
*/
public class InsertTableDemo { public static void main(String args[]) throws SQLException {
Connection connection = null;
List<Student> dataList = getDataList(100000);
long startTime = 0;
try{
connection = getConn();
startTime=System.currentTimeMillis();
connection.setAutoCommit(false);
PreparedStatement statement = connection.prepareStatement("INSERT INTO STUDENT (ID,AGE,NAME,STU_ID) VALUES ( DEMO.SEQ_EID.NEXTVAL, ?,?,? ) ");
int num = 0;
for (int i = 0;i< dataList.size();i++){
Student s = dataList.get(i);
statement.setInt(1, s.getAge());
statement.setString(2, s.getName());
statement.setString(3, s.getStuId());
statement.addBatch();
num++;
if(num !=0 && num%500 == 0){
statement.executeBatch();
connection.commit();
num = 0;
}
}
statement.executeBatch();
connection.commit();
}catch (Exception e){
e.printStackTrace();
connection.rollback();
}finally {
if(connection != null){
connection.close();
}
long endTime=System.currentTimeMillis();
System.out.println("方法执行时间:"+(endTime-startTime)+"ms");
} } public static Connection getConn(){
String driver = "oracle.jdbc.driver.OracleDriver";
String url = "jdbc:oracle:thin:@//ip:port/DMEO"; //DMEO为数据库名
String user = "user";
String password = "pwd";
try{
Class.forName(driver);
Connection conn = DriverManager.getConnection(url, user, password);
return conn;
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
public static List<Student> getDataList(int f){
List<Student> data = new ArrayList<>();
for (int i =0;i<f;i++){
Student s = new Student(""+i,"小明" + i,i);
data.add(s);
}
return data;
} }

JDBC BATCH

这种批量插入大量数据的方式,性能上最好。但是因为我们小组代码管理所限制,因此这种方式不使用。

------------------------------------------------------------------------

五、其他方式

现在已经忘了,其他方式到底使用过哪些,但总归是比以上四种效果都更差,所以没什么印象了。

如果各位,还有什么其他更好的批量插入数据的方式,欢迎加入讨论,集思广益。

以上就是这两天,对在原项目基础上不进行大变动的基础上,提供批处理插入数据的所思所行。

-------------------------------------------------------------------------------

后记:

这里吐槽一句:希望大家不要把未经过自己验证的东西,言之凿凿地写到博客中去。

在我做批处理这件事的时候,领导也在参阅网上的博客。其中有一篇博客,说自己在oracle中批量插入数据,采用foreach insert into (field1,field2,...) values (v11,v12,...),(v21,v22,...) ,(v31,v32,...),...也可以。

虽然我明知不行,但是无可奈何还是要去演示给领导看,在oracle中,这种写法确实不适用。

领导问我为何他说可以,我想我也只能回答:他抄别人的博客呗,抄来抄去都不自己实践验证,就想当然地写到博客里。

所以,如果你看完了我这篇分享,希望您也能写个demo验证下,起码可以加深自己的理解。

感谢领导和DB同事,在此过程中的帮助。

以上内容,都经过本人实践验证过。若转发,请在标题上标记[转],并注明原文链接:http://www.cnblogs.com/robinjava77/p/5530681.html,作者名称:robin。并在文章最后一行附上本句话。否则,作者保留追究的权利。

上一篇:Erlang cowboy 架构


下一篇:contenteditable="true"让div可编辑