postgresql的CopyManager流式数据入库

maven依赖如下:

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.5</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.52</version>
        </dependency>

数据库工具类:

public final class DbUtil {

  public static Connection getConnection(){
    try {

      String url = "jdbc:postgresql://127.0.0.1:6677/db_test";
      Properties properties = new Properties();
      properties.setProperty("user", "postgres");
      properties.setProperty("paasword", "123456");
      Connection connection = DriverManager.getConnection(url, properties);
      return connection;

    } catch (Exception e){
      e.printStackTrace();
    }
    return null;
  }

}

test代码如下:

public class CopyManagerTest {

  public String writeFile(List<JSONObject> list){

    FileWriter writer = null;
    String filePath = "/user/" + UUID.randomUUID();
    try {
      writer = new FileWriter(new File(filePath));
      for (int i = 0; i < list.size(); i++){
        Object[] objects = list.get(i).values().toArray();
        for (int j = 0; j < objects.length; j++){
          if (null == objects[j]){
            writer.write("null");
          } else {
            writer.write(String.valueOf(objects[j]));
          }
          //","作为分隔符
          if (j != objects.length - 1){
            writer.write(",");
          }
        }
        //换行符
        if (i != list.size() -1){
          writer.write("\n");
        }

      }
      writer.flush();
    } catch (Exception e){
      e.printStackTrace();
    }finally {
      if (null != writer){
        try {
          writer.close();
        }catch (IOException e){
          e.printStackTrace();
        }
      }
    }
    return filePath;
  }

  public void copy(String tabName, List<JSONObject> list){
    Connection connection = null;
    CopyManager copyManager = null;
    FileReader reader = null;
    try {
      long startTime = System.currentTimeMillis();
      String filePath = writeFile(list);
      connection = DbUtil.getConnection();
      copyManager = new CopyManager((BaseConnection) connection);
      reader = new FileReader(new File(filePath));
      copyManager.copyIn("copy " + tabName + " from stdin delimiter as ',' NULL as 'null'", reader);
      long endTime = System.currentTimeMillis();
      System.out.println((endTime-startTime)/1000);
    } catch (Exception e){
      e.printStackTrace();
    } finally {
      if (null != reader){
        try {
          reader.close();
        }catch (IOException e){
          e.printStackTrace();
        }
      }
      if (null != connection){
        try {
          connection.close();
        } catch (SQLException e){
          e.printStackTrace();
        }
      }
    }
  }

  public static void main(String[] args) {
    List<JSONObject> list = new ArrayList<JSONObject>();
    for (int i = 0; i < 4000000; i++){
      JSONObject jsonObject = new JSONObject();
      jsonObject.put("id", i);
      jsonObject.put("key", "key_" + i);
      jsonObject.put("time", System.currentTimeMillis());
      jsonObject.put("value", Long.valueOf(i%100));
      list.add(jsonObject);
    }
    new CopyManagerTest().copy("tab_test", list);
  }

}

上面4000000条数据入库大概40秒完成,pg数据库的配置是4C4G,在因为一次性数据入库比较大,在上述数据入库阶段,pg数据库的CUP出现冲高至200%-300%之前,建议使用该方法入库时调整一次性入库的数据流大小,尽可能避免pg的冲高,防止影响其他业务的数据库操作。

另外上面的例子是先把对象写进文件,在把文件解析成流,这样操作是比较麻烦的,在实际使用的过程可以直接将专程对象流,把文件生成和转流这一步省略。

上一篇:Python 将数据写入CSV文件


下一篇:python开发ip2region 离线IP库地址文件