需要对大量数据进行处理时,全部读取放入内存会导致内存溢出。针对这种处理大数据出现的常见问题,可以采用流式读取数据的方法。对于不同的关系型数据库,流式读取数据的方式略有不同,下面对常见的关系型数据库流式读取数据的方式做一点个人总结。
前提:使用JDBC方式读取
- MySQL
MySQL中需要特别注意的是数据库连接的url,必须配置useCursorFetch=true。例如:
jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai&autoReconnect=true
具体流式读取大数据的代码:
public static int fetchData(Connection conn, int fetchSize, String sql) throws Exception {
logger.info("流式读取数据:{};读取窗口:{}.", sql, fetchSize);
List<Map<String, String>> rows = new ArrayList();
int rowIndex = 0;
int batchIndex = 0;
Statement stmt = conn.createStatement(1003, 1007);
stmt.setFetchSize(fetchSize);
stmt.setFetchDirection(1000);
ResultSet rs = stmt.executeQuery(sql);
while(rs.next()) {
Map<String, String> row = new HashMap();
for(int i = 1; i <= rs.getMetaData().getColumnCount(); ++i) {
row.put(rs.getMetaData().getColumnLabel(i).toUpperCase(), rs.getString(i));
}
rows.add(row);
++rowIndex;
if (rows.size() % fetchSize == 0) {
// TODO 数据处理逻辑
doSomething()
rows.clear();
++batchIndex;
}
}
if (!rows.isEmpty()) {
// TODO 数据处理逻辑
doSomething()
}
return rowIndex;
}
参数说明:
conn : 数据库连接对象;fetchSize : 流式读取数据窗口大小;sql :查询数据SQL语句。
- Oracle、SQLServer、Sybase
Oracle、SQLServer、Sybase这三种没有特殊配置,具体代码如MySQL。具体流式读取大数据的代码:
public static int fetchData(Connection conn, int fetchSize, String sql) throws Exception {
logger.info("流式读取数据:{};读取窗口:{}.", sql, fetchSize);
List<Map<String, String>> rows = new ArrayList();
int rowIndex = 0;
int batchIndex = 0;
Statement stmt = conn.createStatement(1003, 1007);
stmt.setFetchSize(fetchSize);
stmt.setFetchDirection(1000);
ResultSet rs = stmt.executeQuery(sql);
while(rs.next()) {
Map<String, String> row = new HashMap();
for(int i = 1; i <= rs.getMetaData().getColumnCount(); ++i) {
row.put(rs.getMetaData().getColumnLabel(i).toUpperCase(), rs.getString(i));
}
rows.add(row);
++rowIndex;
if (rows.size() % fetchSize == 0) {
// TODO 数据处理逻辑
doSomething()
rows.clear();
++batchIndex;
}
}
if (!rows.isEmpty()) {
// TODO 数据处理逻辑
doSomething()
}
return rowIndex;
}
参数说明:
conn : 数据库连接对象;fetchSize : 流式读取数据窗口大小;sql :查询数据SQL语句。
- Postgresql
Postgresql的数据库连接没有特殊配置,但是读取数据需要关闭自动提交事务,否则流式读取不生效。具体代码如下:
public int fetchData(Connection conn, int fetchSize, String sql) throws Exception {
logger.info("流式读取数据:{};读取窗口:{}.", sql, fetchSize);
List<Map<String, String>> rows = new ArrayList();
int rowIndex = 0;
int batchIndex = 0;
Statement stmt = conn.createStatement(1003, 1007);
conn.setAutoCommit(false); // 关闭自动提交事务
stmt.setFetchSize(fetchSize);
ResultSet rs = stmt.executeQuery(sql);
while(rs.next()) {
Map<String, String> row = new HashMap();
for(int i = 1; i <= rs.getMetaData().getColumnCount(); ++i) {
row.put(rs.getMetaData().getColumnLabel(i).toUpperCase(), rs.getString(i));
}
rows.add(row);
++rowIndex;
if (rows.size() % fetchSize == 0) {
// TODO 数据处理逻辑
doSomething()
rows.clear();
++batchIndex;
}
}
if (!rows.isEmpty()) {
// TODO 数据处理逻辑
doSomething()
}
return rowIndex;
}
参数说明:
conn : 数据库连接对象;fetchSize : 流式读取数据窗口大小;sql :查询数据SQL语句。
其他关系型数据库流式读取数据的需求还没有遇到,后期会继续完善。