导入Neo4j的方式有三种,大概如下:
1. 读取外部文件到内存中,然后使用create 语句导入之后建立关系。
2. 用load csv 读取csv 文件
3. 从JDBC直接load 到neo4j 中
这里测试导入的场景如下:
主要包含四个字段: 卡号|用户名称|转账卡号|转让金额;主要逻辑是: 采用序号递增,从0 - 10000,用户名称也是从"user" + 0-10000。转账卡号是转给下一个节点,转账金额也是递增。
下面的构造数据统一采用从内存中构造数据,构造1W条数据,也就是1W个node,1W条关系。
private static final Integer DATA_SIZE = 10000; private static List<Map<String, Object>> generateData() { List<Map<String, Object>> datas = new ArrayList<>(DATA_SIZE); Map<String, Object> tmpMap = null; for (int i = 0; i < DATA_SIZE; i++) { tmpMap = new HashMap<>(); datas.add(tmpMap); tmpMap.put("cardNum", i); tmpMap.put("userName", "user" + i); tmpMap.put("transferCardNum", (i + 1) % 10000); // 每个卡给自己的下一个卡转钱 tmpMap.put("transferAmount", i); } return datas; }
0. pom 引入如下依赖
<!-- neo4j 相关的API --> <dependency> <groupId>org.neo4j.driver</groupId> <artifactId>neo4j-java-driver</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.neo4j</groupId> <artifactId>neo4j</artifactId> <version>3.3.4</version> </dependency>
1. 外部文件导入neo4j
这里导入文件读入文件的过程忽略掉,从内存中模拟1W条数据。
/** * 测试手动插入数据以及维护关系 */ private static void inertNeo4jTest() { // 构造数据, 数据和pg 库里面的数据一样 List<Map<String, Object>> datas = generateData(); Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j.")); Session session = driver.session(); StopWatch stopWatch = new StopWatch(); stopWatch.start(); // 手动create to neo4j String createCQLTemplate = "create (n:transferDetail {cardNum: '$cardNum', userName: '$userName', transferCardNum: '$transferCardNum', transferAmount: '$transferAmount'})"; datas.forEach(data -> { String createCQL = createCQLTemplate.replace("$cardNum", MapUtils.getString(data, "cardNum")) .replace("$userName", MapUtils.getString(data, "userName")) .replace("$transferCardNum", MapUtils.getString(data, "transferCardNum")) .replace("$transferAmount", MapUtils.getString(data, "transferAmount")); session.run(createCQL); }); System.out.println("插入成功耗时: " + stopWatch.getTime() + " ms"); // 手动维护关系 String mergeCQLTemplate = "match (a:transferDetail{cardNum: '$cardNum1'}), (b:transferDetail{cardNum: '$cardNum2'}) MERGE(a)-[:TRANSFER{transferAmount: '$transferAmount'}]->(b)"; datas.forEach(data -> { String mergeCQL = mergeCQLTemplate.replace("$cardNum1", MapUtils.getString(data, "cardNum")) .replace("$cardNum2", MapUtils.getString(data, "transferCardNum")) .replace("$transferAmount", MapUtils.getString(data, "transferAmount")); session.run(mergeCQL); }); stopWatch.stop(); System.out.println("转换关系成功,耗时: " + stopWatch.getTime() + " ms"); // close resource session.close(); driver.close(); }
代码逻辑很简单,从内存构造1W条数据-》create 到 neo4j -》手动维护关系。 测试结果耗时如下:
插入成功耗时: 50976 ms
转换关系成功,耗时: 285111 ms
可以看到消耗时间大概为不到5min。到neo4j 查看数据如下:
MATCH (n:transferDetail) RETURN count(n)
2. 测试load csv
1. pom 引入如下依赖:
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-csv</artifactId> <version>1.3</version> </dependency>
2. 测试方法: 生成csv 文件,然后load csv
private static void loadCSVtest() throws Exception { // 读取CSV 文件 // Reader fileReader = new FileReader(fileName); // Iterable<CSVRecord> records = CSVFormat.RFC4180.withFirstRecordAsHeader().parse(fileReader); // for (CSVRecord record : records) { // System.out.println(record.get("instanceId") + record.get("regionId") + record.get("zoneId")) // } // 1. 写入一个csv 到本地, 构造相同的数据 StopWatch stopWatch = new StopWatch(); stopWatch.start(); Appendable fileWriter = new FileWriter("E:\\neo4j3.5\\neo4j-community-3.5.5\\import\\transfer.csv"); CSVPrinter printer = CSVFormat.RFC4180.withHeader("cardnum", "username", "transfercardnum", "transferamount").print(fileWriter); List<Map<String, Object>> datas = generateData(); datas.forEach(data -> { try { printer.printRecord(MapUtils.getString(data, "cardNum"), MapUtils.getString(data, "userName"), MapUtils.getString(data, "transferCardNum"), MapUtils.getString(data, "transferAmount")); } catch (IOException ignore) { // ignore } }); printer.close(); System.out.println("csv 文件输出完成, 耗时: " + stopWatch.getTime()); // 2. csv load 到neo4j Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j.")); Session session = driver.session(); String deleteCQL = "match (n:transferdetail) detach delete n"; session.run(deleteCQL); System.out.println("neo4j 清空数据库成功, 耗时: " + stopWatch.getTime() + "ms"); String constraintCQL = "create constraint on (n:transferdetail) ASSERT n.cardnum is unique"; session.run(constraintCQL); String createCQL = "load csv WITH HEADERS from 'file:///transfer.csv' as line create(n:transferdetail{cardnum:line.cardnum, username:line.username, transfercardnum:line.transfercardnum, transferamount:line.transferamount})"; session.run(createCQL); String relateCQL = "load csv WITH HEADERS from 'file:///transfer.csv' as row match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) merge (n)-[:transfer{transferamount:row.transferamount}]->(m)"; session.run(relateCQL); stopWatch.stop(); System.out.println("load csv导入成功, 耗时: " + stopWatch.getTime() + "ms"); session.close(); driver.close(); }
结果: 可以看到非常的快,比自己手动创建节点然后建立关系快多了
csv 文件输出完成, 耗时: 269 neo4j 清空数据库成功, 耗时: 2730ms load csv导入成功, 耗时: 3070ms
3. 测试从RDBMS 从加载数据
这里采用apoc 从jdbc 加载数据。这里采用从PG数据库加载数据。
1. 首先下载apoc 插件
https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases
2. 将下载的jar 包和pg 的驱动包放到 %neo4j%\plugins 目录下,如下:
3. 修改%neo4j-community-3.5.5%\conf文件夹下面neo4j.conf文件, 最后增加如下配置:
dbms.security.procedures.unrestricted=apoc.* apoc.export.file.enabled=true
4. 重启neo4j server
5. 查看apoc 版本: 查看到版本证明apoc 插件安装成功
return apoc.version()
结果:
6. 修改程序采用apoc 从jdbc load 数据
(1) 增加pg 驱动
<dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.2.2</version> </dependency>
(2) 编写测试类
public static final void inertFromPGTest() throws Exception { // 1. 构造数据 List<Map<String, Object>> datas = generateData(); // 2. 插入到PG库 StopWatch stopWatch = new StopWatch(); stopWatch.start(); Connection connection = null; try { Class.forName("org.postgresql.Driver"); connection = DriverManager .getConnection("jdbc:postgresql://127.0.0.1:5432/qlq_test", "postgres", "postgres"); } catch (Exception e) { e.printStackTrace(); } Assert.notNull(connection, "链接失败"); java.sql.Statement statement = connection.createStatement(); // 清空数据库 String trunacteSQL = "truncate table transferdetail"; statement.execute(trunacteSQL); System.out.println("删除pg数据库成功, 耗时: " + stopWatch.getTime() + "ms"); // 插入数据 List<String> insertSQLs = new ArrayList<>(); String valueSQL = "('$cardNum', '$userName', '$transferCardNum', '$transferAmount')"; datas.forEach(data -> { insertSQLs.add(valueSQL.replace("$cardNum", MapUtils.getString(data, "cardNum")) .replace("$userName", MapUtils.getString(data, "userName")) .replace("$transferCardNum", MapUtils.getString(data, "transferCardNum")) .replace("$transferAmount", MapUtils.getString(data, "transferAmount"))); }); String sql = "insert into transferdetail(cardNum, userName, transferCardNum, transferAmount) values"; sql += StringUtils.join(insertSQLs, ","); statement.execute(sql); System.out.println("插入pg数据库成功, 耗时: " + stopWatch.getTime() + "ms"); // 3. PG 库转换到neo4j 库 Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j.")); Session session = driver.session(); String deleteCQL = "match (n:transferdetail) detach delete n"; session.run(deleteCQL); System.out.println("neo4j 清空数据库成功, 耗时: " + stopWatch.getTime() + "ms"); String constraintCQL = "create constraint on (n:transferdetail) ASSERT n.cardnum is unique"; session.run(constraintCQL); // cypher = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"MERGE(n:transferdetail{cardnum:row.cardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount}) with * MERGE(m:transferdetail{cardnum:row.transfercardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount}) with * create p=(n)-[r:transfer{transferamount:row.transferamount}]->(m)\",{batchSize:10000,iterateList:true})"; //连接postgresSQL数据库和设计创建neo4j图数据库数据模型 String createCQL = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"create(n:transferdetail{cardnum:row.cardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount})\",{batchSize:10000,iterateList:true})"; //连接postgresSQL数据库和设计创建neo4j图数据库数据模型 session.run(createCQL); String relateCQL = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) merge (n)-[:transfer{transferamount:row.transferamount}]->(m)\",{batchSize:10000,iterateList:true})"; //连接postgresSQL数据库和设计创建neo4j图数据库数据模型 // String cypher2 = "CALL apoc.periodic.iterate(\"CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\\\"SELECT * FROM transferdetail\\\")\",\"match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) create p=(n)-[r:transfer{transferamount:row.transferamount}]->(m)\",{batchSize:10000,iterateList:true})"; //连接postgresSQL数据库和设计创建neo4j图数据库数据模型 session.run(relateCQL); stopWatch.stop(); System.out.println("apoc导入成功, 耗时: " + stopWatch.getTime() + "ms"); session.close(); driver.close(); }
测试结果如下:
删除pg数据库成功, 耗时: 546ms
插入pg数据库成功, 耗时: 976ms
neo4j 清空数据库成功, 耗时: 2578ms
apoc导入成功, 耗时: 3013ms
可以得出结论,从jdbc load 数据和load csv 从性能上差不多;两者都快于手动创建和维护关系。