diff --git a/.vscode/eps-prescription-status-update-api.code-workspace b/.vscode/eps-prescription-status-update-api.code-workspace index f9c9088de3..d5cc30a36c 100644 --- a/.vscode/eps-prescription-status-update-api.code-workspace +++ b/.vscode/eps-prescription-status-update-api.code-workspace @@ -74,6 +74,7 @@ "Codeable", "codeinline", "codesystem", + "Cooldown", "cpsu", "dbaeumer", "devcontainer", diff --git a/packages/nhsNotifyLambda/src/nhsNotifyLambda.ts b/packages/nhsNotifyLambda/src/nhsNotifyLambda.ts index 542e83004b..b4175ccca1 100644 --- a/packages/nhsNotifyLambda/src/nhsNotifyLambda.ts +++ b/packages/nhsNotifyLambda/src/nhsNotifyLambda.ts @@ -8,6 +8,7 @@ import errorHandler from "@nhs/fhir-middy-error-handler" import { addPrescriptionMessagesToNotificationStateStore, + checkCooldownForUpdate, clearCompletedSQSMessages, drainQueue, PSUDataItemMessage @@ -26,6 +27,7 @@ export const lambdaHandler = async (event: EventBridgeEvent): Pr logger.info("NHS Notify lambda triggered by scheduler", {event}) let messages: Array + let processed: Array try { messages = await drainQueue(logger, 100) @@ -34,18 +36,46 @@ export const lambdaHandler = async (event: EventBridgeEvent): Pr return } - const toNotify = messages.map((m) => ({ - RequestID: m.PSUDataItem.RequestID, - TaskId: m.PSUDataItem.TaskID, - Message: "Notification Required" - })) + // Filter messages by checkCooldownForUpdate. This is done in two stages so we can check in parallel + const eligibility = await Promise.all( + messages.map(async (m) => ({ + message: m, + allowed: await checkCooldownForUpdate(logger, m.PSUDataItem) + })) + ) + const toProcess = eligibility + .filter((e) => e.allowed) + .map((e) => e.message) + + // Log the results of checking the cooldown + const suppressedCount = messages.length - toProcess.length + if (toProcess.length === 0) { + logger.info("All messages suppressed by cooldown; nothing to notify", + { + suppressedCount, + totalFetched: messages.length + }) + return + } else if (suppressedCount > 0) { + logger.info(`Suppressed ${suppressedCount} messages due to cooldown`, + { + suppressedCount, + totalFetched: messages.length + } + ) + } + + // Just for diagnostics for now + const toNotify = toProcess + .map((m) => ({ + RequestID: m.PSUDataItem.RequestID, + TaskId: m.PSUDataItem.TaskID, + Message: "Notification Required" + })) logger.info("Fetched prescription notification messages", {count: toNotify.length, toNotify}) - // TODO: Notifications logic will be done here. - // - query PrescriptionNotificationState - // - process prescriptions, build NHS notify payload - // - Make NHS notify request - // Don't forget to make appropriate logs! + // TODO: Notifications request will be done here. + processed = toProcess } catch (err) { logger.error("Error while draining SQS queue", {error: err}) @@ -53,7 +83,7 @@ export const lambdaHandler = async (event: EventBridgeEvent): Pr } try { - await addPrescriptionMessagesToNotificationStateStore(logger, messages) + await addPrescriptionMessagesToNotificationStateStore(logger, processed) } catch (err) { logger.error("Error while pushing data to the PSU notification state data store", {err}) throw err @@ -62,7 +92,7 @@ export const lambdaHandler = async (event: EventBridgeEvent): Pr // By waiting until a message is successfully processed before deleting it from SQS, // failed messages will eventually be retried by subsequent notify consumers. try { - await clearCompletedSQSMessages(logger, messages) + await clearCompletedSQSMessages(logger, processed) } catch (err) { logger.error("Error while deleting successfully processed messages from SQS", {error: err}) throw err diff --git a/packages/nhsNotifyLambda/src/utils.ts b/packages/nhsNotifyLambda/src/utils.ts index 436f161555..d45ee8c076 100644 --- a/packages/nhsNotifyLambda/src/utils.ts +++ b/packages/nhsNotifyLambda/src/utils.ts @@ -6,7 +6,7 @@ import { Message } from "@aws-sdk/client-sqs" import {DynamoDBClient} from "@aws-sdk/client-dynamodb" -import {DynamoDBDocumentClient, PutCommand} from "@aws-sdk/lib-dynamodb" +import {DynamoDBDocumentClient, GetCommand, PutCommand} from "@aws-sdk/lib-dynamodb" import {PSUDataItem} from "@PrescriptionStatusUpdate_common/commonTypes" @@ -208,3 +208,67 @@ export async function addPrescriptionMessagesToNotificationStateStore( } } } + +/** + * Returns TRUE if the patient HAS NOT received a recent notification. + * Returns FALSE if the patient HAS received a recent notification + * + * @param logger - AWS logging object + * @param update - The Prescription Status Update that we are checking + * @param cooldownPeriod - Minimum time in seconds between notifications + */ +export async function checkCooldownForUpdate( + logger: Logger, + update: PSUDataItem, + cooldownPeriod: number = 900 +): Promise { + + if (!dynamoTable) { + logger.error("DynamoDB table not configured") + throw new Error("TABLE_NAME not set") + } + + try { + // Retrieve the last notification state for this patient/pharmacy combo + const getCmd = new GetCommand({ + TableName: dynamoTable, + Key: { + NHSNumber: update.PatientNHSNumber, + ODSCode: update.PharmacyODSCode + } + }) + const {Item} = await docClient.send(getCmd) + + // If no previous record, we're okay to send a notification + if (!Item?.LastNotificationRequestTimestamp) { + logger.info("No previous notification state found. Notification allowed.") + return true + } + + // Compute seconds since last notification + const lastTs = new Date(Item.LastNotificationRequestTimestamp).getTime() + const nowTs = Date.now() + const secondsSince = Math.floor((nowTs - lastTs) / 1000) + + if (secondsSince > cooldownPeriod) { + logger.info("Cooldown period has passed. Notification allowed.", { + NHSNumber: update.PatientNHSNumber, + ODSCode: update.PharmacyODSCode, + cooldownPeriod, + secondsSince + }) + return true + } else { + logger.info("Within cooldown period. Notification suppressed.", { + NHSNumber: update.PatientNHSNumber, + ODSCode: update.PharmacyODSCode, + cooldownPeriod, + secondsSince + }) + return false + } + } catch (err) { + logger.error("Error checking cooldown state", {error: err}) + throw err + } +} diff --git a/packages/nhsNotifyLambda/tests/testNhsNotifyLambda.test.ts b/packages/nhsNotifyLambda/tests/testNhsNotifyLambda.test.ts index 8b8a30a86e..52099da9f7 100644 --- a/packages/nhsNotifyLambda/tests/testNhsNotifyLambda.test.ts +++ b/packages/nhsNotifyLambda/tests/testNhsNotifyLambda.test.ts @@ -11,13 +11,16 @@ import {constructPSUDataItem, constructPSUDataItemMessage} from "./testHelpers" const mockAddPrescriptionMessagesToNotificationStateStore = jest.fn() const mockClearCompletedSQSMessages = jest.fn() const mockDrainQueue = jest.fn() +const mockCheckCooldownForUpdate = jest.fn() + jest.unstable_mockModule( "../src/utils", async () => ({ __esModule: true, drainQueue: mockDrainQueue, addPrescriptionMessagesToNotificationStateStore: mockAddPrescriptionMessagesToNotificationStateStore, - clearCompletedSQSMessages: mockClearCompletedSQSMessages + clearCompletedSQSMessages: mockClearCompletedSQSMessages, + checkCooldownForUpdate: mockCheckCooldownForUpdate }) ) @@ -60,6 +63,7 @@ describe("Unit test for NHS Notify lambda handler", () => { mockDrainQueue.mockImplementation(() => Promise.resolve([])) await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow() + expect(mockCheckCooldownForUpdate).not.toHaveBeenCalled() expect(mockInfo).toHaveBeenCalledWith("No messages to process") }) @@ -72,9 +76,13 @@ describe("Unit test for NHS Notify lambda handler", () => { mockDrainQueue.mockImplementationOnce(() => Promise.resolve([msg1, msg2])) // deletion succeeds mockClearCompletedSQSMessages.mockImplementationOnce(() => Promise.resolve(undefined)) + // Checking cooldown + mockCheckCooldownForUpdate.mockImplementation(() => Promise.resolve(true)) await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow() + expect(mockCheckCooldownForUpdate).toHaveBeenCalledTimes(2) + // ensure we logged the fetched notifications expect(mockInfo).toHaveBeenCalledWith( "Fetched prescription notification messages", @@ -97,6 +105,7 @@ describe("Unit test for NHS Notify lambda handler", () => { const item = constructPSUDataItem({TaskID: "tx", RequestID: "rx"}) const msg = constructPSUDataItemMessage({PSUDataItem: item}) mockDrainQueue.mockImplementationOnce(() => Promise.resolve([msg])) + mockCheckCooldownForUpdate.mockImplementation(() => Promise.resolve(true)) const deletionError = new Error("Delete failed") mockClearCompletedSQSMessages.mockImplementationOnce(() => Promise.reject(deletionError)) @@ -111,6 +120,7 @@ describe("Unit test for NHS Notify lambda handler", () => { it("Throws and logs if addPrescriptionMessagesToNotificationStateStore fails", async () => { mockDrainQueue.mockImplementationOnce(() => Promise.resolve([constructPSUDataItemMessage()])) + mockCheckCooldownForUpdate.mockImplementation(() => Promise.resolve(true)) const thrownError = new Error("Failed") mockAddPrescriptionMessagesToNotificationStateStore.mockImplementationOnce( () => Promise.reject(thrownError) @@ -133,6 +143,7 @@ describe("Unit test for NHS Notify lambda handler", () => { mockDrainQueue.mockImplementation(() => Promise.resolve([message]) ) + mockCheckCooldownForUpdate.mockImplementation(() => Promise.resolve(true)) await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow() @@ -151,4 +162,65 @@ describe("Unit test for NHS Notify lambda handler", () => { } ) }) + + it("Filters out messages inside cooldown", async () => { + const fresh = constructPSUDataItem({RequestID: "fresh", TaskID: "t1"}) + const stale = constructPSUDataItem({RequestID: "stale", TaskID: "t2"}) + const msgFresh = constructPSUDataItemMessage({PSUDataItem: fresh}) + const msgStale = constructPSUDataItemMessage({PSUDataItem: stale}) + + mockDrainQueue.mockImplementation(() => Promise.resolve([msgFresh, msgStale])) + + // returns true if the request ID is "fresh" + mockCheckCooldownForUpdate.mockImplementation((logger, update) => { + const u = update as { RequestID: string } + return Promise.resolve(u.RequestID === "fresh") + }) + + mockClearCompletedSQSMessages.mockImplementation(() => Promise.resolve()) + mockAddPrescriptionMessagesToNotificationStateStore.mockImplementation(() => Promise.resolve()) + + await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow() + + // we should only persist & delete the fresh one + expect(mockAddPrescriptionMessagesToNotificationStateStore) + .toHaveBeenCalledWith(expect.any(Object), [msgFresh]) + + expect(mockClearCompletedSQSMessages) + .toHaveBeenCalledWith(expect.any(Object), [msgFresh]) + + // and log how many were suppressed + expect(mockInfo).toHaveBeenCalledWith( + "Suppressed 1 messages due to cooldown", + {suppressedCount: 1, totalFetched: 2} + ) + }) + + it("Logs a message when all messages are inside cooldown", async () => { + const stale = constructPSUDataItem({RequestID: "stale", TaskID: "t1"}) + const msgStale = constructPSUDataItemMessage({PSUDataItem: stale}) + + mockDrainQueue.mockImplementation(() => Promise.resolve([msgStale])) + + // returns true if the request ID is "fresh" + mockCheckCooldownForUpdate.mockImplementation((logger, update) => { + const u = update as { RequestID: string } + return Promise.resolve(u.RequestID === "fresh") + }) + + mockClearCompletedSQSMessages.mockImplementation(() => Promise.resolve()) + mockAddPrescriptionMessagesToNotificationStateStore.mockImplementation(() => Promise.resolve()) + + await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow() + + expect(mockAddPrescriptionMessagesToNotificationStateStore).not.toHaveBeenCalled() + expect(mockClearCompletedSQSMessages).not.toHaveBeenCalled() + + // and log that everything was suppressed + expect(mockInfo) + .toHaveBeenCalledWith( + "All messages suppressed by cooldown; nothing to notify", + {suppressedCount: 1, totalFetched: 1} + ) + }) }) diff --git a/packages/nhsNotifyLambda/tests/testUtils.test.ts b/packages/nhsNotifyLambda/tests/testUtils.test.ts index a584c232ca..10ee6ca62d 100644 --- a/packages/nhsNotifyLambda/tests/testUtils.test.ts +++ b/packages/nhsNotifyLambda/tests/testUtils.test.ts @@ -2,7 +2,7 @@ import {jest} from "@jest/globals" import {SpiedFunction} from "jest-mock" import {Logger} from "@aws-lambda-powertools/logger" -import {DynamoDBDocumentClient, PutCommand} from "@aws-sdk/lib-dynamodb" +import {DynamoDBDocumentClient, GetCommand, PutCommand} from "@aws-sdk/lib-dynamodb" import {DeleteMessageBatchCommand, Message} from "@aws-sdk/client-sqs" import {constructMessage, constructPSUDataItemMessage, mockSQSClient} from "./testHelpers" @@ -12,6 +12,7 @@ const {mockSend: sqsMockSend} = mockSQSClient() const { addPrescriptionMessagesToNotificationStateStore, clearCompletedSQSMessages, + checkCooldownForUpdate, drainQueue } = await import("../src/utils") @@ -309,4 +310,118 @@ describe("NHS notify lambda helper functions", () => { expect(sendSpy).not.toHaveBeenCalled() }) }) + + describe("checkCooldownForUpdate", () => { + let logger: Logger + let infoSpy: SpiedFunction<(msg: string, ...meta: Array) => void> + let errorSpy: SpiedFunction<(msg: string, ...meta: Array) => void> + let sendSpy: ReturnType + + beforeEach(async () => { + jest.resetModules() + jest.clearAllMocks() + + process.env = {...ORIGINAL_ENV, TABLE_NAME: "test-table"} + + logger = new Logger({serviceName: "test-service"}) + infoSpy = jest.spyOn(logger, "info") + errorSpy = jest.spyOn(logger, "error") + sendSpy = jest.spyOn(DynamoDBDocumentClient.prototype, "send") + }) + + afterAll(() => { + process.env = {...ORIGINAL_ENV} + }) + + it("throws if TABLE_NAME is not set", async () => { + delete process.env.TABLE_NAME + const {checkCooldownForUpdate: fn} = await import("../src/utils") + const update = constructPSUDataItemMessage().PSUDataItem + + await expect(fn(logger, update)).rejects.toThrow("TABLE_NAME not set") + expect(errorSpy).toHaveBeenCalledWith("DynamoDB table not configured") + }) + + it("returns true if no previous record exists", async () => { + // send resolves with no item + sendSpy.mockImplementationOnce(() => Promise.resolve({})) + + const update = constructPSUDataItemMessage().PSUDataItem + const result = await checkCooldownForUpdate(logger, update, 900) + + expect(sendSpy).toHaveBeenCalledWith(expect.any(GetCommand)) + expect(infoSpy).toHaveBeenCalledWith( + "No previous notification state found. Notification allowed." + ) + expect(result).toBe(true) + }) + + it("returns true when last notification is older than default cooldown", async () => { + const pastTs = new Date(Date.now() - (1000 * 901)).toISOString() // 901s ago + sendSpy.mockImplementationOnce(() => + Promise.resolve({Item: {LastNotificationRequestTimestamp: pastTs}}) + ) + + const update = constructPSUDataItemMessage().PSUDataItem + const result = await checkCooldownForUpdate(logger, update, 900) + + expect(infoSpy).toHaveBeenCalledWith( + "Cooldown period has passed. Notification allowed.", + expect.objectContaining({secondsSince: expect.any(Number)}) + ) + expect(result).toBe(true) + }) + + it("returns false when last notification is within default cooldown", async () => { + const recentTs = new Date(Date.now() - (1000 * 300)).toISOString() // 300s ago + sendSpy.mockImplementationOnce(() => + Promise.resolve({Item: {LastNotificationRequestTimestamp: recentTs}}) + ) + + const update = constructPSUDataItemMessage().PSUDataItem + const result = await checkCooldownForUpdate(logger, update, 900) + + expect(infoSpy).toHaveBeenCalledWith( + "Within cooldown period. Notification suppressed.", + expect.objectContaining({secondsSince: expect.any(Number)}) + ) + expect(result).toBe(false) + }) + + it("honours a custom cooldownPeriod", async () => { + // custom cooldown = 60 seconds, but timestamp is only 30s ago + const recentTs = new Date(Date.now() - 30000).toISOString() + sendSpy.mockImplementationOnce(() => + Promise.resolve({Item: {LastNotificationRequestTimestamp: recentTs}}) + ) + + const update = constructPSUDataItemMessage().PSUDataItem + const result = await checkCooldownForUpdate(logger, update, 60) + + expect(infoSpy).toHaveBeenCalledWith( + "Within cooldown period. Notification suppressed.", + expect.objectContaining({secondsSince: expect.any(Number)}) + ) + expect(result).toBe(false) + }) + + it("propagates and logs errors from DynamoDB", async () => { + const awsErr = new Error("DDB failure") + sendSpy.mockImplementationOnce(() => Promise.reject(awsErr)) + + const update = constructPSUDataItemMessage().PSUDataItem + await expect(checkCooldownForUpdate(logger, update)).rejects.toThrow("DDB failure") + expect(errorSpy).toHaveBeenCalledWith( + "Error checking cooldown state", + expect.objectContaining({error: awsErr}) + ) + }) + + it("does nothing when passed an empty array", async () => { + await addPrescriptionMessagesToNotificationStateStore(logger, []) + expect(infoSpy).toHaveBeenCalledTimes(1) + expect(infoSpy).toHaveBeenCalledWith("No data to push into DynamoDB.") + expect(sendSpy).not.toHaveBeenCalled() + }) + }) })