目前我需要将一个大型CSV文件推送到mongo DB中,并且值的顺序需要确定数据库条目的键:
示例CSV文件:
9,1557,358,286,Mutantville,4368,2358026,,M,0,0,0,1,0
9,1557,359,147,Wroogny,4853,2356061,,D,0,0,0,1,0
将其解析为数组的代码:
var fs = require("fs");
var csv = require("fast-csv");
fs.createReadStream("rank.txt")
.pipe(csv())
.on("data", function(data){
console.log(data);
})
.on("end", function(data){
console.log("Read Finished");
});
代码输出:
[ '9',
'1557',
'358',
'286',
'Mutantville',
'4368',
'2358026',
'',
'M',
'0',
'0',
'0',
'1',
'0' ]
[ '9',
'1557',
'359',
'147',
'Wroogny',
'4853',
'2356061',
'',
'D',
'0',
'0',
'0',
'1',
'0' ]
如何将数组插入我的mongoose模式进入mongo db?
架构:
var mongoose = require("mongoose");
var rankSchema = new mongoose.Schema({
serverid: Number,
resetid: Number,
rank: Number,
number: Number,
name: String,
land: Number,
networth: Number,
tag: String,
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
module.exports = mongoose.model("Rank", rankSchema);
数组的顺序需要匹配模式的顺序,例如在数组中第一个数字9需要始终保存,因为它们键入“serverid”,依此类推.我正在使用Node.JS
解决方法:
您可以通过从模式定义中获取headers
来使用fast-csv来执行此操作,该模式定义将解析的行返回为“对象”.你实际上有一些不匹配,所以我用更正标记了它们:
const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
只要架构实际上符合提供的CSV,那么它就可以了.这些是我可以看到的更正,但如果您需要以不同方式对齐实际字段名称,则需要进行调整.但是在基本上有一个数字,其中有一个字符串,基本上是一个额外的字段,我认为这是CSV中的空白字段.
一般的事情是从模式中获取字段名称数组,并在制作csv解析器实例时将其传递给选项:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
一旦你真的这样做,那么你得到一个“对象”而不是一个数组:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
不要担心“类型”,因为Mongoose会根据模式转换值.
其余的发生在数据事件的处理程序中.为了获得最大效率,我们使用insertMany()
仅每10,000行写入一次数据库.实际上如何进入服务器和进程取决于MongoDB版本,但根据您为单个集合导入的平均字段数,在内存使用的“权衡”和编写中,10,000应该是相当合理的.合理的网络要求.如有必要,请将数字设为更小.
重要的部分是将这些调用标记为异步函数,并在继续之前等待insertMany()
的结果.此外,我们需要pause()
流和resume()
每个项目,否则我们冒着覆盖要在实际发送之前插入的文档缓冲区的风险. pause()
和resume()
是必要的,以便在管道上施加“背压”,否则物品只会“退出”并触发数据事件.
自然地,对10,000个条目的控制要求我们在每次迭代和流完成时检查两者以便清空缓冲区并将任何剩余的文档发送到服务器.
这就是你想要做的事情,因为你当然不想在通过数据事件的“每次”迭代中向服务器发出异步请求,或者基本上不等待每个请求完成.您将不会检查“非常小的文件”,但对于任何实际负载,由于“飞行中”尚未完成的异步调用,您肯定会超过调用堆栈.
仅供参考 – 使用package.json. mz
是可选的,因为它只是一个现代化的Promise启用的标准节点“内置”库库,我只是习惯使用它.代码当然可以与fs模块完全互换.
{
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
实际上,使用Node v8.9.x及更高版本,我们甚至可以通过stream-to-iterator
模块实现AsyncIterator,从而简化这一过程.它仍然在Iterator< Promise< T>>模式,但它应该做到Node v10.x变得稳定LTS:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
基本上,所有流“事件”处理和暂停和恢复都被一个简单的for循环取代:
const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
简单!当它变得更稳定时,使用for..await..of在以后的节点实现中清理它.但是上面的指定版本及以上版本运行正常.