Skip to content

Commit 5404d44

Browse files
committed
Refactored message sender. Refactored ipc message into a wrapper class called plugin-message
1 parent 2106ba6 commit 5404d44

5 files changed

Lines changed: 110 additions & 97 deletions

File tree

src/plugins/ipc-message.ts

Lines changed: 0 additions & 27 deletions
This file was deleted.

src/plugins/message-sender.ts

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,90 +3,90 @@ import { ChildProcess } from 'node:child_process';
33
import { clearTimeout } from 'node:timers';
44

55
import { ctx } from '../events/context.js';
6-
import { ipcMessageValidator, returnMessageCmd } from './plugin-process.js';
6+
import { PluginMessage } from './plugin-message.js';
7+
import { ipcMessageValidator } from './plugin-process.js';
78

89
type Resolve<T> = (value: T) => void;
910
type Reject = (reason?: Error) => void;
1011

1112
// Default timeout is 10 minutes after last message, stdout, or stderr
1213
const TIMEOUT = 6_000_000
1314

14-
export class MessageForResultSender {
15-
cmd: string;
16-
resultCmd: string;
17-
promiseResolve: Resolve<IpcMessageV2>;
15+
export async function sendIpcMessageForResult(message: PluginMessage, process: ChildProcess, timeout = TIMEOUT): Promise<PluginMessage> {
16+
let handler: MessageForResultHandler;
17+
18+
return new Promise<PluginMessage>((resolve, reject) => {
19+
handler = new MessageForResultHandler(message, resolve, reject, timeout);
20+
21+
// Sets listeners
22+
process.on('message', handler.messageListener);
23+
process.stdout?.on('data', handler.startOrResetTimeout)
24+
process.stderr?.on('data', handler.startOrResetTimeout)
25+
26+
// Send the message
27+
process.send(message);
28+
}).finally(() => {
29+
// Removes all listeners
30+
process.removeListener('message', handler.messageListener);
31+
process.stdout?.removeListener('data', handler.startOrResetTimeout)
32+
process.stderr?.removeListener('data', handler.startOrResetTimeout)
33+
clearTimeout(handler.timerId);
34+
});
35+
}
36+
37+
class MessageForResultHandler {
38+
message: PluginMessage;
39+
promiseResolve: Resolve<PluginMessage>;
1840
promiseReject: Reject;
1941
timerId!: NodeJS.Timeout;
2042
timeout: number;
2143

22-
private constructor(
23-
cmd: string,
24-
resolve: Resolve<IpcMessageV2>,
44+
constructor(
45+
message: PluginMessage,
46+
resolve: Resolve<PluginMessage>,
2547
reject: Reject,
2648
timeout = TIMEOUT,
2749
) {
28-
this.cmd = cmd;
29-
this.resultCmd = returnMessageCmd(cmd);
50+
this.message = message;
3051
this.promiseResolve = resolve;
3152
this.promiseReject = reject;
3253
this.timeout = timeout;
3354

3455
this.startOrResetTimeout();
3556
}
3657

37-
static async send(message: IpcMessageV2, process: ChildProcess, timeout = TIMEOUT): Promise<IpcMessageV2> {
38-
let handler: MessageForResultSender;
39-
40-
return new Promise<IpcMessageV2>((resolve, reject) => {
41-
handler = new MessageForResultSender(message.cmd, resolve, reject, timeout);
42-
43-
// Sets listeners
44-
process.on('message', handler.messageListener);
45-
process.stdout?.on('data', handler.startOrResetTimeout)
46-
process.stderr?.on('data', handler.startOrResetTimeout)
47-
48-
process.send(message);
49-
}).finally(() => {
50-
51-
// Removes all listeners
52-
process.removeListener('message', handler.messageListener);
53-
process.stdout?.removeListener('data', handler.startOrResetTimeout)
54-
process.stderr?.removeListener('data', handler.startOrResetTimeout)
55-
clearTimeout(handler.timerId);
56-
});
57-
}
58-
59-
private messageListener = (incomingMessage: unknown) => {
58+
messageListener = (incomingMessage: unknown) => {
6059
ctx.debug(JSON.stringify(incomingMessage, null, 2));
6160

62-
if (!this.validateIpcMessage(incomingMessage)) {
61+
const message = PluginMessage.fromUnknown(incomingMessage);
62+
if (!message) {
6363
return this.reject(new Error(`Invalid message from plugin. ${JSON.stringify(incomingMessage, null, 2)}`))
6464
}
6565

66-
if (incomingMessage.cmd === this.resultCmd) {
67-
this.resolve(incomingMessage);
66+
if (this.message.isSameRequest(message)) {
67+
this.resolve(message);
6868
} else {
6969
this.startOrResetTimeout();
7070
}
7171
};
7272

73-
private reject = (err: Error) => {
73+
reject = (err: Error) => {
7474
if (this.timerId.hasRef()) {
7575
clearTimeout(this.timerId);
7676
}
7777

7878
this.promiseReject(err)
7979
}
8080

81-
private resolve = (value: IpcMessageV2) => {
81+
resolve = (value: PluginMessage) => {
8282
if (this.timerId.hasRef()) {
8383
clearTimeout(this.timerId);
8484
}
8585

8686
this.promiseResolve(value)
8787
}
8888

89-
private startOrResetTimeout = () => {
89+
startOrResetTimeout = () => {
9090
if (this.timerId?.hasRef()) {
9191
clearTimeout(this.timerId);
9292
}
@@ -95,11 +95,11 @@ export class MessageForResultSender {
9595

9696
// Use date here to convert ms to minutes
9797
const date = new Date(this.timeout)
98-
this.reject(new Error(`Plugin did not respond in ${date.getMinutes().toString().padStart(2, '0')}:${date.getSeconds().toString().padStart(2, '0')} minutes: ${this.cmd}`))
98+
this.reject(new Error(`Plugin did not respond in ${date.getMinutes().toString().padStart(2, '0')}:${date.getSeconds().toString().padStart(2, '0')} minutes: ${this.message.cmd}`))
9999
}, this.timeout);
100100
}
101101

102-
private validateIpcMessage(response: unknown): response is IpcMessageV2 {
102+
validateIpcMessage(response: unknown): response is IpcMessageV2 {
103103
return ipcMessageValidator(response);
104104
}
105105
}

src/plugins/plugin-message.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { IpcMessageV2, MessageStatus } from 'codify-schemas';
2+
import { nanoid } from 'nanoid'
3+
4+
import { ipcMessageValidator } from './plugin-process.js';
5+
6+
export class PluginMessage implements IpcMessageV2 {
7+
cmd!: string;
8+
requestId!: string;
9+
status?: MessageStatus | undefined;
10+
data!: unknown;
11+
12+
constructor(ipcMessage: IpcMessageV2) {
13+
Object.assign(this, ipcMessage);
14+
}
15+
16+
static fromUnknown(ipcMessage: unknown): PluginMessage | null {
17+
if(!ipcMessageValidator(ipcMessage)) {
18+
return null;
19+
}
20+
21+
return new PluginMessage(ipcMessage as unknown as IpcMessageV2);
22+
}
23+
24+
static create(cmd: string, data: any): PluginMessage {
25+
const requestId = nanoid();
26+
27+
return new PluginMessage({
28+
cmd,
29+
data,
30+
requestId
31+
})
32+
}
33+
34+
isSameRequest(message: IpcMessageV2): boolean {
35+
return message.requestId === this.requestId;
36+
}
37+
38+
isSuccessful(): boolean {
39+
return this.status === MessageStatus.SUCCESS;
40+
}
41+
}

src/plugins/plugin-process.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
import { IpcMessageSchema, IpcMessageV2, MessageCmd, SudoRequestData, SudoRequestDataSchema } from 'codify-schemas';
1+
import { IpcMessageV2Schema, IpcMessageV2, MessageCmd, SudoRequestData, SudoRequestDataSchema } from 'codify-schemas';
22
import { ChildProcess, fork } from 'node:child_process';
33
import { createRequire } from 'node:module';
44

55
import { Event, ctx } from '../events/context.js';
66
import { ajv } from '../utils/ajv.js';
7-
import { IpcMessageWrapper } from './ipc-message.js';
8-
import { MessageForResultSender } from './message-sender.js';
7+
import { PluginMessage } from './plugin-message.js';
8+
import { sendIpcMessageForResult } from './message-sender.js';
99

10-
export const ipcMessageValidator = ajv.compile(IpcMessageSchema);
10+
export const ipcMessageValidator = ajv.compile(IpcMessageV2Schema);
1111
export const sudoRequestValidator = ajv.compile(SudoRequestDataSchema);
1212

1313
export function returnMessageCmd(cmd: string) {
@@ -96,9 +96,9 @@ export class PluginProcess {
9696
return true;
9797
}
9898

99-
sendMessageForResult(cmd: string, data: unknown): Promise<IpcMessageV2> {
100-
const message = IpcMessageWrapper.create(cmd, data);
101-
return MessageForResultSender.send(message, this.process);
99+
sendMessageForResult(cmd: string, data: unknown): Promise<PluginMessage> {
100+
const message = PluginMessage.create(cmd, data);
101+
return sendIpcMessageForResult(message, this.process);
102102
}
103103
}
104104

src/plugins/plugin.ts

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import {
55
ImportResponseDataSchema,
66
InitializeResponseData,
77
InitializeResponseDataSchema,
8-
MessageStatus,
98
PlanResponseData,
109
PlanResponseDataSchema,
1110
ResourceConfig as SchemaResourceConfig,
@@ -57,49 +56,49 @@ export class Plugin {
5756

5857
async validate(configs: ResourceConfig[]): Promise<ValidateResponseData> {
5958
const rawConfigs = configs.map((c) => c.raw);
60-
const { data, status } = await this.process!.sendMessageForResult('validate', { configs: rawConfigs });
59+
const result = await this.process!.sendMessageForResult('validate', { configs: rawConfigs });
6160

62-
if (status === MessageStatus.ERROR) {
63-
throw new Error(`Initialize error for plugin: "${this.name}" \n\n` + data);
61+
if (!result.isSuccessful()) {
62+
throw new Error(`Initialize error for plugin: "${this.name}" \n\n` + result.data);
6463
}
6564

66-
if (!this.validateValidateResponse(data)) {
65+
if (!this.validateValidateResponse(result.data)) {
6766
throw new Error(`Plugin error: Invalid validate response from plugin: ${this.name}`);
6867
}
6968

70-
return data;
69+
return result.data;
7170
}
7271

7372
async getResourceInfo(type: string): Promise<GetResourceInfoResponseData> {
74-
const { data, status } = await this.process!.sendMessageForResult('getResourceInfo', { type });
73+
const result = await this.process!.sendMessageForResult('getResourceInfo', { type });
7574

76-
if (status === MessageStatus.ERROR) {
77-
throw new Error(`Unable to get info for resource: "${type}" from plugin: "${this.name}" \n\n` + data);
75+
if (!result.isSuccessful()) {
76+
throw new Error(`Unable to get info for resource: "${type}" from plugin: "${this.name}" \n\n` + result.data);
7877
}
7978

80-
if (!this.validateGetResourceInfoResponse(data)) {
79+
if (!this.validateGetResourceInfoResponse(result.data)) {
8180
throw new Error(`Plugin error: Invalid get resource info response from plugin: ${this.name}`);
8281
}
8382

84-
return data;
83+
return result.data;
8584
}
8685

8786
async import(config: SchemaResourceConfig): Promise<ImportResponseData> {
88-
const { data, status } = await this.process!.sendMessageForResult('import', { config });
87+
const result = await this.process!.sendMessageForResult('import', { config });
8988

90-
if (status === MessageStatus.ERROR) {
91-
throw new Error(`Unable import resource ${config.type} with plugin: "${this.name}" \n\n` + data);
89+
if (!result.isSuccessful()) {
90+
throw new Error(`Unable import resource ${config.type} with plugin: "${this.name}" \n\n` + result.data);
9291
}
9392

94-
if (!this.validateImportResponse(data)) {
93+
if (!this.validateImportResponse(result.data)) {
9594
throw new Error(`Plugin error: Invalid import response from plugin: ${this.name}`);
9695
}
9796

98-
return data;
97+
return result.data;
9998
}
10099

101100
async plan(request: PlanRequest): Promise<ResourcePlan> {
102-
const { data, status } = await this.process!.sendMessageForResult(
101+
const result = await this.process!.sendMessageForResult(
103102
'plan',
104103
{
105104
desired: request.desired,
@@ -108,21 +107,21 @@ export class Plugin {
108107
}
109108
);
110109

111-
if (status === MessageStatus.ERROR) {
112-
throw new Error(`Plan error for plugin: "${this.name}", resource: "${request.type}" \n\n` + data);
110+
if (!result.isSuccessful()) {
111+
throw new Error(`Plan error for plugin: "${this.name}", resource: "${request.type}" \n\n` + result.data);
113112
}
114113

115-
if (!this.validatePlanResponse(data)) {
114+
if (!this.validatePlanResponse(result.data)) {
116115
throw new Error(`Plugin error: plugin ${this.name} returned invalid plan response: ${JSON.stringify(planResponseValidator.errors, null, 2)}`)
117116
}
118117

119-
return new ResourcePlan(data);
118+
return new ResourcePlan(result.data);
120119
}
121120

122121
async apply(plan: ResourcePlan): Promise<void> {
123122
const result = await this.process!.sendMessageForResult('apply', { plan });
124123

125-
if (result.status === MessageStatus.ERROR) {
124+
if (!result.isSuccessful()) {
126125
throw new Error(`Apply error for plugin: "${this.name}", resource: "${plan.resourceType}" \n\n` + result.data);
127126
}
128127
}

0 commit comments

Comments
 (0)