Skip to content

Commit 31c6de4

Browse files
authored
New: [AEA-5296] - Log message for ITOC monitoring (#1787)
## Summary - ✨ New Feature ### Details Log the queue attributes at the start of the processor run. - timestamp - Number of messages on queue - Number of invisible messages - Number of delayed message There should always be 0 delayed messages.
1 parent c2dbb72 commit 31c6de4

6 files changed

Lines changed: 142 additions & 11 deletions

File tree

packages/nhsNotifyLambda/src/nhsNotifyLambda.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
addPrescriptionMessagesToNotificationStateStore,
1313
checkCooldownForUpdate,
1414
removeSQSMessages,
15+
reportQueueStatus,
1516
drainQueue,
1617
makeBatchNotifyRequest,
1718
NotifyDataItemMessage
@@ -108,6 +109,7 @@ export const lambdaHandler = async (
108109
logger.info("NHS Notify lambda triggered by scheduler", {event})
109110
logger.info("Routing Plan ID:", {routingId})
110111

112+
await reportQueueStatus(logger)
111113
await drainAndProcess(routingId)
112114
}
113115

packages/nhsNotifyLambda/src/utils.ts

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ import {
33
SQSClient,
44
ReceiveMessageCommand,
55
DeleteMessageBatchCommand,
6-
Message
6+
Message,
7+
GetQueueAttributesCommand
78
} from "@aws-sdk/client-sqs"
89
import {DynamoDBClient} from "@aws-sdk/client-dynamodb"
910
import {DynamoDBDocumentClient, GetCommand, PutCommand} from "@aws-sdk/lib-dynamodb"
@@ -58,6 +59,37 @@ function chunkArray<T>(arr: Array<T>, size: number): Array<Array<T>> {
5859
return chunks
5960
}
6061

62+
export async function reportQueueStatus(logger: Logger): Promise<void> {
63+
if (!sqsUrl) {
64+
logger.error("Notifications SQS URL not configured")
65+
throw new Error("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set")
66+
}
67+
68+
const attrsCmd = new GetQueueAttributesCommand({
69+
QueueUrl: sqsUrl,
70+
AttributeNames: [
71+
"ApproximateNumberOfMessages",
72+
"ApproximateNumberOfMessagesNotVisible",
73+
"ApproximateNumberOfMessagesDelayed"
74+
]
75+
})
76+
const {Attributes} = await sqs.send(attrsCmd)
77+
78+
// Fall back to a negative value so missing data can be identified
79+
const ApproximateNumberOfMessages = parseInt(Attributes?.ApproximateNumberOfMessages ?? "-1")
80+
const ApproximateNumberOfMessagesNotVisible = parseInt(Attributes?.ApproximateNumberOfMessagesNotVisible ?? "-1")
81+
const ApproximateNumberOfMessagesDelayed = parseInt(Attributes?.ApproximateNumberOfMessagesDelayed ?? "-1")
82+
83+
logger.info(
84+
"Current queue attributes (if a value failed to fetch, it will be reported as -1):",
85+
{
86+
ApproximateNumberOfMessages,
87+
ApproximateNumberOfMessagesNotVisible,
88+
ApproximateNumberOfMessagesDelayed
89+
}
90+
)
91+
}
92+
6193
// This is an extension of the SQS message interface, which explicitly parses the PSUDataItem
6294
// and helps track the nhs notify results
6395
export interface NotifyDataItemMessage extends Message {
@@ -78,7 +110,7 @@ export interface NotifyDataItemMessage extends Message {
78110
* - messages the array of parsed NotifyDataItemMessage
79111
* - isEmpty: true if the last receive returned fewer than 5 messages (or none),
80112
* indicating the queue is effectively drained.
81-
*/
113+
*/
82114
export async function drainQueue(
83115
logger: Logger,
84116
maxTotal = 100

packages/nhsNotifyLambda/tests/testNhsNotifyLambda.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ jest.unstable_mockModule(
1919

2020
const mockAddPrescriptionMessagesToNotificationStateStore = jest.fn()
2121
const mockRemoveSQSMessages = jest.fn()
22+
const mockReportQueueStatus = jest.fn()
2223
const mockDrainQueue = jest.fn()
2324
const mockCheckCooldownForUpdate = jest.fn()
2425
const mockMakeBatchNotifyRequest = jest.fn()
@@ -27,6 +28,7 @@ jest.unstable_mockModule(
2728
"../src/utils",
2829
async () => ({
2930
__esModule: true,
31+
reportQueueStatus: mockReportQueueStatus,
3032
drainQueue: mockDrainQueue,
3133
addPrescriptionMessagesToNotificationStateStore: mockAddPrescriptionMessagesToNotificationStateStore,
3234
removeSQSMessages: mockRemoveSQSMessages,

packages/nhsNotifyLambda/tests/testUtils.test.ts

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import nock from "nock"
44

55
import {Logger} from "@aws-lambda-powertools/logger"
66
import {DynamoDBDocumentClient, GetCommand, PutCommand} from "@aws-sdk/lib-dynamodb"
7-
import {DeleteMessageBatchCommand, Message} from "@aws-sdk/client-sqs"
7+
import {GetQueueAttributesCommand, DeleteMessageBatchCommand, Message} from "@aws-sdk/client-sqs"
88

99
import {constructMessage, constructPSUDataItemMessage, mockSQSClient} from "./testHelpers"
1010

@@ -36,8 +36,9 @@ jest.unstable_mockModule(
3636

3737
const {
3838
addPrescriptionMessagesToNotificationStateStore,
39-
removeSQSMessages: clearCompletedSQSMessages,
39+
removeSQSMessages,
4040
checkCooldownForUpdate,
41+
reportQueueStatus,
4142
drainQueue,
4243
makeBatchNotifyRequest
4344
} = await import("../src/utils")
@@ -155,7 +156,7 @@ describe("NHS notify lambda helper functions", () => {
155156
})
156157
})
157158

158-
describe("clearCompletedSQSMessages", () => {
159+
describe("removeSQSMessages", () => {
159160
let logger: Logger
160161
let errorSpy: SpiedFunction<(msg: string, ...meta: Array<unknown>) => void>
161162
let infoSpy: SpiedFunction<(msg: string, ...meta: Array<unknown>) => void>
@@ -179,7 +180,7 @@ describe("NHS notify lambda helper functions", () => {
179180
// successful delete (no .Failed)
180181
sqsMockSend.mockImplementationOnce(() => Promise.resolve({}))
181182

182-
await expect(clearCompletedSQSMessages(logger, messages))
183+
await expect(removeSQSMessages(logger, messages))
183184
.resolves
184185
.toBeUndefined()
185186

@@ -205,7 +206,7 @@ describe("NHS notify lambda helper functions", () => {
205206
// succeed both batches
206207
sqsMockSend.mockImplementation(() => Promise.resolve({}))
207208

208-
await clearCompletedSQSMessages(logger, messages)
209+
await removeSQSMessages(logger, messages)
209210
expect(sqsMockSend).toHaveBeenCalledTimes(2)
210211

211212
// first batch of 10
@@ -234,7 +235,7 @@ describe("NHS notify lambda helper functions", () => {
234235
// partial failure
235236
sqsMockSend.mockImplementationOnce(() => Promise.resolve({Failed: failedEntries}))
236237

237-
await clearCompletedSQSMessages(logger, messages)
238+
await removeSQSMessages(logger, messages)
238239

239240
expect(errorSpy).toHaveBeenCalledWith(
240241
"Some messages failed to delete in this batch",
@@ -756,4 +757,63 @@ describe("NHS notify lambda helper functions", () => {
756757
})
757758
})
758759
})
760+
761+
describe("reportQueueStatus", () => {
762+
let logger: Logger
763+
let infoSpy: SpiedFunction<(msg: string, ...meta: Array<unknown>) => void>
764+
let errorSpy: SpiedFunction<(msg: string, ...meta: Array<unknown>) => void>
765+
766+
beforeEach(() => {
767+
jest.resetModules()
768+
jest.clearAllMocks()
769+
770+
process.env = {...ORIGINAL_ENV}
771+
logger = new Logger({serviceName: "test-service"})
772+
infoSpy = jest.spyOn(logger, "info")
773+
errorSpy = jest.spyOn(logger, "error")
774+
})
775+
776+
it("logs current queue attributes when SQS returns attributes", async () => {
777+
const attrs = {
778+
ApproximateNumberOfMessages: "7",
779+
ApproximateNumberOfMessagesNotVisible: "4",
780+
ApproximateNumberOfMessagesDelayed: "1"
781+
}
782+
783+
sqsMockSend.mockImplementationOnce((cmd) => {
784+
expect(cmd).toBeInstanceOf(GetQueueAttributesCommand)
785+
expect((cmd as GetQueueAttributesCommand).input).toEqual({
786+
QueueUrl: process.env.NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL,
787+
AttributeNames: [
788+
"ApproximateNumberOfMessages",
789+
"ApproximateNumberOfMessagesNotVisible",
790+
"ApproximateNumberOfMessagesDelayed"
791+
]
792+
})
793+
return Promise.resolve({Attributes: attrs})
794+
})
795+
796+
await reportQueueStatus(logger)
797+
798+
expect(infoSpy).toHaveBeenCalledWith(
799+
"Current queue attributes (if a value failed to fetch, it will be reported as -1):",
800+
{
801+
ApproximateNumberOfMessages: 7,
802+
ApproximateNumberOfMessagesNotVisible: 4,
803+
ApproximateNumberOfMessagesDelayed: 1
804+
}
805+
)
806+
})
807+
808+
it("throws if the SQS URL is not configured", async () => {
809+
delete process.env.NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL
810+
const {reportQueueStatus: rqs} = await import("../src/utils")
811+
812+
await expect(rqs(logger)).rejects.toThrow(
813+
"NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set"
814+
)
815+
expect(errorSpy).toHaveBeenCalledWith("Notifications SQS URL not configured")
816+
})
817+
})
818+
759819
})

packages/nhsNotifyUpdateCallback/src/lambdaHandler.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,23 @@ const lambdaHandler = async (event: APIGatewayProxyEvent): Promise<APIGatewayPro
3535
let payload: MessageStatusResponse
3636
try {
3737
payload = JSON.parse(event.body)
38-
logger.info("Payload parsed", {payload})
3938
} catch (error) {
4039
logger.error("Failed to parse payload", {error, payload: event.body})
4140
return response(400, {message: "Request body failed to parse"})
4241
}
4342

43+
payload.data.forEach(m => {
44+
logger.info(
45+
"Message state updated",
46+
{
47+
messageStatus: m.attributes.messageStatus,
48+
messageReference: m.attributes.messageReference,
49+
messageId: m.attributes.messageId,
50+
receivedTimestamp: m.attributes.timestamp
51+
}
52+
)
53+
})
54+
4455
try {
4556
await updateNotificationsTable(logger, payload)
4657
} catch (error) {

packages/nhsNotifyUpdateCallback/tests/testNhsNotifyCallbackLambda.test.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,19 @@ describe("NHS Notify update callback lambda handler", () => {
9090
})
9191

9292
it("returns 500 if updateNotificationsTable throws", async () => {
93-
const payload = {status: "foo"}
93+
const payload = {
94+
status: "foo",
95+
data: [
96+
{
97+
attributes: {
98+
messageStatus: "messageStatus",
99+
messageReference: "messageReference",
100+
messageId: "messageId",
101+
timestamp: "timestamp"
102+
}
103+
}
104+
]
105+
}
94106
const event = generateMockEvent(payload)
95107
event.headers["x-request-id"] = "abc"
96108
mockCheckSignature.mockImplementation(() => undefined)
@@ -110,7 +122,19 @@ describe("NHS Notify update callback lambda handler", () => {
110122
})
111123

112124
it("returns 202 and 'OK' when everything succeeds", async () => {
113-
const payload = {status: "ok"}
125+
const payload = {
126+
status: "ok",
127+
data: [
128+
{
129+
attributes: {
130+
messageStatus: "messageStatus",
131+
messageReference: "messageReference",
132+
messageId: "messageId",
133+
timestamp: "timestamp"
134+
}
135+
}
136+
]
137+
}
114138
const event = generateMockEvent(payload)
115139
event.headers["x-request-id"] = "abc"
116140
mockCheckSignature.mockImplementation(() => undefined)

0 commit comments

Comments
 (0)