Skip to content

Commit a27e854

Browse files
committed
[CODE-55][CODE-101] Refactored plugin message sender and added smart timeout that only happens if 10 minutes elapses past the last message, stderr or stdout
1 parent 72dc1a5 commit a27e854

4 files changed

Lines changed: 256 additions & 90 deletions

File tree

src/plugins/message-sender.test.ts

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import { MessageForResultSender } from './message-sender.js';
2+
import { ChildProcess } from 'node:child_process';
3+
import { EventEmitter } from 'node:events';
4+
import { Readable } from 'stream';
5+
import { describe, expect, it } from 'vitest';
6+
import { returnMessageCmd } from './plugin-process.js';
7+
import { clearInterval, clearTimeout } from 'node:timers';
8+
9+
describe('Message sender tests', async () => {
10+
const mockChildProcess = () => {
11+
const process = new ChildProcess();
12+
process.stdout = new EventEmitter() as Readable;
13+
process.stderr = new EventEmitter() as Readable
14+
process.send = () => true;
15+
16+
return process;
17+
}
18+
19+
it('Is able to send a message and await a result', async () => {
20+
const cProcess = mockChildProcess();
21+
22+
const result = await Promise.all([
23+
MessageForResultSender.send({ cmd: 'message1', data: null }, cProcess),
24+
setTimeout(() => {
25+
cProcess.emit('message', { cmd: returnMessageCmd('message1'), data: null });
26+
}, 100),
27+
])
28+
29+
expect(result[0]).toMatchObject({
30+
cmd: returnMessageCmd('message1')
31+
})
32+
})
33+
34+
it('Clears all listeners', async () => {
35+
const cProcess = mockChildProcess();
36+
37+
const result = await Promise.all([
38+
MessageForResultSender.send({ cmd: 'message1', data: null }, cProcess),
39+
setTimeout(() => {
40+
cProcess.emit('message', { cmd: returnMessageCmd('message1'), data: null });
41+
}, 100),
42+
])
43+
44+
expect(cProcess.listenerCount('messsage')).to.eq(0);
45+
expect(cProcess.stdout!.listenerCount('data')).to.eq(0);
46+
expect(cProcess.stderr!.listenerCount('data')).to.eq(0);
47+
})
48+
49+
it('Is able to send a message and timeout if not received', async () => {
50+
const cProcess = mockChildProcess();
51+
52+
await expect(() => Promise.all([
53+
MessageForResultSender.send({ cmd: 'message1', data: null }, cProcess, 100),
54+
setTimeout(() => {
55+
cProcess.emit('message', { cmd: returnMessageCmd('message1'), data: null });
56+
}, 200),
57+
])).rejects.toThrowError();
58+
})
59+
60+
it('Is able to send a message and not timeout if stdout is returned', async () => {
61+
const cProcess = mockChildProcess();
62+
63+
// Explanation:
64+
// 1. Send a message and set the timeout to be 100ms
65+
// 2. Make the return message come back in 200ms
66+
// 3. Periodically send a stdout every 50ms. This should not throw
67+
const [result, timer, interval] = await Promise.all([
68+
MessageForResultSender.send({ cmd: 'message1', data: null }, cProcess, 100),
69+
setTimeout(() => {
70+
cProcess.emit('message', { cmd: returnMessageCmd('message1'), data: null });
71+
}, 200),
72+
setInterval(() => {
73+
cProcess.stdout!.emit('data', 'message');
74+
}, 50),
75+
])
76+
77+
clearInterval(interval);
78+
clearTimeout(timer);
79+
expect(result).toMatchObject({
80+
cmd: returnMessageCmd('message1')
81+
})
82+
})
83+
84+
it('Is able to send a message and not timeout if stderr is returned', async () => {
85+
const cProcess = mockChildProcess();
86+
87+
// Explanation:
88+
// 1. Send a message and set the timeout to be 100ms
89+
// 2. Make the return message come back in 200ms
90+
// 3. Periodically send a stdout every 50ms. This should not throw
91+
const [result, timer, interval] = await Promise.all([
92+
MessageForResultSender.send({ cmd: 'message1', data: null }, cProcess, 100),
93+
setTimeout(() => {
94+
cProcess.emit('message', { cmd: returnMessageCmd('message1'), data: null });
95+
}, 200),
96+
setInterval(() => {
97+
cProcess.stderr!.emit('data', 'message');
98+
}, 50),
99+
])
100+
101+
clearInterval(interval);
102+
clearTimeout(timer);
103+
expect(result).toMatchObject({
104+
cmd: returnMessageCmd('message1')
105+
})
106+
})
107+
108+
it('Is able to send a message and not timeout if a non-resolving message is sent (like a sudo request)', async () => {
109+
const cProcess = mockChildProcess();
110+
111+
// Explanation:
112+
// 1. Send a message and set the timeout to be 100ms
113+
// 2. Make the return message come back in 200ms
114+
// 3. Periodically send a stdout every 50ms. This should not throw
115+
const [result, timer, interval] = await Promise.all([
116+
MessageForResultSender.send({ cmd: 'message1', data: null }, cProcess, 100),
117+
setTimeout(() => {
118+
cProcess.emit('message', { cmd: returnMessageCmd('message1'), data: null });
119+
}, 200),
120+
setInterval(() => {
121+
cProcess.emit('message', { cmd: 'non-resolving', data: null })
122+
}, 50),
123+
])
124+
125+
clearInterval(interval);
126+
clearTimeout(timer);
127+
expect(result).toMatchObject({
128+
cmd: returnMessageCmd('message1')
129+
})
130+
})
131+
132+
});

