-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathnhsNotifyLambda.ts
More file actions
111 lines (96 loc) · 3.35 KB
/
nhsNotifyLambda.ts
File metadata and controls
111 lines (96 loc) · 3.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import {EventBridgeEvent} from "aws-lambda"
import {Logger} from "@aws-lambda-powertools/logger"
import {injectLambdaContext} from "@aws-lambda-powertools/logger/middleware"
import middy from "@middy/core"
import inputOutputLogger from "@middy/input-output-logger"
import errorHandler from "@nhs/fhir-middy-error-handler"
import {
addPrescriptionMessagesToNotificationStateStore,
checkCooldownForUpdate,
clearCompletedSQSMessages,
drainQueue,
PSUDataItemMessage
} from "./utils"
const logger = new Logger({serviceName: "nhsNotify"})
/**
* Handler for the scheduled trigger.
*
* @param event - The CloudWatch EventBridge scheduled event payload.
*/
export const lambdaHandler = async (event: EventBridgeEvent<string, string>): Promise<void> => {
// EventBridge jsonifies the details so the second type of the event is a string. That's unused here, though
logger.info("NHS Notify lambda triggered by scheduler", {event})
let messages: Array<PSUDataItemMessage>
let processed: Array<PSUDataItemMessage>
try {
messages = await drainQueue(logger, 100)
if (messages.length === 0) {
logger.info("No messages to process")
return
}
// Filter messages by checkCooldownForUpdate. This is done in two stages so we can check in parallel
const eligibility = await Promise.all(
messages.map(async (m) => ({
message: m,
allowed: await checkCooldownForUpdate(logger, m.PSUDataItem)
}))
)
const toProcess = eligibility
.filter((e) => e.allowed)
.map((e) => e.message)
// Log the results of checking the cooldown
const suppressedCount = messages.length - toProcess.length
if (toProcess.length === 0) {
logger.info("All messages suppressed by cooldown; nothing to notify",
{
suppressedCount,
totalFetched: messages.length
})
return
} else if (suppressedCount > 0) {
logger.info(`Suppressed ${suppressedCount} messages due to cooldown`,
{
suppressedCount,
totalFetched: messages.length
}
)
}
// Just for diagnostics for now
const toNotify = toProcess
.map((m) => ({
RequestID: m.PSUDataItem.RequestID,
TaskId: m.PSUDataItem.TaskID,
Message: "Notification Required"
}))
logger.info("Fetched prescription notification messages", {count: toNotify.length, toNotify})
// TODO: Notifications request will be done here.
processed = toProcess
} catch (err) {
logger.error("Error while draining SQS queue", {error: err})
throw err
}
try {
await addPrescriptionMessagesToNotificationStateStore(logger, processed)
} catch (err) {
logger.error("Error while pushing data to the PSU notification state data store", {err})
throw err
}
// By waiting until a message is successfully processed before deleting it from SQS,
// failed messages will eventually be retried by subsequent notify consumers.
try {
await clearCompletedSQSMessages(logger, processed)
} catch (err) {
logger.error("Error while deleting successfully processed messages from SQS", {error: err})
throw err
}
}
export const handler = middy(lambdaHandler)
.use(injectLambdaContext(logger, {clearState: true}))
.use(
inputOutputLogger({
logger: (request) => {
logger.info(request)
}
})
)
.use(errorHandler({logger: logger}))