Skip to content

Commit 7a25b4f

Browse files
committed
fix(informer): treat TimeoutError as reconnectable, normalize timeout errors, add exponential backoff + integration test
1 parent d74afab commit 7a25b4f

5 files changed

Lines changed: 285 additions & 1 deletion

File tree

src/cache.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,19 @@ export interface ObjectCache<T> {
2424
export type CacheMap<T extends KubernetesObject> = Map<string, Map<string, T>>;
2525

2626
export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, Informer<T> {
27+
private static readonly BASE_RECONNECT_DELAY_MS = 1000;
28+
private static readonly MAX_RECONNECT_DELAY_MS = 30000;
29+
2730
private objects: CacheMap<T> = new Map();
2831
private resourceVersion: string;
2932
private readonly indexCache: { [key: string]: T[] } = {};
3033
private readonly callbackCache: { [key: string]: (ObjectCallback<T> | ErrorCallback)[] } = {};
3134
private request: AbortController | undefined;
3235
private stopped: boolean = false;
36+
private reconnectDelayMs: number = 0;
37+
private hasConnected: boolean = false;
38+
private delayFn: (ms: number) => Promise<void> = (ms) =>
39+
new Promise((resolve) => setTimeout(resolve, ms));
3340
private readonly path: string;
3441
private readonly watch: Watch;
3542
private readonly listFn: ListPromise<T>;
@@ -63,6 +70,8 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
6370

6471
public async start(): Promise<void> {
6572
this.stopped = false;
73+
this.reconnectDelayMs = 0;
74+
this.hasConnected = false;
6675
await this.doneHandler(null);
6776
}
6877

@@ -151,6 +160,8 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
151160
((err as { statusCode?: number }).statusCode === 410 || (err as { code?: number }).code === 410)
152161
) {
153162
this.resourceVersion = '';
163+
} else if (err && (err as { name?: string }).name === 'TimeoutError') {
164+
// Watch client-side timeout — reconnect from last known resourceVersion
154165
} else if (err) {
155166
this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err));
156167
return;
@@ -186,6 +197,16 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
186197
if (this.fieldSelector !== undefined) {
187198
queryParams.fieldSelector = ObjectSerializer.serialize(this.fieldSelector, 'string');
188199
}
200+
if (this.reconnectDelayMs > 0 && this.hasConnected) {
201+
await this.delayFn(this.reconnectDelayMs);
202+
}
203+
if (this.hasConnected) {
204+
this.reconnectDelayMs = Math.min(
205+
this.reconnectDelayMs > 0 ? this.reconnectDelayMs * 2 : ListWatch.BASE_RECONNECT_DELAY_MS,
206+
ListWatch.MAX_RECONNECT_DELAY_MS,
207+
);
208+
}
209+
this.hasConnected = true;
189210
this.request = await this.watch.watch(
190211
this.path,
191212
queryParams,
@@ -236,6 +257,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
236257
// nothing to do, here for documentation, mostly.
237258
break;
238259
}
260+
this.reconnectDelayMs = 0;
239261
this.resourceVersion = obj.metadata ? obj.metadata!.resourceVersion || '' : '';
240262
}
241263
}

