Skip to content

Commit f21022c

Browse files
committed
Re-open WS if not open
1 parent f7f7ded commit f21022c

1 file changed

Lines changed: 50 additions & 7 deletions

File tree

src/boilingdata/boilingdata.ts

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ export interface ISocketInstance {
7373
receivedSubBatches: Map<number, Set<number>>;
7474
}
7575
>;
76-
send: (payload: IBDDataQuery) => void;
77-
query: (params: IBDQuery) => void;
76+
send: (payload: IBDDataQuery) => Promise<void>;
77+
query: (params: IBDQuery) => Promise<void>;
7878
bumpActivity: () => void;
7979
socket?: WebSocket;
8080
queryCallbacks: Map<string, IBDCallbacks>;
@@ -123,10 +123,29 @@ export class BoilingData {
123123
queries: new Map(), // no queries yet
124124
queryCallbacks: new Map(), // no queries yet, so no query specific callbacks either
125125
lastActivity: Date.now(),
126-
send: (payload: IBDDataQuery) => {
126+
send: async (payload: IBDDataQuery) => {
127127
this.logger.debug("PAYLOAD(send):\n", JSON.stringify(payload));
128-
this.socketInstance.socket?.send(JSON.stringify(payload));
129-
this.execEventCallback({ eventType: EEvent.REQUEST, requestId: payload.requestId, payload });
128+
try {
129+
await new Promise<void>((resolve, reject) => {
130+
if (!this.socketInstance.socket) {
131+
return reject({ message: "No socket instance, need to connect." });
132+
}
133+
if (this.socketInstance.socket.readyState != WebSocket.OPEN) {
134+
return reject({
135+
message: `Socket is not OPEN(1) (${this.socketInstance.socket.readyState}), need to re-connect`,
136+
});
137+
}
138+
this.socketInstance.socket.send(JSON.stringify(payload), err => {
139+
if (err) reject(err);
140+
resolve();
141+
});
142+
});
143+
this.execEventCallback({ eventType: EEvent.REQUEST, requestId: payload.requestId, payload });
144+
} catch (error) {
145+
console.error(error);
146+
this.execEventCallback({ eventType: EEvent.LOG_ERROR, requestId: payload.requestId, payload: { error } });
147+
return;
148+
}
130149
},
131150
bumpActivity: () => {
132151
this.socketInstance.lastActivity = Date.now();
@@ -251,7 +270,29 @@ export class BoilingData {
251270
}
252271
}
253272

254-
public execQuery(params: IBDQuery): void {
273+
private getSocketReadyStateString(
274+
readyState:
275+
| typeof WebSocket.CONNECTING
276+
| typeof WebSocket.OPEN
277+
| typeof WebSocket.CLOSING
278+
| typeof WebSocket.CLOSED
279+
| undefined,
280+
): string {
281+
switch (readyState) {
282+
case WebSocket.CONNECTING:
283+
return "CONNECTING";
284+
case WebSocket.OPEN:
285+
return "OPEN";
286+
case WebSocket.CLOSED:
287+
return "CLOSED";
288+
case WebSocket.CLOSING:
289+
return "CLOSING";
290+
default:
291+
return "UNKNOWN";
292+
}
293+
}
294+
295+
public async execQuery(params: IBDQuery): Promise<void> {
255296
this.validateJsHooks(params);
256297
this.logger.info("execQuery:", params);
257298
this.socketInstance.bumpActivity();
@@ -289,7 +330,9 @@ export class BoilingData {
289330
onQueryFinished: params.callbacks?.onQueryFinished,
290331
});
291332
this.logger.debug("PAYLOAD:\n", payload);
292-
this.socketInstance.send(payload);
333+
this.logger.debug("WebSocket.readyState:", this.getSocketReadyStateString(this.socketInstance.socket?.readyState));
334+
if (this.socketInstance.socket?.readyState != WebSocket.OPEN) await this.connect();
335+
await this.socketInstance.send(payload);
293336
}
294337

295338
private processBatchInfo(message: unknown): void {

0 commit comments

Comments
 (0)