@@ -12,6 +12,7 @@ var csvline=require("./csvline");
1212var fileline = require ( "./fileline" ) ;
1313var dataToCSVLine = require ( "./dataToCSVLine" ) ;
1414var linesToJson = require ( "./linesToJson" ) ;
15+ var CSVError = require ( "./CSVError" ) ;
1516function Converter ( params , options ) {
1617 Transform . call ( this , options ) ;
1718 _param = defParam ( params ) ;
@@ -22,10 +23,9 @@ function Converter(params,options) {
2223 this . pipe ( this . resultObject ) ; // it is important to have downstream for a transform otherwise it will stuck
2324 this . started = false ; //indicate if parsing has started.
2425 this . recordNum = 0 ;
25- this . lineNumber = 0 ;
26- this . runningProcess = 0 ;
26+ this . lineNumber = 0 ; //file line number
2727 this . _csvLineBuffer = "" ;
28- this . lastIndex = 0 ;
28+ this . lastIndex = 0 ; // index in result json array
2929 //this._pipe(this.lineParser).pipe(this.processor);
3030 if ( this . param . fork ) {
3131 this . param . fork = false ;
@@ -69,14 +69,14 @@ Converter.prototype.processData=function(data,cb){
6969 if ( ! params . _headers ) { //header is not inited. init header
7070 this . processHead ( data , cb ) ;
7171 } else {
72- if ( params . workerNum === 1 ) {
72+ // if (params.workerNum===1){
7373 var lines = dataToCSVLine ( data , params ) ;
7474 this . setPartialData ( lines . partial ) ;
7575 var res = linesToJson ( lines . lines , params , this . recordNum ) ;
7676 this . processResult ( res , cb ) ;
77- } else {
78- this . workerProcess ( data , cb ) ;
79- }
77+ // }else{
78+ // this.workerProcess(data,cb);
79+ // }
8080 }
8181}
8282Converter . prototype . initWorker = function ( ) {
@@ -85,16 +85,16 @@ Converter.prototype.initWorker=function(){
8585 workerMgr . initWorker ( workerNum , this . param ) ;
8686 }
8787}
88- Converter . prototype . workerProcess = function ( data , cb ) {
89- var self = this ;
90- workerMgr . sendWorker ( data , this . lastIndex , function ( length , partial ) {
91- self . setPartialData ( partial ) ;
92- self . lastIndex += length ;
93- cb ( ) ;
94- } , function ( results ) {
95- self . processResult ( results , function ( ) { } , true ) ;
96- } )
97- }
88+ // Converter.prototype.workerProcess=function(data,cb){
89+ // var self=this;
90+ // workerMgr.sendWorker(data,this.lastIndex,function(length,partial){
91+ // self.setPartialData(partial);
92+ // self.lastIndex+=length;
93+ // cb();
94+ // },function(results){
95+ // self.processResult(results,function(){},true);
96+ // })
97+ // }
9898Converter . prototype . processHead = function ( data , cb ) {
9999 var params = this . param ;
100100 if ( ! params . _headers ) { //header is not inited. init header
@@ -134,7 +134,7 @@ Converter.prototype.processResult=function(result,cb,isAsync){
134134 }
135135 }
136136 if ( isAsync ) {
137- this . flushBuffer ( cb ) ;
137+ // this.flushBuffer(cb);
138138 } else {
139139 this . lastIndex += result . length ;
140140 cb ( ) ;
@@ -158,132 +158,138 @@ Converter.prototype.emitResult=function(r){
158158 }
159159 this . recordNum = index + 1 ;
160160}
161- Converter . prototype . initNoFork = function ( ) {
162- // function onError() {
163- // var args = Array.prototype.slice.call(arguments, 0);
164- // args.unshift("error");
165- // this.hasError = true;
166- // this.emit.apply(this, args);
167- // };
168- this . _lineBuffer = "" ;
169- this . _csvLineBuffer = "" ;
170- // this.lineParser = new CSVLine(this.param);
171- // this.lineParser.on("error", onError.bind(this));
172- if ( this . param . delimiter instanceof Array || this . param . delimiter . toLowerCase ( ) === "auto" ) {
173- this . param . needCheckDelimiter = true ;
174- } else {
175- this . param . needCheckDelimiter = false ;
176- }
177- this . processor = new Processor ( this . param ) ;
178- // this.processor.on("error", onError.bind(this));
179- // var syncWorker = new Worker(this.param, true);
180- // // syncWorker.on("error",onError);
181- // this.processor.addWorker(syncWorker);
182- // if (this.param.workerNum > 1) {
183- // for (var i = 1; i < this.param.workerNum; i++) {
184- // var worker = new Worker(this.param, false);
185- // // worker.on("error",onError);
186- // this.processor.addWorker(worker);
187- // }
188- // } else if (this.param.workerNum < 1) {
189- // this.param.workerNum = 1;
190- // }
191- if ( ! this . param . constructResult ) {
192- this . resultObject . disableConstruct ( ) ;
193- }
194- // this.lineParser.pipe(this.processor);
195- var syncLock = false ;
196- // this.processor.on("record_parsed", function(resultRow, row, index) {
197- // // this.emit("record_parsed", resultRow, row, index);
198- // this.sequenceBuffer[index] = {
199- // resultRow: resultRow,
200- // row: row,
201- // index: index
202- // };
203- // //critical area
204- // if (!syncLock) {
205- // syncLock = true;
206- // this.flushBuffer();
207- // syncLock = false;
208- // }
209- // }.bind(this));
210- // this.processor.on("end_parse", function() {
211- // this.processEnd = true;
212- // this.flushBuffer();
213- // this.checkAndFlush();
214- // }.bind(this));
215- this . _transform = this . _transformNoFork ;
216- this . _flush = this . _flushNoFork ;
217- }
218- Converter . prototype . flushBuffer = function ( cb ) {
219- while ( this . sequenceBuffer [ this . recordNum ] ) {
220- var r = this . sequenceBuffer [ this . recordNum ] ;
221- this . sequenceBuffer [ this . recordNum ] = undefined ;
222- this . emitResult ( r ) ;
223- }
224- this . checkAndFlush ( ) ;
225- cb ( ) ;
226- }
161+ // Converter.prototype.initNoFork = function() {
162+ // // function onError() {
163+ // // var args = Array.prototype.slice.call(arguments, 0);
164+ // // args.unshift("error");
165+ // // this.hasError = true;
166+ // // this.emit.apply(this, args);
167+ // // };
168+ // this._lineBuffer = "";
169+ // this._csvLineBuffer = "";
170+ // // this.lineParser = new CSVLine(this.param);
171+ // // this.lineParser.on("error", onError.bind(this));
172+ // if (this.param.delimiter instanceof Array || this.param.delimiter.toLowerCase()==="auto"){
173+ // this.param.needCheckDelimiter=true;
174+ // }else{
175+ // this.param.needCheckDelimiter=false;
176+ // }
177+ // this.processor = new Processor(this.param);
178+ // // this.processor.on("error", onError.bind(this));
179+ // // var syncWorker = new Worker(this.param, true);
180+ // // // syncWorker.on("error",onError);
181+ // // this.processor.addWorker(syncWorker);
182+ // // if (this.param.workerNum > 1) {
183+ // // for (var i = 1; i < this.param.workerNum; i++) {
184+ // // var worker = new Worker(this.param, false);
185+ // // // worker.on("error",onError);
186+ // // this.processor.addWorker(worker);
187+ // // }
188+ // // } else if (this.param.workerNum < 1) {
189+ // // this.param.workerNum = 1;
190+ // // }
191+ // if (!this.param.constructResult) {
192+ // this.resultObject.disableConstruct();
193+ // }
194+ // // this.lineParser.pipe(this.processor);
195+ // var syncLock = false;
196+ // // this.processor.on("record_parsed", function(resultRow, row, index) {
197+ // // // this.emit("record_parsed", resultRow, row, index);
198+ // // this.sequenceBuffer[index] = {
199+ // // resultRow: resultRow,
200+ // // row: row,
201+ // // index: index
202+ // // };
203+ // // //critical area
204+ // // if (!syncLock) {
205+ // // syncLock = true;
206+ // // this.flushBuffer();
207+ // // syncLock = false;
208+ // // }
209+ // // }.bind(this));
210+ // // this.processor.on("end_parse", function() {
211+ // // this.processEnd = true;
212+ // // this.flushBuffer();
213+ // // this.checkAndFlush();
214+ // // }.bind(this));
215+ // this._transform = this._transformNoFork;
216+ // this._flush = this._flushNoFork;
217+ // }
218+
219+
220+ // Converter.prototype.flushBuffer = function(cb) {
221+ // while (this.sequenceBuffer[this.recordNum]) {
222+ // var r=this.sequenceBuffer[this.recordNum];
223+ // this.sequenceBuffer[this.recordNum] = undefined;
224+ // this.emitResult(r);
225+ // }
226+ // this.checkAndFlush();
227+ // cb();
228+ // }
227229Converter . prototype . preProcessRaw = function ( data , cb ) {
228230 cb ( data ) ;
229231}
230232
231- Converter . prototype . processCSVLines = function ( csvLines , cb ) {
232- // for (var i=0;i<csvLines.length;i++){
233- // this.push(csvLines[i].data);
234- // }
235- // cb();
236- // return;
237- this . runningProcess ++ ;
238- this . processor . rows ( csvLines , function ( err , resArr ) {
239- this . runningProcess -- ;
240- if ( err ) {
241- this . emit ( "error" , "row_process" , err ) ;
242- } else {
243- for ( var i = 0 ; i < resArr . length ; i ++ ) {
244- this . sequenceBuffer [ resArr [ i ] . index ] = {
245- resultJSONStr : resArr [ i ] . jsonRaw ,
246- row : resArr [ i ] . row ,
247- index : resArr [ i ] . index
248- }
249- }
250- this . flushBuffer ( ) ;
251- }
252- } . bind ( this ) , cb ) ;
253- }
254- Converter . prototype . toLines = function ( data ) {
255- data = this . _lineBuffer + data ;
256- var eol = this . getEol ( data ) ;
257- return data . split ( eol ) ;
258- }
233+ // Converter.prototype.processCSVLines = function(csvLines, cb) {
234+ // // for (var i=0;i<csvLines.length;i++){
235+ // // this.push(csvLines[i].data);
236+ // // }
237+ // // cb();
238+ // // return;
239+ // this.runningProcess++;
240+ // this.processor.rows(csvLines, function(err, resArr) {
241+ // this.runningProcess--;
242+ // if (err) {
243+ // this.emit("error","row_process",err);
244+ // } else {
245+ // for (var i = 0; i < resArr.length; i++) {
246+ // this.sequenceBuffer[resArr[i].index] = {
247+ // resultJSONStr: resArr[i].jsonRaw,
248+ // row: resArr[i].row,
249+ // index: resArr[i].index
250+ // }
251+ // }
252+ // this.flushBuffer();
253+ // }
254+ // }.bind(this), cb);
255+ // }
256+ // Converter.prototype.toLines = function(data) {
257+ // data = this._lineBuffer + data;
258+ // var eol = this.getEol(data);
259+ // return data.split(eol);
260+ // }
259261Converter . prototype . preProcessLine = function ( line , lineNumber ) {
260262 return line ;
261263}
262- Converter . prototype . toCSVLines = function ( fileLines , last ) {
263- var recordLine = "" ;
264- var lines = [ ] ;
265- while ( fileLines . length > 1 ) {
266- this . lineNumber ++ ;
267- var line = this . preProcessLine ( fileLines . shift ( ) , this . lineNumber ) ;
268- if ( line && line . length > 0 ) {
269- lines = lines . concat ( this . _line ( line ) ) ;
270- }
271- }
272- this . _lineBuffer = fileLines [ 0 ] ;
273- if ( last && this . _csvLineBuffer . length > 0 ) {
274- this . emit ( "error" , "unclosed_quote" , this . _csvLineBuffer )
275- }
276- return lines ;
277- }
264+ // Converter.prototype.toCSVLines = function(fileLines, last) {
265+ // var recordLine = "";
266+ // var lines = [];
267+ // while (fileLines.length > 1) {
268+ // this.lineNumber++;
269+ // var line = this.preProcessLine(fileLines.shift(),this.lineNumber);
270+ // if (line && line.length>0){
271+ // lines = lines.concat(this._line(line));
272+ // }
273+ // }
274+ // this._lineBuffer = fileLines[0];
275+ // if (last && this._csvLineBuffer.length > 0) {
276+ // this.emit("error", "unclosed_quote", this._csvLineBuffer)
277+ // }
278+ // return lines;
279+ // }
278280Converter . prototype . _flush = function ( cb ) {
279281 var self = this ;
280- this . flushCb = cb ;
281282 if ( this . _csvLineBuffer . length > 0 ) {
283+ if ( this . _csvLineBuffer [ this . _csvLineBuffer . length - 1 ] != this . getEol ( ) ) {
284+ this . _csvLineBuffer += this . getEol ( ) ;
285+ }
282286 this . processData ( this . _csvLineBuffer , function ( ) {
283287 this . checkAndFlush ( ) ;
288+ cb ( ) ;
284289 } . bind ( this ) ) ;
285290 } else {
286291 this . checkAndFlush ( ) ;
292+ cb ( ) ;
287293 }
288294 return ;
289295} ;
@@ -295,17 +301,12 @@ Converter.prototype._flush = function(cb) {
295301// this.child.on("exit", cb);
296302// }
297303Converter . prototype . checkAndFlush = function ( ) {
298- if ( this . runningProcess === 0 && this . flushCb ) {
299304 if ( this . _csvLineBuffer . length !== 0 ) {
300- this . emit ( "error" , " unclosed_quote" , this . _csvLineBuffer ) ;
305+ this . emit ( "error" , CSVError . unclosed_quote ( this . lastIndex , this . _csvLineBuffer ) , this . _csvLineBuffer ) ;
301306 }
302307 if ( this . param . toArrayString ) {
303308 this . push ( eol + "]" , "utf8" ) ;
304309 }
305- this . flushCb ( ) ;
306- // this.processor.releaseWorker();
307- this . flushCb = null ;
308- }
309310}
310311Converter . prototype . getEol = function ( data ) {
311312 if ( ! this . param . eol && data ) {
@@ -373,7 +374,7 @@ Converter.prototype.wrapCallback = function(cb, clean) {
373374 } . bind ( this ) ) ;
374375 this . once ( "error" , function ( err ) {
375376 this . hasError = true ;
376- cb ( Array . prototype . join . call ( arguments , ", " ) ) ;
377+ cb ( err ) ;
377378 clean ( ) ;
378379 } . bind ( this ) ) ;
379380}
0 commit comments