src/cache_test.ts

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1525,6 +1525,133 @@ describe('ListWatchCache', () => {
15251525

15261526
mockAgent.assertNoPendingInterceptors();
15271527
});
1528+
1529+
it('should apply exponential backoff on repeated reconnects', async () => {
1530+
const fakeWatch = mock.mock(Watch);
1531+
const listObj = {
1532+
metadata: { resourceVersion: '12345' } as V1ListMeta,
1533+
items: [] as V1Namespace[],
1534+
} as V1NamespaceList;
1535+
1536+
const listFn: ListPromise<V1Namespace> = () => Promise.resolve(listObj);
1537+
1538+
let watchCalls = 0;
1539+
const delayValues: number[] = [];
1540+
const promise = new Promise((resolve) => {
1541+
mock.when(
1542+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1543+
).thenCall(() => {
1544+
watchCalls++;
1545+
resolve(new AbortController());
1546+
return Promise.resolve(new AbortController());
1547+
});
1548+
});
1549+
1550+
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
1551+
(cache as any).delayFn = (ms: number) => {
1552+
delayValues.push(ms);
1553+
return Promise.resolve();
1554+
};
1555+
await promise;
1556+
strictEqual(watchCalls, 1);
1557+
1558+
const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();
1559+
1560+
await doneHandler(null);
1561+
strictEqual(watchCalls, 2);
1562+
deepStrictEqual(delayValues, []);
1563+
1564+
await doneHandler(null);
1565+
strictEqual(watchCalls, 3);
1566+
deepStrictEqual(delayValues, [1000]);
1567+
1568+
await doneHandler(null);
1569+
strictEqual(watchCalls, 4);
1570+
deepStrictEqual(delayValues, [1000, 2000]);
1571+
1572+
await doneHandler(null);
1573+
strictEqual(watchCalls, 5);
1574+
deepStrictEqual(delayValues, [1000, 2000, 4000]);
1575+
});
1576+
1577+
it('should reset backoff after receiving a watch event', async () => {
1578+
const fakeWatch = mock.mock(Watch);
1579+
const listObj = {
1580+
metadata: { resourceVersion: '12345' } as V1ListMeta,
1581+
items: [] as V1Namespace[],
1582+
} as V1NamespaceList;
1583+
1584+
const listFn: ListPromise<V1Namespace> = () => Promise.resolve(listObj);
1585+
1586+
const delayValues: number[] = [];
1587+
const promise = new Promise((resolve) => {
1588+
mock.when(
1589+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1590+
).thenCall(() => {
1591+
resolve(new AbortController());
1592+
return Promise.resolve(new AbortController());
1593+
});
1594+
});
1595+
1596+
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
1597+
(cache as any).delayFn = (ms: number) => {
1598+
delayValues.push(ms);
1599+
return Promise.resolve();
1600+
};
1601+
await promise;
1602+
1603+
const [, , watchHandler, doneHandler] = mock.capture(fakeWatch.watch).last();
1604+
1605+
await doneHandler(null);
1606+
await doneHandler(null);
1607+
deepStrictEqual(delayValues, [1000]);
1608+
1609+
watchHandler('ADDED', {
1610+
metadata: { name: 'reset', namespace: 'default', resourceVersion: '99' } as V1ObjectMeta,
1611+
} as V1Namespace);
1612+
1613+
delayValues.length = 0;
1614+
await doneHandler(null);
1615+
deepStrictEqual(delayValues, []);
1616+
1617+
await doneHandler(null);
1618+
deepStrictEqual(delayValues, [1000]);
1619+
});
1620+
1621+
it('should reconnect with backoff on TimeoutError', async () => {
1622+
const fakeWatch = mock.mock(Watch);
1623+
const listObj = {
1624+
metadata: { resourceVersion: '12345' } as V1ListMeta,
1625+
items: [] as V1Namespace[],
1626+
} as V1NamespaceList;
1627+
1628+
const listFn: ListPromise<V1Namespace> = () => Promise.resolve(listObj);
1629+
1630+
let watchCalls = 0;
1631+
let errorEmitted = false;
1632+
const promise = new Promise((resolve) => {
1633+
mock.when(
1634+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1635+
).thenCall(() => {
1636+
watchCalls++;
1637+
resolve(new AbortController());
1638+
return Promise.resolve(new AbortController());
1639+
});
1640+
});
1641+
1642+
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
1643+
(cache as any).delayFn = () => Promise.resolve();
1644+
await promise;
1645+
cache.on('error', () => (errorEmitted = true));
1646+
strictEqual(watchCalls, 1);
1647+
1648+
const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();
1649+
1650+
const timeoutError = new DOMException('The operation was aborted due to timeout', 'TimeoutError');
1651+
await doneHandler(timeoutError);
1652+
strictEqual(watchCalls, 2);
1653+
strictEqual(errorEmitted, false);
1654+
});
15281655
});
15291656

