@@ -13,6 +13,7 @@ var fileline=require("./fileline");
1313var dataToCSVLine = require ( "./dataToCSVLine" ) ;
1414var linesToJson = require ( "./linesToJson" ) ;
1515var CSVError = require ( "./CSVError" ) ;
16+ var workerMgr = require ( "./workerMgr" ) ;
1617function Converter ( params , options ) {
1718 Transform . call ( this , options ) ;
1819 _param = defParam ( params ) ;
@@ -27,19 +28,18 @@ function Converter(params,options) {
2728 this . _csvLineBuffer = "" ;
2829 this . lastIndex = 0 ; // index in result json array
2930 //this._pipe(this.lineParser).pipe(this.processor);
30- if ( this . param . fork ) {
31- this . param . fork = false ;
32- this . param . workerNum = 2 ;
33- }
3431 // this.initNoFork();
32+ if ( this . param . forked ) {
33+ this . param . forked = false ;
34+ this . workerNum = 2 ;
35+ }
3536 this . flushCb = null ;
3637 this . processEnd = false ;
3738 this . sequenceBuffer = [ ] ;
38- this . on ( "end" , function ( ) {
39- var finalResult = this . param . constructResult ? this . resultObject . getBuffer ( ) : { } ;
40- this . emit ( "end_parsed" , finalResult ) ;
41- } . bind ( this ) ) ;
39+
4240 this . on ( "error" , function ( ) { } ) ;
41+ this . initWorker ( ) ;
42+ this . initEnd ( ) ;
4343 return this ;
4444}
4545util . inherits ( Converter , Transform ) ;
@@ -58,6 +58,26 @@ Converter.prototype._transform = function(data, encoding, cb) {
5858 }
5959 } )
6060} ;
61+ Converter . prototype . initEnd = function ( ) {
62+ var self = this ;
63+ function endHandler ( ) {
64+ var finalResult = self . param . constructResult ? self . resultObject . getBuffer ( ) : { } ;
65+ self . emit ( "end_parsed" , finalResult ) ;
66+ workerMgr . destroyWorker ( ) ;
67+ }
68+ this . on ( "end" , endHandler ) ;
69+ // if (this.param.workerNum<=1){
70+
71+ // }else{
72+ // workerMgr.drain=function(){
73+ // console.log("flushed",self.flushed)
74+ // if (self.flushed){
75+ // endHandler();
76+ // }
77+
78+ // }
79+ // }
80+ }
6181Converter . prototype . prepareData = function ( data ) {
6282 return this . _csvLineBuffer + data ;
6383}
@@ -69,32 +89,32 @@ Converter.prototype.processData=function(data,cb){
6989 if ( ! params . _headers ) { //header is not inited. init header
7090 this . processHead ( data , cb ) ;
7191 } else {
72- // if (params.workerNum== =1){
92+ if ( params . workerNum < =1 ) {
7393 var lines = dataToCSVLine ( data , params ) ;
7494 this . setPartialData ( lines . partial ) ;
7595 var res = linesToJson ( lines . lines , params , this . recordNum ) ;
7696 this . processResult ( res , cb ) ;
77- // }else{
78- // this.workerProcess(data,cb);
79- // }
97+ } else {
98+ this . workerProcess ( data , cb ) ;
99+ }
80100 }
81101}
82102Converter . prototype . initWorker = function ( ) {
83103 var workerNum = this . param . workerNum - 1 ;
84- if ( workerNum . length > 0 ) {
104+ if ( workerNum > 0 ) {
85105 workerMgr . initWorker ( workerNum , this . param ) ;
86106 }
87107}
88- // Converter.prototype.workerProcess=function(data,cb){
89- // var self=this;
90- // workerMgr.sendWorker(data,this.lastIndex,function(length,partial){
91- // self.setPartialData(partial);
92- // self.lastIndex+=length;
93- // cb();
94- // },function(results){
95- // self.processResult(results,function(){},true);
96- // })
97- // }
108+ Converter . prototype . workerProcess = function ( data , cb ) {
109+ var self = this ;
110+ workerMgr . sendWorker ( data , this . lastIndex , function ( length , partial ) {
111+ self . setPartialData ( partial ) ;
112+ self . lastIndex += length ;
113+ cb ( ) ;
114+ } , function ( results ) {
115+ self . processResult ( results , function ( ) { } , true ) ;
116+ } )
117+ }
98118Converter . prototype . processHead = function ( data , cb ) {
99119 var params = this . param ;
100120 if ( ! params . _headers ) { //header is not inited. init header
@@ -114,27 +134,35 @@ Converter.prototype.processHead=function(data,cb){
114134 params . _headers = headerRow ;
115135 }
116136 }
137+ if ( this . param . workerNum > 1 ) {
138+ workerMgr . setParams ( params ) ;
139+ }
117140 var res = linesToJson ( lines . lines , params , 0 ) ;
141+
118142 this . processResult ( res , cb ) ;
119143 } else {
120144 cb ( ) ;
121145 }
122146}
123147Converter . prototype . processResult = function ( result , cb , isAsync ) {
148+
124149 for ( var i = 0 ; i < result . length ; i ++ ) {
125150 var r = result [ i ] ;
126151 if ( r . err ) {
127152 this . emit ( "error" , r . err ) ;
128153 } else {
129154 if ( isAsync ) {
130155 this . sequenceBuffer [ r . index ] = r ;
156+
131157 } else {
132158 this . emitResult ( r ) ;
133159 }
134160 }
135161 }
162+ // this.lastIndex+=result.length;
163+ // cb();
136164 if ( isAsync ) {
137- // this.flushBuffer(cb);
165+ this . flushBuffer ( cb ) ;
138166 } else {
139167 this . lastIndex += result . length ;
140168 cb ( ) ;
@@ -217,15 +245,15 @@ Converter.prototype.emitResult=function(r){
217245// }
218246
219247
220- // Converter.prototype.flushBuffer = function(cb) {
221- // while (this.sequenceBuffer[this.recordNum]) {
222- // var r=this.sequenceBuffer[this.recordNum];
223- // this.sequenceBuffer[this.recordNum] = undefined;
224- // this.emitResult(r);
225- // }
226- // this.checkAndFlush();
227- // cb();
228- // }
248+ Converter . prototype . flushBuffer = function ( cb ) {
249+ while ( this . sequenceBuffer [ this . recordNum ] ) {
250+ var r = this . sequenceBuffer [ this . recordNum ] ;
251+ this . sequenceBuffer [ this . recordNum ] = undefined ;
252+ this . emitResult ( r ) ;
253+ }
254+ // this.checkAndFlush();
255+ cb ( ) ;
256+ }
229257Converter . prototype . preProcessRaw = function ( data , cb ) {
230258 cb ( data ) ;
231259}
@@ -279,17 +307,16 @@ Converter.prototype.preProcessLine=function(line,lineNumber){
279307// }
280308Converter . prototype . _flush = function ( cb ) {
281309 var self = this ;
310+ this . flushCb = cb ;
282311 if ( this . _csvLineBuffer . length > 0 ) {
283312 if ( this . _csvLineBuffer [ this . _csvLineBuffer . length - 1 ] != this . getEol ( ) ) {
284313 this . _csvLineBuffer += this . getEol ( ) ;
285314 }
286315 this . processData ( this . _csvLineBuffer , function ( ) {
287316 this . checkAndFlush ( ) ;
288- cb ( ) ;
289317 } . bind ( this ) ) ;
290318 } else {
291319 this . checkAndFlush ( ) ;
292- cb ( ) ;
293320 }
294321 return ;
295322} ;
@@ -307,6 +334,13 @@ Converter.prototype.checkAndFlush = function() {
307334 if ( this . param . toArrayString ) {
308335 this . push ( eol + "]" , "utf8" ) ;
309336 }
337+ if ( workerMgr . isRunning ( ) ) {
338+ workerMgr . drain = function ( ) {
339+ this . flushCb ( ) ;
340+ } . bind ( this ) ;
341+ } else {
342+ this . flushCb ( ) ;
343+ }
310344}
311345Converter . prototype . getEol = function ( data ) {
312346 if ( ! this . param . eol && data ) {
0 commit comments