Skip to content

Commit 63dc43c

Browse files
authored
New: [AEA-5206] - Check cooldown when sending a notification (#1621)
## Summary - ✨ New Feature ### Details When we have a batch of messages, filter them to remove ones which are recorded in the data store as the patient having recently received a notification. Default cooldown period is 15 minutes.
1 parent be703b4 commit 63dc43c

5 files changed

Lines changed: 297 additions & 15 deletions

File tree

.vscode/eps-prescription-status-update-api.code-workspace

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
"Codeable",
7575
"codeinline",
7676
"codesystem",
77+
"Cooldown",
7778
"cpsu",
7879
"dbaeumer",
7980
"devcontainer",

packages/nhsNotifyLambda/src/nhsNotifyLambda.ts

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import errorHandler from "@nhs/fhir-middy-error-handler"
88

99
import {
1010
addPrescriptionMessagesToNotificationStateStore,
11+
checkCooldownForUpdate,
1112
clearCompletedSQSMessages,
1213
drainQueue,
1314
PSUDataItemMessage
@@ -26,6 +27,7 @@ export const lambdaHandler = async (event: EventBridgeEvent<string, string>): Pr
2627
logger.info("NHS Notify lambda triggered by scheduler", {event})
2728

2829
let messages: Array<PSUDataItemMessage>
30+
let processed: Array<PSUDataItemMessage>
2931
try {
3032
messages = await drainQueue(logger, 100)
3133

@@ -34,26 +36,54 @@ export const lambdaHandler = async (event: EventBridgeEvent<string, string>): Pr
3436
return
3537
}
3638

37-
const toNotify = messages.map((m) => ({
38-
RequestID: m.PSUDataItem.RequestID,
39-
TaskId: m.PSUDataItem.TaskID,
40-
Message: "Notification Required"
41-
}))
39+
// Filter messages by checkCooldownForUpdate. This is done in two stages so we can check in parallel
40+
const eligibility = await Promise.all(
41+
messages.map(async (m) => ({
42+
message: m,
43+
allowed: await checkCooldownForUpdate(logger, m.PSUDataItem)
44+
}))
45+
)
46+
const toProcess = eligibility
47+
.filter((e) => e.allowed)
48+
.map((e) => e.message)
49+
50+
// Log the results of checking the cooldown
51+
const suppressedCount = messages.length - toProcess.length
52+
if (toProcess.length === 0) {
53+
logger.info("All messages suppressed by cooldown; nothing to notify",
54+
{
55+
suppressedCount,
56+
totalFetched: messages.length
57+
})
58+
return
59+
} else if (suppressedCount > 0) {
60+
logger.info(`Suppressed ${suppressedCount} messages due to cooldown`,
61+
{
62+
suppressedCount,
63+
totalFetched: messages.length
64+
}
65+
)
66+
}
67+
68+
// Just for diagnostics for now
69+
const toNotify = toProcess
70+
.map((m) => ({
71+
RequestID: m.PSUDataItem.RequestID,
72+
TaskId: m.PSUDataItem.TaskID,
73+
Message: "Notification Required"
74+
}))
4275
logger.info("Fetched prescription notification messages", {count: toNotify.length, toNotify})
4376

44-
// TODO: Notifications logic will be done here.
45-
// - query PrescriptionNotificationState
46-
// - process prescriptions, build NHS notify payload
47-
// - Make NHS notify request
48-
// Don't forget to make appropriate logs!
77+
// TODO: Notifications request will be done here.
78+
processed = toProcess
4979

5080
} catch (err) {
5181
logger.error("Error while draining SQS queue", {error: err})
5282
throw err
5383
}
5484

5585
try {
56-
await addPrescriptionMessagesToNotificationStateStore(logger, messages)
86+
await addPrescriptionMessagesToNotificationStateStore(logger, processed)
5787
} catch (err) {
5888
logger.error("Error while pushing data to the PSU notification state data store", {err})
5989
throw err
@@ -62,7 +92,7 @@ export const lambdaHandler = async (event: EventBridgeEvent<string, string>): Pr
6292
// By waiting until a message is successfully processed before deleting it from SQS,
6393
// failed messages will eventually be retried by subsequent notify consumers.
6494
try {
65-
await clearCompletedSQSMessages(logger, messages)
95+
await clearCompletedSQSMessages(logger, processed)
6696
} catch (err) {
6797
logger.error("Error while deleting successfully processed messages from SQS", {error: err})
6898
throw err

packages/nhsNotifyLambda/src/utils.ts

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
Message
77
} from "@aws-sdk/client-sqs"
88
import {DynamoDBClient} from "@aws-sdk/client-dynamodb"
9-
import {DynamoDBDocumentClient, PutCommand} from "@aws-sdk/lib-dynamodb"
9+
import {DynamoDBDocumentClient, GetCommand, PutCommand} from "@aws-sdk/lib-dynamodb"
1010

1111
import {PSUDataItem} from "@PrescriptionStatusUpdate_common/commonTypes"
1212

@@ -208,3 +208,67 @@ export async function addPrescriptionMessagesToNotificationStateStore(
208208
}
209209
}
210210
}
211+
212+
/**
213+
* Returns TRUE if the patient HAS NOT received a recent notification.
214+
* Returns FALSE if the patient HAS received a recent notification
215+
*
216+
* @param logger - AWS logging object
217+
* @param update - The Prescription Status Update that we are checking
218+
* @param cooldownPeriod - Minimum time in seconds between notifications
219+
*/
220+
export async function checkCooldownForUpdate(
221+
logger: Logger,
222+
update: PSUDataItem,
223+
cooldownPeriod: number = 900
224+
): Promise<boolean> {
225+
226+
if (!dynamoTable) {
227+
logger.error("DynamoDB table not configured")
228+
throw new Error("TABLE_NAME not set")
229+
}
230+
231+
try {
232+
// Retrieve the last notification state for this patient/pharmacy combo
233+
const getCmd = new GetCommand({
234+
TableName: dynamoTable,
235+
Key: {
236+
NHSNumber: update.PatientNHSNumber,
237+
ODSCode: update.PharmacyODSCode
238+
}
239+
})
240+
const {Item} = await docClient.send(getCmd)
241+
242+
// If no previous record, we're okay to send a notification
243+
if (!Item?.LastNotificationRequestTimestamp) {
244+
logger.info("No previous notification state found. Notification allowed.")
245+
return true
246+
}
247+
248+
// Compute seconds since last notification
249+
const lastTs = new Date(Item.LastNotificationRequestTimestamp).getTime()
250+
const nowTs = Date.now()
251+
const secondsSince = Math.floor((nowTs - lastTs) / 1000)
252+
253+
if (secondsSince > cooldownPeriod) {
254+
logger.info("Cooldown period has passed. Notification allowed.", {
255+
NHSNumber: update.PatientNHSNumber,
256+
ODSCode: update.PharmacyODSCode,
257+
cooldownPeriod,
258+
secondsSince
259+
})
260+
return true
261+
} else {
262+
logger.info("Within cooldown period. Notification suppressed.", {
263+
NHSNumber: update.PatientNHSNumber,
264+
ODSCode: update.PharmacyODSCode,
265+
cooldownPeriod,
266+
secondsSince
267+
})
268+
return false
269+
}
270+
} catch (err) {
271+
logger.error("Error checking cooldown state", {error: err})
272+
throw err
273+
}
274+
}