15301657
describe('delete items', () => {

src/test/integration/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ import patchNamespace from './patchNamespace.js';
22
import cpFromPod from './cpFromPod.js';
33
import portForwardIntegration from './portForward.js';
44
import watchPods from './watchPods.js';
5+
import informerReconnect from './informerReconnect.js';
56

67
console.log('Integration testing');
78

89
await patchNamespace();
910
await cpFromPod();
1011
await portForwardIntegration();
1112
await watchPods();
13+
await informerReconnect();
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import assert from 'node:assert';
2+
import { setTimeout } from 'node:timers/promises';
3+
import { CoreV1Api, KubeConfig, V1ConfigMap, V1ConfigMapList } from '../../index.js';
4+
import { Watch } from '../../watch.js';
5+
import { ListWatch } from '../../cache.js';
6+
import { generateName } from './name.js';
7+
8+
export default async function informerReconnect() {
9+
const kc = new KubeConfig();
10+
kc.loadFromDefault();
11+
12+
const coreV1Client = kc.makeApiClient(CoreV1Api);
13+
const namespace = 'default';
14+
const labelKey = 'informer-test';
15+
const labelValue = generateName('run');
16+
const labelSelector = `${labelKey}=${labelValue}`;
17+
18+
console.log(`\n=== Informer Reconnect Integration Test (label=${labelValue}) ===`);
19+
20+
const watch = new Watch(kc);
21+
const listFn = async (): Promise<V1ConfigMapList> => {
22+
return coreV1Client.listNamespacedConfigMap({
23+
namespace,
24+
labelSelector,
25+
});
26+
};
27+
28+
const informer = new ListWatch<V1ConfigMap>(
29+
`/api/v1/namespaces/${namespace}/configmaps`,
30+
watch,
31+
listFn,
32+
false,
33+
labelSelector,
34+
);
35+
36+
const addedNames: string[] = [];
37+
let connectCount = 0;
38+
let errorCount = 0;
39+
40+
informer.on('add', (obj: V1ConfigMap) => {
41+
const name = obj.metadata?.name ?? 'unknown';
42+
console.log(`Informer event: add ${name}`);
43+
addedNames.push(name);
44+
});
45+
46+
informer.on('connect', () => {
47+
connectCount++;
48+
console.log(`Informer event: connect (#${connectCount})`);
49+
});
50+
51+
informer.on('error', (err: any) => {
52+
errorCount++;
53+
console.log(`Informer event: error ${err}`);
54+
});
55+
56+
const cm1Name = generateName('cm1');
57+
const cm2Name = generateName('cm2');
58+
59+
try {
60+
await informer.start();
61+
console.log('Informer started');
62+
63+
console.log(`Creating configmap ${cm1Name}`);
64+
await coreV1Client.createNamespacedConfigMap({
65+
namespace,
66+
body: {
67+
metadata: { name: cm1Name, labels: { [labelKey]: labelValue } },
68+
data: { key: 'value1' },
69+
},
70+
});
71+
72+
for (let i = 0; i < 30; i++) {
73+
if (addedNames.includes(cm1Name)) break;
74+
await setTimeout(500);
75+
}
76+
assert.ok(addedNames.includes(cm1Name), 'Should have received add event for cm1');
77+
console.log('✓ Received add event for cm1');
78+
79+
const initialConnects = connectCount;
80+
console.log(`Waiting for watch reconnection (up to 45s)...`);
81+
for (let i = 0; i < 90; i++) {
82+
if (connectCount > initialConnects) break;
83+
await setTimeout(500);
84+
}
85+
assert.ok(connectCount > initialConnects, 'Informer should have reconnected');
86+
console.log(`✓ Informer reconnected (connect count: ${connectCount})`);
87+
88+
console.log(`Creating configmap ${cm2Name}`);
89+
await coreV1Client.createNamespacedConfigMap({
90+
namespace,
91+
body: {
92+
metadata: { name: cm2Name, labels: { [labelKey]: labelValue } },
93+
data: { key: 'value2' },
94+
},
95+
});
96+
97+
for (let i = 0; i < 30; i++) {
98+
if (addedNames.includes(cm2Name)) break;
99+
await setTimeout(500);
100+
}
101+
assert.ok(addedNames.includes(cm2Name), 'Should have received add event for cm2 after reconnect');
102+
console.log('✓ Received add event for cm2 after reconnection');
103+
104+
const cm1Duplicates = addedNames.filter((n) => n === cm1Name).length;
105+
assert.strictEqual(
106+
cm1Duplicates,
107+
1,
108+
`cm1 should only appear once in add events, got ${cm1Duplicates}`,
109+
);
110+
console.log('✓ No duplicate add events for cm1 (delta-only after reconnect)');
111+
112+
assert.strictEqual(errorCount, 0, `Expected no errors, got ${errorCount}`);
113+
console.log('✓ No error events');
114+
} finally {
115+
await informer.stop();
116+
try {
117+
await coreV1Client.deleteNamespacedConfigMap({ name: cm1Name, namespace });
118+
} catch {
119+
// already deleted
120+
}
121+
try {
122+
await coreV1Client.deleteNamespacedConfigMap({ name: cm2Name, namespace });
123+
} catch {
124+
// already deleted
125+
}
126+
}
127+
128+
console.log('Informer reconnect integration test passed!');
129+
}

src/watch.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ export class Watch {
5151
if (!doneCalled) {
5252
doneCalled = true;
5353
controller.abort();
54-
done(err);
54+
if (err && timeoutSignal.aborted) {
55+
done(new DOMException('The operation was aborted due to timeout', 'TimeoutError'));
56+
} else {
57+
done(err);
58+
}
5559
}
5660
};
5761

0 commit comments

Comments
 (0)