@@ -3939,6 +3939,154 @@ async function runPayloadUploadParallelSingleFileV4(file, opts = {}) {
39393939 } ) ) ;
39403940}
39413941
3942+ function readLineFromSocket ( socket , timeoutMs = READ_TIMEOUT_MS ) {
3943+ return new Promise ( ( resolve , reject ) => {
3944+ let buffer = Buffer . alloc ( 0 ) ;
3945+ let settled = false ;
3946+ const cleanup = ( ) => {
3947+ clearTimeout ( timer ) ;
3948+ socket . off ( 'data' , onData ) ;
3949+ socket . off ( 'error' , onErr ) ;
3950+ socket . off ( 'close' , onClose ) ;
3951+ } ;
3952+ const timer = setTimeout ( ( ) => {
3953+ if ( settled ) return ;
3954+ settled = true ;
3955+ cleanup ( ) ;
3956+ reject ( new Error ( 'Read timed out' ) ) ;
3957+ } , timeoutMs ) ;
3958+
3959+ const onData = ( data ) => {
3960+ if ( settled ) return ;
3961+ buffer = Buffer . concat ( [ buffer , data ] ) ;
3962+ const idx = buffer . indexOf ( 0x0a ) ;
3963+ if ( idx >= 0 ) {
3964+ settled = true ;
3965+ cleanup ( ) ;
3966+ resolve ( buffer . subarray ( 0 , idx ) . toString ( 'utf8' ) . trim ( ) ) ;
3967+ }
3968+ } ;
3969+ const onErr = ( err ) => {
3970+ if ( settled ) return ;
3971+ settled = true ;
3972+ cleanup ( ) ;
3973+ reject ( err ) ;
3974+ } ;
3975+ const onClose = ( ) => {
3976+ if ( settled ) return ;
3977+ settled = true ;
3978+ cleanup ( ) ;
3979+ // If we got partial data, return it; otherwise error
3980+ if ( buffer . length > 0 ) {
3981+ resolve ( buffer . toString ( 'utf8' ) . trim ( ) ) ;
3982+ } else {
3983+ reject ( new Error ( 'Socket closed before response' ) ) ;
3984+ }
3985+ } ;
3986+ socket . on ( 'data' , onData ) ;
3987+ socket . on ( 'error' , onErr ) ;
3988+ socket . on ( 'close' , onClose ) ;
3989+ } ) ;
3990+ }
3991+
3992+ async function runPayloadUploadFastMultiFile ( files , opts = { } ) {
3993+ const {
3994+ ip,
3995+ destPath,
3996+ connections = 8 ,
3997+ cancel = { value : false } ,
3998+ chmodAfterUpload = false ,
3999+ onProgress = ( ) => { } ,
4000+ log = ( ) => { } ,
4001+ onSkipFile,
4002+ } = opts ;
4003+
4004+ // Sort files: large files first for better parallelism
4005+ const sorted = [ ...files ] . sort ( ( a , b ) => b . size - a . size ) ;
4006+
4007+ // Note: No need to pre-create directories — handle_upload_fast_wrapper on
4008+ // the payload side calls mkdir_p() for each file's parent directory automatically.
4009+
4010+ // Connection pool
4011+ let fileIndex = 0 ;
4012+ let totalSent = 0 ;
4013+ let completedFiles = 0 ;
4014+
4015+ const uploadOneFile = async ( file , idx ) => {
4016+ const socket = await createSocketWithTimeout ( ip , TRANSFER_PORT ) ;
4017+ tuneUploadSocket ( socket ) ;
4018+
4019+ try {
4020+ const chmodToken = chmodAfterUpload ? 'CHMOD_END' : '0' ;
4021+ const cmd = `UPLOAD_FAST ${ escapeCommandPath ( destPath ) } ${ escapeCommandPath ( file . rel_path ) } ${ file . size } DIRECT ${ chmodToken } \n` ;
4022+ socket . write ( cmd ) ;
4023+
4024+ // Wait for READY
4025+ const response = await readLineFromSocket ( socket , READ_TIMEOUT_MS ) ;
4026+ if ( ! response . startsWith ( 'READY' ) ) {
4027+ throw new Error ( `Upload rejected: ${ response } ` ) ;
4028+ }
4029+
4030+ // Stream file data with 8MB buffer
4031+ if ( file . size > 0 ) {
4032+ const fd = await fs . promises . open ( file . abs_path , 'r' ) ;
4033+ try {
4034+ const ioBuf = Buffer . allocUnsafe ( 8 * 1024 * 1024 ) ;
4035+ let remaining = file . size ;
4036+ let pos = 0 ;
4037+ while ( remaining > 0 ) {
4038+ if ( cancel . value ) throw new Error ( 'Upload cancelled' ) ;
4039+ const take = Math . min ( ioBuf . length , remaining ) ;
4040+ const { bytesRead } = await fd . read ( ioBuf , 0 , take , pos ) ;
4041+ if ( bytesRead <= 0 ) throw new Error ( 'Read failed' ) ;
4042+ await writeAllRetry ( socket , ioBuf . subarray ( 0 , bytesRead ) , cancel , log ) ;
4043+ remaining -= bytesRead ;
4044+ pos += bytesRead ;
4045+ totalSent += bytesRead ;
4046+ onProgress ( totalSent , completedFiles , file . rel_path ) ;
4047+ }
4048+ } finally {
4049+ await fd . close ( ) . catch ( ( ) => { } ) ;
4050+ }
4051+ }
4052+
4053+ // Wait for OK
4054+ const result = await readLineFromSocket ( socket , READ_TIMEOUT_MS ) ;
4055+ if ( ! result . startsWith ( 'OK' ) ) {
4056+ throw new Error ( `Upload failed: ${ result } ` ) ;
4057+ }
4058+ completedFiles ++ ;
4059+ } finally {
4060+ try { socket . destroy ( ) ; } catch { }
4061+ }
4062+ } ;
4063+
4064+ // Run workers - each worker takes next file from queue
4065+ const workers = [ ] ;
4066+ for ( let w = 0 ; w < Math . min ( connections , sorted . length ) ; w ++ ) {
4067+ workers . push ( ( async ( ) => {
4068+ while ( true ) {
4069+ const idx = fileIndex ++ ;
4070+ if ( idx >= sorted . length ) break ;
4071+ if ( cancel . value ) break ;
4072+ const file = sorted [ idx ] ;
4073+ try {
4074+ await uploadOneFile ( file , idx ) ;
4075+ } catch ( err ) {
4076+ if ( err . code === 'ENOENT' || err . code === 'EACCES' ) {
4077+ log ( `Skipping: ${ file . rel_path } ` ) ;
4078+ if ( typeof onSkipFile === 'function' ) onSkipFile ( file , err ) ;
4079+ continue ;
4080+ }
4081+ throw err ;
4082+ }
4083+ }
4084+ } ) ( ) ) ;
4085+ }
4086+
4087+ await Promise . all ( workers ) ;
4088+ }
4089+
39424090async function runPayloadUploadMadMaxSingleFile ( file , opts = { } ) {
39434091 const {
39444092 ip,
@@ -7630,29 +7778,59 @@ const emitLog = (message, level = 'info', force = false) => {
76307778 } ,
76317779 } ) ;
76327780 } else {
7633- emitLog ( `Payload parallel mode: ${ effectivePayloadConnections } workers.` , 'info' ) ;
7634- state . transferStatus = {
7635- ...state . transferStatus ,
7636- payload_transfer_path : 'parallel_files' ,
7637- payload_workers : effectivePayloadConnections ,
7638- } ;
7639- await runPayloadUploadParallelFiles ( attemptFiles , {
7640- connections : effectivePayloadConnections ,
7641- uploadInit,
7642- cancel : { get value ( ) { return state . transferCancel ; } } ,
7643- compression : effectiveCompression ,
7644- rateLimitBps,
7645- packLimitBytes : basePackLimit ,
7646- streamChunkBytes : baseChunkSize ,
7647- extraPaceMs,
7648- onSkipFile : ( file ) => missingFiles . add ( file . rel_path ) ,
7649- log : debugLog ,
7650- onProgress : ( sent , filesSent , currentFile ) => {
7651- payloadSent = BigInt ( sent ) ;
7652- payloadFilesSent = filesSent ;
7653- updateProgress ( currentFile ) ;
7654- } ,
7655- } ) ;
7781+ // Try fast per-file UPLOAD_FAST path first (no pack overhead)
7782+ try {
7783+ emitLog ( `Payload fast multi-file mode: ${ effectivePayloadConnections } workers.` , 'info' ) ;
7784+ state . transferStatus = {
7785+ ...state . transferStatus ,
7786+ payload_transfer_path : 'fast_multi_file' ,
7787+ payload_workers : effectivePayloadConnections ,
7788+ } ;
7789+ await runPayloadUploadFastMultiFile ( attemptFiles , {
7790+ ip : req . ip ,
7791+ destPath : req . dest_path ,
7792+ connections : effectivePayloadConnections ,
7793+ cancel : { get value ( ) { return state . transferCancel ; } } ,
7794+ chmodAfterUpload : ! ! req . chmod_after_upload ,
7795+ onSkipFile : ( file ) => missingFiles . add ( file . rel_path ) ,
7796+ log : debugLog ,
7797+ onProgress : ( sent , filesSent , currentFile ) => {
7798+ payloadSent = BigInt ( sent ) ;
7799+ payloadFilesSent = filesSent ;
7800+ updateProgress ( currentFile ) ;
7801+ } ,
7802+ } ) ;
7803+ } catch ( fastErr ) {
7804+ // Fall back to V4 pack path if UPLOAD_FAST is unsupported
7805+ if ( fastErr ?. message ?. includes ( 'PAYLOAD_UNSUPPORTED' ) ||
7806+ fastErr ?. message ?. includes ( 'Unknown command' ) ) {
7807+ emitLog ( `Fast multi-file not supported, falling back to V4 parallel mode.` , 'warn' ) ;
7808+ state . transferStatus = {
7809+ ...state . transferStatus ,
7810+ payload_transfer_path : 'parallel_files' ,
7811+ payload_workers : effectivePayloadConnections ,
7812+ } ;
7813+ await runPayloadUploadParallelFiles ( attemptFiles , {
7814+ connections : effectivePayloadConnections ,
7815+ uploadInit,
7816+ cancel : { get value ( ) { return state . transferCancel ; } } ,
7817+ compression : effectiveCompression ,
7818+ rateLimitBps,
7819+ packLimitBytes : basePackLimit ,
7820+ streamChunkBytes : baseChunkSize ,
7821+ extraPaceMs,
7822+ onSkipFile : ( file ) => missingFiles . add ( file . rel_path ) ,
7823+ log : debugLog ,
7824+ onProgress : ( sent , filesSent , currentFile ) => {
7825+ payloadSent = BigInt ( sent ) ;
7826+ payloadFilesSent = filesSent ;
7827+ updateProgress ( currentFile ) ;
7828+ } ,
7829+ } ) ;
7830+ } else {
7831+ throw fastErr ;
7832+ }
7833+ }
76567834 }
76577835 parsed = { files : payloadFilesSent || attemptFiles . length , bytes : Number ( payloadSent ) } ;
76587836 return ;
0 commit comments