@@ -20,8 +20,8 @@ function Converter(params,options) {
2020 this . _options = options || { } ;
2121 this . param = _param ;
2222 this . param . _options = this . _options ;
23- this . resultObject = new Result ( this ) ;
24- this . pipe ( this . resultObject ) ; // it is important to have downstream for a transform otherwise it will stuck
23+ // this.resultObject = new Result(this);
24+ // this.pipe(this.resultObject); // it is important to have downstream for a transform otherwise it will stuck
2525 this . started = false ; //indicate if parsing has started.
2626 this . recordNum = 0 ;
2727 this . lineNumber = 0 ; //file line number
@@ -36,7 +36,8 @@ function Converter(params,options) {
3636 this . flushCb = null ;
3737 this . processEnd = false ;
3838 this . sequenceBuffer = [ ] ;
39-
39+ this . _needJson = null ;
40+ this . on ( "data" , function ( ) { } ) ;
4041 this . on ( "error" , function ( ) { } ) ;
4142 this . initWorker ( ) ;
4243 this . initEnd ( ) ;
@@ -63,7 +64,9 @@ Converter.prototype.initEnd=function(){
6364 function endHandler ( ) {
6465 var finalResult = self . param . constructResult ? self . resultObject . getBuffer ( ) : { } ;
6566 self . emit ( "end_parsed" , finalResult ) ;
66- workerMgr . destroyWorker ( ) ;
67+ if ( self . workerMgr ) {
68+ self . workerMgr . destroyWorker ( ) ;
69+ }
6770 }
6871 this . on ( "end" , endHandler ) ;
6972 // if (this.param.workerNum<=1){
@@ -92,8 +95,11 @@ Converter.prototype.processData=function(data,cb){
9295 if ( params . workerNum <= 1 ) {
9396 var lines = dataToCSVLine ( data , params ) ;
9497 this . setPartialData ( lines . partial ) ;
95- var res = linesToJson ( lines . lines , params , this . recordNum ) ;
96- this . processResult ( res , cb ) ;
98+ var jsonArr = linesToJson ( lines . lines , params , this . recordNum ) ;
99+ this . processResult ( jsonArr )
100+ this . lastIndex += jsonArr . length ;
101+ this . recordNum += jsonArr . length ;
102+ cb ( ) ;
97103 } else {
98104 this . workerProcess ( data , cb ) ;
99105 }
@@ -102,18 +108,45 @@ Converter.prototype.processData=function(data,cb){
102108Converter . prototype . initWorker = function ( ) {
103109 var workerNum = this . param . workerNum - 1 ;
104110 if ( workerNum > 0 ) {
105- workerMgr . initWorker ( workerNum , this . param ) ;
111+ this . workerMgr = workerMgr ( ) ;
112+ this . workerMgr . initWorker ( workerNum , this . param ) ;
106113 }
107114}
115+ /**
116+ * workerpRocess does not support embeded multiple lines.
117+ */
118+
108119Converter . prototype . workerProcess = function ( data , cb ) {
109120 var self = this ;
110- workerMgr . sendWorker ( data , this . lastIndex , function ( length , partial ) {
111- self . setPartialData ( partial ) ;
112- self . lastIndex += length ;
113- cb ( ) ;
114- } , function ( results ) {
115- self . processResult ( results , function ( ) { } , true ) ;
116- } )
121+ var line = fileline ( data , this . param )
122+ this . setPartialData ( line . partial )
123+ this . workerMgr . sendWorker ( line . lines . join ( "\n" ) , this . lastIndex , cb , function ( results , lastIndex ) {
124+ var cur = self . sequenceBuffer [ 0 ] ;
125+ if ( cur . idx === lastIndex ) {
126+ cur . result = results ;
127+ var records = [ ] ;
128+ while ( self . sequenceBuffer [ 0 ] && self . sequenceBuffer [ 0 ] . result ) {
129+ var buf = self . sequenceBuffer . shift ( ) ;
130+ records = records . concat ( buf . result )
131+ }
132+ self . processResult ( records )
133+ self . recordNum += records . length ;
134+ } else {
135+ for ( var i = 0 ; i < self . sequenceBuffer . length ; i ++ ) {
136+ var buf = self . sequenceBuffer [ i ] ;
137+ if ( buf . idx === lastIndex ) {
138+ buf . result = results ;
139+ break ;
140+ }
141+ }
142+ }
143+ // self.processResult(JSON.parse(results),function(){},true);
144+ } )
145+ this . sequenceBuffer . push ( {
146+ idx :this . lastIndex ,
147+ result :null
148+ } ) ;
149+ this . lastIndex += line . lines . length ;
117150}
118151Converter . prototype . processHead = function ( data , cb ) {
119152 var params = this . param ;
@@ -135,56 +168,67 @@ Converter.prototype.processHead=function(data,cb){
135168 }
136169 }
137170 if ( this . param . workerNum > 1 ) {
138- workerMgr . setParams ( params ) ;
171+ this . workerMgr . setParams ( params ) ;
139172 }
140173 var res = linesToJson ( lines . lines , params , 0 ) ;
141-
142- this . processResult ( res , cb ) ;
174+ this . processResult ( res ) ;
175+ this . lastIndex += res . length ;
176+ this . recordNum += res . length ;
177+ cb ( ) ;
143178 } else {
144179 cb ( ) ;
145180 }
146181}
147- Converter . prototype . processResult = function ( result , cb , isAsync ) {
182+ Converter . prototype . processResult = function ( result ) {
148183
149184 for ( var i = 0 ; i < result . length ; i ++ ) {
150185 var r = result [ i ] ;
151186 if ( r . err ) {
152187 this . emit ( "error" , r . err ) ;
153188 } else {
154- if ( isAsync ) {
155- this . sequenceBuffer [ r . index ] = r ;
156-
157- } else {
158- this . emitResult ( r ) ;
159- }
189+ this . emitResult ( r ) ;
160190 }
161191 }
162192 // this.lastIndex+=result.length;
163193 // cb();
164- if ( isAsync ) {
165- this . flushBuffer ( cb ) ;
166- } else {
167- this . lastIndex += result . length ;
168- cb ( ) ;
169- }
170194}
171195Converter . prototype . emitResult = function ( r ) {
196+ if ( this . _needJson === null ) {
197+ this . _needJson = this . listeners ( "record_parsed" ) . length > 0 || this . transform || this . _options . objectMode ;
198+ }
172199 var index = r . index ;
173200 var row = r . row ;
174- var resultRow = r . json ;
201+ var result = r . json ;
202+ var resultJson = null ;
203+ var resultStr = null ;
204+ if ( typeof result === "string" ) {
205+ resultStr = result ;
206+ } else {
207+ resultJson = result ;
208+ }
209+ if ( resultJson === null && this . _needJson ) {
210+ resultJson = JSON . parse ( resultStr )
211+ if ( typeof row === "string" ) {
212+ row = JSON . parse ( row )
213+ }
214+ }
175215 if ( this . transform && typeof this . transform === "function" ) {
176- this . transform ( resultRow , row , index ) ;
216+ this . transform ( resultJson , row , index ) ;
217+ }
218+ if ( this . listeners ( "record_parsed" ) . length > 0 ) {
219+ this . emit ( "record_parsed" , resultJson , row , index ) ;
177220 }
178- this . emit ( "record_parsed" , resultRow , row , index ) ;
179221 if ( this . param . toArrayString && index > 0 ) {
180222 this . push ( "," + eol ) ;
181223 }
182224 if ( this . _options && this . _options . objectMode ) {
183- this . push ( resultRow ) ;
225+ this . push ( resultJson ) ;
184226 } else {
185- this . push ( JSON . stringify ( resultRow ) , "utf8" ) ;
227+ if ( resultStr === null ) {
228+ resultStr = JSON . stringify ( resultJson )
229+ }
230+ this . push ( resultStr , "utf8" ) ;
186231 }
187- this . recordNum = index + 1 ;
188232}
189233// Converter.prototype.initNoFork = function() {
190234// // function onError() {
@@ -329,13 +373,13 @@ Converter.prototype._flush = function(cb) {
329373// }
330374Converter . prototype . checkAndFlush = function ( ) {
331375 if ( this . _csvLineBuffer . length !== 0 ) {
332- this . emit ( "error" , CSVError . unclosed_quote ( this . lastIndex , this . _csvLineBuffer ) , this . _csvLineBuffer ) ;
376+ this . emit ( "error" , CSVError . unclosed_quote ( this . recordNum , this . _csvLineBuffer ) , this . _csvLineBuffer ) ;
333377 }
334378 if ( this . param . toArrayString ) {
335379 this . push ( eol + "]" , "utf8" ) ;
336380 }
337- if ( workerMgr . isRunning ( ) ) {
338- workerMgr . drain = function ( ) {
381+ if ( this . workerMgr && this . workerMgr . isRunning ( ) ) {
382+ this . workerMgr . drain = function ( ) {
339383 this . flushCb ( ) ;
340384 } . bind ( this ) ;
341385 } else {
0 commit comments