Skip to content

Commit 5d58841

Browse files
committed
refactor parser
1 parent e6919b7 commit 5d58841

20 files changed

Lines changed: 950 additions & 183 deletions

libs/core/CSVError.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
var util=require("util");
2+
module.exports=CSVError;
3+
function CSVError(err,index){
4+
Error.call(this,"");
5+
this.err=err;
6+
this.line=index;
7+
this.message="Error: "+err+" Line number: "+index;
8+
this.name="CSV Error";
9+
}
10+
util.inherits(CSVError,Error);
11+

libs/core/Converter.js

Lines changed: 135 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -4,54 +4,34 @@ var Readable = require("stream").Readable;
44
var Result = require("./Result");
55
var os = require("os");
66
var eol = os.EOL;
7-
var Processor = require("./Processor.js");
8-
var Worker = require("./Worker.js");
7+
// var Processor = require("./Processor.js");
98
var utils = require("./utils.js");
109
var async = require("async");
11-
10+
var defParam=require("./defParam");
11+
var csvline=require("./csvline");
12+
var fileline=require("./fileline");
13+
var dataToCSVLine=require("./dataToCSVLine");
14+
var linesToJson=require("./linesToJson");
1215
function Converter(params,options) {
1316
Transform.call(this,options);
14-
var _param = {
15-
constructResult: true, //set to false to not construct result in memory. suitable for big csv data
16-
delimiter: ',', // change the delimiter of csv columns. It is able to use an array to specify potencial delimiters. e.g. [",","|",";"]
17-
quote: '"', //quote for a column containing delimiter.
18-
trim: true, //trim column's space charcters
19-
checkType: true, //whether check column type
20-
toArrayString: false, //stream down stringified json array instead of string of json. (useful if downstream is file writer etc)
21-
ignoreEmpty: false, //Ignore empty value while parsing. if a value of the column is empty, it will be skipped parsing.
22-
workerNum: 1, //number of parallel workers. If multi-core CPU available, increase the number will get better performance for large csv data.
23-
fork: false, //use another CPU core to convert the csv stream
24-
noheader: false, //indicate if first line of CSV file is header or not.
25-
headers: null, //an array of header strings. If noheader is false and headers is array, csv header will be ignored.
26-
flatKeys: false, // Don't interpret dots and square brackets in header fields as nested object or array identifiers at all.
27-
maxRowLength: 0, //the max character a csv row could have. 0 means infinite. If max number exceeded, parser will emit "error" of "row_exceed". if a possibly corrupted csv data provided, give it a number like 65535 so the parser wont consume memory. default: 0
28-
checkColumn: false //whether check column number of a row is the same as headers. If column number mismatched headers number, an error of "mismatched_column" will be emitted.. default: false
29-
};
30-
if (params && typeof params === "object") {
31-
for (var key in params) {
32-
if (params.hasOwnProperty(key)) {
33-
_param[key] = params[key];
34-
}
35-
}
36-
} else if (typeof params === "boolean") { //backcompatible with older version
37-
console.warn("Parameter should be a JSON object like {'constructResult':false}");
38-
_param.constructResult = params;
39-
}
17+
_param=defParam(params);
4018
this._options=options || {};
4119
this.param = _param;
4220
this.param._options=this._options;
4321
this.resultObject = new Result(this);
4422
this.pipe(this.resultObject); // it is important to have downstream for a transform otherwise it will stuck
45-
this.started = false;
23+
this.started = false;//indicate if parsing has started.
4624
this.recordNum = 0;
4725
this.lineNumber=0;
4826
this.runningProcess = 0;
27+
this._csvLineBuffer="";
28+
this.lastIndex=0;
4929
//this._pipe(this.lineParser).pipe(this.processor);
5030
if (this.param.fork) {
5131
this.param.fork=false;
5232
this.param.workerNum=2;
5333
}
54-
this.initNoFork();
34+
// this.initNoFork();
5535
this.flushCb = null;
5636
this.processEnd = false;
5737
this.sequenceBuffer = [];
@@ -63,56 +43,121 @@ function Converter(params,options) {
6343
return this;
6444
}
6545
util.inherits(Converter, Transform);
66-
// Converter.prototype.initFork = function() {
67-
// var env = process.env;
68-
// env.params = JSON.stringify(this.param);
69-
// this.child = require("child_process").fork(__dirname + "/fork.js", {
70-
// env: env,
71-
// silent: true
72-
// });
73-
// this.child.stderr.on("data", function(d, e) {
74-
// process.stderr.write(d, e);
75-
// // this.push(d, e);
76-
// // this.emit("record_parsed");
77-
// }.bind(this));
78-
// // this.child.stdout.on("data",function(d){
79-
// // this.push(d.toString("utf8"));
80-
// // }.bind(this));
81-
// this.child.on("message", function(msg) {
82-
// if (msg.action === "record_parsed") {
83-
// //var recs = msg.arguments;
84-
// var args = msg.arguments;
85-
// //console.log(recs);
86-
// //var recs=args[0];
87-
// //for (var i=0;i<recs.length;i++){
88-
// //this.emit("record_parsed", recs[i][0], recs[i][1], recs[i][2]);
89-
// //}
90-
// this.emit("record_parsed", args[0], args[1], args[2]);
91-
// } else if (msg.action === "data") {
92-
// var args = msg.arguments;
93-
// this.push(new Buffer(args[0]), args[1]);
94-
// } else if (msg.action === "error") {
95-
// var args = msg.arguments;
96-
// args.unshift("error");
97-
// this.hasError = true;
98-
// this.emit.apply(this, args);
99-
// }
100-
// }.bind(this));
101-
// this._transform = this._transformFork;
102-
// this._flush = this._flushFork;
103-
// //child.on("message",function(msg){
104-
// //var syncLock=false;
105-
// //if (msg.action=="record_parsed"){
106-
// //this.sequenceBuffer[msg.index]=msg;
107-
// //if
108-
// //}
109-
// //}.bind(this));
110-
// //child.on("exit",function(code){
111-
// //this.processEnd=true;
112-
// //this.flushBuffer();
113-
// //this.checkAndFlush();
114-
// //}.bind(this));
115-
// }
46+
Converter.prototype._transform = function(data, encoding, cb) {
47+
if (this.param.toArrayString && this.started === false) {
48+
this.started = true;
49+
this.push("[" + eol, "utf8");
50+
}
51+
data=data.toString("utf8");
52+
var self=this;
53+
this.preProcessRaw(data,function(d){
54+
if (d && d.length>0){
55+
self.processData(self.prepareData(d), cb);
56+
}else{
57+
cb();
58+
}
59+
})
60+
};
61+
Converter.prototype.prepareData=function(data){
62+
return this._csvLineBuffer+data;
63+
}
64+
Converter.prototype.setPartialData=function(d){
65+
this._csvLineBuffer=d;
66+
}
67+
Converter.prototype.processData=function(data,cb){
68+
var params=this.param;
69+
if (!params._headers){ //header is not inited. init header
70+
this.processHead(data,cb);
71+
}else{
72+
if (params.workerNum===1){
73+
var lines=dataToCSVLine(data,params);
74+
this.setPartialData(lines.partial);
75+
var res=linesToJson(lines.lines,params,this.recordNum);
76+
this.processResult(res,cb);
77+
}else{
78+
this.workerProcess(data,cb);
79+
}
80+
}
81+
}
82+
Converter.prototype.initWorker=function(){
83+
var workerNum=this.param.workerNum-1;
84+
if (workerNum.length>0){
85+
workerMgr.initWorker(workerNum,this.param);
86+
}
87+
}
88+
Converter.prototype.workerProcess=function(data,cb){
89+
var self=this;
90+
workerMgr.sendWorker(data,this.lastIndex,function(length,partial){
91+
self.setPartialData(partial);
92+
self.lastIndex+=length;
93+
cb();
94+
},function(results){
95+
self.processResult(results,function(){},true);
96+
})
97+
}
98+
Converter.prototype.processHead=function(data,cb){
99+
var params=this.param;
100+
if (!params._headers){ //header is not inited. init header
101+
var lines=dataToCSVLine(data,params);
102+
this.setPartialData(lines.partial);
103+
if (params.noheader){
104+
if (params.headers){
105+
params._headers=params.headers;
106+
}else{
107+
params._headers=[];
108+
}
109+
}else{
110+
var headerRow=lines.lines.shift();
111+
if (params.headers){
112+
params._headers=params.headers;
113+
}else{
114+
params._headers=headerRow;
115+
}
116+
}
117+
var res=linesToJson(lines.lines,params,0);
118+
this.processResult(res,cb);
119+
}else{
120+
cb();
121+
}
122+
}
123+
Converter.prototype.processResult=function(result,cb,isAsync){
124+
for (var i=0;i<result.length;i++){
125+
var r=result[i];
126+
if (r.err){
127+
this.emit("error",r.err);
128+
}else{
129+
if (isAsync){
130+
this.sequenceBuffer[r.index]=r;
131+
}else{
132+
this.emitResult(r);
133+
}
134+
}
135+
}
136+
if (isAsync){
137+
this.flushBuffer(cb);
138+
}else{
139+
this.lastIndex+=result.length;
140+
cb();
141+
}
142+
}
143+
Converter.prototype.emitResult=function(r){
144+
var index=r.index;
145+
var row=r.row;
146+
var resultRow=r.json;
147+
if (this.transform && typeof this.transform==="function"){
148+
this.transform(resultRow,row,index);
149+
}
150+
this.emit("record_parsed", resultRow, row, index);
151+
if (this.param.toArrayString && index > 0) {
152+
this.push("," + eol);
153+
}
154+
if (this._options && this._options.objectMode){
155+
this.push(resultRow);
156+
}else{
157+
this.push(JSON.stringify(resultRow), "utf8");
158+
}
159+
this.recordNum=index+1;
160+
}
116161
Converter.prototype.initNoFork = function() {
117162
// function onError() {
118163
// var args = Array.prototype.slice.call(arguments, 0);
@@ -170,59 +215,19 @@ Converter.prototype.initNoFork = function() {
170215
this._transform = this._transformNoFork;
171216
this._flush = this._flushNoFork;
172217
}
173-
Converter.prototype.flushBuffer = function() {
218+
Converter.prototype.flushBuffer = function(cb) {
174219
while (this.sequenceBuffer[this.recordNum]) {
175-
var index = this.recordNum;
176-
var obj = this.sequenceBuffer[index];
177-
this.sequenceBuffer[index] = undefined;
178-
var resultJSONStr = obj.resultJSONStr;
179-
var resultRow = JSON.parse(resultJSONStr)
180-
var row = obj.row;
181-
if (this.transform && typeof this.transform==="function"){
182-
this.transform(resultRow,row,index);
183-
resultJSONStr=JSON.stringify(resultRow);
184-
}
185-
this.emit("record_parsed", resultRow, row, index);
186-
if (this.param.toArrayString && this.recordNum > 0) {
187-
this.push("," + eol);
188-
}
189-
if (this._options && this._options.objectMode){
190-
this.push(resultRow);
191-
}else{
192-
this.push(resultJSONStr, "utf8");
193-
}
194-
this.recordNum++;
220+
var r=this.sequenceBuffer[this.recordNum];
221+
this.sequenceBuffer[this.recordNum] = undefined;
222+
this.emitResult(r);
195223
}
196224
this.checkAndFlush();
225+
cb();
197226
}
198227
Converter.prototype.preProcessRaw=function(data,cb){
199228
cb(data);
200229
}
201230

202-
Converter.prototype._transformNoFork = function(data, encoding, cb) {
203-
if (this.param.toArrayString && this.started === false) {
204-
this.started = true;
205-
this.push("[" + eol, "utf8");
206-
}
207-
data=data.toString("utf8");
208-
var self=this;
209-
this.preProcessRaw(data,function(d){
210-
if (d && d.length>0){
211-
var lines = self.toCSVLines(self.toLines(d)); //lines of csv
212-
self.processCSVLines(lines, cb);
213-
}else{
214-
cb();
215-
}
216-
})
217-
// async.eachLimit(lines,1,function(line,scb){
218-
// this.push(line.data);
219-
// scb();
220-
// }.bind(this),function(err){
221-
// cb();
222-
// });
223-
//this.push(data,encoding);
224-
// cb();
225-
};
226231
Converter.prototype.processCSVLines = function(csvLines, cb) {
227232
// for (var i=0;i<csvLines.length;i++){
228233
// this.push(csvLines[i].data);
@@ -270,28 +275,11 @@ Converter.prototype.toCSVLines = function(fileLines, last) {
270275
}
271276
return lines;
272277
}
273-
Converter.prototype._line = function(line) {
274-
var lines = [];
275-
this._csvLineBuffer += line;
276-
if (this.param.maxRowLength && this._csvLineBuffer.length > this.param.maxRowLength) {
277-
this.hasError = true;
278-
this.emit("error", "row_exceed", this._csvLineBuffer);
279-
}
280-
if (!utils.isToogleQuote(this._csvLineBuffer, this.param.quote)) { //if a complete record is in buffer.push to result
281-
var data = this._csvLineBuffer;
282-
this._csvLineBuffer = '';
283-
lines.push(data);
284-
} else { //if the record in buffer is not a complete record (quote does not close). wait next line
285-
this._csvLineBuffer += this.getEol();
286-
}
287-
return lines;
288-
}
289-
Converter.prototype._flushNoFork = function(cb) {
278+
Converter.prototype._flush = function(cb) {
290279
var self = this;
291280
this.flushCb = cb;
292-
if (this._lineBuffer.length > 0) {
293-
var lines = this._line(this._lineBuffer);
294-
this.processCSVLines(lines, function() {
281+
if (this._csvLineBuffer.length > 0) {
282+
this.processData(this._csvLineBuffer,function(){
295283
this.checkAndFlush();
296284
}.bind(this));
297285
} else {
@@ -315,7 +303,7 @@ Converter.prototype.checkAndFlush = function() {
315303
this.push(eol + "]", "utf8");
316304
}
317305
this.flushCb();
318-
this.processor.releaseWorker();
306+
// this.processor.releaseWorker();
319307
this.flushCb = null;
320308
}
321309
}

0 commit comments

Comments
 (0)