Skip to content

Commit 8c57dfa

Browse files
authored
New: [AEA-5274] - Deduplicate notification queue by NHS Number (#1562)
## Summary - ✨ New Feature ### Details We deduplicate using SQS FIFO deduplication IDs. This is a salted hash of the patient NHS number, and the ODS code. So, one notification per patient, per dispensing site, per 5 minute SQS deduplication window. Note that I'm technically not using a proper salted hash, since that would use a random salt. Since we need hashes to be deterministic, instead we use a secret value that's generated once during deployment and re-used for all hashes thereafter.
1 parent 255fcb6 commit 8c57dfa

9 files changed

Lines changed: 282 additions & 80 deletions

File tree

SAMtemplates/functions/main.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,17 @@ Conditions:
6060
- !Ref DeployCheckPrescriptionStatusUpdate
6161

6262
Resources:
63+
SQSSaltSecret:
64+
Type: AWS::SecretsManager::Secret
65+
Properties:
66+
Name: !Sub ${StackName}-SqsSalt
67+
Description: Auto-generated salt for SQS_SALT
68+
GenerateSecretString:
69+
SecretStringTemplate: "{}"
70+
GenerateStringKey: salt
71+
PasswordLength: 32
72+
ExcludePunctuation: true
73+
6374
UpdatePrescriptionStatus:
6475
Type: AWS::Serverless::Function
6576
Properties:
@@ -71,6 +82,7 @@ Resources:
7182
Variables:
7283
TABLE_NAME: !Ref PrescriptionStatusUpdatesTableName
7384
NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL: !Ref NHSNotifyPrescriptionsSQSQueueUrl
85+
SQS_SALT: !Sub "{{resolve:secretsmanager:${SQSSaltSecret}:SecretString:salt}}"
7486
LOG_LEVEL: !Ref LogLevel
7587
ENVIRONMENT: !Ref Environment
7688
TEST_PRESCRIPTIONS_1: "None"

SAMtemplates/messaging/main.yaml

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ Resources:
4848
NHSNotifyPrescriptionsSQSQueue:
4949
Type: AWS::SQS::Queue
5050
Properties:
51-
QueueName: !Sub ${StackName}-NHSNotifyPrescriptions
51+
QueueName: !Sub ${StackName}-NHSNotifyPrescriptions.fifo
52+
FifoQueue: true
53+
ContentBasedDeduplication: false
5254
KmsMasterKeyId: !Ref NotificationSQSQueueKMSKeyAlias
5355
MessageRetentionPeriod: 86400 # 1 day in seconds
5456
RedrivePolicy:
@@ -59,7 +61,9 @@ Resources:
5961
NHSNotifyPrescriptionsDeadLetterQueue:
6062
Type: AWS::SQS::Queue
6163
Properties:
62-
QueueName: !Sub ${StackName}-NHSNotifyPrescriptionsDeadLetter
64+
QueueName: !Sub ${StackName}-NHSNotifyPrescriptionsDeadLetter.fifo
65+
FifoQueue: true
66+
ContentBasedDeduplication: false
6367
KmsMasterKeyId: !Ref NotificationSQSQueueKMSKeyAlias
6468
MessageRetentionPeriod: 604800 # 1 week in seconds
6569
VisibilityTimeout: 300
@@ -80,7 +84,7 @@ Resources:
8084
- kms:GenerateDataKey
8185
- kms:Decrypt
8286
Resource: !GetAtt NHSNotifyPrescriptionsSQSQueue.Arn
83-
87+
8488
WriteNHSNotifyPrescriptionsSQSQueuePolicy:
8589
Type: AWS::IAM::ManagedPolicy
8690
Properties:
@@ -96,7 +100,7 @@ Resources:
96100
- kms:GenerateDataKey
97101
- kms:Decrypt
98102
Resource: !GetAtt NHSNotifyPrescriptionsSQSQueue.Arn
99-
103+
100104
Outputs:
101105
NHSNotifyPrescriptionsSQSQueueUrl:
102106
Description: The URL of the NHS Notify Prescriptions SQS Queue

packages/nhsNotifyLambda/src/nhsNotifyLambda.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import inputOutputLogger from "@middy/input-output-logger"
77
import errorHandler from "@nhs/fhir-middy-error-handler"
88

99
import {PSUDataItem} from "@PrescriptionStatusUpdate_common/commonTypes"
10-
import {drainQueue} from "./utils"
10+
import {clearCompletedSQSMessages, drainQueue} from "./utils"
1111

1212
const logger = new Logger({serviceName: "nhsNotify"})
1313

@@ -21,8 +21,9 @@ export const lambdaHandler = async (event: EventBridgeEvent<string, string>): Pr
2121

2222
logger.info("NHS Notify lambda triggered by scheduler", {event})
2323

24+
let messages
2425
try {
25-
const messages = await drainQueue(logger, 100)
26+
messages = await drainQueue(logger, 100)
2627

2728
if (messages.length === 0) {
2829
logger.info("No messages to process")
@@ -56,6 +57,15 @@ export const lambdaHandler = async (event: EventBridgeEvent<string, string>): Pr
5657
logger.error("Error while draining SQS queue", {error: err})
5758
throw err
5859
}
60+
61+
// By waiting until a message is successfully processed before deleting it from SQS,
62+
// failed messages will eventually be retried by subsequent notify consumers.
63+
try {
64+
await clearCompletedSQSMessages(messages, logger)
65+
} catch (err) {
66+
logger.error("Error while deleting successfully processed messages from SQS", {error: err})
67+
throw err
68+
}
5969
}
6070

6171
export const handler = middy(lambdaHandler)

packages/nhsNotifyLambda/src/utils.ts

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ const docClient = DynamoDBDocumentClient.from(dynamo)
2222
* Pulls up to `maxTotal` messages off the queue (in batches of up to 10),
2323
* logs them, and deletes them.
2424
*/
25-
export async function drainQueue(logger: Logger, maxTotal = 100) {
25+
export async function drainQueue(logger: Logger, maxTotal = 100): Promise<Array<Message>> {
2626
let receivedSoFar = 0
2727
const allMessages: Array<Message> = []
2828

@@ -47,25 +47,43 @@ export async function drainQueue(logger: Logger, maxTotal = 100) {
4747

4848
allMessages.push(...Messages)
4949
receivedSoFar += Messages.length
50+
}
5051

51-
// delete this batch of messages from the queue
52-
const deleteEntries = Messages.map((m) => ({
53-
Id: m.MessageId!,
54-
ReceiptHandle: m.ReceiptHandle!
55-
}))
56-
const deleteCmd = new DeleteMessageBatchCommand({
57-
QueueUrl: sqsUrl,
58-
Entries: deleteEntries
59-
})
60-
const delResult = await sqs.send(deleteCmd)
52+
return allMessages
53+
}
6154

62-
if (delResult.Failed) {
63-
logger.error("Some messages failed to delete", {failed: delResult.Failed})
64-
throw new Error("Failed to delete fetched messages from SQS")
65-
}
55+
/**
56+
* For each message given, delete it from the notifications SQS. Throws an error if it fails
57+
*
58+
* @param messages - The messages that were received from SQS, and are to be deleted.
59+
* @param logger - the logging object
60+
*/
61+
export async function clearCompletedSQSMessages(
62+
messages: Array<Message>,
63+
logger: Logger
64+
): Promise<void> {
65+
if (!sqsUrl) {
66+
logger.error("Notifications SQS URL not configured")
67+
throw new Error("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set")
6668
}
6769

68-
return allMessages
70+
const deleteMessages = messages.map((m) => ({
71+
Id: m.MessageId!,
72+
ReceiptHandle: m.ReceiptHandle!
73+
}))
74+
75+
const deleteCmd = new DeleteMessageBatchCommand({
76+
QueueUrl: sqsUrl,
77+
Entries: deleteMessages
78+
})
79+
const delResult = await sqs.send(deleteCmd)
80+
81+
if (delResult.Failed) {
82+
logger.error("Some messages failed to delete", {failed: delResult.Failed})
83+
throw new Error("Failed to delete fetched messages from SQS")
84+
}
85+
86+
logger.info("Successfully deleted messages from SQS", {result: delResult})
6987
}
7088

7189
export async function addPrescriptionToNotificationStateStore(logger: Logger, dataArray: Array<PSUDataItem>) {

packages/nhsNotifyLambda/tests/testNhsNotifyLambda.test.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import {
77
} from "@jest/globals"
88

99
const mockDrainQueue = jest.fn()
10+
const mockClearCompletedSQSMessages = jest.fn()
1011
jest.unstable_mockModule(
1112
"../src/utils",
1213
async () => ({
1314
__esModule: true,
14-
drainQueue: mockDrainQueue
15+
drainQueue: mockDrainQueue,
16+
clearCompletedSQSMessages: mockClearCompletedSQSMessages
1517
})
1618
)
1719

@@ -57,6 +59,51 @@ describe("Unit test for NHS Notify lambda handler", () => {
5759
expect(mockInfo).toHaveBeenCalledWith("No messages to process")
5860
})
5961

62+
it("Clears completed messages after successful processing", async () => {
63+
const item1 = {TaskID: "t1", RequestID: "r1"}
64+
const item2 = {TaskID: "t2", RequestID: "r2"}
65+
const msg1 = {Body: JSON.stringify(item1)}
66+
const msg2 = {Body: JSON.stringify(item2)}
67+
// drainQueue returns two messages
68+
mockDrainQueue.mockImplementationOnce(() => Promise.resolve([msg1, msg2]))
69+
// deletion succeeds
70+
mockClearCompletedSQSMessages.mockImplementationOnce(() => Promise.resolve(undefined))
71+
72+
await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow()
73+
74+
// ensure we logged the fetched notifications
75+
expect(mockInfo).toHaveBeenCalledWith(
76+
"Fetched prescription notification messages",
77+
{
78+
count: 2,
79+
toNotify: [
80+
{RequestID: "r1", TaskId: "t1", Message: "Notification Required"},
81+
{RequestID: "r2", TaskId: "t2", Message: "Notification Required"}
82+
]
83+
}
84+
)
85+
// ensure clearCompletedSQSMessages was called with the original messages array
86+
expect(mockClearCompletedSQSMessages).toHaveBeenCalledWith(
87+
[msg1, msg2],
88+
expect.any(Object) // the logger instance
89+
)
90+
})
91+
92+
it("Throws and logs if clearCompletedSQSMessages fails", async () => {
93+
const item = {TaskID: "tx", RequestID: "rx"}
94+
const msg = {Body: JSON.stringify(item)}
95+
mockDrainQueue.mockImplementationOnce(() => Promise.resolve([msg]))
96+
const deletionError = new Error("Delete failed")
97+
mockClearCompletedSQSMessages.mockImplementationOnce(() => Promise.reject(deletionError))
98+
99+
await expect(lambdaHandler(mockEventBridgeEvent)).rejects.toThrow("Delete failed")
100+
101+
expect(mockError).toHaveBeenCalledWith(
102+
"Error while deleting successfully processed messages from SQS",
103+
{error: deletionError}
104+
)
105+
})
106+
60107
it("When drainQueue returns only valid JSON messages, all are processed", async () => {
61108
const validItem = {
62109
prescriptionId: "abc123",

packages/nhsNotifyLambda/tests/testUtils.test.ts

Lines changed: 75 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ import {SpiedFunction} from "jest-mock"
33

44
import {Logger} from "@aws-lambda-powertools/logger"
55
import {DynamoDBDocumentClient, PutCommand} from "@aws-sdk/lib-dynamodb"
6-
import {Message} from "@aws-sdk/client-sqs"
6+
import {DeleteMessageBatchCommand, Message} from "@aws-sdk/client-sqs"
77

88
import {constructPSUDataItem, mockSQSClient} from "./testHelpers"
99

1010
const {mockSend: sqsMockSend} = mockSQSClient()
1111

12-
const {addPrescriptionToNotificationStateStore, drainQueue} = await import("../src/utils")
12+
const {addPrescriptionToNotificationStateStore, clearCompletedSQSMessages, drainQueue} = await import("../src/utils")
1313

1414
const ORIGINAL_ENV = {...process.env}
1515

@@ -31,13 +31,10 @@ describe("NHS notify lambda helper functions", () => {
3131
it("Does not throw an error when the SQS fetch succeeds", async () => {
3232
const payload = {Messages: Array.from({length: 10}, () => (constructPSUDataItem() as Message))}
3333

34-
// Mock once for the fetch, and once for the delete
35-
sqsMockSend
36-
.mockImplementationOnce(() => Promise.resolve(payload))
37-
.mockImplementationOnce(() => Promise.resolve({Successful: []}))
34+
sqsMockSend.mockImplementationOnce(() => Promise.resolve(payload))
3835

3936
const messages = await drainQueue(logger, 10)
40-
expect(sqsMockSend).toHaveBeenCalledTimes(2)
37+
expect(sqsMockSend).toHaveBeenCalledTimes(1)
4138
expect(messages).toStrictEqual(payload.Messages)
4239
})
4340

@@ -47,39 +44,92 @@ describe("NHS notify lambda helper functions", () => {
4744
const messages = await drainQueue(logger, 5)
4845
expect(messages).toEqual([])
4946
expect(sqsMockSend).toHaveBeenCalledTimes(1)
50-
// no deletion attempted
5147
})
5248

5349
it("Throws an error if the SQS fetch fails", async () => {
5450
sqsMockSend.mockImplementation(() => Promise.reject(new Error("Fetch failed")))
5551
await expect(drainQueue(logger, 10)).rejects.toThrow("Fetch failed")
5652
})
5753

58-
it("Throws an error if the delete batch operation fails", async () => {
59-
const msg = constructPSUDataItem() as Message
60-
// first call: fetch, second call: delete
61-
sqsMockSend
62-
.mockImplementationOnce(() =>
63-
Promise.resolve({Messages: [msg]})
64-
)
65-
.mockImplementationOnce(() =>
66-
Promise.resolve({
67-
Failed: [{Id: msg.MessageId!, Message: "del-error", Code: "500"}]
68-
})
69-
)
70-
71-
await expect(drainQueue(logger, 1)).rejects.toThrow("Failed to delete fetched messages from SQS")
54+
it("Throws an error if the SQS URL is not configured", async () => {
55+
delete process.env.NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL
56+
const {drainQueue} = await import("../src/utils")
57+
58+
await expect(drainQueue(logger)).rejects.toThrow("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set")
59+
expect(errorSpy).toHaveBeenCalledWith("Notifications SQS URL not configured")
60+
})
61+
})
62+
63+
describe("clearCompletedSQSMessages", () => {
64+
let logger: Logger
65+
let errorSpy: SpiedFunction<(msg: string, ...meta: Array<unknown>) => void>
66+
67+
beforeEach(() => {
68+
jest.resetModules()
69+
jest.clearAllMocks()
70+
71+
process.env = {...ORIGINAL_ENV}
72+
logger = new Logger({serviceName: "test-service"})
73+
errorSpy = jest.spyOn(logger, "error")
74+
})
75+
76+
it("deletes messages successfully without error", async () => {
77+
const messages: Array<Message> = [
78+
{MessageId: "msg1", ReceiptHandle: "rh1"},
79+
{MessageId: "msg2", ReceiptHandle: "rh2"}
80+
]
81+
82+
// successful delete (no .Failed)
83+
sqsMockSend.mockImplementationOnce(() => Promise.resolve({}))
84+
85+
await expect(clearCompletedSQSMessages(messages, logger))
86+
.resolves
87+
.toBeUndefined()
88+
89+
expect(sqsMockSend).toHaveBeenCalledTimes(1)
90+
91+
const cmd = sqsMockSend.mock.calls[0][0]
92+
93+
expect(cmd).toBeInstanceOf(DeleteMessageBatchCommand)
94+
expect((cmd as DeleteMessageBatchCommand).input).toEqual({
95+
QueueUrl: process.env.NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL,
96+
Entries: [
97+
{Id: "msg1", ReceiptHandle: "rh1"},
98+
{Id: "msg2", ReceiptHandle: "rh2"}
99+
]
100+
})
101+
102+
expect(errorSpy).not.toHaveBeenCalled()
103+
})
104+
105+
it("logs and throws if some deletions fail", async () => {
106+
const messages: Array<Message> = [
107+
{MessageId: "msg1", ReceiptHandle: "rh1"}
108+
]
109+
const failedEntries = [
110+
{Id: "msg1", SenderFault: true, Code: "Error", Message: "fail"}
111+
]
112+
113+
// partial failure
114+
sqsMockSend.mockImplementationOnce(() => Promise.resolve({Failed: failedEntries}))
115+
116+
await expect(clearCompletedSQSMessages(messages, logger))
117+
.rejects
118+
.toThrow("Failed to delete fetched messages from SQS")
119+
72120
expect(errorSpy).toHaveBeenCalledWith(
73121
"Some messages failed to delete",
74-
{failed: expect.any(Array)}
122+
{failed: failedEntries}
75123
)
76124
})
77125

78126
it("Throws an error if the SQS URL is not configured", async () => {
79127
delete process.env.NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL
80-
const {drainQueue} = await import("../src/utils")
128+
const {clearCompletedSQSMessages} = await import("../src/utils")
81129

82-
await expect(drainQueue(logger)).rejects.toThrow("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set")
130+
await expect(clearCompletedSQSMessages([], logger))
131+
.rejects
132+
.toThrow("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set")
83133
expect(errorSpy).toHaveBeenCalledWith("Notifications SQS URL not configured")
84134
})
85135
})
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/* eslint-disable no-undef */
22
process.env.NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL = "dummy_notify_sqs";
33
process.env.AWS_REGION = "eu-west-2";
4+
process.env.SQS_SALT = "the quick brown fox something something"

0 commit comments

Comments
 (0)