packages/nhsNotifyLambda/tests/testNhsNotifyLambda.test.ts

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@ import {constructPSUDataItem, constructPSUDataItemMessage} from "./testHelpers"
1111
const mockAddPrescriptionMessagesToNotificationStateStore = jest.fn()
1212
const mockClearCompletedSQSMessages = jest.fn()
1313
const mockDrainQueue = jest.fn()
14+
const mockCheckCooldownForUpdate = jest.fn()
15+
1416
jest.unstable_mockModule(
1517
"../src/utils",
1618
async () => ({
1719
__esModule: true,
1820
drainQueue: mockDrainQueue,
1921
addPrescriptionMessagesToNotificationStateStore: mockAddPrescriptionMessagesToNotificationStateStore,
20-
clearCompletedSQSMessages: mockClearCompletedSQSMessages
22+
clearCompletedSQSMessages: mockClearCompletedSQSMessages,
23+
checkCooldownForUpdate: mockCheckCooldownForUpdate
2124
})
2225
)
2326

@@ -60,6 +63,7 @@ describe("Unit test for NHS Notify lambda handler", () => {
6063
mockDrainQueue.mockImplementation(() => Promise.resolve([]))
6164
await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow()
6265

66+
expect(mockCheckCooldownForUpdate).not.toHaveBeenCalled()
6367
expect(mockInfo).toHaveBeenCalledWith("No messages to process")
6468
})
6569

