-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathutils.ts
More file actions
274 lines (237 loc) · 8.54 KB
/
utils.ts
File metadata and controls
274 lines (237 loc) · 8.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import {Logger} from "@aws-lambda-powertools/logger"
import {
SQSClient,
ReceiveMessageCommand,
DeleteMessageBatchCommand,
Message
} from "@aws-sdk/client-sqs"
import {DynamoDBClient} from "@aws-sdk/client-dynamodb"
import {DynamoDBDocumentClient, GetCommand, PutCommand} from "@aws-sdk/lib-dynamodb"
import {PSUDataItem} from "@PrescriptionStatusUpdate_common/commonTypes"
const TTL_DELTA = 60 * 60 * 24 * 7 // Keep records for a week
const dynamoTable = process.env.TABLE_NAME
const sqsUrl = process.env.NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL
// AWS clients
const sqs = new SQSClient({region: process.env.AWS_REGION})
const dynamo = new DynamoDBClient({region: process.env.AWS_REGION})
const docClient = DynamoDBDocumentClient.from(dynamo)
/**
* Returns the original array, chunked in batches of up to <size>
*
* @param arr - Array to be chunked
* @param size - The maximum size of each chunk. The final chunk may be smaller.
* @returns - an (N+1) dimensional array
*/
function chunkArray<T>(arr: Array<T>, size: number): Array<Array<T>> {
const chunks: Array<Array<T>> = []
for (let i = 0; i < arr.length; i += size) {
chunks.push(arr.slice(i, i + size))
}
return chunks
}
// This is an extension of the SQS message interface, which explicitly parses the PSUDataItem
export interface PSUDataItemMessage extends Message {
PSUDataItem: PSUDataItem
}
/**
* Pulls up to `maxTotal` messages off the queue (in batches of up to 10),
* logs them, and deletes them.
*/
export async function drainQueue(logger: Logger, maxTotal = 100): Promise<Array<PSUDataItemMessage>> {
let receivedSoFar = 0
const allMessages: Array<PSUDataItemMessage> = []
if (!sqsUrl) {
logger.error("Notifications SQS URL not configured")
throw new Error("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set")
}
let pollingIteration = 0
while (receivedSoFar < maxTotal) {
pollingIteration = pollingIteration + 1
const toFetch = Math.min(10, maxTotal - receivedSoFar)
const receiveCmd = new ReceiveMessageCommand({
QueueUrl: sqsUrl,
MaxNumberOfMessages: toFetch,
WaitTimeSeconds: 20, // Use long polling to avoid getting empty responses when the queue is small
MessageAttributeNames: [
"MessageDeduplicationId",
"MessageGroupId"
]
})
const {Messages} = await sqs.send(receiveCmd)
// if the queue is now empty, then break the loop
if (!Messages || Messages.length === 0) break
logger.info(
"Received some messages from the queue. Parsing them...",
{
pollingIteration,
MessageIDs: Messages.map((m) => m.MessageId)
}
)
const parsedMessages: Array<PSUDataItemMessage> = Messages.map((m) => {
if (!m.Body) {
logger.error("Failed to parse SQS message - aborting this notification processor check.", {offendingMessage: m})
throw new Error(`Received an invalid SQS message. Message ID ${m.MessageId}`)
}
const parsedBody: PSUDataItem = JSON.parse(m.Body) as PSUDataItem
return {
...m,
PSUDataItem: parsedBody
}
})
allMessages.push(...parsedMessages)
receivedSoFar += Messages.length
// if the last batch of messages was small, then break the loop
// This is to prevent a slow-loris style breakdown if the queue has
// barely enough messages to keep the processors alive
if (!Messages || Messages.length < 5) {
logger.info("Received a small number of messages. Considering the queue drained.", {batchLength: Messages.length})
break
}
}
logger.info(`In sum, retrieved ${allMessages.length} messages from SQS`)
return allMessages
}
/**
* For each message given, delete it from the notifications SQS in batches of up to 10.
* Throws an error if any batch fails, but previous batches will remain deleted.
*
* @param logger - the logging object
* @param messages - The messages that were received from SQS, and are to be deleted.
*/
export async function clearCompletedSQSMessages(
logger: Logger,
messages: Array<Message>
): Promise<void> {
if (!sqsUrl) {
logger.error("Notifications SQS URL not configured")
throw new Error("NHS_NOTIFY_PRESCRIPTIONS_SQS_QUEUE_URL not set")
}
const batches = chunkArray(messages, 10)
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
const batch = batches[batchIndex]
const entries = batch.map((m) => ({
Id: m.MessageId!,
ReceiptHandle: m.ReceiptHandle!
}))
logger.info(`Deleting batch ${batchIndex + 1}/${batches.length}`, {
batchSize: entries.length,
messageIds: entries.map((e) => e.Id)
})
const deleteCmd = new DeleteMessageBatchCommand({
QueueUrl: sqsUrl,
Entries: entries
})
const delResult = await sqs.send(deleteCmd)
if (delResult.Failed && delResult.Failed.length > 0) {
logger.error("Some messages failed to delete in this batch", {failed: delResult.Failed})
throw new Error(`Failed to delete ${delResult.Failed.length} messages from SQS`)
}
logger.info(`Successfully deleted SQS message batch ${batchIndex + 1}`, {
result: delResult,
messageIds: entries.map((e) => e.Id)
})
}
}
export interface LastNotificationStateType {
NHSNumber: string
ODSCode: string
RequestId: string // x-request-id header
MessageID: string // The SQS message ID
LastNotifiedPrescriptionStatus: string
DeliveryStatus: string
LastNotificationRequestTimestamp: string // ISO-8601 string
ExpiryTime: number // DynamoDB expiration time (UNIX timestamp)
}
export async function addPrescriptionMessagesToNotificationStateStore(
logger: Logger,
dataArray: Array<PSUDataItemMessage>
) {
if (!dynamoTable) {
logger.error("DynamoDB table not configured")
throw new Error("TABLE_NAME not set")
}
if (dataArray.length) logger.info("Attempting to push data to DynamoDB", {count: dataArray.length})
else logger.info("No data to push into DynamoDB.")
for (const data of dataArray) {
const item: LastNotificationStateType = {
NHSNumber: data.PSUDataItem.PatientNHSNumber,
ODSCode: data.PSUDataItem.PharmacyODSCode,
RequestId: data.PSUDataItem.RequestID,
MessageID: data.MessageId!,
LastNotifiedPrescriptionStatus: data.PSUDataItem.Status,
DeliveryStatus: "requested",
LastNotificationRequestTimestamp: new Date().toISOString(),
ExpiryTime: (Math.floor(+new Date() / 1000) + TTL_DELTA)
}
try {
await docClient.send(new PutCommand({
TableName: dynamoTable,
Item: item
}))
logger.info("Upserted prescription")
} catch (err) {
logger.error("Failed to write to DynamoDB", {
error: err
})
throw err
}
}
}
/**
* Returns TRUE if the patient HAS NOT received a recent notification.
* Returns FALSE if the patient HAS received a recent notification
*
* @param logger - AWS logging object
* @param update - The Prescription Status Update that we are checking
* @param cooldownPeriod - Minimum time in seconds between notifications
*/
export async function checkCooldownForUpdate(
logger: Logger,
update: PSUDataItem,
cooldownPeriod: number = 900
): Promise<boolean> {
if (!dynamoTable) {
logger.error("DynamoDB table not configured")
throw new Error("TABLE_NAME not set")
}
try {
// Retrieve the last notification state for this patient/pharmacy combo
const getCmd = new GetCommand({
TableName: dynamoTable,
Key: {
NHSNumber: update.PatientNHSNumber,
ODSCode: update.PharmacyODSCode
}
})
const {Item} = await docClient.send(getCmd)
// If no previous record, we're okay to send a notification
if (!Item?.LastNotificationRequestTimestamp) {
logger.info("No previous notification state found. Notification allowed.")
return true
}
// Compute seconds since last notification
const lastTs = new Date(Item.LastNotificationRequestTimestamp).getTime()
const nowTs = Date.now()
const secondsSince = Math.floor((nowTs - lastTs) / 1000)
if (secondsSince > cooldownPeriod) {
logger.info("Cooldown period has passed. Notification allowed.", {
NHSNumber: update.PatientNHSNumber,
ODSCode: update.PharmacyODSCode,
cooldownPeriod,
secondsSince
})
return true
} else {
logger.info("Within cooldown period. Notification suppressed.", {
NHSNumber: update.PatientNHSNumber,
ODSCode: update.PharmacyODSCode,
cooldownPeriod,
secondsSince
})
return false
}
} catch (err) {
logger.error("Error checking cooldown state", {error: err})
throw err
}
}