-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathrequest_validator.py
More file actions
113 lines (86 loc) · 4.37 KB
/
request_validator.py
File metadata and controls
113 lines (86 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import logging
import re
from collections.abc import Callable
from functools import wraps
from typing import Any
from mangum.types import LambdaContext, LambdaEvent
from eligibility_signposting_api.common.api_error_response import (
INVALID_CATEGORY_ERROR,
INVALID_CONDITION_FORMAT_ERROR,
INVALID_INCLUDE_ACTIONS_ERROR,
NHS_NUMBER_MISMATCH_ERROR,
NHS_NUMBER_MISSING_ERROR,
)
from eligibility_signposting_api.config.contants import NHS_NUMBER_HEADER
logger = logging.getLogger(__name__)
condition_pattern = re.compile(r"^\s*[a-zA-Z0-9]+\s*$", re.IGNORECASE)
category_pattern = re.compile(r"^\s*(VACCINATIONS|SCREENING|ALL)\s*$", re.IGNORECASE)
include_actions_pattern = re.compile(r"^\s*([YN])\s*$", re.IGNORECASE)
def validate_query_params(query_params: dict[str, str]) -> tuple[bool, dict[str, Any] | None]:
conditions = query_params.get("conditions", "ALL").split(",")
for condition in conditions:
search = re.search(condition_pattern, condition)
if not search:
return False, get_condition_error_response(condition)
category = query_params.get("category", "ALL")
if not re.search(category_pattern, category):
return False, get_category_error_response(category)
include_actions = query_params.get("includeActions", "Y")
if not re.search(include_actions_pattern, include_actions):
return False, get_include_actions_error_response(include_actions)
return True, None
def validate_nhs_number(path_nhs: str, header_nhs: str) -> bool:
logger.info("NHS numbers from the request", extra={"header_nhs": header_nhs, "path_nhs": path_nhs})
if not header_nhs or not path_nhs:
logger.error("NHS number is not present", extra={"header_nhs": header_nhs, "path_nhs": path_nhs})
return False
if header_nhs != path_nhs:
logger.error("NHS number mismatch", extra={"header_nhs": header_nhs, "path_nhs": path_nhs})
return False
return True
def validate_request_params() -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(event: LambdaEvent, context: LambdaContext) -> dict[str, Any] | None:
path_nhs_no = event.get("pathParameters", {}).get("id")
header_nhs_no = event.get("headers", {}).get(NHS_NUMBER_HEADER)
if not path_nhs_no:
message = "Missing required NHS Number from path parameters"
return NHS_NUMBER_MISSING_ERROR.log_and_generate_response(
log_message=message, diagnostics=message, location_param="id"
)
if not validate_nhs_number(path_nhs_no, header_nhs_no):
message = f"NHS Number {path_nhs_no or ''} does not match the header NHS Number {header_nhs_no or ''}"
return NHS_NUMBER_MISMATCH_ERROR.log_and_generate_response(
log_message=message, diagnostics=message, location_param="id"
)
query_params = event.get("queryStringParameters")
if query_params:
is_valid, problem = validate_query_params(query_params)
if not is_valid:
return problem
return func(event, context)
return wrapper
return decorator
def get_include_actions_error_response(include_actions: str) -> dict[str, Any]:
diagnostics = f"{include_actions} is not a value that is supported by the API"
return INVALID_INCLUDE_ACTIONS_ERROR.log_and_generate_response(
log_message=f"Invalid include actions query param: '{include_actions}'",
diagnostics=diagnostics,
location_param="includeActions",
)
def get_category_error_response(category: str) -> dict[str, Any]:
diagnostics = f"{category} is not a category that is supported by the API"
return INVALID_CATEGORY_ERROR.log_and_generate_response(
log_message=f"Invalid category query param: '{category}'", diagnostics=diagnostics, location_param="category"
)
def get_condition_error_response(condition: str) -> dict[str, Any]:
diagnostics = (
f"{condition} should be a single or comma separated list of condition "
f"strings with no other punctuation or special characters"
)
return INVALID_CONDITION_FORMAT_ERROR.log_and_generate_response(
log_message=f"Invalid condition query param: '{condition}'",
diagnostics=diagnostics,
location_param="conditions",
)