@@ -18,8 +18,8 @@ var Worker = require("./Worker.js");
1818var utils = require ( "./utils.js" ) ;
1919var async = require ( "async" ) ;
2020
21- function Converter ( params ) {
22- Transform . call ( this ) ; //TODO what does this do? -->This calls the constructor of Transform and initialise anything the Transform needs.(like var initialisation)
21+ function Converter ( params , options ) {
22+ Transform . call ( this , options ) ;
2323 var _param = {
2424 constructResult : true , //set to false to not construct result in memory. suitable for big csv data
2525 delimiter : ',' , // change the delimiter of csv columns. It is able to use an array to specify potencial delimiters. e.g. [",","|",";"]
@@ -46,11 +46,14 @@ function Converter(params) {
4646 console . warn ( "Parameter should be a JSON object like {'constructResult':false}" ) ;
4747 _param . constructResult = params ;
4848 }
49+ this . _options = options || { } ;
4950 this . param = _param ;
51+ this . param . _options = this . _options ;
5052 this . resultObject = new Result ( this ) ;
5153 this . pipe ( this . resultObject ) ; // it is important to have downstream for a transform otherwise it will stuck
5254 this . started = false ;
5355 this . recordNum = 0 ;
56+ this . lineNumber = 0 ;
5457 this . runningProcess = 0 ;
5558 //this._pipe(this.lineParser).pipe(this.processor);
5659 if ( this . param . fork ) {
@@ -192,21 +195,34 @@ Converter.prototype.flushBuffer = function() {
192195 if ( this . param . toArrayString && this . recordNum > 0 ) {
193196 this . push ( "," + eol ) ;
194197 }
195- this . push ( resultJSONStr , "utf8" ) ;
198+ if ( this . _options && this . _options . objectMode ) {
199+ this . push ( resultRow ) ;
200+ } else {
201+ this . push ( resultJSONStr , "utf8" ) ;
202+ }
196203 this . recordNum ++ ;
197204 }
198205 this . checkAndFlush ( ) ;
199206}
200- var size = 0 ;
207+ Converter . prototype . preProcessRaw = function ( data , cb ) {
208+ cb ( data ) ;
209+ }
201210
202211Converter . prototype . _transformNoFork = function ( data , encoding , cb ) {
203- size += data . length ;
204212 if ( this . param . toArrayString && this . started === false ) {
205213 this . started = true ;
206214 this . push ( "[" + eol , "utf8" ) ;
207215 }
208- var lines = this . toCSVLines ( this . toLines ( data , encoding ) ) ; //lines of csv
209- this . processCSVLines ( lines , cb ) ;
216+ data = data . toString ( "utf8" ) ;
217+ var self = this ;
218+ this . preProcessRaw ( data , function ( d ) {
219+ if ( d && d . length > 0 ) {
220+ var lines = self . toCSVLines ( self . toLines ( d ) ) ; //lines of csv
221+ self . processCSVLines ( lines , cb ) ;
222+ } else {
223+ cb ( ) ;
224+ }
225+ } )
210226 // async.eachLimit(lines,1,function(line,scb){
211227 // this.push(line.data);
212228 // scb();
@@ -239,20 +255,23 @@ Converter.prototype.processCSVLines = function(csvLines, cb) {
239255 }
240256 } . bind ( this ) , cb ) ;
241257}
242- Converter . prototype . toLines = function ( data , encoding ) {
243- if ( encoding === "buffer" ) {
244- encoding = "utf8" ;
245- }
246- data = this . _lineBuffer + data . toString ( encoding ) ;
258+ Converter . prototype . toLines = function ( data ) {
259+ data = this . _lineBuffer + data ;
247260 var eol = this . getEol ( data ) ;
248261 return data . split ( eol ) ;
249262}
263+ Converter . prototype . preProcessLine = function ( line , lineNumber ) {
264+ return line ;
265+ }
250266Converter . prototype . toCSVLines = function ( fileLines , last ) {
251267 var recordLine = "" ;
252268 var lines = [ ] ;
253269 while ( fileLines . length > 1 ) {
254- var line = fileLines . shift ( ) ;
255- lines = lines . concat ( this . _line ( line ) ) ;
270+ this . lineNumber ++ ;
271+ var line = this . preProcessLine ( fileLines . shift ( ) , this . lineNumber ) ;
272+ if ( line && line . length > 0 ) {
273+ lines = lines . concat ( this . _line ( line ) ) ;
274+ }
256275 }
257276 this . _lineBuffer = fileLines [ 0 ] ;
258277 if ( last && this . _csvLineBuffer . length > 0 ) {
@@ -417,7 +436,7 @@ function Processor(params) {
417436 this . runningWorker = 0 ;
418437 this . flushCb = null ;
419438 if ( this . param . workerNum > 1 ) {
420- for ( var i = 0 ; i < this . param . workerNum ; i ++ ) {
439+ for ( var i = 0 ; i < this . param . workerNum - 1 ; i ++ ) {
421440 var worker = new Worker ( this . param , false ) ;
422441 // worker.on("error",onError);
423442 this . addWorker ( worker ) ;
@@ -558,20 +577,40 @@ var Writable = require("stream").Writable;
558577var util = require ( "util" ) ;
559578var eol = require ( "os" ) . EOL ;
560579function Result ( csvParser ) {
561- Writable . call ( this ) ;
580+ Writable . call ( this , csvParser . _options ) ;
581+ this . _option = csvParser . _options || { } ;
562582 this . parser = csvParser ;
563583 this . param = csvParser . param ;
564- this . buffer = this . param . toArrayString ? "" :"[" + eol ;
584+ this . buffer = this . _option . objectMode ? [ ] :"[" + eol ;
565585 this . started = false ;
566586 var self = this ;
567587 this . parser . on ( "end" , function ( ) {
568- if ( ! self . param . toArrayString ) {
588+ if ( typeof self . buffer === "string" ) {
569589 self . buffer += eol + "]" ;
570590 }
571591 } ) ;
592+ this . _write = this . _option . objectMode ?_writeObject :_writeBuffer ;
572593}
573594util . inherits ( Result , Writable ) ;
574- Result . prototype . _write = function ( data , encoding , cb ) {
595+
596+ Result . prototype . getBuffer = function ( ) {
597+ return typeof this . buffer === "string" ?JSON . parse ( this . buffer ) :this . buffer ;
598+ } ;
599+
600+ Result . prototype . disableConstruct = function ( ) {
601+ this . _write = function ( d , e , cb ) {
602+ // console.log(typeof d,d);
603+ cb ( ) ; //do nothing just dropit
604+ } ;
605+ } ;
606+
607+
608+ function _writeObject ( data , encoding , cb ) {
609+ this . buffer . push ( data ) ;
610+ cb ( ) ;
611+ } ;
612+
613+ function _writeBuffer ( data , encoding , cb ) {
575614 if ( encoding === "buffer" ) {
576615 encoding = "utf8" ;
577616 }
@@ -588,17 +627,6 @@ Result.prototype._write = function(data, encoding, cb) {
588627 cb ( ) ;
589628} ;
590629
591- Result . prototype . getBuffer = function ( ) {
592- // console.log(this.buffer);
593- return JSON . parse ( this . buffer ) ;
594- } ;
595-
596- Result . prototype . disableConstruct = function ( ) {
597- this . _write = function ( d , e , cb ) {
598- cb ( ) ; //do nothing just dropit
599- } ;
600- } ;
601-
602630module . exports = Result ;
603631
604632} , { "os" :35 , "stream" :54 , "util" :67 } ] , 6 :[ function ( require , module , exports ) {
@@ -772,21 +800,33 @@ module.exports = {
772800 var headArr = ( params . config && params . config . flatKeys ) ? [ fieldStr ] : fieldStr . split ( '.' ) ;
773801 var match , index , key , pointer ;
774802 //now the pointer is pointing the position to add a key/value pair.
775- pointer = processHead ( params . resultRow , headArr , arrReg , params . config && params . config . flatKeys ) ;
803+ var pointer = processHead ( params . resultRow , headArr , arrReg , params . config && params . config . flatKeys ) ;
776804 key = headArr . shift ( ) ;
777805 match = ( params . config && params . config . flatKeys ) ? false : key . match ( arrReg ) ;
778806 if ( match ) { // the last element is an array, we need check and treat it as an array.
779- key = key . replace ( match [ 0 ] , '' ) ;
780- if ( ! pointer [ key ] || ! ( pointer [ key ] instanceof Array ) ) {
781- pointer [ key ] = [ ] ;
782- }
783- index = match [ 1 ] ;
784- if ( index === '' ) {
785- index = pointer [ key ] . length ;
807+ try {
808+ key = key . replace ( match [ 0 ] , '' ) ;
809+ if ( ! pointer [ key ] || ! ( pointer [ key ] instanceof Array ) ) {
810+ pointer [ key ] = [ ] ;
811+ }
812+ if ( pointer [ key ] ) {
813+ index = match [ 1 ] ;
814+ if ( index === '' ) {
815+ index = pointer [ key ] . length ;
816+ }
817+ pointer [ key ] [ index ] = params . item ;
818+ } else {
819+ params . resultRow [ fieldStr ] = params . item ;
820+ }
821+ } catch ( e ) {
822+ params . resultRow [ fieldStr ] = params . item ;
786823 }
787- pointer [ key ] [ index ] = params . item ;
788824 } else {
789- pointer [ key ] = params . item ;
825+ if ( typeof pointer === "string" ) {
826+ params . resultRow [ fieldStr ] = params . item ;
827+ } else {
828+ pointer [ key ] = params . item ;
829+ }
790830 }
791831 }
792832} ;
@@ -847,7 +887,7 @@ function Parser(name, regExp, parser, processSafe) {
847887 this . parse = parser ;
848888 }
849889}
850- var numReg = / ^ [ - + ] ? [ 0 - 9 ] * \. ? [ 0 - 9 ] + $ / ;
890+ var numReg = / ^ [ - + ] ? [ 0 - 9 ] * \. ? [ 0 - 9 ] + ( [ e E ] [ - + ] ? [ 0 - 9 ] + ) ? $ / ;
851891Parser . prototype . convertType = function ( item ) {
852892 var type = this . type ;
853893 if ( type === 'number' ) {
@@ -1049,6 +1089,9 @@ function isQuoteClose(str,param){
10491089 return count % 2 !== 0 ;
10501090}
10511091function rowSplit ( rowStr , param ) {
1092+ if ( rowStr === "" ) {
1093+ return [ ] ;
1094+ }
10521095 var quote = param . quote ;
10531096 var trim = param . trim ;
10541097 if ( param . needCheckDelimiter === true ) {
@@ -1057,6 +1100,9 @@ function rowSplit(rowStr, param) {
10571100 }
10581101 var delimiter = param . delimiter ;
10591102 var rowArr = rowStr . split ( delimiter ) ;
1103+ if ( quote === "off" ) {
1104+ return rowArr ;
1105+ }
10601106 var row = [ ] ;
10611107 var inquote = false ;
10621108 var quoteBuff = '' ;
@@ -10147,7 +10193,7 @@ module.exports={
1014710193 "email" : "t3dodson@gmail.com"
1014810194 }
1014910195 ] ,
10150- "version" : "0.5.10 " ,
10196+ "version" : "1.0.1 " ,
1015110197 "keywords" : [
1015210198 "csv" ,
1015310199 "csvtojson" ,
@@ -10180,10 +10226,11 @@ module.exports={
1018010226 "mocha" : "^2.4.5"
1018110227 } ,
1018210228 "dependencies" : {
10183- "async" : "^1.2.1"
10229+ "async" : "^1.2.1" ,
10230+ "minimist" : "^1.2.0"
1018410231 } ,
10185- "scripts" :{
10186- "test" :"mocha ./test -R spec"
10232+ "scripts" : {
10233+ "test" : "mocha ./test -R spec"
1018710234 }
1018810235}
1018910236
0 commit comments