src/plugins/message-sender.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { IpcMessage } from 'codify-schemas';
2+
import { ChildProcess } from 'node:child_process';
3+
import { clearTimeout } from 'node:timers';
4+
5+
import { ctx } from '../events/context.js';
6+
import { ipcMessageValidator, returnMessageCmd } from './plugin-process.js';
7+
8+
type Resolve<T> = (value: T) => void;
9+
type Reject = (reason?: Error) => void;
10+
11+
// Default timeout is 10 minutes after last message, stdout, or stderr
12+
const TIMEOUT = 6_000_000
13+
14+
export class MessageForResultSender {
15+
cmd: string;
16+
resultCmd: string;
17+
promiseResolve: Resolve<IpcMessage>;
18+
promiseReject: Reject;
19+
timerId!: NodeJS.Timeout;
20+
timeout: number;
21+
22+
private constructor(
23+
cmd: string,
24+
resolve: Resolve<IpcMessage>,
25+
reject: Reject,
26+
timeout = TIMEOUT,
27+
) {
28+
this.cmd = cmd;
29+
this.resultCmd = returnMessageCmd(cmd);
30+
this.promiseResolve = resolve;
31+
this.promiseReject = reject;
32+
this.timeout = timeout;
33+
34+
this.startOrResetTimeout();
35+
}
36+
37+
static async send(message: IpcMessage, process: ChildProcess, timeout = TIMEOUT): Promise<IpcMessage> {
38+
let handler: MessageForResultSender;
39+
40+
return new Promise<IpcMessage>((resolve, reject) => {
41+
handler = new MessageForResultSender(message.cmd, resolve, reject, timeout);
42+
43+
// Sets listeners
44+
process.on('message', handler.messageListener);
45+
process.stdout?.on('data', handler.startOrResetTimeout)
46+
process.stderr?.on('data', handler.startOrResetTimeout)
47+
48+
process.send(message);
49+
}).finally(() => {
50+
51+
// Removes all listeners
52+
process.removeListener('message', handler.messageListener);
53+
process.stdout?.removeListener('data', handler.startOrResetTimeout)
54+
process.stderr?.removeListener('data', handler.startOrResetTimeout)
55+
clearTimeout(handler.timerId);
56+
});
57+
}
58+
59+
private messageListener = (incomingMessage: unknown) => {
60+
ctx.debug(JSON.stringify(incomingMessage, null, 2));
61+
62+
if (!this.validateIpcMessage(incomingMessage)) {
63+
return this.reject(new Error(`Invalid message from plugin. ${JSON.stringify(incomingMessage, null, 2)}`))
64+
}
65+
66+
if (incomingMessage.cmd === this.resultCmd) {
67+
this.resolve(incomingMessage);
68+
} else {
69+
this.startOrResetTimeout();
70+
}
71+
};
72+
73+
private reject = (err: Error) => {
74+
if (this.timerId.hasRef()) {
75+
clearTimeout(this.timerId);
76+
}
77+
78+
this.promiseReject(err)
79+
}
80+
81+
private resolve = (value: IpcMessage) => {
82+
if (this.timerId.hasRef()) {
83+
clearTimeout(this.timerId);
84+
}
85+
86+
this.promiseResolve(value)
87+
}
88+
89+
private startOrResetTimeout = () => {
90+
if (this.timerId?.hasRef()) {
91+
clearTimeout(this.timerId);
92+
}
93+
94+
this.timerId = setTimeout(() => {
95+
96+
// Use date here to convert ms to minutes
97+
const date = new Date(this.timeout)
98+
this.reject(new Error(`Plugin did not respond in ${date.getMinutes().toString().padStart(2, '0')}:${date.getSeconds().toString().padStart(2, '0')} minutes: ${this.cmd}`))
99+
}, this.timeout);
100+
}
101+
102+
private validateIpcMessage(response: unknown): response is IpcMessage {
103+
return ipcMessageValidator(response);
104+
}
105+
}

src/plugins/message.ts

Lines changed: 0 additions & 4 deletions
This file was deleted.

src/plugins/plugin-process.ts

Lines changed: 19 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,14 @@ import { createRequire } from 'node:module';
44

55
import { ctx, Event } from '../events/context.js';
66
import { ajv } from '../utils/ajv.js';
7-
import { PluginMessage } from './message.js';
7+
import { MessageForResultSender } from './message-sender.js';
88

