diff --git a/.vscode/eps-prescription-status-update-api.code-workspace b/.vscode/eps-prescription-status-update-api.code-workspace index 6c09d0bdb8..f9c9088de3 100644 --- a/.vscode/eps-prescription-status-update-api.code-workspace +++ b/.vscode/eps-prescription-status-update-api.code-workspace @@ -128,6 +128,7 @@ "sourcetype", "timonwong", "Truststore", + "Upserted", "URID", "URPID", "uuidv4", diff --git a/SAMtemplates/tables/main.yaml b/SAMtemplates/tables/main.yaml index 95f94942eb..443f1b869d 100644 --- a/SAMtemplates/tables/main.yaml +++ b/SAMtemplates/tables/main.yaml @@ -417,7 +417,7 @@ Resources: - kms:Encrypt - kms:ReEncrypt* - kms:Decrypt - Resource: !GetAtt PrescriptionStatusUpdatesKMSKey.Arn + Resource: !GetAtt PrescriptionNotificationStateKMSKey.Arn PrescriptionNotificationStateTable: Type: AWS::DynamoDB::Table @@ -426,29 +426,19 @@ Resources: PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true AttributeDefinitions: - - AttributeName: PrescriptionID - AttributeType: S - AttributeName: NHSNumber AttributeType: S + - AttributeName: ODSCode + AttributeType: S KeySchema: - - AttributeName: PrescriptionID - KeyType: HASH + - AttributeName: NHSNumber + KeyType: HASH # Partition key + - AttributeName: ODSCode + KeyType: RANGE # Sort key BillingMode: !If - EnableDynamoDBAutoScalingCondition - PROVISIONED - PAY_PER_REQUEST - GlobalSecondaryIndexes: - - IndexName: NotificationNHSNumberIndex - KeySchema: - - AttributeName: NHSNumber - KeyType: HASH - Projection: - ProjectionType: ALL - ProvisionedThroughput: !If - - EnableDynamoDBAutoScalingCondition - - ReadCapacityUnits: 1 - WriteCapacityUnits: !Ref MinWritePrescriptionNotificationStateCapacity - - !Ref "AWS::NoValue" ProvisionedThroughput: !If - EnableDynamoDBAutoScalingCondition - ReadCapacityUnits: 1 @@ -524,59 +514,6 @@ Resources: PredefinedMetricSpecification: PredefinedMetricType: DynamoDBReadCapacityUtilization - # Scaling for the indexes - NotificationNHSNumberIndexScalingWriteTarget: - Type: AWS::ApplicationAutoScaling::ScalableTarget - DependsOn: PrescriptionNotificationStateTable - Condition: EnableDynamoDBAutoScalingCondition - Properties: - MinCapacity: !Ref MinWritePrescriptionNotificationStateCapacity - MaxCapacity: !Ref MaxWritePrescriptionNotificationStateCapacity - ResourceId: !Sub table/${PrescriptionNotificationStateTable}/index/NotificationNHSNumberIndex - RoleARN: !GetAtt DynamoDbScalingRole.Arn - ScalableDimension: "dynamodb:index:WriteCapacityUnits" - ServiceNamespace: dynamodb - - NotificationNHSNumberIndexScalingWritePolicy: - Type: AWS::ApplicationAutoScaling::ScalingPolicy - Condition: EnableDynamoDBAutoScalingCondition - Properties: - PolicyName: NotificationNHSNumberIndexScalingWritePolicy - PolicyType: TargetTrackingScaling - ScalingTargetId: !Ref NotificationNHSNumberIndexScalingWriteTarget - TargetTrackingScalingPolicyConfiguration: - TargetValue: 50 - ScaleInCooldown: 600 - ScaleOutCooldown: 0 - PredefinedMetricSpecification: - PredefinedMetricType: DynamoDBWriteCapacityUtilization - - NotificationNHSNumberIndexScalingReadTarget: - Type: AWS::ApplicationAutoScaling::ScalableTarget - DependsOn: PrescriptionNotificationStateTable - Condition: EnableDynamoDBAutoScalingCondition - Properties: - MinCapacity: 1 - MaxCapacity: 100 - ResourceId: !Sub table/${PrescriptionNotificationStateTable}/index/NotificationNHSNumberIndex - RoleARN: !GetAtt DynamoDbScalingRole.Arn - ScalableDimension: "dynamodb:index:ReadCapacityUnits" - ServiceNamespace: dynamodb - - NotificationNHSNumberIndexScalingReadPolicy: - Type: AWS::ApplicationAutoScaling::ScalingPolicy - Condition: EnableDynamoDBAutoScalingCondition - Properties: - PolicyName: NotificationNHSNumberIndexReadScalingPolicy - PolicyType: TargetTrackingScaling - ScalingTargetId: !Ref NotificationNHSNumberIndexScalingReadTarget - TargetTrackingScalingPolicyConfiguration: - TargetValue: 70 - ScaleInCooldown: 60 - ScaleOutCooldown: 60 - PredefinedMetricSpecification: - PredefinedMetricType: DynamoDBReadCapacityUtilization - Outputs: PrescriptionStatusUpdatesTableName: Description: PrescriptionStatusUpdates table name diff --git a/packages/nhsNotifyLambda/jest.config.ts b/packages/nhsNotifyLambda/jest.config.ts index c8c575153b..7b0c4931ea 100644 --- a/packages/nhsNotifyLambda/jest.config.ts +++ b/packages/nhsNotifyLambda/jest.config.ts @@ -3,8 +3,15 @@ import type {JestConfigWithTsJest} from "ts-jest" const jestConfig: JestConfigWithTsJest = { ...defaultConfig, - "rootDir": "./", - setupFiles: ["/.jest/setEnvVars.js"] + rootDir: "./", + setupFiles: ["/.jest/setEnvVars.js"], + coveragePathIgnorePatterns: ["/tests/"], + coverageReporters: [ + "clover", + "json", + "text", + ["lcov", {projectRoot: "../../"}] + ] } export default jestConfig diff --git a/packages/nhsNotifyLambda/src/nhsNotifyLambda.ts b/packages/nhsNotifyLambda/src/nhsNotifyLambda.ts index c14cdff563..542e83004b 100644 --- a/packages/nhsNotifyLambda/src/nhsNotifyLambda.ts +++ b/packages/nhsNotifyLambda/src/nhsNotifyLambda.ts @@ -6,8 +6,12 @@ import middy from "@middy/core" import inputOutputLogger from "@middy/input-output-logger" import errorHandler from "@nhs/fhir-middy-error-handler" -import {PSUDataItem} from "@PrescriptionStatusUpdate_common/commonTypes" -import {clearCompletedSQSMessages, drainQueue} from "./utils" +import { + addPrescriptionMessagesToNotificationStateStore, + clearCompletedSQSMessages, + drainQueue, + PSUDataItemMessage +} from "./utils" const logger = new Logger({serviceName: "nhsNotify"}) @@ -21,7 +25,7 @@ export const lambdaHandler = async (event: EventBridgeEvent): Pr logger.info("NHS Notify lambda triggered by scheduler", {event}) - let messages + let messages: Array try { messages = await drainQueue(logger, 100) @@ -30,19 +34,9 @@ export const lambdaHandler = async (event: EventBridgeEvent): Pr return } - // parse & log each PSUDataItem as a placeholder for now. - const items = messages.map((m) => { - try { - return JSON.parse(m.Body!) as PSUDataItem - } catch (err) { - logger.error("Failed to parse message body", {body: m.Body, error: err}) - return null - } - }).filter((i): i is PSUDataItem => i !== null) - - const toNotify = items.map((m) => ({ - RequestID: m.RequestID, - TaskId: m.TaskID, + const toNotify = messages.map((m) => ({ + RequestID: m.PSUDataItem.RequestID, + TaskId: m.PSUDataItem.TaskID, Message: "Notification Required" })) logger.info("Fetched prescription notification messages", {count: toNotify.length, toNotify}) @@ -58,10 +52,17 @@ export const lambdaHandler = async (event: EventBridgeEvent): Pr throw err } + try { + await addPrescriptionMessagesToNotificationStateStore(logger, messages) + } catch (err) { + logger.error("Error while pushing data to the PSU notification state data store", {err}) + throw err + } + // By waiting until a message is successfully processed before deleting it from SQS, // failed messages will eventually be retried by subsequent notify consumers. try { - await clearCompletedSQSMessages(messages, logger) + await clearCompletedSQSMessages(logger, messages) } catch (err) { logger.error("Error while deleting successfully processed messages from SQS", {error: err}) throw err diff --git a/packages/nhsNotifyLambda/src/utils.ts b/packages/nhsNotifyLambda/src/utils.ts index e7f4b95454..436f161555 100644 --- a/packages/nhsNotifyLambda/src/utils.ts +++ b/packages/nhsNotifyLambda/src/utils.ts @@ -10,6 +10,8 @@ import {DynamoDBDocumentClient, PutCommand} from "@aws-sdk/lib-dynamodb" import {PSUDataItem} from "@PrescriptionStatusUpdate_common/commonTypes" +const TTL_DELTA = 60 * 60 * 24 * 7 // Keep records for a week + const dynamoTable = process.env.TABLE_NAME const sqsUrl = process.env.NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL @@ -18,26 +20,52 @@ const sqs = new SQSClient({region: process.env.AWS_REGION}) const dynamo = new DynamoDBClient({region: process.env.AWS_REGION}) const docClient = DynamoDBDocumentClient.from(dynamo) +/** + * Returns the original array, chunked in batches of up to + * + * @param arr - Array to be chunked + * @param size - The maximum size of each chunk. The final chunk may be smaller. + * @returns - an (N+1) dimensional array + */ +function chunkArray(arr: Array, size: number): Array> { + const chunks: Array> = [] + for (let i = 0; i < arr.length; i += size) { + chunks.push(arr.slice(i, i + size)) + } + return chunks +} + +// This is an extension of the SQS message interface, which explicitly parses the PSUDataItem +export interface PSUDataItemMessage extends Message { + PSUDataItem: PSUDataItem +} + /** * Pulls up to `maxTotal` messages off the queue (in batches of up to 10), * logs them, and deletes them. */ -export async function drainQueue(logger: Logger, maxTotal = 100): Promise> { +export async function drainQueue(logger: Logger, maxTotal = 100): Promise> { let receivedSoFar = 0 - const allMessages: Array = [] + const allMessages: Array = [] if (!sqsUrl) { logger.error("Notifications SQS URL not configured") throw new Error("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set") } + let pollingIteration = 0 while (receivedSoFar < maxTotal) { + pollingIteration = pollingIteration + 1 + const toFetch = Math.min(10, maxTotal - receivedSoFar) const receiveCmd = new ReceiveMessageCommand({ QueueUrl: sqsUrl, MaxNumberOfMessages: toFetch, - WaitTimeSeconds: 0, - VisibilityTimeout: 30 + WaitTimeSeconds: 20, // Use long polling to avoid getting empty responses when the queue is small + MessageAttributeNames: [ + "MessageDeduplicationId", + "MessageGroupId" + ] }) const {Messages} = await sqs.send(receiveCmd) @@ -45,64 +73,125 @@ export async function drainQueue(logger: Logger, maxTotal = 100): Promise m.MessageId) + } + ) + + const parsedMessages: Array = Messages.map((m) => { + if (!m.Body) { + logger.error("Failed to parse SQS message - aborting this notification processor check.", {offendingMessage: m}) + throw new Error(`Received an invalid SQS message. Message ID ${m.MessageId}`) + } + + const parsedBody: PSUDataItem = JSON.parse(m.Body) as PSUDataItem + + return { + ...m, + PSUDataItem: parsedBody + } + }) + allMessages.push(...parsedMessages) receivedSoFar += Messages.length + + // if the last batch of messages was small, then break the loop + // This is to prevent a slow-loris style breakdown if the queue has + // barely enough messages to keep the processors alive + if (!Messages || Messages.length < 5) { + logger.info("Received a small number of messages. Considering the queue drained.", {batchLength: Messages.length}) + break + } } + logger.info(`In sum, retrieved ${allMessages.length} messages from SQS`) + return allMessages } /** - * For each message given, delete it from the notifications SQS. Throws an error if it fails + * For each message given, delete it from the notifications SQS in batches of up to 10. + * Throws an error if any batch fails, but previous batches will remain deleted. * - * @param messages - The messages that were received from SQS, and are to be deleted. * @param logger - the logging object + * @param messages - The messages that were received from SQS, and are to be deleted. */ export async function clearCompletedSQSMessages( - messages: Array, - logger: Logger + logger: Logger, + messages: Array ): Promise { if (!sqsUrl) { logger.error("Notifications SQS URL not configured") throw new Error("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set") } - const deleteMessages = messages.map((m) => ({ - Id: m.MessageId!, - ReceiptHandle: m.ReceiptHandle! - })) + const batches = chunkArray(messages, 10) - const deleteCmd = new DeleteMessageBatchCommand({ - QueueUrl: sqsUrl, - Entries: deleteMessages - }) - const delResult = await sqs.send(deleteCmd) + for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) { + const batch = batches[batchIndex] + const entries = batch.map((m) => ({ + Id: m.MessageId!, + ReceiptHandle: m.ReceiptHandle! + })) - if (delResult.Failed) { - logger.error("Some messages failed to delete", {failed: delResult.Failed}) - throw new Error("Failed to delete fetched messages from SQS") + logger.info(`Deleting batch ${batchIndex + 1}/${batches.length}`, { + batchSize: entries.length, + messageIds: entries.map((e) => e.Id) + }) + + const deleteCmd = new DeleteMessageBatchCommand({ + QueueUrl: sqsUrl, + Entries: entries + }) + const delResult = await sqs.send(deleteCmd) + + if (delResult.Failed && delResult.Failed.length > 0) { + logger.error("Some messages failed to delete in this batch", {failed: delResult.Failed}) + throw new Error(`Failed to delete ${delResult.Failed.length} messages from SQS`) + } + + logger.info(`Successfully deleted SQS message batch ${batchIndex + 1}`, { + result: delResult, + messageIds: entries.map((e) => e.Id) + }) } +} - logger.info("Successfully deleted messages from SQS", {result: delResult}) +export interface LastNotificationStateType { + NHSNumber: string + ODSCode: string + RequestId: string // x-request-id header + MessageID: string // The SQS message ID + LastNotifiedPrescriptionStatus: string + DeliveryStatus: string + LastNotificationRequestTimestamp: string // ISO-8601 string + ExpiryTime: number // DynamoDB expiration time (UNIX timestamp) } -export async function addPrescriptionToNotificationStateStore(logger: Logger, dataArray: Array) { +export async function addPrescriptionMessagesToNotificationStateStore( + logger: Logger, + dataArray: Array +) { if (!dynamoTable) { logger.error("DynamoDB table not configured") throw new Error("TABLE_NAME not set") } - logger.info("Pushing data to DynamoDB", {count: dataArray.length}) + if (dataArray.length) logger.info("Attempting to push data to DynamoDB", {count: dataArray.length}) + else logger.info("No data to push into DynamoDB.") for (const data of dataArray) { - const item = { - ...data, - // TTL for the item. - // Since we only care about notifications that happened within - // the cooldown period, a day of storage is more than enough for - // practical purposes. But: - // TODO: Do we need to store this for longer for auditing and crisis resolution? - ExpiryTime: 86400 + const item: LastNotificationStateType = { + NHSNumber: data.PSUDataItem.PatientNHSNumber, + ODSCode: data.PSUDataItem.PharmacyODSCode, + RequestId: data.PSUDataItem.RequestID, + MessageID: data.MessageId!, + LastNotifiedPrescriptionStatus: data.PSUDataItem.Status, + DeliveryStatus: "requested", + LastNotificationRequestTimestamp: new Date().toISOString(), + ExpiryTime: (Math.floor(+new Date() / 1000) + TTL_DELTA) } try { @@ -110,13 +199,9 @@ export async function addPrescriptionToNotificationStateStore(logger: Logger, da TableName: dynamoTable, Item: item })) - logger.info("Upserted prescription", { - PrescriptionID: data.PrescriptionID, - PatientNHSNumber: data.PatientNHSNumber - }) + logger.info("Upserted prescription") } catch (err) { logger.error("Failed to write to DynamoDB", { - PrescriptionID: data.PrescriptionID, error: err }) throw err diff --git a/packages/nhsNotifyLambda/tests/testHelpers.ts b/packages/nhsNotifyLambda/tests/testHelpers.ts index 81a9c8b438..79a9583ef1 100644 --- a/packages/nhsNotifyLambda/tests/testHelpers.ts +++ b/packages/nhsNotifyLambda/tests/testHelpers.ts @@ -3,6 +3,7 @@ import {jest} from "@jest/globals" import * as sqs from "@aws-sdk/client-sqs" import {PSUDataItem} from "@PrescriptionStatusUpdate_common/commonTypes" +import {PSUDataItemMessage} from "../src/utils" // Similarly mock the SQS client export function mockSQSClient() { @@ -18,6 +19,22 @@ export function mockSQSClient() { return {mockSend} } +export function constructMessage(overrides: Partial = {}): sqs.Message { + return { + MessageId: "messageId", + Body: JSON.stringify(constructPSUDataItem()), + ...overrides + } +} + +export function constructPSUDataItemMessage(overrides: Partial = {}): PSUDataItemMessage { + return { + ...constructMessage(), + PSUDataItem: constructPSUDataItem(), + ...overrides + } +} + export function constructPSUDataItem(overrides: Partial = {}): PSUDataItem { return { LastModified: "2023-01-02T00:00:00Z", diff --git a/packages/nhsNotifyLambda/tests/testNhsNotifyLambda.test.ts b/packages/nhsNotifyLambda/tests/testNhsNotifyLambda.test.ts index 2e1c5f59f1..8b8a30a86e 100644 --- a/packages/nhsNotifyLambda/tests/testNhsNotifyLambda.test.ts +++ b/packages/nhsNotifyLambda/tests/testNhsNotifyLambda.test.ts @@ -6,13 +6,17 @@ import { afterEach } from "@jest/globals" -const mockDrainQueue = jest.fn() +import {constructPSUDataItem, constructPSUDataItemMessage} from "./testHelpers" + +const mockAddPrescriptionMessagesToNotificationStateStore = jest.fn() const mockClearCompletedSQSMessages = jest.fn() +const mockDrainQueue = jest.fn() jest.unstable_mockModule( "../src/utils", async () => ({ __esModule: true, drainQueue: mockDrainQueue, + addPrescriptionMessagesToNotificationStateStore: mockAddPrescriptionMessagesToNotificationStateStore, clearCompletedSQSMessages: mockClearCompletedSQSMessages }) ) @@ -60,10 +64,10 @@ describe("Unit test for NHS Notify lambda handler", () => { }) it("Clears completed messages after successful processing", async () => { - const item1 = {TaskID: "t1", RequestID: "r1"} - const item2 = {TaskID: "t2", RequestID: "r2"} - const msg1 = {Body: JSON.stringify(item1)} - const msg2 = {Body: JSON.stringify(item2)} + const item1 = constructPSUDataItem({TaskID: "t1", RequestID: "r1"}) + const item2 = constructPSUDataItem({TaskID: "t2", RequestID: "r2"}) + const msg1 = constructPSUDataItemMessage({PSUDataItem: item1}) + const msg2 = constructPSUDataItemMessage({PSUDataItem: item2}) // drainQueue returns two messages mockDrainQueue.mockImplementationOnce(() => Promise.resolve([msg1, msg2])) // deletion succeeds @@ -84,15 +88,16 @@ describe("Unit test for NHS Notify lambda handler", () => { ) // ensure clearCompletedSQSMessages was called with the original messages array expect(mockClearCompletedSQSMessages).toHaveBeenCalledWith( - [msg1, msg2], - expect.any(Object) // the logger instance + expect.any(Object), // the logger instance + [msg1, msg2] ) }) it("Throws and logs if clearCompletedSQSMessages fails", async () => { - const item = {TaskID: "tx", RequestID: "rx"} - const msg = {Body: JSON.stringify(item)} + const item = constructPSUDataItem({TaskID: "tx", RequestID: "rx"}) + const msg = constructPSUDataItemMessage({PSUDataItem: item}) mockDrainQueue.mockImplementationOnce(() => Promise.resolve([msg])) + const deletionError = new Error("Delete failed") mockClearCompletedSQSMessages.mockImplementationOnce(() => Promise.reject(deletionError)) @@ -104,67 +109,42 @@ describe("Unit test for NHS Notify lambda handler", () => { ) }) - it("When drainQueue returns only valid JSON messages, all are processed", async () => { - const validItem = { - prescriptionId: "abc123", - TaskID: "task-1", - RequestID: "req-1" - } - mockDrainQueue.mockImplementation(() => - Promise.resolve([{Body: JSON.stringify(validItem)}]) + it("Throws and logs if addPrescriptionMessagesToNotificationStateStore fails", async () => { + mockDrainQueue.mockImplementationOnce(() => Promise.resolve([constructPSUDataItemMessage()])) + const thrownError = new Error("Failed") + mockAddPrescriptionMessagesToNotificationStateStore.mockImplementationOnce( + () => Promise.reject(thrownError) ) - await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow() - - expect(mockError).not.toHaveBeenCalled() - expect(mockInfo).toHaveBeenCalledWith( - "Fetched prescription notification messages", - { - count: 1, - toNotify: [ - { - RequestID: "req-1", - TaskId: "task-1", - Message: "Notification Required" - } - ] - } + await expect(lambdaHandler(mockEventBridgeEvent)).rejects.toThrow("Failed") + expect(mockError).toHaveBeenCalledWith( + "Error while pushing data to the PSU notification state data store", + {err: thrownError} ) }) - it("Filters out invalid JSON and logs parse errors", async () => { - const validItem = { - foo: "bar", - TaskID: "task-2", - RequestID: "req-2" - } - const messages = [ - {Body: JSON.stringify(validItem)}, - {Body: "not-json"} - ] + it("When drainQueue returns only valid messages, all are processed", async () => { + const validItem = constructPSUDataItem({ + PrescriptionID: "abc123", + TaskID: "task-1", + RequestID: "req-1" + }) + const message = constructPSUDataItemMessage({PSUDataItem: validItem}) mockDrainQueue.mockImplementation(() => - Promise.resolve(messages) + Promise.resolve([message]) ) await expect(lambdaHandler(mockEventBridgeEvent)).resolves.not.toThrow() - // should have logged a parse‐error - expect(mockError).toHaveBeenCalledWith( - "Failed to parse message body", - expect.objectContaining({ - body: "not-json", - error: expect.any(Error) - }) - ) - // only the one valid item should make it through + expect(mockError).not.toHaveBeenCalled() expect(mockInfo).toHaveBeenCalledWith( "Fetched prescription notification messages", { count: 1, toNotify: [ { - RequestID: "req-2", - TaskId: "task-2", + RequestID: "req-1", + TaskId: "task-1", Message: "Notification Required" } ] diff --git a/packages/nhsNotifyLambda/tests/testUtils.test.ts b/packages/nhsNotifyLambda/tests/testUtils.test.ts index 4016ebd170..a584c232ca 100644 --- a/packages/nhsNotifyLambda/tests/testUtils.test.ts +++ b/packages/nhsNotifyLambda/tests/testUtils.test.ts @@ -5,11 +5,15 @@ import {Logger} from "@aws-lambda-powertools/logger" import {DynamoDBDocumentClient, PutCommand} from "@aws-sdk/lib-dynamodb" import {DeleteMessageBatchCommand, Message} from "@aws-sdk/client-sqs" -import {constructPSUDataItem, mockSQSClient} from "./testHelpers" +import {constructMessage, constructPSUDataItemMessage, mockSQSClient} from "./testHelpers" const {mockSend: sqsMockSend} = mockSQSClient() -const {addPrescriptionToNotificationStateStore, clearCompletedSQSMessages, drainQueue} = await import("../src/utils") +const { + addPrescriptionMessagesToNotificationStateStore, + clearCompletedSQSMessages, + drainQueue +} = await import("../src/utils") const ORIGINAL_ENV = {...process.env} @@ -18,6 +22,7 @@ describe("NHS notify lambda helper functions", () => { describe("drainQueue", () => { let logger: Logger let errorSpy: SpiedFunction<(msg: string, ...meta: Array) => void> + let infoSpy: SpiedFunction<(msg: string, ...meta: Array) => void> beforeEach(() => { jest.resetModules() @@ -26,20 +31,67 @@ describe("NHS notify lambda helper functions", () => { process.env = {...ORIGINAL_ENV} logger = new Logger({serviceName: "test-service"}) errorSpy = jest.spyOn(logger, "error") + infoSpy = jest.spyOn(logger, "info") }) it("Does not throw an error when the SQS fetch succeeds", async () => { - const payload = {Messages: Array.from({length: 10}, () => (constructPSUDataItem() as Message))} + const payload = {Messages: Array.from({length: 10}, () => (constructMessage()))} sqsMockSend.mockImplementationOnce(() => Promise.resolve(payload)) const messages = await drainQueue(logger, 10) expect(sqsMockSend).toHaveBeenCalledTimes(1) - expect(messages).toStrictEqual(payload.Messages) + expect(messages).toHaveLength(10) + expect(infoSpy).toHaveBeenCalledWith( + "Received some messages from the queue. Parsing them...", + expect.objectContaining({pollingIteration: 1, MessageIDs: expect.any(Array)}) + ) + }) + + it("Batches multiple fetches until maxTotal is reached and stops on empty response", async () => { + // First fetch returns 5, second fetch returns 5, third fetch empty + const first = {Messages: Array.from({length: 5}, () => constructMessage())} + const second = {Messages: Array.from({length: 5}, () => constructMessage())} + const empty = {Messages: []} + + sqsMockSend + .mockImplementationOnce(() => Promise.resolve(first)) + .mockImplementationOnce(() => Promise.resolve(second)) + .mockImplementationOnce(() => Promise.resolve(empty)) + + const messages = await drainQueue(logger, 15) + expect(sqsMockSend).toHaveBeenCalledTimes(3) + expect(messages).toHaveLength(10) + expect(infoSpy).toHaveBeenCalledTimes(3) + }) + + it("Does not return more than the maximum number of messages, even if more are available", async () => { + const constructMessageArray = {Messages: Array.from({length: 10}, () => constructMessage())} + const mockQueue = () => Promise.resolve(constructMessageArray) + sqsMockSend.mockImplementation(mockQueue) + + const messages = await drainQueue(logger, 20) + + expect(sqsMockSend).toHaveBeenCalledTimes(2) + expect(messages).toHaveLength(20) + expect(infoSpy).toHaveBeenCalledTimes(3) + }) + + it("Stops polling the queue if not enough messages are returned from the queue", async () => { + const first = {Messages: Array.from({length: 10}, () => constructMessage())} + const second = {Messages: Array.from({length: 4}, () => constructMessage())} + + sqsMockSend + .mockImplementationOnce(() => Promise.resolve(first)) + .mockImplementationOnce(() => Promise.resolve(second)) + + const messages = await drainQueue(logger, 20) + expect(sqsMockSend).toHaveBeenCalledTimes(2) + expect(messages).toHaveLength(14) }) it("returns empty array if queue is empty on first fetch", async () => { - sqsMockSend.mockImplementation(() => Promise.resolve({Messages: []})) + sqsMockSend.mockImplementationOnce(() => Promise.resolve({Messages: []})) const messages = await drainQueue(logger, 5) expect(messages).toEqual([]) @@ -51,11 +103,25 @@ describe("NHS notify lambda helper functions", () => { await expect(drainQueue(logger, 10)).rejects.toThrow("Fetch failed") }) + it("Throws an error if a message has no Body", async () => { + const badMsg = constructMessage({Body: undefined}) + sqsMockSend.mockImplementationOnce(() => Promise.resolve({Messages: [badMsg]})) + + await expect(drainQueue(logger, 1)).rejects.toThrow( + `Received an invalid SQS message. Message ID ${badMsg.MessageId}` + ) + expect(errorSpy).toHaveBeenCalledWith( + "Failed to parse SQS message - aborting this notification processor check.", + {offendingMessage: badMsg} + ) + }) + it("Throws an error if the SQS URL is not configured", async () => { delete process.env.NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL - const {drainQueue} = await import("../src/utils") - - await expect(drainQueue(logger)).rejects.toThrow("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set") + const {drainQueue: dq} = await import("../src/utils") + await expect(dq(logger)).rejects.toThrow( + "NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set" + ) expect(errorSpy).toHaveBeenCalledWith("Notifications SQS URL not configured") }) }) @@ -63,6 +129,7 @@ describe("NHS notify lambda helper functions", () => { describe("clearCompletedSQSMessages", () => { let logger: Logger let errorSpy: SpiedFunction<(msg: string, ...meta: Array) => void> + let infoSpy: SpiedFunction<(msg: string, ...meta: Array) => void> beforeEach(() => { jest.resetModules() @@ -71,18 +138,19 @@ describe("NHS notify lambda helper functions", () => { process.env = {...ORIGINAL_ENV} logger = new Logger({serviceName: "test-service"}) errorSpy = jest.spyOn(logger, "error") + infoSpy = jest.spyOn(logger, "info") }) - it("deletes messages successfully without error", async () => { + it("deletes messages in a single batch successfully", async () => { const messages: Array = [ - {MessageId: "msg1", ReceiptHandle: "rh1"}, - {MessageId: "msg2", ReceiptHandle: "rh2"} + constructMessage({MessageId: "msg1", ReceiptHandle: "rh1"}), + constructMessage({MessageId: "msg2", ReceiptHandle: "rh2"}) ] // successful delete (no .Failed) sqsMockSend.mockImplementationOnce(() => Promise.resolve({})) - await expect(clearCompletedSQSMessages(messages, logger)) + await expect(clearCompletedSQSMessages(logger, messages)) .resolves .toBeUndefined() @@ -98,14 +166,38 @@ describe("NHS notify lambda helper functions", () => { {Id: "msg2", ReceiptHandle: "rh2"} ] }) - expect(errorSpy).not.toHaveBeenCalled() }) + it("splits into batches of 10 when over the SQS limit", async () => { + const messages: Array = Array.from({length: 12}, (_, i) => + constructMessage({MessageId: `msg${i}`, ReceiptHandle: `rh${i}`}) + ) + // succeed both batches + sqsMockSend.mockImplementation(() => Promise.resolve({})) + + await clearCompletedSQSMessages(logger, messages) + expect(sqsMockSend).toHaveBeenCalledTimes(2) + + // first batch of 10 + const firstCmd = sqsMockSend.mock.calls[0][0] as DeleteMessageBatchCommand + expect(firstCmd.input.Entries).toHaveLength(10) + // second batch of 2 + const secondCmd = sqsMockSend.mock.calls[1][0] as DeleteMessageBatchCommand + expect(secondCmd.input.Entries).toHaveLength(2) + + expect(infoSpy).toHaveBeenCalledWith( + "Deleting batch 1/2", + expect.objectContaining({batchSize: 10, messageIds: expect.any(Array)}) + ) + expect(infoSpy).toHaveBeenCalledWith( + "Deleting batch 2/2", + expect.objectContaining({batchSize: 2, messageIds: expect.any(Array)}) + ) + }) + it("logs and throws if some deletions fail", async () => { - const messages: Array = [ - {MessageId: "msg1", ReceiptHandle: "rh1"} - ] + const messages: Array = [constructMessage({MessageId: "msg1", ReceiptHandle: "rh1"})] const failedEntries = [ {Id: "msg1", SenderFault: true, Code: "Error", Message: "fail"} ] @@ -113,28 +205,26 @@ describe("NHS notify lambda helper functions", () => { // partial failure sqsMockSend.mockImplementationOnce(() => Promise.resolve({Failed: failedEntries})) - await expect(clearCompletedSQSMessages(messages, logger)) + await expect(clearCompletedSQSMessages(logger, messages)) .rejects - .toThrow("Failed to delete fetched messages from SQS") + .toThrow("Failed to delete 1 messages from SQS") expect(errorSpy).toHaveBeenCalledWith( - "Some messages failed to delete", + "Some messages failed to delete in this batch", {failed: failedEntries} ) }) it("Throws an error if the SQS URL is not configured", async () => { delete process.env.NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL - const {clearCompletedSQSMessages} = await import("../src/utils") + const {clearCompletedSQSMessages: clearFunc} = await import("../src/utils") - await expect(clearCompletedSQSMessages([], logger)) - .rejects - .toThrow("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set") + await expect(clearFunc(logger, [])).rejects.toThrow("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set") expect(errorSpy).toHaveBeenCalledWith("Notifications SQS URL not configured") }) }) - describe("addPrescriptionToNotificationStateStore", () => { + describe("addPrescriptionMessagesToNotificationStateStore", () => { let logger: Logger let infoSpy: SpiedFunction<(msg: string, ...meta: Array) => void> let errorSpy: SpiedFunction<(msg: string, ...meta: Array) => void> @@ -154,10 +244,10 @@ describe("NHS notify lambda helper functions", () => { it("throws and logs error if TABLE_NAME is not set", async () => { delete process.env.TABLE_NAME - const {addPrescriptionToNotificationStateStore} = await import("../src/utils") + const {addPrescriptionMessagesToNotificationStateStore: addFn} = await import("../src/utils") await expect( - addPrescriptionToNotificationStateStore(logger, [constructPSUDataItem()]) + addFn(logger, [constructPSUDataItemMessage()]) ).rejects.toThrow("TABLE_NAME not set") expect(errorSpy).toHaveBeenCalledWith( @@ -168,65 +258,55 @@ describe("NHS notify lambda helper functions", () => { }) it("throws and logs error when a DynamoDB write fails", async () => { - const item = constructPSUDataItem() + const item = constructPSUDataItemMessage() const awsErr = new Error("AWS error") sendSpy.mockImplementationOnce(() => Promise.reject(awsErr)) await expect( - addPrescriptionToNotificationStateStore(logger, [item]) + addPrescriptionMessagesToNotificationStateStore(logger, [item]) ).rejects.toThrow("AWS error") // first info for count expect(infoSpy).toHaveBeenCalledWith( - "Pushing data to DynamoDB", + "Attempting to push data to DynamoDB", {count: 1} ) // error log includes PrescriptionID and the error expect(errorSpy).toHaveBeenCalledWith( "Failed to write to DynamoDB", { - PrescriptionID: item.PrescriptionID, error: awsErr } ) }) it("puts data in DynamoDB and logs correctly when configured", async () => { - const item = constructPSUDataItem() + const item = constructPSUDataItemMessage() sendSpy.mockImplementationOnce(() => Promise.resolve({})) - await addPrescriptionToNotificationStateStore(logger, [item]) + await addPrescriptionMessagesToNotificationStateStore(logger, [item]) - // 1st info: pushing batch - expect(infoSpy).toHaveBeenNthCalledWith( - 1, - "Pushing data to DynamoDB", + expect(infoSpy).toHaveBeenCalledWith( + "Attempting to push data to DynamoDB", {count: 1} ) + // send was called exactly once with a PutCommand expect(sendSpy).toHaveBeenCalledTimes(1) const cmd = sendSpy.mock.calls[0][0] as PutCommand expect(cmd).toBeInstanceOf(PutCommand) - // verify TTL injected - expect(cmd.input).toEqual({ - TableName: "dummy_table", - Item: { - ...item, - ExpiryTime: 86400 - } - }) - // 2nd info: upsert log - expect(infoSpy).toHaveBeenNthCalledWith( - 2, - "Upserted prescription", - { - PrescriptionID: item.PrescriptionID, - PatientNHSNumber: item.PatientNHSNumber - } - ) + expect(infoSpy).toHaveBeenCalledWith("Upserted prescription") + // No errors expect(errorSpy).not.toHaveBeenCalled() }) + + it("does nothing when passed an empty array", async () => { + await addPrescriptionMessagesToNotificationStateStore(logger, []) + expect(infoSpy).toHaveBeenCalledTimes(1) + expect(infoSpy).toHaveBeenCalledWith("No data to push into DynamoDB.") + expect(sendSpy).not.toHaveBeenCalled() + }) }) }) diff --git a/packages/updatePrescriptionStatus/src/utils/sqsClient.ts b/packages/updatePrescriptionStatus/src/utils/sqsClient.ts index 5f24871063..325dd924bd 100644 --- a/packages/updatePrescriptionStatus/src/utils/sqsClient.ts +++ b/packages/updatePrescriptionStatus/src/utils/sqsClient.ts @@ -93,10 +93,25 @@ export async function pushPrescriptionToNotificationSQS( // FIFO // We dedupe on both nhs number and ods code MessageDeduplicationId: saltedHash(logger, `${item.PatientNHSNumber}:${item.PharmacyODSCode}`), - MessageGroupId: requestId + MessageGroupId: requestId, + MessageAttributes: { + RequestId: { + DataType: "String", + StringValue: requestId + } + } })) // We could do a round of deduplications here, but benefits would be minimal and AWS SQS will do it for us anyway. + logger.info( + "For this batch, this is the results of filtering out unwanted statuses and parsing to SQS message entries", + { + batchLength: batch.length, + entriesLength: entries.length, + entriesStatuses: batch.map((el) => el.Status) + } + ) + if (!entries.length) { // Carry on if we have no updates to make. logger.info("No entries to post to the notifications SQS")