-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathrequest_validator.py
More file actions
104 lines (78 loc) · 3.98 KB
/
request_validator.py
File metadata and controls
104 lines (78 loc) · 3.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import logging
import re
from collections.abc import Callable
from functools import wraps
from typing import Any
from mangum.types import LambdaContext, LambdaEvent
from eligibility_signposting_api.common.api_error_response import (
INVALID_CATEGORY_ERROR,
INVALID_CONDITION_FORMAT_ERROR,
INVALID_INCLUDE_ACTIONS_ERROR,
NHS_NUMBER_MISMATCH_ERROR,
)
from eligibility_signposting_api.config.contants import NHS_NUMBER_HEADER
logger = logging.getLogger(__name__)
condition_pattern = re.compile(r"^\s*[a-z0-9]+\s*$", re.IGNORECASE)
category_pattern = re.compile(r"^\s*(VACCINATIONS|SCREENING|ALL)\s*$", re.IGNORECASE)
include_actions_pattern = re.compile(r"^\s*([YN])\s*$", re.IGNORECASE)
def validate_query_params(query_params: dict[str, str]) -> tuple[bool, dict[str, Any] | None]:
conditions = query_params.get("conditions", "ALL").split(",")
for condition in conditions:
search = re.search(condition_pattern, condition)
if not search:
return False, get_condition_error_response(condition)
category = query_params.get("category", "ALL")
if not re.search(category_pattern, category):
return False, get_category_error_response(category)
include_actions = query_params.get("includeActions", "Y")
if not re.search(include_actions_pattern, include_actions):
return False, get_include_actions_error_response(include_actions)
return True, None
def validate_nhs_number(path_nhs: str, header_nhs: str) -> bool:
logger.info("NHS numbers from the request", extra={"header_nhs": header_nhs, "path_nhs": path_nhs})
if not header_nhs or not path_nhs:
logger.error("NHS number is not present", extra={"header_nhs": header_nhs, "path_nhs": path_nhs})
return False
if header_nhs != path_nhs:
logger.error("NHS number mismatch", extra={"header_nhs": header_nhs, "path_nhs": path_nhs})
return False
return True
def validate_request_params() -> Callable:
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(event: LambdaEvent, context: LambdaContext) -> dict[str, Any] | None:
path_nhs_no = event.get("pathParameters", {}).get("id")
header_nhs_no = event.get("headers", {}).get(NHS_NUMBER_HEADER)
if not validate_nhs_number(path_nhs_no, header_nhs_no):
message = "You are not authorised to request information for the supplied NHS Number"
return NHS_NUMBER_MISMATCH_ERROR.log_and_generate_response(log_message=message, diagnostics=message)
query_params = event.get("queryStringParameters")
if query_params:
is_valid, problem = validate_query_params(query_params)
if not is_valid:
return problem
return func(event, context)
return wrapper
return decorator
def get_include_actions_error_response(include_actions: str) -> dict[str, Any]:
diagnostics = f"{include_actions} is not a value that is supported by the API"
return INVALID_INCLUDE_ACTIONS_ERROR.log_and_generate_response(
log_message=f"Invalid include actions query param: '{include_actions}'",
diagnostics=diagnostics,
location_param="includeActions",
)
def get_category_error_response(category: str) -> dict[str, Any]:
diagnostics = f"{category} is not a category that is supported by the API"
return INVALID_CATEGORY_ERROR.log_and_generate_response(
log_message=f"Invalid category query param: '{category}'", diagnostics=diagnostics, location_param="category"
)
def get_condition_error_response(condition: str) -> dict[str, Any]:
diagnostics = (
f"{condition} should be a single or comma separated list of condition "
f"strings with no other punctuation or special characters"
)
return INVALID_CONDITION_FORMAT_ERROR.log_and_generate_response(
log_message=f"Invalid condition query param: '{condition}'",
diagnostics=diagnostics,
location_param="conditions",
)