@@ -72,9 +76,13 @@ describe("Unit test for NHS Notify lambda handler", () => {
7276
mockDrainQueue.mockImplementationOnce(() => Promise.resolve([msg1, msg2]))
7377
// deletion succeeds
7478
mockClearCompletedSQSMessages.mockImplementationOnce(() => Promise.resolve(undefined))
79+
// Checking cooldown
80+
mockCheckCooldownForUpdate.mockImplementation(() => Promise.resolve(true))
7581

7682
await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow()
7783

84+
expect(mockCheckCooldownForUpdate).toHaveBeenCalledTimes(2)
85+
7886
// ensure we logged the fetched notifications
7987
expect(mockInfo).toHaveBeenCalledWith(
8088
"Fetched prescription notification messages",
@@ -97,6 +105,7 @@ describe("Unit test for NHS Notify lambda handler", () => {
97105
const item = constructPSUDataItem({TaskID: "tx", RequestID: "rx"})
98106
const msg = constructPSUDataItemMessage({PSUDataItem: item})
99107
mockDrainQueue.mockImplementationOnce(() => Promise.resolve([msg]))
108+
mockCheckCooldownForUpdate.mockImplementation(() => Promise.resolve(true))
100109

101110
const deletionError = new Error("Delete failed")
102111
mockClearCompletedSQSMessages.mockImplementationOnce(() => Promise.reject(deletionError))
@@ -111,6 +120,7 @@ describe("Unit test for NHS Notify lambda handler", () => {
111120

112121
it("Throws and logs if addPrescriptionMessagesToNotificationStateStore fails", async () => {
113122
mockDrainQueue.mockImplementationOnce(() => Promise.resolve([constructPSUDataItemMessage()]))
123+
mockCheckCooldownForUpdate.mockImplementation(() => Promise.resolve(true))
114124
const thrownError = new Error("Failed")
115125
mockAddPrescriptionMessagesToNotificationStateStore.mockImplementationOnce(
116126
() => Promise.reject(thrownError)
@@ -133,6 +143,7 @@ describe("Unit test for NHS Notify lambda handler", () => {
133143
mockDrainQueue.mockImplementation(() =>
134144
Promise.resolve([message])
135145
)
146+
mockCheckCooldownForUpdate.mockImplementation(() => Promise.resolve(true))
136147

137148
await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow()
138149

@@ -151,4 +162,65 @@ describe("Unit test for NHS Notify lambda handler", () => {
151162
}
152163
)
153164
})
165+
166+
it("Filters out messages inside cooldown", async () => {
167+
const fresh = constructPSUDataItem({RequestID: "fresh", TaskID: "t1"})
168+
const stale = constructPSUDataItem({RequestID: "stale", TaskID: "t2"})
169+
const msgFresh = constructPSUDataItemMessage({PSUDataItem: fresh})
170+
const msgStale = constructPSUDataItemMessage({PSUDataItem: stale})
171+
172+
mockDrainQueue.mockImplementation(() => Promise.resolve([msgFresh, msgStale]))
173+
174+
// returns true if the request ID is "fresh"
175+
mockCheckCooldownForUpdate.mockImplementation((logger, update) => {
176+
const u = update as { RequestID: string }
177+
return Promise.resolve(u.RequestID === "fresh")
178+
})
179+
180+
mockClearCompletedSQSMessages.mockImplementation(() => Promise.resolve())
181+
mockAddPrescriptionMessagesToNotificationStateStore.mockImplementation(() => Promise.resolve())
182+
183+
await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow()
184+
185+
// we should only persist & delete the fresh one
186+
expect(mockAddPrescriptionMessagesToNotificationStateStore)
187+
.toHaveBeenCalledWith(expect.any(Object), [msgFresh])
188+
189+
expect(mockClearCompletedSQSMessages)
190+
.toHaveBeenCalledWith(expect.any(Object), [msgFresh])
191+
192+
// and log how many were suppressed
193+
expect(mockInfo).toHaveBeenCalledWith(
194+
"Suppressed 1 messages due to cooldown",
195+
{suppressedCount: 1, totalFetched: 2}
196+
)
197+
})
198+
199+
it("Logs a message when all messages are inside cooldown", async () => {
200+
const stale = constructPSUDataItem({RequestID: "stale", TaskID: "t1"})
201+
const msgStale = constructPSUDataItemMessage({PSUDataItem: stale})
202+
203+
mockDrainQueue.mockImplementation(() => Promise.resolve([msgStale]))
204+
205+
// returns true if the request ID is "fresh"
206+
mockCheckCooldownForUpdate.mockImplementation((logger, update) => {
207+
const u = update as { RequestID: string }
208+
return Promise.resolve(u.RequestID === "fresh")
209+
})
210+
211+
mockClearCompletedSQSMessages.mockImplementation(() => Promise.resolve())
212+
mockAddPrescriptionMessagesToNotificationStateStore.mockImplementation(() => Promise.resolve())
213+
214+
await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow()
215+
216+
expect(mockAddPrescriptionMessagesToNotificationStateStore).not.toHaveBeenCalled()
217+
expect(mockClearCompletedSQSMessages).not.toHaveBeenCalled()
218+
219+
// and log that everything was suppressed
220+
expect(mockInfo)
221+
.toHaveBeenCalledWith(
222+
"All messages suppressed by cooldown; nothing to notify",
223+
{suppressedCount: 1, totalFetched: 1}
224+
)
225+
})
154226
})

0 commit comments

Comments
 (0)