Skip to content

Commit 880f795

Browse files
committed
optimise performance 1
1 parent 5116fc1 commit 880f795

12 files changed

Lines changed: 130 additions & 196 deletions

libs/core/CSVError.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,24 @@ function CSVError(err,index,extra){
44
Error.call(this,"");
55
this.err=err;
66
this.line=index;
7+
this.extra=extra;
78
this.message="Error: "+err+". JSON Line number: "+index+ (extra?" near: "+extra:"");
89
this.name="CSV Error";
910
}
1011
util.inherits(CSVError,Error);
1112

13+
CSVError.prototype.toString=function(){
14+
return JSON.stringify([this.err,this.line,this.extra]);
15+
}
16+
1217
CSVError.column_mismatched=function(index,extra){
1318
return new CSVError("column_mismatched",index,extra);
1419
}
1520

1621
CSVError.unclosed_quote=function(index,extra){
1722
return new CSVError("unclosed_quote",index,extra);
23+
}
24+
25+
CSVError.fromArray=function(arr){
26+
return new CSVError(arr[0],arr[1],arr[2]);
1827
}

libs/core/Converter.js

Lines changed: 45 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,35 @@ function Converter(params,options) {
3737
this.processEnd = false;
3838
this.sequenceBuffer = [];
3939
this._needJson=null;
40-
this.on("data", function() {});
40+
this._needEmitResult=null;
41+
this._needEmitFinalResult=null;
42+
this._needPush=null;
43+
this.finalResult=[];
44+
// this.on("data", function() {});
4145
this.on("error", function() {});
4246
this.initWorker();
43-
this.initEnd();
4447
return this;
4548
}
4649
util.inherits(Converter, Transform);
4750
Converter.prototype._transform = function(data, encoding, cb) {
51+
if (this._needEmitFinalResult === null){
52+
this._needEmitFinalResult=this.listeners("end_parsed").length > 0
53+
}
54+
if (this._needEmitResult===null){
55+
this._needEmitResult=this.listeners("record_parsed").length>0
56+
}
57+
if (this._needJson === null){
58+
this._needJson=this._needEmitFinalResult || this._needEmitResult || this.transform || this._options.objectMode;
59+
}
60+
if (this._needPush === null){
61+
this._needPush = this.listeners("data").length > 0 || this.listeners("readable").length>0
62+
// this._needPush=false;
63+
}
4864
if (this.param.toArrayString && this.started === false) {
4965
this.started = true;
50-
this.push("[" + eol, "utf8");
66+
if (this._needPush){
67+
this.push("[" + eol, "utf8");
68+
}
5169
}
5270
data=data.toString("utf8");
5371
var self=this;
@@ -59,28 +77,6 @@ Converter.prototype._transform = function(data, encoding, cb) {
5977
}
6078
})
6179
};
62-
Converter.prototype.initEnd=function(){
63-
var self=this;
64-
function endHandler(){
65-
var finalResult = self.param.constructResult ? self.resultObject.getBuffer() : {};
66-
self.emit("end_parsed", finalResult);
67-
if (self.workerMgr){
68-
self.workerMgr.destroyWorker();
69-
}
70-
}
71-
this.on("end", endHandler);
72-
// if (this.param.workerNum<=1){
73-
74-
// }else{
75-
// workerMgr.drain=function(){
76-
// console.log("flushed",self.flushed)
77-
// if (self.flushed){
78-
// endHandler();
79-
// }
80-
81-
// }
82-
// }
83-
}
8480
Converter.prototype.prepareData=function(data){
8581
return this._csvLineBuffer+data;
8682
}
@@ -119,8 +115,9 @@ Converter.prototype.initWorker=function(){
119115
Converter.prototype.workerProcess=function(data,cb){
120116
var self=this;
121117
var line=fileline(data,this.param)
118+
var eol=this.getEol(data)
122119
this.setPartialData(line.partial)
123-
this.workerMgr.sendWorker(line.lines.join("\n"),this.lastIndex,cb,function(results,lastIndex){
120+
this.workerMgr.sendWorker(line.lines.join(eol)+eol,this.lastIndex,cb,function(results,lastIndex){
124121
var cur=self.sequenceBuffer[0];
125122
if (cur.idx === lastIndex){
126123
cur.result=results;
@@ -180,7 +177,7 @@ Converter.prototype.processHead=function(data,cb){
180177
}
181178
}
182179
Converter.prototype.processResult=function(result){
183-
180+
184181
for (var i=0;i<result.length;i++){
185182
var r=result[i];
186183
if (r.err){
@@ -193,9 +190,6 @@ Converter.prototype.processResult=function(result){
193190
// cb();
194191
}
195192
Converter.prototype.emitResult=function(r){
196-
if (this._needJson === null){
197-
this._needJson=this.listeners("record_parsed").length>0 || this.transform || this._options.objectMode;
198-
}
199193
var index=r.index;
200194
var row=r.row;
201195
var result=r.json;
@@ -212,146 +206,49 @@ Converter.prototype.emitResult=function(r){
212206
row=JSON.parse(row)
213207
}
214208
}
209+
if (this.param.constructResult && this._needEmitFinalResult){
210+
this.finalResult.push(resultJson)
211+
}
215212
if (this.transform && typeof this.transform==="function"){
216213
this.transform(resultJson,row,index);
217214
}
218-
if (this.listeners("record_parsed").length>0){
215+
if (this._needEmitResult){
219216
this.emit("record_parsed", resultJson, row, index);
220217
}
221-
if (this.param.toArrayString && index > 0) {
218+
if (this.param.toArrayString && index > 0 && this._needPush) {
222219
this.push("," + eol);
223220
}
224221
if (this._options && this._options.objectMode){
225222
this.push(resultJson);
226223
}else{
227-
if (resultStr===null){
228-
resultStr=JSON.stringify(resultJson)
224+
if (this._needPush){
225+
if (resultStr===null){
226+
resultStr=JSON.stringify(resultJson)
227+
}
228+
this.push(!this.param.toArrayString?resultStr+eol:resultStr, "utf8");
229229
}
230-
this.push(resultStr, "utf8");
231230
}
232231
}
233-
// Converter.prototype.initNoFork = function() {
234-
// // function onError() {
235-
// // var args = Array.prototype.slice.call(arguments, 0);
236-
// // args.unshift("error");
237-
// // this.hasError = true;
238-
// // this.emit.apply(this, args);
239-
// // };
240-
// this._lineBuffer = "";
241-
// this._csvLineBuffer = "";
242-
// // this.lineParser = new CSVLine(this.param);
243-
// // this.lineParser.on("error", onError.bind(this));
244-
// if (this.param.delimiter instanceof Array || this.param.delimiter.toLowerCase()==="auto"){
245-
// this.param.needCheckDelimiter=true;
246-
// }else{
247-
// this.param.needCheckDelimiter=false;
248-
// }
249-
// this.processor = new Processor(this.param);
250-
// // this.processor.on("error", onError.bind(this));
251-
// // var syncWorker = new Worker(this.param, true);
252-
// // // syncWorker.on("error",onError);
253-
// // this.processor.addWorker(syncWorker);
254-
// // if (this.param.workerNum > 1) {
255-
// // for (var i = 1; i < this.param.workerNum; i++) {
256-
// // var worker = new Worker(this.param, false);
257-
// // // worker.on("error",onError);
258-
// // this.processor.addWorker(worker);
259-
// // }
260-
// // } else if (this.param.workerNum < 1) {
261-
// // this.param.workerNum = 1;
262-
// // }
263-
// if (!this.param.constructResult) {
264-
// this.resultObject.disableConstruct();
265-
// }
266-
// // this.lineParser.pipe(this.processor);
267-
// var syncLock = false;
268-
// // this.processor.on("record_parsed", function(resultRow, row, index) {
269-
// // // this.emit("record_parsed", resultRow, row, index);
270-
// // this.sequenceBuffer[index] = {
271-
// // resultRow: resultRow,
272-
// // row: row,
273-
// // index: index
274-
// // };
275-
// // //critical area
276-
// // if (!syncLock) {
277-
// // syncLock = true;
278-
// // this.flushBuffer();
279-
// // syncLock = false;
280-
// // }
281-
// // }.bind(this));
282-
// // this.processor.on("end_parse", function() {
283-
// // this.processEnd = true;
284-
// // this.flushBuffer();
285-
// // this.checkAndFlush();
286-
// // }.bind(this));
287-
// this._transform = this._transformNoFork;
288-
// this._flush = this._flushNoFork;
289-
// }
290232

291-
292-
Converter.prototype.flushBuffer = function(cb) {
293-
while (this.sequenceBuffer[this.recordNum]) {
294-
var r=this.sequenceBuffer[this.recordNum];
295-
this.sequenceBuffer[this.recordNum] = undefined;
296-
this.emitResult(r);
297-
}
298-
// this.checkAndFlush();
299-
cb();
300-
}
301233
Converter.prototype.preProcessRaw=function(data,cb){
302234
cb(data);
303235
}
304236

305-
// Converter.prototype.processCSVLines = function(csvLines, cb) {
306-
// // for (var i=0;i<csvLines.length;i++){
307-
// // this.push(csvLines[i].data);
308-
// // }
309-
// // cb();
310-
// // return;
311-
// this.runningProcess++;
312-
// this.processor.rows(csvLines, function(err, resArr) {
313-
// this.runningProcess--;
314-
// if (err) {
315-
// this.emit("error","row_process",err);
316-
// } else {
317-
// for (var i = 0; i < resArr.length; i++) {
318-
// this.sequenceBuffer[resArr[i].index] = {
319-
// resultJSONStr: resArr[i].jsonRaw,
320-
// row: resArr[i].row,
321-
// index: resArr[i].index
322-
// }
323-
// }
324-
// this.flushBuffer();
325-
// }
326-
// }.bind(this), cb);
327-
// }
328-
// Converter.prototype.toLines = function(data) {
329-
// data = this._lineBuffer + data;
330-
// var eol = this.getEol(data);
331-
// return data.split(eol);
332-
// }
333237
Converter.prototype.preProcessLine=function(line,lineNumber){
334238
return line;
335239
}
336-
// Converter.prototype.toCSVLines = function(fileLines, last) {
337-
// var recordLine = "";
338-
// var lines = [];
339-
// while (fileLines.length > 1) {
340-
// this.lineNumber++;
341-
// var line = this.preProcessLine(fileLines.shift(),this.lineNumber);
342-
// if (line && line.length>0){
343-
// lines = lines.concat(this._line(line));
344-
// }
345-
// }
346-
// this._lineBuffer = fileLines[0];
347-
// if (last && this._csvLineBuffer.length > 0) {
348-
// this.emit("error", "unclosed_quote", this._csvLineBuffer)
349-
// }
350-
// return lines;
351-
// }
352240
Converter.prototype._flush = function(cb) {
353241
var self = this;
354-
this.flushCb=cb;
242+
this.flushCb=function(){
243+
self.emit("end_parsed",self.finalResult);
244+
if (self.workerMgr){
245+
self.workerMgr.destroyWorker();
246+
}
247+
cb()
248+
if (!self._needPush){
249+
self.emit("end")
250+
}
251+
};
355252
if (this._csvLineBuffer.length > 0) {
356253
if (this._csvLineBuffer[this._csvLineBuffer.length-1] != this.getEol()){
357254
this._csvLineBuffer+=this.getEol();
@@ -375,7 +272,7 @@ Converter.prototype.checkAndFlush = function() {
375272
if (this._csvLineBuffer.length !== 0) {
376273
this.emit("error", CSVError.unclosed_quote(this.recordNum,this._csvLineBuffer), this._csvLineBuffer);
377274
}
378-
if (this.param.toArrayString) {
275+
if (this.param.toArrayString && this._needPush) {
379276
this.push(eol + "]", "utf8");
380277
}
381278
if (this.workerMgr && this.workerMgr.isRunning()){

libs/core/defParam.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ module.exports=function(params){
77
checkType: true, //whether check column type
88
toArrayString: false, //stream down stringified json array instead of string of json. (useful if downstream is file writer etc)
99
ignoreEmpty: false, //Ignore empty value while parsing. if a value of the column is empty, it will be skipped parsing.
10-
workerNum: 1, //number of parallel workers. If multi-core CPU available, increase the number will get better performance for large csv data.
10+
workerNum: getEnv("CSV_WORKER",1), //number of parallel workers. If multi-core CPU available, increase the number will get better performance for large csv data.
1111
fork: false, //use another CPU core to convert the csv stream
1212
noheader: false, //indicate if first line of CSV file is header or not.
1313
headers: null, //an array of header strings. If noheader is false and headers is array, csv header will be ignored.
@@ -22,4 +22,13 @@ module.exports=function(params){
2222
}
2323
};
2424
return _param;
25+
}
26+
27+
28+
function getEnv(key,def){
29+
if (process.env[key]){
30+
return process.env[key];
31+
}else{
32+
return def;
33+
}
2534
}

libs/core/index.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
module.exports=constructor;
12
module.exports.Converter = require("./Converter.js");
23
module.exports.Parser = require("./parser.js");
34
module.exports.parserMgr = require("./parserMgr.js");
5+
6+
7+
function constructor(param,options){
8+
return new module.exports.Converter(param,options)
9+
}

0 commit comments

Comments
 (0)