Skip to content

Commit d81a184

Browse files
committed
WIP
1 parent 17b9ccc commit d81a184

3 files changed

Lines changed: 402 additions & 187 deletions

File tree

packages/javascript-api/src/lib/services/graphql/__fixtures__/graphql-subscriptions-fixture.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ export class GraphQLSubscriptionsFixture {
4040
return (this.graphqlService as any).subscriptions.length;
4141
}
4242

43-
getGraphqlServiceSubscriptionObserverMapSize(): number {
44-
return Object.keys(this.getGraphqlServiceSubscriptionObserverMap()).length;
43+
getMessageSubscribersSize(): number {
44+
return (this.graphqlService as any).messageSubscribers.size;
4545
}
4646

47-
getGraphqlServiceSubscriptionObserverMap(): Record<string, Observer<object>> {
48-
return (this.graphqlService as any).subscriptionObserverMap;
47+
hasMessageSubscriber(id: string): boolean {
48+
return (this.graphqlService as any).messageSubscribers.has(id);
4949
}
5050

5151
async waitForConnection() {
@@ -125,6 +125,7 @@ export class GraphQLSubscriptionsFixture {
125125
}
126126

127127
async cleanup() {
128+
(this.graphqlService as any).clearSubscriptionRetry();
128129
WS.clean();
129130
await this.server.closed;
130131
}

packages/javascript-api/src/lib/services/graphql/__tests__/graphql-subscriptions.spec.ts

Lines changed: 176 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
import gql from 'graphql-tag';
22
import fetchMock from 'jest-fetch-mock';
33
import { WebSocket } from 'mock-socket';
4-
import { Subscriber } from 'rxjs';
54
import { ConnectionStatus } from '../../../model/connection-status';
65
import { GraphQLSubscriptionsFixture } from '../__fixtures__/graphql-subscriptions-fixture';
7-
import { QminderGraphQLError } from '../graphql.service';
86

97
jest.mock('isomorphic-ws', () => WebSocket);
108
jest.mock('../../../util/sleep-ms/sleep-ms', () => ({
@@ -96,24 +94,20 @@ describe('GraphQL subscriptions', () => {
9694
});
9795

9896
it('cleans up internal state when unsubscribing', async () => {
99-
// start the test with an empty observer-map
100-
expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0);
97+
expect(fixture.getMessageSubscribersSize()).toBe(0);
10198
const subscription = fixture.triggerSubscription();
10299
await fixture.handleConnectionInit();
103100
await fixture.consumeSubscribeMessage();
104-
// the observer map should equal { "1": Subscriber => spy }
105-
expect(fixture.getGraphqlServiceSubscriptionObserverMap()).toEqual({
106-
'1': expect.any(Subscriber),
107-
});
101+
expect(fixture.getMessageSubscribersSize()).toBe(1);
102+
expect(fixture.hasMessageSubscriber('1')).toBe(true);
108103

109-
// unsubscribing should clean up
110104
subscription.unsubscribe();
111105
await fixture.consumeAnyMessage();
112-
expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0);
106+
expect(fixture.getMessageSubscribersSize()).toBe(0);
113107
});
114108

115109
it('when receiving a published message for a subscription that does not exist anymore, it does not throw', async () => {
116-
expect(fixture.getGraphqlServiceSubscriptionObserverMapSize()).toBe(0);
110+
expect(fixture.getMessageSubscribersSize()).toBe(0);
117111
const subscription = fixture.triggerSubscription();
118112

119113
await fixture.handleConnectionInit();
@@ -335,43 +329,191 @@ describe('GraphQL subscriptions', () => {
335329
subscription.unsubscribe();
336330
});
337331

338-
it('error messages are propagated to the subscriber', async () => {
339-
const ERRORS: QminderGraphQLError[] = [
340-
{
341-
message:
342-
"Invalid Syntax : offending token 'createdTickets' at line 1 column 1",
343-
sourcePreview:
344-
'createdTickets(locationId: 673) {\n' +
345-
' id\n' +
346-
' firstName\n' +
347-
' lastName\n',
348-
offendingToken: 'createdTickets',
349-
locations: [],
350-
errorType: 'InvalidSyntax',
351-
extensions: null,
352-
path: null,
353-
},
354-
];
332+
it('GQL_ERROR does not kill the subscription or trigger reconnect', async () => {
333+
const reconnectSpy = jest.spyOn(
334+
fixture.graphqlService as any,
335+
'handleConnectionDrop',
336+
);
355337
const errorSpy = jest.fn();
356338
const subscription = fixture.triggerSubscription('subscription { baba }', {
357339
error: errorSpy,
358340
});
359341
await fixture.handleConnectionInit();
360342
await fixture.consumeSubscribeMessage();
343+
344+
fixture.server.send({
345+
id: '1',
346+
type: 'error',
347+
payload: {
348+
data: null,
349+
errors: [{ message: 'Subscription limit reached' }],
350+
},
351+
});
352+
353+
await new Promise((r) => setTimeout(r, 10));
354+
355+
expect(errorSpy).not.toHaveBeenCalled();
356+
expect(reconnectSpy).not.toHaveBeenCalled();
357+
expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(1);
358+
expect(fixture.hasMessageSubscriber('1')).toBe(true);
359+
360+
subscription.unsubscribe();
361+
});
362+
363+
it('GQL_ERROR emits true on the subscription error observable', async () => {
364+
const values: boolean[] = [];
365+
fixture.graphqlService
366+
.getSubscriptionErrorObservable()
367+
.subscribe((v) => values.push(v));
368+
369+
const subscription = fixture.triggerSubscription('subscription { baba }');
370+
await fixture.handleConnectionInit();
371+
await fixture.consumeSubscribeMessage();
372+
373+
fixture.server.send({
374+
id: '1',
375+
type: 'error',
376+
payload: {
377+
data: null,
378+
errors: [
379+
{
380+
message:
381+
'The maximum subscription limit of 100 has been reached',
382+
},
383+
],
384+
},
385+
});
386+
387+
await new Promise((r) => setTimeout(r, 10));
388+
389+
expect(values).toEqual([false, true]);
390+
391+
subscription.unsubscribe();
392+
});
393+
394+
it('retries failed subscriptions after delay and clears error state', async () => {
395+
(fixture.graphqlService as any).subscriptionRetryDelayMs = 50;
396+
397+
const values: boolean[] = [];
398+
fixture.graphqlService
399+
.getSubscriptionErrorObservable()
400+
.subscribe((v) => values.push(v));
401+
402+
const subscription = fixture.triggerSubscription('subscription { baba }');
403+
await fixture.handleConnectionInit();
404+
await fixture.consumeSubscribeMessage();
405+
361406
fixture.server.send({
362407
id: '1',
363408
type: 'error',
364409
payload: {
365410
data: null,
366-
errors: ERRORS,
411+
errors: [{ message: 'Limit reached' }],
367412
},
368413
});
369414

370-
expect(errorSpy).toHaveBeenCalledWith(ERRORS);
371-
// Cleans up as well
372-
expect(
373-
(fixture.graphqlService as any).subscriptionObserverMap['1'],
374-
).toBeUndefined();
415+
await new Promise((r) => setTimeout(r, 10));
416+
expect(values).toEqual([false, true]);
417+
418+
await new Promise((r) => setTimeout(r, 60));
419+
420+
expect(values).toEqual([false, true, false]);
421+
expect(await fixture.getNextMessage()).toEqual({
422+
id: '1',
423+
type: 'start',
424+
payload: { query: 'subscription { baba }' },
425+
});
426+
427+
subscription.unsubscribe();
428+
});
429+
430+
it('does not send GQL_STOP when server sends GQL_COMPLETE', async () => {
431+
const completeSpy = jest.fn();
432+
const subscription = fixture.triggerSubscription('subscription { baba }', {
433+
next: () => {},
434+
complete: completeSpy,
435+
});
436+
await fixture.handleConnectionInit();
437+
await fixture.consumeSubscribeMessage();
438+
439+
fixture.sendMessageToClient({
440+
id: '1',
441+
type: 'complete',
442+
});
443+
444+
await new Promise((r) => setTimeout(r, 10));
445+
446+
expect(completeSpy).toHaveBeenCalled();
447+
expect(fixture.hasMessageSubscriber('1')).toBe(false);
448+
expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(0);
449+
expect(fixture.server.messagesToConsume.pendingItems).toHaveLength(0);
450+
451+
subscription.unsubscribe();
452+
});
453+
454+
it('GQL_ERROR keeps subscription tracked so it re-subscribes on natural reconnect and clears error state', async () => {
455+
const values: boolean[] = [];
456+
fixture.graphqlService
457+
.getSubscriptionErrorObservable()
458+
.subscribe((v) => values.push(v));
459+
460+
const subscription = fixture.triggerSubscription('subscription { baba }');
461+
await fixture.handleConnectionInit();
462+
await fixture.consumeSubscribeMessage();
463+
464+
fixture.sendMessageToClient({
465+
id: '1',
466+
type: 'error',
467+
payload: {
468+
data: null,
469+
errors: [{ message: 'Limit reached' }],
470+
},
471+
});
472+
473+
await new Promise((r) => setTimeout(r, 10));
474+
475+
expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(1);
476+
expect(fixture.hasMessageSubscriber('1')).toBe(true);
477+
expect(values).toEqual([false, true]);
478+
479+
await fixture.closeWithCode(1001);
480+
fixture.openServer();
481+
await fixture.handleConnectionInit();
482+
expect(await fixture.getNextMessage()).toEqual({
483+
id: '1',
484+
type: 'start',
485+
payload: { query: 'subscription { baba }' },
486+
});
487+
488+
expect(values).toEqual([false, true, false]);
489+
490+
subscription.unsubscribe();
491+
});
492+
493+
it('cleans up subscription on unknown message type with errors', async () => {
494+
const errorSpy = jest.fn();
495+
const subscription = fixture.triggerSubscription('subscription { baba }', {
496+
error: errorSpy,
497+
});
498+
await fixture.handleConnectionInit();
499+
await fixture.consumeSubscribeMessage();
500+
501+
fixture.sendMessageToClient({
502+
id: '1',
503+
type: 'unknown_type',
504+
payload: {
505+
errors: [{ message: 'Something went wrong' }],
506+
},
507+
});
508+
509+
await new Promise((r) => setTimeout(r, 10));
510+
511+
expect(errorSpy).toHaveBeenCalledWith(
512+
expect.objectContaining({ message: 'Something went wrong' }),
513+
);
514+
expect(errorSpy.mock.calls[0][0]).toBeInstanceOf(Error);
515+
expect(fixture.getGraphqlServiceActiveSubscriptionCount()).toBe(0);
516+
expect(fixture.hasMessageSubscriber('1')).toBe(false);
375517

376518
subscription.unsubscribe();
377519
});

0 commit comments

Comments
 (0)