9-
const ipcMessageValidator = ajv.compile(IpcMessageSchema);
10-
const sudoRequestValidator = ajv.compile(SudoRequestDataSchema);
9+
export const ipcMessageValidator = ajv.compile(IpcMessageSchema);
10+
export const sudoRequestValidator = ajv.compile(SudoRequestDataSchema);
1111

12-
13-
type Resolve<T> = (value: T) => void;
14-
type Reject = (reason?: Error) => void;
15-
16-
const resultFunctionName = (cmd: string) => `${cmd}_Response`;
12+
export function returnMessageCmd(cmd: string) {
13+
return `${cmd}_Response`;
14+
}
1715

1816
export class PluginProcess {
1917
process: ChildProcess;
@@ -53,31 +51,6 @@ export class PluginProcess {
5351
this.process = process;
5452
}
5553

56-
async sendMessageForResult(message: PluginMessage): Promise<IpcMessage> {
57-
return new Promise((resolve, reject) => {
58-
const handler = new SendMessageForResultHandler(message, this.process, resolve, reject);
59-
60-
this.process.on('message', handler.messageListener);
61-
this.process.send(message);
62-
});
63-
}
64-
65-
sendMessage(message: PluginMessage): void {
66-
this.process.send(message);
67-
}
68-
69-
// Tsx is only installed for dev builds. Only allow typescript plugins for testing.
70-
private static isTsxInstalled(): boolean {
71-
try {
72-
const require = createRequire(import.meta.url);
73-
require.resolve('tsx');
74-
} catch {
75-
return false;
76-
}
77-
78-
return true;
79-
}
80-
8154
private static handleSudoRequests(process: ChildProcess, pluginName: string) {
8255
// Listen for incoming sudo incoming sudo requests
8356
process.on('message', (message) => {
@@ -99,71 +72,31 @@ export class PluginProcess {
9972
ctx.on(Event.SUDO_REQUEST_GRANTED, (_pluginName, data) => {
10073
if (_pluginName === pluginName) {
10174
process.send({
102-
cmd: resultFunctionName(MessageCmd.SUDO_REQUEST),
75+
cmd: returnMessageCmd(MessageCmd.SUDO_REQUEST),
10376
data
10477
})
10578
}
10679
})
10780
}
108-
}
10981

110-
class SendMessageForResultHandler {
111-
messageToSend: PluginMessage;
112-
process: ChildProcess;
113-
promiseResolve: Resolve<IpcMessage>;
114-
promiseReject: Reject;
115-
timer: NodeJS.Timeout;
116-
117-
constructor(
118-
messageToSend: PluginMessage,
119-
process: ChildProcess,
120-
resolve: Resolve<IpcMessage>,
121-
reject: Reject,
122-
timeout = 600_000, // Default time is 10 minutes for a command
123-
) {
124-
this.messageToSend = messageToSend;
125-
this.process = process;
126-
this.promiseResolve = resolve;
127-
this.promiseReject = reject;
128-
this.timer = this.setResultTimeout(timeout);
129-
}
130-
131-
messageListener = (incomingMessage: unknown) => {
132-
ctx.debug(JSON.stringify(incomingMessage, null, 2));
133-
134-
if (!this.validateIpcMessage(incomingMessage)) {
135-
return this.reject(new Error(`Invalid message from plugin. ${JSON.stringify(incomingMessage, null, 2)}`))
136-
}
137-
138-
if (incomingMessage.cmd === resultFunctionName(this.messageToSend.cmd)) {
139-
this.resolve(incomingMessage);
140-
}
141-
};
142-
143-
reject = (err: Error) => {
144-
if (this.timer.hasRef()) {
145-
clearTimeout(this.timer);
146-
}
147-
148-
this.process.removeListener('message', this.messageListener);
149-
this.promiseReject(err);
82+
sendMessage(message: IpcMessage): void {
83+
this.process.send(message);
15084
}
15185

152-
private resolve = (value: IpcMessage) => {
153-
if (this.timer.hasRef()) {
154-
clearTimeout(this.timer);
86+
// Tsx is only installed for dev builds. Only allow typescript plugins for testing.
87+
private static isTsxInstalled(): boolean {
88+
try {
89+
const require = createRequire(import.meta.url);
90+
require.resolve('tsx');
91+
} catch {
92+
return false;
15593
}
15694

157-
this.process.removeListener('message', this.messageListener);
158-
this.promiseResolve(value);
95+
return true;
15996
}
16097

161-
private setResultTimeout = (timeout: number) => setTimeout(() => {
162-
this.reject(new Error(`Plugin did not respond in 10 minutes to call: ${this.messageToSend.cmd}`))
163-
}, timeout);
164-
165-
private validateIpcMessage(response: unknown): response is IpcMessage {
166-
return ipcMessageValidator(response);
98+
sendMessageForResult(message: IpcMessage): Promise<IpcMessage> {
99+
return MessageForResultSender.send(message, this.process);
167100
}
168101
}
169102

0 commit comments

Comments
 (0)