JDBC读取常见关系型数据库中大数据

需要对大量数据进行处理时,全部读取放入内存会导致内存溢出。针对这种处理大数据出现的常见问题,可以采用流式读取数据的方法。对于不同的关系型数据库,流式读取数据的方式略有不同,下面对常见的关系型数据库流式读取数据的方式做一点个人总结。
前提:使用JDBC方式读取

  1. MySQL

MySQL中需要特别注意的是数据库连接的url,必须配置useCursorFetch=true。例如:

jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8&useSSL=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai&autoReconnect=true

具体流式读取大数据的代码:

public static int fetchData(Connection conn, int fetchSize, String sql) throws Exception {
        logger.info("流式读取数据:{};读取窗口:{}.", sql, fetchSize);
        List<Map<String, String>> rows = new ArrayList();
        int rowIndex = 0;
        int batchIndex = 0;
        Statement stmt = conn.createStatement(1003, 1007);
        stmt.setFetchSize(fetchSize);
        stmt.setFetchDirection(1000);
        ResultSet rs = stmt.executeQuery(sql);

        while(rs.next()) {
            Map<String, String> row = new HashMap();

            for(int i = 1; i <= rs.getMetaData().getColumnCount(); ++i) {
                row.put(rs.getMetaData().getColumnLabel(i).toUpperCase(), rs.getString(i));
            }

            rows.add(row);
            ++rowIndex;
            if (rows.size() % fetchSize == 0) {
                // TODO 数据处理逻辑
                doSomething()
                
                rows.clear();
                ++batchIndex;
            }
        }

        if (!rows.isEmpty()) {
            // TODO 数据处理逻辑
             doSomething()
        }

        return rowIndex;
    }
参数说明:
conn : 数据库连接对象;fetchSize :  流式读取数据窗口大小;sql :查询数据SQL语句。
  1. Oracle、SQLServer、Sybase
    Oracle、SQLServer、Sybase这三种没有特殊配置,具体代码如MySQL。具体流式读取大数据的代码:
public static int fetchData(Connection conn, int fetchSize, String sql) throws Exception {
        logger.info("流式读取数据:{};读取窗口:{}.", sql, fetchSize);
        List<Map<String, String>> rows = new ArrayList();
        int rowIndex = 0;
        int batchIndex = 0;
        Statement stmt = conn.createStatement(1003, 1007);
        stmt.setFetchSize(fetchSize);
        stmt.setFetchDirection(1000);
        ResultSet rs = stmt.executeQuery(sql);

        while(rs.next()) {
            Map<String, String> row = new HashMap();

            for(int i = 1; i <= rs.getMetaData().getColumnCount(); ++i) {
                row.put(rs.getMetaData().getColumnLabel(i).toUpperCase(), rs.getString(i));
            }

            rows.add(row);
            ++rowIndex;
            if (rows.size() % fetchSize == 0) {
                // TODO 数据处理逻辑
                doSomething()
                
                rows.clear();
                ++batchIndex;
            }
        }

        if (!rows.isEmpty()) {
            // TODO 数据处理逻辑
             doSomething()
        }

        return rowIndex;
    }
参数说明:
conn : 数据库连接对象;fetchSize :  流式读取数据窗口大小;sql :查询数据SQL语句。
  1. Postgresql
    Postgresql的数据库连接没有特殊配置,但是读取数据需要关闭自动提交事务,否则流式读取不生效。具体代码如下:
public int fetchData(Connection conn, int fetchSize, String sql) throws Exception {
        logger.info("流式读取数据:{};读取窗口:{}.", sql, fetchSize);
        List<Map<String, String>> rows = new ArrayList();
        int rowIndex = 0;
        int batchIndex = 0;
        Statement stmt = conn.createStatement(1003, 1007);
        conn.setAutoCommit(false); // 关闭自动提交事务
        stmt.setFetchSize(fetchSize);
        ResultSet rs = stmt.executeQuery(sql);

        while(rs.next()) {
            Map<String, String> row = new HashMap();

            for(int i = 1; i <= rs.getMetaData().getColumnCount(); ++i) {
                row.put(rs.getMetaData().getColumnLabel(i).toUpperCase(), rs.getString(i));
            }

            rows.add(row);
            ++rowIndex;
            if (rows.size() % fetchSize == 0) {
                 // TODO 数据处理逻辑
                doSomething()
                
                rows.clear();
                ++batchIndex;
            }
        }

        if (!rows.isEmpty()) {
             // TODO 数据处理逻辑
                doSomething()
        }

        return rowIndex;
    }
参数说明:
conn : 数据库连接对象;fetchSize :  流式读取数据窗口大小;sql :查询数据SQL语句。

其他关系型数据库流式读取数据的需求还没有遇到,后期会继续完善。

上一篇:JDBC连接数据库:判断是否登录成功


下一篇:IDEA中使用JDBC访问SQL SERVER(五)修改结果集数据