@@ -73,8 +73,8 @@ export interface ISocketInstance {
7373 receivedSubBatches : Map < number , Set < number > > ;
7474 }
7575 > ;
76- send : ( payload : IBDDataQuery ) => void ;
77- query : ( params : IBDQuery ) => void ;
76+ send : ( payload : IBDDataQuery ) => Promise < void > ;
77+ query : ( params : IBDQuery ) => Promise < void > ;
7878 bumpActivity : ( ) => void ;
7979 socket ?: WebSocket ;
8080 queryCallbacks : Map < string , IBDCallbacks > ;
@@ -123,10 +123,29 @@ export class BoilingData {
123123 queries : new Map ( ) , // no queries yet
124124 queryCallbacks : new Map ( ) , // no queries yet, so no query specific callbacks either
125125 lastActivity : Date . now ( ) ,
126- send : ( payload : IBDDataQuery ) => {
126+ send : async ( payload : IBDDataQuery ) => {
127127 this . logger . debug ( "PAYLOAD(send):\n" , JSON . stringify ( payload ) ) ;
128- this . socketInstance . socket ?. send ( JSON . stringify ( payload ) ) ;
129- this . execEventCallback ( { eventType : EEvent . REQUEST , requestId : payload . requestId , payload } ) ;
128+ try {
129+ await new Promise < void > ( ( resolve , reject ) => {
130+ if ( ! this . socketInstance . socket ) {
131+ return reject ( { message : "No socket instance, need to connect." } ) ;
132+ }
133+ if ( this . socketInstance . socket . readyState != WebSocket . OPEN ) {
134+ return reject ( {
135+ message : `Socket is not OPEN(1) (${ this . socketInstance . socket . readyState } ), need to re-connect` ,
136+ } ) ;
137+ }
138+ this . socketInstance . socket . send ( JSON . stringify ( payload ) , err => {
139+ if ( err ) reject ( err ) ;
140+ resolve ( ) ;
141+ } ) ;
142+ } ) ;
143+ this . execEventCallback ( { eventType : EEvent . REQUEST , requestId : payload . requestId , payload } ) ;
144+ } catch ( error ) {
145+ console . error ( error ) ;
146+ this . execEventCallback ( { eventType : EEvent . LOG_ERROR , requestId : payload . requestId , payload : { error } } ) ;
147+ return ;
148+ }
130149 } ,
131150 bumpActivity : ( ) => {
132151 this . socketInstance . lastActivity = Date . now ( ) ;
@@ -251,7 +270,29 @@ export class BoilingData {
251270 }
252271 }
253272
254- public execQuery ( params : IBDQuery ) : void {
273+ private getSocketReadyStateString (
274+ readyState :
275+ | typeof WebSocket . CONNECTING
276+ | typeof WebSocket . OPEN
277+ | typeof WebSocket . CLOSING
278+ | typeof WebSocket . CLOSED
279+ | undefined ,
280+ ) : string {
281+ switch ( readyState ) {
282+ case WebSocket . CONNECTING :
283+ return "CONNECTING" ;
284+ case WebSocket . OPEN :
285+ return "OPEN" ;
286+ case WebSocket . CLOSED :
287+ return "CLOSED" ;
288+ case WebSocket . CLOSING :
289+ return "CLOSING" ;
290+ default :
291+ return "UNKNOWN" ;
292+ }
293+ }
294+
295+ public async execQuery ( params : IBDQuery ) : Promise < void > {
255296 this . validateJsHooks ( params ) ;
256297 this . logger . info ( "execQuery:" , params ) ;
257298 this . socketInstance . bumpActivity ( ) ;
@@ -289,7 +330,9 @@ export class BoilingData {
289330 onQueryFinished : params . callbacks ?. onQueryFinished ,
290331 } ) ;
291332 this . logger . debug ( "PAYLOAD:\n" , payload ) ;
292- this . socketInstance . send ( payload ) ;
333+ this . logger . debug ( "WebSocket.readyState:" , this . getSocketReadyStateString ( this . socketInstance . socket ?. readyState ) ) ;
334+ if ( this . socketInstance . socket ?. readyState != WebSocket . OPEN ) await this . connect ( ) ;
335+ await this . socketInstance . send ( payload ) ;
293336 }
294337
295338 private processBatchInfo ( message : unknown ) : void {
0 commit comments