From fe840ad696dc58a7c97ecce130713f5f89a1b446 Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Wed, 18 Mar 2026 15:15:39 +0000 Subject: [PATCH 01/12] Test SSH signed commit From aa19cca0e1de86dc976d6bbfb467001d6dba6e32 Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Tue, 24 Mar 2026 14:31:01 +0000 Subject: [PATCH 02/12] CHORE: [ELI-751] Added Error handling. --- utils/data_helper.py | 87 +++++++++++++++++++++------------ utils/data_template_resolver.py | 15 +++++- utils/dynamo_helper.py | 36 ++++++++------ utils/s3_config_manager.py | 55 ++++++++++++--------- 4 files changed, 125 insertions(+), 68 deletions(-) diff --git a/utils/data_helper.py b/utils/data_helper.py index 7ca67b24..f36180a4 100644 --- a/utils/data_helper.py +++ b/utils/data_helper.py @@ -145,9 +145,15 @@ def load_all_expected_responses(folder_path): for path in Path(folder_path).iterdir(): if path.suffix != ".json": continue - - with path.open() as f: - raw_json = json.load(f) + try: + with path.open() as f: + raw_json = json.load(f) + except (OSError, IOError) as e: + logger.error("Failed to read expected response file %s: %s", path, e) + continue + except json.JSONDecodeError as e: + logger.error("Invalid JSON in expected response file %s: %s", path, e) + continue resolved_data = resolve_placeholders_in_data(raw_json, path.name) cleaned_data = clean_responses(data=resolved_data, ignore_keys=keys_to_ignore) @@ -157,48 +163,69 @@ def load_all_expected_responses(folder_path): return all_data +def _load_json_file(path: Path): + try: + with path.open() as f: + return json.load(f) + except (OSError, IOError) as e: + logger.error("Failed to read test scenario file %s: %s", path, e) + except json.JSONDecodeError as e: + logger.error("Invalid JSON in test scenario file %s: %s", path, e) + return None + + +def _ensure_default_product_id(request_headers: dict) -> dict: + if not any(k == "NHSE-Product-ID" for k in request_headers): + request_headers["NHSE-Product-ID"] = "Story_Test_Consumer_ID" + return request_headers + + +def _build_test_scenario_entry(raw_json: dict, resolved_data, path_name: str) -> dict: + return { + "dynamo_items": resolved_data, + "nhs_number": extract_nhs_number_from_data(resolved_data), + "config_filenames": raw_json.get("config_filenames"), + "expected_response_code": raw_json.get("expected_response_code"), + "request_headers": _ensure_default_product_id( + raw_json.get("request_headers") or {} + ), + "query_params": raw_json.get("query_params"), + "scenario_name": raw_json.get("scenario_name"), + "secret_version": raw_json.get("secret_version"), + } + + def load_all_test_scenarios(folder_path): all_data = {} - data_builder = TemplateEngine.create() + try: + data_builder = TemplateEngine.create() + except Exception as e: + logger.error("Failed to initialise template engine: %s", e) + raise # Sort files alphabetically by filename for path in sorted(Path(folder_path).iterdir(), key=lambda p: p.name.lower()): if path.suffix != ".json": continue - with path.open() as f: - raw_json = json.load(f) - - templated_data = data_builder.apply(raw_json["data"]) - - config_filenames = raw_json.get("config_filenames") - scenario_name = raw_json.get("scenario_name") - request_headers = raw_json.get("request_headers") or {} - expected_response_code = raw_json.get("expected_response_code") - query_params = raw_json.get("query_params") - secret_version = raw_json.get("secret_version") + raw_json = _load_json_file(path) + if raw_json is None: + continue - if not any(k == "NHSE-Product-ID" for k in request_headers): - request_headers["NHSE-Product-ID"] = "Story_Test_Consumer_ID" + try: + templated_data = data_builder.apply(raw_json["data"]) + except ValueError as e: + logger.error("Failed to apply template to test scenario %s: %s", path, e) + continue # Resolve placeholders resolved_data = resolve_placeholders_in_data(templated_data, path.name) - # Extract NHS number - nhs_number = extract_nhs_number_from_data(resolved_data) - # Add resolved scenario - all_data[path.name] = { - "dynamo_items": resolved_data, - "nhs_number": nhs_number, - "config_filenames": config_filenames, - "expected_response_code": expected_response_code, - "request_headers": request_headers, - "query_params": query_params, - "scenario_name": scenario_name, - "secret_version": secret_version, - } + all_data[path.name] = _build_test_scenario_entry( + raw_json, resolved_data, path.name + ) return all_data diff --git a/utils/data_template_resolver.py b/utils/data_template_resolver.py index 3823490a..52166e0d 100644 --- a/utils/data_template_resolver.py +++ b/utils/data_template_resolver.py @@ -24,8 +24,19 @@ def __init__(self, templates, inheritance): @classmethod @lru_cache(maxsize=1) def create(cls): - with open(DEFAULT_TEMPLATE) as f: - templates = json.load(f) + try: + with open(DEFAULT_TEMPLATE) as f: + templates = json.load(f) + except FileNotFoundError: + raise FileNotFoundError( + f"Default template file not found: {DEFAULT_TEMPLATE}" + ) + except json.JSONDecodeError: + raise ValueError( + f"Invalid JSON in default template file: {DEFAULT_TEMPLATE}" + ) + except (OSError, IOError) as e: + raise IOError(f"Error reading template file: {DEFAULT_TEMPLATE}, {e}") return cls(templates, DEFAULT_INHERITANCE) def apply(self, scenario_data): diff --git a/utils/dynamo_helper.py b/utils/dynamo_helper.py index 4f66da79..905aa107 100644 --- a/utils/dynamo_helper.py +++ b/utils/dynamo_helper.py @@ -194,6 +194,8 @@ def file_backup_exists(dynamo_db_table: DynamoDBHelper): return True except FileNotFoundError: return False + except json.JSONDecodeError as e: + logger.warning(f"Corrupted backup file detected: {e}") def reset_dynamo_tables(): @@ -250,22 +252,28 @@ def reset_dynamo_tables(): def load_information_from_backup_files(dynamo_db_table: DynamoDBHelper): logger.warning("Table information taken from backup files") - dynamo_db_table.tags = json.loads( - load_from_file(f"{DYNAMO_TEMP_LOCATION}tags-{dynamo_db_table.environment}.json") - ) - dynamo_db_table.attribute_definitions = json.loads( - load_from_file( - f"{DYNAMO_TEMP_LOCATION}attribute_definitions-{dynamo_db_table.environment}.json" + try: + dynamo_db_table.tags = json.loads( + load_from_file( + f"{DYNAMO_TEMP_LOCATION}tags-{dynamo_db_table.environment}.json" + ) ) - ) - dynamo_db_table.key_schema = json.loads( - load_from_file( - f"{DYNAMO_TEMP_LOCATION}key_schema-{dynamo_db_table.environment}.json" + dynamo_db_table.attribute_definitions = json.loads( + load_from_file( + f"{DYNAMO_TEMP_LOCATION}attribute_definitions-{dynamo_db_table.environment}.json" + ) + ) + dynamo_db_table.key_schema = json.loads( + load_from_file( + f"{DYNAMO_TEMP_LOCATION}key_schema-{dynamo_db_table.environment}.json" + ) + ) + dynamo_db_table.table_arn = load_from_file( + f"{DYNAMO_TEMP_LOCATION}table_arn-{dynamo_db_table.environment}.json" ) - ) - dynamo_db_table.table_arn = load_from_file( - f"{DYNAMO_TEMP_LOCATION}table_arn-{dynamo_db_table.environment}.json" - ) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON from backup files: {e}") + raise ValueError(f"Corrupted DynamoDB backup files: {e}") _cached_dynamo_helper: "DynamoDBHelper | None" = None diff --git a/utils/s3_config_manager.py b/utils/s3_config_manager.py index cd43ca49..7c2da752 100644 --- a/utils/s3_config_manager.py +++ b/utils/s3_config_manager.py @@ -93,11 +93,8 @@ def delete_all(self) -> None: logger.warning("📭 Nothing to delete.") self._uploaded_configs.clear() - def upload_all_configs(self, local_paths: list[Path]) -> None: - desired_filenames = [p.name for p in local_paths] - desired_keys = {self._s3_key(name) for name in desired_filenames} - - # Resolve all configs locally first + def _resolve_local_configs(self, local_paths: list[Path]) -> dict[str, str]: + """Helper to read and resolve local JSON configs safely.""" resolved_configs: dict[str, str] = {} for path in local_paths: filename = path.name @@ -105,30 +102,26 @@ def upload_all_configs(self, local_paths: list[Path]) -> None: logger.debug("🔧 Resolving placeholders in config: %s", filename) - with path.open() as f: - raw_data = json.load(f) + try: + with path.open() as f: + raw_data = json.load(f) + except (OSError, IOError) as e: + logger.error("Failed to read config file %s: %s", path, e) + continue + except json.JSONDecodeError as e: + logger.error("Invalid JSON in config file %s: %s", path, e) + continue resolved = resolve_placeholders_in_data(raw_data, filename) resolved_configs[s3_key] = json.dumps(resolved, indent=2) - # Check if the desired state matches what we've already uploaded - if ( - self._uploaded_configs - and set(self._uploaded_configs.keys()) == desired_keys - and all( - self._uploaded_configs.get(k) == resolved_configs.get(k) - for k in desired_keys - ) - ): - logger.debug("⏭️ S3 configs unchanged since last upload. Skipping.") - return + return resolved_configs - # Delete stale keys not in the desired set + def _delete_stale_keys(self, desired_keys: set[str]) -> None: + """Helper to remove stale keys from S3 and local cache.""" if self._uploaded_configs: - # Use in-memory state — no list API call needed stale_keys = [k for k in self._uploaded_configs if k not in desired_keys] else: - # First call — check S3 for any pre-existing objects existing_keys = self._list_existing_keys() stale_keys = [k for k in existing_keys if k not in desired_keys] @@ -137,7 +130,25 @@ def upload_all_configs(self, local_paths: list[Path]) -> None: for k in stale_keys: self._uploaded_configs.pop(k, None) - # Upload configs that are new or changed + def upload_all_configs(self, local_paths: list[Path]) -> None: + desired_filenames = [p.name for p in local_paths] + desired_keys = {self._s3_key(name) for name in desired_filenames} + + resolved_configs = self._resolve_local_configs(local_paths) + + if ( + self._uploaded_configs + and set(self._uploaded_configs.keys()) == desired_keys + and all( + self._uploaded_configs.get(k) == resolved_configs.get(k) + for k in desired_keys + ) + ): + logger.debug("⏭️ S3 configs unchanged since last upload. Skipping.") + return + + self._delete_stale_keys(desired_keys) + for s3_key, resolved_json_str in resolved_configs.items(): if self._uploaded_configs.get(s3_key) == resolved_json_str: logger.debug("✅ Config '%s' unchanged. Skipping upload.", s3_key) From f65987a20e7cf946b2ccf414be4383c9a2d02b88 Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Tue, 24 Mar 2026 16:09:10 +0000 Subject: [PATCH 03/12] CHORE: [ELI-751] Added Error handling performance. --- tests/performance_tests/test_performance_tests.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/performance_tests/test_performance_tests.py b/tests/performance_tests/test_performance_tests.py index 1d660d08..4334ce96 100644 --- a/tests/performance_tests/test_performance_tests.py +++ b/tests/performance_tests/test_performance_tests.py @@ -296,6 +296,7 @@ def test_locust_run_and_csv_exists( perf_spawn_rate, xray_sampling_rate, ): + global proc custom_env = os.environ.copy() custom_env["BASE_URL"] = eligibility_client.api_url @@ -310,7 +311,14 @@ def test_locust_run_and_csv_exists( start_time = datetime.now(timezone.utc) logging.warning("LOCUST TEST STARTING: start_time=%s", start_time) - proc = _run_locust(locust_command, env=custom_env) + try: + proc = _run_locust(locust_command, env=custom_env) + except FileNotFoundError: + pytest.fail( + "Locust executable not found. Ensure locust is installed and in PATH." + ) + except subprocess.SubprocessError as e: + pytest.fail(f"Failed to execute Locust subprocess: {e}") end_time = datetime.now(timezone.utc) logging.warning("LOCUST TEST FINISHED: end_time=%s", end_time) From b6b8d8f59c11e0af1712f7360080d436e694ff03 Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Tue, 24 Mar 2026 16:33:35 +0000 Subject: [PATCH 04/12] CHORE: [ELI-751] Added Error handling locust. --- tests/performance_tests/locust.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/tests/performance_tests/locust.py b/tests/performance_tests/locust.py index 306caf6a..64b4f553 100644 --- a/tests/performance_tests/locust.py +++ b/tests/performance_tests/locust.py @@ -1,6 +1,7 @@ import ast import csv import os +import sys from pathlib import Path import urllib3 @@ -18,10 +19,22 @@ def _(parser): ) -with open("temp/nhs_numbers.csv", newline="") as csvFile: - reader = csv.reader(csvFile) - next(reader, None) - csvData = list(reader) +csvData = [] +try: + with open("temp/nhs_numbers.csv", newline="") as csvFile: + reader = csv.reader(csvFile) + next(reader, None) # Skip header + csvData = list(reader) + + if not csvData: + print("Error: temp/nhs_numbers.csv is empty.", file=sys.stderr) +except FileNotFoundError: + print( + "Error: temp/nhs_numbers.csv not found. Ensure test data is generated.", + file=sys.stderr, + ) +except (OSError, IOError) as e: + print(f"Error reading temp/nhs_numbers.csv: {e}", file=sys.stderr) # Class for API execution From 62a35e5cb0383fdf3cb87dbfa966c39dde988931 Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Wed, 25 Mar 2026 14:27:49 +0000 Subject: [PATCH 05/12] CHORE: [ELI-751] Added Error handling final. --- tests/performance_tests/validate_inputs.py | 14 ++++++++-- tests/test_in_progress.py | 5 ++-- tests/test_story_tests.py | 5 +++- utils/common_utils.py | 30 ++++++++++++++++------ 4 files changed, 41 insertions(+), 13 deletions(-) diff --git a/tests/performance_tests/validate_inputs.py b/tests/performance_tests/validate_inputs.py index e923b4ae..275bc1d5 100644 --- a/tests/performance_tests/validate_inputs.py +++ b/tests/performance_tests/validate_inputs.py @@ -45,8 +45,18 @@ def parse_run_time_to_seconds(run_time: str) -> int: def main() -> None: - users = int(get_env("USERS")) - spawn_rate = int(get_env("SPAWN_RATE")) + global users, spawn_rate + + try: + users = int(get_env("USERS")) + except ValueError: + fail("Invalid input", "USERS must be an integer.") + + try: + spawn_rate = int(get_env("SPAWN_RATE")) + except ValueError: + fail("Invalid input", "SPAWN_RATE must be an integer.") + run_time = get_env("RUN_TIME") validate_range("users", users, 1, MAX_USERS) diff --git a/tests/test_in_progress.py b/tests/test_in_progress.py index 59962b4a..f80650be 100644 --- a/tests/test_in_progress.py +++ b/tests/test_in_progress.py @@ -32,8 +32,9 @@ def test_run_in_progress_tests( actual_response = eligibility_client.make_request( nhs_number, headers=request_headers, query_params=query_params, strict_ssl=False ) - expected_response = all_expected_responses.get(filename).get("response_items", {}) - + expected_response = all_expected_responses.get(filename, {}).get( + "response_items", {} + ) expected_response_code = expected_response_code or http.HTTPStatus.OK assert actual_response["status_code"] == expected_response_code diff --git a/tests/test_story_tests.py b/tests/test_story_tests.py index 6b567524..92c7c6e6 100644 --- a/tests/test_story_tests.py +++ b/tests/test_story_tests.py @@ -36,7 +36,10 @@ def test_run_story_test_cases( query_params=query_params, strict_ssl=False, ) - expected_response = all_expected_responses.get(filename).get("response_items", {}) + + expected_response = all_expected_responses.get(filename, {}).get( + "response_items", {} + ) expected_response_code = expected_response_code or http.HTTPStatus.OK assert actual_response["status_code"] == expected_response_code diff --git a/utils/common_utils.py b/utils/common_utils.py index 911fed78..06d6b3a8 100644 --- a/utils/common_utils.py +++ b/utils/common_utils.py @@ -1,16 +1,30 @@ +import logging from pathlib import Path +logger = logging.getLogger(__name__) + def save_to_file(file_name: str, data, directory: str = None): - if directory is not None: - Path.mkdir(Path(directory), parents=True, exist_ok=True) - else: - directory = Path.cwd() + try: + if directory is not None: + Path.mkdir(Path(directory), parents=True, exist_ok=True) + else: + directory = Path.cwd() - with Path.open(Path(directory) / file_name, "w") as f: - f.write(data) + with Path.open(Path(directory) / file_name, "w", encoding="utf-8") as f: + f.write(data) + except (OSError, IOError) as e: + logger.error("Failed to save file %s to %s: %s", file_name, directory, e) + raise def load_from_file(file_name): - with Path.open(file_name, "r") as f: - return f.read() + try: + with Path.open(Path(file_name), "r", encoding="utf-8") as f: + return f.read() + except FileNotFoundError: + logger.error("File not found: %s", file_name) + raise + except (OSError, IOError) as e: + logger.error("Failed to read file %s: %s", file_name, e) + raise From e90c8627d3b1a28479ef5d8aec2f9ed98f71f320 Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Mon, 30 Mar 2026 13:57:09 +0100 Subject: [PATCH 06/12] CHORE: [ELI-751] Added Error handling test passing. --- Makefile | 3 + tests/performance_tests/validate_inputs.py | 38 +++++- tests/test_error_handling_utils.py | 136 +++++++++++++++++++++ utils/common_utils.py | 39 ++++-- utils/data_helper.py | 101 +++++++++++---- utils/dynamo_helper.py | 11 +- 6 files changed, 287 insertions(+), 41 deletions(-) create mode 100644 tests/test_error_handling_utils.py diff --git a/Makefile b/Makefile index 249700a4..83d3dbe4 100644 --- a/Makefile +++ b/Makefile @@ -97,3 +97,6 @@ endif run-vita-preprod-tests: poetry run pytest --env=preprod --log-cli-level=info tests/test_vita_integration_tests.py tests/test_upload_consumer_configs.py + +run-unit-tests: guard-env guard-log_level + poetry run pytest --env=${env} --log-cli-level=${log_level} tests/test_error_handling_utils.py -v diff --git a/tests/performance_tests/validate_inputs.py b/tests/performance_tests/validate_inputs.py index 275bc1d5..c4190248 100644 --- a/tests/performance_tests/validate_inputs.py +++ b/tests/performance_tests/validate_inputs.py @@ -8,11 +8,25 @@ def fail(title: str, message: str) -> None: + """Print a GitHub Actions error annotation and exit with code 1. + + Args: + title: Short error title shown in the annotation. + message: Detailed error message. + """ print(f"::error title={title}::{message}") sys.exit(1) def get_env(name: str) -> str: + """Return the value of environment variable ``name``, or call ``fail`` if unset. + + Args: + name: Environment variable name. + + Returns: + The variable's string value. + """ value = os.getenv(name) if not value: fail("Missing input", f"{name} was not provided.") @@ -20,6 +34,14 @@ def get_env(name: str) -> str: def validate_range(name: str, value: int, min_value: int, max_value: int) -> None: + """Call ``fail`` if ``value`` is outside [``min_value``, ``max_value``]. + + Args: + name: Human-readable parameter name used in the error message. + value: Integer value to check. + min_value: Inclusive lower bound. + max_value: Inclusive upper bound. + """ if value < min_value or value > max_value: fail( f"{name} out of range", @@ -28,6 +50,16 @@ def validate_range(name: str, value: int, min_value: int, max_value: int) -> Non def parse_run_time_to_seconds(run_time: str) -> int: + """Parse a run-time string such as ``"30s"`` or ``"5m"`` into seconds. + + Only seconds (``s``) and minutes (``m``) are accepted. + + Args: + run_time: Run-time string to parse. + + Returns: + Equivalent number of seconds. + """ match = re.fullmatch(r"(\d+)([sm])", run_time) if not match: fail( @@ -35,6 +67,7 @@ def parse_run_time_to_seconds(run_time: str) -> int: "run_time must look like 30s or 5m (seconds or minutes only). " f"Got: '{run_time}'", ) + return 0 value = int(match.group(1)) unit = match.group(2) @@ -45,17 +78,18 @@ def parse_run_time_to_seconds(run_time: str) -> int: def main() -> None: - global users, spawn_rate - + """Validate all performance test inputs and exit with a non-zero code on failure.""" try: users = int(get_env("USERS")) except ValueError: fail("Invalid input", "USERS must be an integer.") + return try: spawn_rate = int(get_env("SPAWN_RATE")) except ValueError: fail("Invalid input", "SPAWN_RATE must be an integer.") + return run_time = get_env("RUN_TIME") diff --git a/tests/test_error_handling_utils.py b/tests/test_error_handling_utils.py new file mode 100644 index 00000000..f1ac48e6 --- /dev/null +++ b/tests/test_error_handling_utils.py @@ -0,0 +1,136 @@ +import os +from unittest.mock import MagicMock, patch + +import pytest +from botocore.exceptions import ClientError + +from utils.data_helper import load_all_expected_responses, load_all_test_scenarios +from utils.dynamo_helper import file_backup_exists +from utils.eligibility_api_client import EligibilityApiClient +from tests.performance_tests.validate_inputs import parse_run_time_to_seconds + +# --------------------------------------------------------------------------- +# 1. data_helper.py — load_all_expected_responses +# --------------------------------------------------------------------------- + + +def test_load_all_expected_responses_skips_bad_json(tmp_path): + """A corrupt JSON file is skipped; valid files in the same folder still load.""" + (tmp_path / "bad.json").write_text("NOT JSON", encoding="utf-8") + (tmp_path / "good.json").write_text('{"id": "123", "data": "ok"}', encoding="utf-8") + + result = load_all_expected_responses(tmp_path) + + assert "good.json" in result + assert "bad.json" not in result + + +# --------------------------------------------------------------------------- +# 2. data_helper.py — load_all_test_scenarios +# --------------------------------------------------------------------------- + + +def test_load_all_test_scenarios_skips_missing_data_key(tmp_path): + """A scenario file that has no 'data' key is skipped; the result is an empty dict. + + The TemplateEngine is mocked so that no real template files are needed. + Its apply() return value is configured to return the input unchanged so + that placeholder resolution does not fail if the 'data' key were present. + """ + (tmp_path / "scenario.json").write_text('{"other_key": "value"}', encoding="utf-8") + + mock_engine = MagicMock() + mock_engine.apply.side_effect = lambda data: data # pass-through + + with patch("utils.data_helper.TemplateEngine.create", return_value=mock_engine): + result = load_all_test_scenarios(tmp_path) + + assert result == {} + + +def test_load_all_test_scenarios_skips_bad_json(tmp_path): + """A scenario file with invalid JSON is skipped; the result is an empty dict.""" + (tmp_path / "scenario.json").write_text("{ bad json", encoding="utf-8") + + mock_engine = MagicMock() + mock_engine.apply.side_effect = lambda data: data + + with patch("utils.data_helper.TemplateEngine.create", return_value=mock_engine): + result = load_all_test_scenarios(tmp_path) + + assert result == {} + + +# --------------------------------------------------------------------------- +# 3. dynamo_helper.py — file_backup_exists +# --------------------------------------------------------------------------- + + +@patch("utils.dynamo_helper.load_from_file") +def test_file_backup_exists_catches_json_decode_error(mock_load_from_file): + """file_backup_exists returns False (not an exception) when a backup file is corrupted.""" + # All load_from_file calls return a string that is not valid JSON + mock_load_from_file.return_value = "NOT JSON" + + mock_db = MagicMock() + mock_db.environment = "test" + + result = file_backup_exists(mock_db) + + assert result is False + + +@patch.dict(os.environ, {"BASE_URL": "http://localhost"}, clear=True) +@patch("utils.eligibility_api_client.boto3.client") +def test_api_client_handles_ssm_client_error(mock_boto_client): + """A boto3 ClientError from SSM is wrapped in a descriptive RuntimeError.""" + mock_ssm = MagicMock() + error_response = { + "Error": {"Code": "AccessDeniedException", "Message": "Access Denied"} + } + mock_ssm.get_parameter.side_effect = ClientError(error_response, "GetParameter") + mock_boto_client.return_value = mock_ssm + + # Force _ensure_certs_present to think no certs exist on disk + with patch("pathlib.Path.exists", return_value=False): + with pytest.raises(RuntimeError, match="Error retrieving .* from SSM"): + EligibilityApiClient() + + +# --------------------------------------------------------------------------- +# 4. validate_inputs.py — parse_run_time_to_seconds and main() +# --------------------------------------------------------------------------- + + +def test_parse_run_time_to_seconds_valid(): + """Seconds and minutes are parsed correctly.""" + assert parse_run_time_to_seconds("30s") == 30 + assert parse_run_time_to_seconds("5m") == 300 + + +@patch("tests.performance_tests.validate_inputs.fail") +def test_parse_run_time_to_seconds_invalid(mock_fail): + """An unsupported unit calls fail() with the right message and returns 0 (not an exception).""" + result = parse_run_time_to_seconds("10h") + + mock_fail.assert_called_once_with( + "Invalid run_time format", + "run_time must look like 30s or 5m (seconds or minutes only). Got: '10h'", + ) + # After fail() is mocked (no sys.exit), the function returns the sentinel 0 + assert result == 0 + + +@patch("tests.performance_tests.validate_inputs.get_env") +@patch("tests.performance_tests.validate_inputs.fail") +def test_validate_inputs_main_catches_value_error(mock_fail, mock_get_env): + """A non-integer USERS value calls fail() and returns early — no NameError or crash.""" + from tests.performance_tests.validate_inputs import main + + # USERS returns a non-integer; all other env vars return a valid integer string + mock_get_env.side_effect = lambda name: "NOT_AN_INT" if name == "USERS" else "1" + + # Should complete without raising any exception + main() + + mock_fail.assert_called_once_with("Invalid input", "USERS must be an integer.") diff --git a/utils/common_utils.py b/utils/common_utils.py index 06d6b3a8..5642a041 100644 --- a/utils/common_utils.py +++ b/utils/common_utils.py @@ -4,23 +4,42 @@ logger = logging.getLogger(__name__) -def save_to_file(file_name: str, data, directory: str = None): - try: - if directory is not None: - Path.mkdir(Path(directory), parents=True, exist_ok=True) - else: - directory = Path.cwd() +def save_to_file(file_name: str, data: str, directory: str | None = None) -> None: + """Write ``data`` to ``directory/file_name``, creating the directory if needed. + + Args: + file_name: Name of the file to write. + data: String content to write. + directory: Optional directory path. Defaults to the current working directory. - with Path.open(Path(directory) / file_name, "w", encoding="utf-8") as f: + Raises: + OSError: If the file cannot be written. + """ + target_dir = Path(directory) if directory is not None else Path.cwd() + try: + target_dir.mkdir(parents=True, exist_ok=True) + with (target_dir / file_name).open("w", encoding="utf-8") as f: f.write(data) except (OSError, IOError) as e: - logger.error("Failed to save file %s to %s: %s", file_name, directory, e) + logger.error("Failed to save file %s to %s: %s", file_name, target_dir, e) raise -def load_from_file(file_name): +def load_from_file(file_name: str) -> str: + """Read and return the full text content of ``file_name``. + + Args: + file_name: Path to the file to read. + + Returns: + The file contents as a string. + + Raises: + FileNotFoundError: If the file does not exist. + OSError: If the file cannot be read. + """ try: - with Path.open(Path(file_name), "r", encoding="utf-8") as f: + with Path(file_name).open("r", encoding="utf-8") as f: return f.read() except FileNotFoundError: logger.error("File not found: %s", file_name) diff --git a/utils/data_helper.py b/utils/data_helper.py index f36180a4..0d4c9977 100644 --- a/utils/data_helper.py +++ b/utils/data_helper.py @@ -19,6 +19,8 @@ AWS_REGION = "eu-west-2" logger = logging.getLogger(__name__) +_DEFAULT_PRODUCT_ID = "Story_Test_Consumer_ID" + def initialise_tests(folder): folder_path = Path(folder).resolve() @@ -164,6 +166,7 @@ def load_all_expected_responses(folder_path): def _load_json_file(path: Path): + """Read and parse a JSON file, returning None on any read or parse error.""" try: with path.open() as f: return json.load(f) @@ -175,12 +178,14 @@ def _load_json_file(path: Path): def _ensure_default_product_id(request_headers: dict) -> dict: + """Add the default NHSE-Product-ID header if not already present.""" if not any(k == "NHSE-Product-ID" for k in request_headers): - request_headers["NHSE-Product-ID"] = "Story_Test_Consumer_ID" + request_headers["NHSE-Product-ID"] = _DEFAULT_PRODUCT_ID return request_headers def _build_test_scenario_entry(raw_json: dict, resolved_data, path_name: str) -> dict: + """Construct the standard scenario dict from resolved template data.""" return { "dynamo_items": resolved_data, "nhs_number": extract_nhs_number_from_data(resolved_data), @@ -195,56 +200,98 @@ def _build_test_scenario_entry(raw_json: dict, resolved_data, path_name: str) -> } +def _process_single_scenario(path: Path, data_builder: TemplateEngine) -> dict | None: + """Parse and resolve a single scenario JSON file. + + Extracted from ``load_all_test_scenarios`` to keep cyclomatic complexity + within the flake8 C901 limit of 10. + + Args: + path: Path to the scenario JSON file. + data_builder: Initialized TemplateEngine instance. + + Returns: + Resolved scenario dict, or ``None`` if the file cannot be processed. + """ + raw_json = _load_json_file(path) + if raw_json is None: + return None + + scenario_data = raw_json.get("data") + if scenario_data is None: + logger.error("Missing required 'data' key in scenario file %s", path) + return None + + try: + templated_data = data_builder.apply(scenario_data) + except ValueError as e: + logger.error("Template application failed for %s: %s", path, e) + return None + + resolved_data = resolve_placeholders_in_data(templated_data, path.name) + return _build_test_scenario_entry(raw_json, resolved_data, path.name) + + def load_all_test_scenarios(folder_path): - all_data = {} + """Load, template-expand, and resolve all scenario JSON files in ``folder_path``. + + Files are processed in alphabetical order. Files that cannot be read, + parsed, or template-expanded are skipped with an error log entry. + + Args: + folder_path: Directory containing scenario JSON files. + + Returns: + Dict mapping filenames to resolved scenario dicts. + """ + all_data: dict = {} try: data_builder = TemplateEngine.create() except Exception as e: - logger.error("Failed to initialise template engine: %s", e) + logger.error("Failed to initialize TemplateEngine: %s", e) raise - # Sort files alphabetically by filename for path in sorted(Path(folder_path).iterdir(), key=lambda p: p.name.lower()): if path.suffix != ".json": continue - raw_json = _load_json_file(path) - if raw_json is None: - continue - - try: - templated_data = data_builder.apply(raw_json["data"]) - except ValueError as e: - logger.error("Failed to apply template to test scenario %s: %s", path, e) - continue - - # Resolve placeholders - resolved_data = resolve_placeholders_in_data(templated_data, path.name) - - # Add resolved scenario - all_data[path.name] = _build_test_scenario_entry( - raw_json, resolved_data, path.name - ) + scenario_result = _process_single_scenario(path, data_builder) + if scenario_result is not None: + all_data[path.name] = scenario_result return all_data def load_data_items_to_dynamo(folder_path): + """Load raw DynamoDB items from JSON files and insert them immediately. + + Each file must contain a top-level ``"data"`` key with a list of item dicts. + Files that cannot be read or parsed are skipped with an error log entry. + Args: + folder_path: Directory containing data JSON files. + """ for path in Path(folder_path).iterdir(): if path.suffix != ".json": continue - with path.open() as f: - raw_json = json.load(f) + try: + with path.open() as f: + raw_json = json.load(f) + except (OSError, IOError) as e: + logger.error("Failed to read data file %s: %s", path, e) + continue + except json.JSONDecodeError as e: + logger.error("Invalid JSON in data file %s: %s", path, e) + continue - raw_data = raw_json["data"] + raw_data = raw_json.get("data") + if raw_data is None: + logger.error("Missing required 'data' key in file %s", path) + continue - # Resolve placeholders with shared DTO resolved_data = resolve_placeholders_in_data(raw_data, path.name) - - # Insert immediately insert_into_dynamo(resolved_data) diff --git a/utils/dynamo_helper.py b/utils/dynamo_helper.py index 905aa107..de0b5ad9 100644 --- a/utils/dynamo_helper.py +++ b/utils/dynamo_helper.py @@ -172,6 +172,12 @@ def restore_tags_to_table(dynamo_db_table: DynamoDBHelper): def file_backup_exists(dynamo_db_table: DynamoDBHelper): + """Return True if all required backup files exist and contain valid JSON. + + Args: + dynamo_db_table: DynamoDBHelper instance whose environment is used to + locate the backup files. + """ try: json.loads( load_from_file( @@ -192,10 +198,11 @@ def file_backup_exists(dynamo_db_table: DynamoDBHelper): f"{DYNAMO_TEMP_LOCATION}table_arn-{dynamo_db_table.environment}.json" ) return True - except FileNotFoundError: + except (FileNotFoundError, OSError): return False except json.JSONDecodeError as e: - logger.warning(f"Corrupted backup file detected: {e}") + logger.warning("Corrupted backup file detected: %s", e) + return False def reset_dynamo_tables(): From 2fc4c810cb6848080b2dd994f3b77ee099d4bc8a Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Mon, 30 Mar 2026 16:42:46 +0100 Subject: [PATCH 07/12] CHORE: [ELI-751] fixed Quality Gate issues. --- utils/data_helper.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/utils/data_helper.py b/utils/data_helper.py index 0d4c9977..b926beee 100644 --- a/utils/data_helper.py +++ b/utils/data_helper.py @@ -180,7 +180,13 @@ def _load_json_file(path: Path): def _ensure_default_product_id(request_headers: dict) -> dict: """Add the default NHSE-Product-ID header if not already present.""" if not any(k == "NHSE-Product-ID" for k in request_headers): - request_headers["NHSE-Product-ID"] = _DEFAULT_PRODUCT_ID + request_headers["NHSE-Product-ID"] = "test-Story_Test_Consumer_ID" + if request_headers["NHSE-Product-ID"] == "P.XWA-VFF": + request_headers["NHSE-Product-ID"] = "test-P.XWA-VFF" + if request_headers["NHSE-Product-ID"] == "P.WTJ-FJT": + request_headers["NHSE-Product-ID"] = "test-P.WTJ-FJT" + if request_headers["NHSE-Product-ID"] == "Story_Test_Consumer_ID": + request_headers["NHSE-Product-ID"] = "test-Story_Test_Consumer_ID" return request_headers From 750f204fc0f2a4b778aad850a446e0954d660a52 Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Tue, 31 Mar 2026 08:52:41 +0100 Subject: [PATCH 08/12] CHORE: [ELI-751] fixed Quality Gate issues. --- utils/data_helper.py | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/utils/data_helper.py b/utils/data_helper.py index b926beee..19228ddb 100644 --- a/utils/data_helper.py +++ b/utils/data_helper.py @@ -190,7 +190,7 @@ def _ensure_default_product_id(request_headers: dict) -> dict: return request_headers -def _build_test_scenario_entry(raw_json: dict, resolved_data, path_name: str) -> dict: +def _build_test_scenario_entry(raw_json: dict, resolved_data) -> dict: """Construct the standard scenario dict from resolved template data.""" return { "dynamo_items": resolved_data, @@ -207,18 +207,6 @@ def _build_test_scenario_entry(raw_json: dict, resolved_data, path_name: str) -> def _process_single_scenario(path: Path, data_builder: TemplateEngine) -> dict | None: - """Parse and resolve a single scenario JSON file. - - Extracted from ``load_all_test_scenarios`` to keep cyclomatic complexity - within the flake8 C901 limit of 10. - - Args: - path: Path to the scenario JSON file. - data_builder: Initialized TemplateEngine instance. - - Returns: - Resolved scenario dict, or ``None`` if the file cannot be processed. - """ raw_json = _load_json_file(path) if raw_json is None: return None @@ -235,7 +223,7 @@ def _process_single_scenario(path: Path, data_builder: TemplateEngine) -> dict | return None resolved_data = resolve_placeholders_in_data(templated_data, path.name) - return _build_test_scenario_entry(raw_json, resolved_data, path.name) + return _build_test_scenario_entry(raw_json, resolved_data) def load_all_test_scenarios(folder_path): From fecbbc1eae64862f125ca1dfffbc3c1aa4d2dafa Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Tue, 31 Mar 2026 09:04:43 +0100 Subject: [PATCH 09/12] CHORE: [ELI-751] fixed Quality Gate issues. --- utils/dynamo_helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/dynamo_helper.py b/utils/dynamo_helper.py index de0b5ad9..485c52a7 100644 --- a/utils/dynamo_helper.py +++ b/utils/dynamo_helper.py @@ -198,7 +198,7 @@ def file_backup_exists(dynamo_db_table: DynamoDBHelper): f"{DYNAMO_TEMP_LOCATION}table_arn-{dynamo_db_table.environment}.json" ) return True - except (FileNotFoundError, OSError): + except OSError: return False except json.JSONDecodeError as e: logger.warning("Corrupted backup file detected: %s", e) From a94edc43515bb6bc30901e9be0f26ddf9d907331 Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Tue, 31 Mar 2026 14:52:18 +0100 Subject: [PATCH 10/12] CHORE: [ELI-751] fixed data_helper. --- utils/data_helper.py | 103 +++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 58 deletions(-) diff --git a/utils/data_helper.py b/utils/data_helper.py index 19228ddb..a9f908d7 100644 --- a/utils/data_helper.py +++ b/utils/data_helper.py @@ -19,7 +19,7 @@ AWS_REGION = "eu-west-2" logger = logging.getLogger(__name__) -_DEFAULT_PRODUCT_ID = "Story_Test_Consumer_ID" +_DEFAULT_PRODUCT_ID = "test-Story_Test_Consumer_ID" def initialise_tests(folder): @@ -147,6 +147,7 @@ def load_all_expected_responses(folder_path): for path in Path(folder_path).iterdir(): if path.suffix != ".json": continue + try: with path.open() as f: raw_json = json.load(f) @@ -165,50 +166,48 @@ def load_all_expected_responses(folder_path): return all_data -def _load_json_file(path: Path): - """Read and parse a JSON file, returning None on any read or parse error.""" - try: - with path.open() as f: - return json.load(f) - except (OSError, IOError) as e: - logger.error("Failed to read test scenario file %s: %s", path, e) - except json.JSONDecodeError as e: - logger.error("Invalid JSON in test scenario file %s: %s", path, e) - return None - - -def _ensure_default_product_id(request_headers: dict) -> dict: +def _ensure_default_product_id(request_headers: dict, cached_test: bool) -> dict: """Add the default NHSE-Product-ID header if not already present.""" - if not any(k == "NHSE-Product-ID" for k in request_headers): - request_headers["NHSE-Product-ID"] = "test-Story_Test_Consumer_ID" - if request_headers["NHSE-Product-ID"] == "P.XWA-VFF": - request_headers["NHSE-Product-ID"] = "test-P.XWA-VFF" - if request_headers["NHSE-Product-ID"] == "P.WTJ-FJT": - request_headers["NHSE-Product-ID"] = "test-P.WTJ-FJT" - if request_headers["NHSE-Product-ID"] == "Story_Test_Consumer_ID": - request_headers["NHSE-Product-ID"] = "test-Story_Test_Consumer_ID" + if cached_test: + return request_headers + + product_id = request_headers.get("NHSE-Product-ID") + if not product_id: + request_headers["NHSE-Product-ID"] = _DEFAULT_PRODUCT_ID + else: + product_id = str(product_id).strip() + if not product_id.startswith("test-"): + request_headers["NHSE-Product-ID"] = f"test-{product_id}" + else: + request_headers["NHSE-Product-ID"] = product_id return request_headers -def _build_test_scenario_entry(raw_json: dict, resolved_data) -> dict: +def _build_test_scenario_entry(raw_json: dict, resolved_data, path_name: str) -> dict: """Construct the standard scenario dict from resolved template data.""" return { "dynamo_items": resolved_data, "nhs_number": extract_nhs_number_from_data(resolved_data), "config_filenames": raw_json.get("config_filenames"), "expected_response_code": raw_json.get("expected_response_code"), - "request_headers": _ensure_default_product_id( - raw_json.get("request_headers") or {} - ), + "request_headers": raw_json.get("request_headers") or {}, "query_params": raw_json.get("query_params"), "scenario_name": raw_json.get("scenario_name"), "secret_version": raw_json.get("secret_version"), } -def _process_single_scenario(path: Path, data_builder: TemplateEngine) -> dict | None: - raw_json = _load_json_file(path) - if raw_json is None: +def _process_single_scenario( + path: Path, data_builder: TemplateEngine, cached_test: bool +) -> dict | None: + try: + with path.open() as f: + raw_json = json.load(f) + except (OSError, IOError) as e: + logger.error("Failed to read test scenario file %s: %s", path, e) + return None + except json.JSONDecodeError as e: + logger.error("Invalid JSON in test scenario file %s: %s", path, e) return None scenario_data = raw_json.get("data") @@ -222,35 +221,28 @@ def _process_single_scenario(path: Path, data_builder: TemplateEngine) -> dict | logger.error("Template application failed for %s: %s", path, e) return None - resolved_data = resolve_placeholders_in_data(templated_data, path.name) - return _build_test_scenario_entry(raw_json, resolved_data) - + request_headers = raw_json.get("request_headers") or {} + raw_json["request_headers"] = _ensure_default_product_id( + request_headers, cached_test + ) -def load_all_test_scenarios(folder_path): - """Load, template-expand, and resolve all scenario JSON files in ``folder_path``. + resolved_data = resolve_placeholders_in_data(templated_data, path.name) - Files are processed in alphabetical order. Files that cannot be read, - parsed, or template-expanded are skipped with an error log entry. + return _build_test_scenario_entry(raw_json, resolved_data, path.name) - Args: - folder_path: Directory containing scenario JSON files. - Returns: - Dict mapping filenames to resolved scenario dicts. - """ - all_data: dict = {} +def load_all_test_scenarios(folder_path): + all_data = {} - try: - data_builder = TemplateEngine.create() - except Exception as e: - logger.error("Failed to initialize TemplateEngine: %s", e) - raise + data_builder = TemplateEngine.create() + cached_test = "performance" in folder_path.name.lower() + # Sort files alphabetically by filename for path in sorted(Path(folder_path).iterdir(), key=lambda p: p.name.lower()): if path.suffix != ".json": continue - scenario_result = _process_single_scenario(path, data_builder) + scenario_result = _process_single_scenario(path, data_builder, cached_test) if scenario_result is not None: all_data[path.name] = scenario_result @@ -258,14 +250,6 @@ def load_all_test_scenarios(folder_path): def load_data_items_to_dynamo(folder_path): - """Load raw DynamoDB items from JSON files and insert them immediately. - - Each file must contain a top-level ``"data"`` key with a list of item dicts. - Files that cannot be read or parsed are skipped with an error log entry. - - Args: - folder_path: Directory containing data JSON files. - """ for path in Path(folder_path).iterdir(): if path.suffix != ".json": continue @@ -274,10 +258,10 @@ def load_data_items_to_dynamo(folder_path): with path.open() as f: raw_json = json.load(f) except (OSError, IOError) as e: - logger.error("Failed to read data file %s: %s", path, e) + logger.error("Failed to read data item file %s: %s", path, e) continue except json.JSONDecodeError as e: - logger.error("Invalid JSON in data file %s: %s", path, e) + logger.error("Invalid JSON in data item file %s: %s", path, e) continue raw_data = raw_json.get("data") @@ -285,7 +269,10 @@ def load_data_items_to_dynamo(folder_path): logger.error("Missing required 'data' key in file %s", path) continue + # Resolve placeholders with shared DTO resolved_data = resolve_placeholders_in_data(raw_data, path.name) + + # Insert immediately insert_into_dynamo(resolved_data) From 3f6729255f801ef16e118c1219d0849ef7a5fa51 Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Tue, 31 Mar 2026 15:12:08 +0100 Subject: [PATCH 11/12] CHORE: [ELI-751] fixed data_helper fixed sonarqube. --- utils/data_helper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/data_helper.py b/utils/data_helper.py index a9f908d7..bdd6ad11 100644 --- a/utils/data_helper.py +++ b/utils/data_helper.py @@ -183,7 +183,7 @@ def _ensure_default_product_id(request_headers: dict, cached_test: bool) -> dict return request_headers -def _build_test_scenario_entry(raw_json: dict, resolved_data, path_name: str) -> dict: +def _build_test_scenario_entry(raw_json: dict, resolved_data) -> dict: """Construct the standard scenario dict from resolved template data.""" return { "dynamo_items": resolved_data, @@ -228,7 +228,7 @@ def _process_single_scenario( resolved_data = resolve_placeholders_in_data(templated_data, path.name) - return _build_test_scenario_entry(raw_json, resolved_data, path.name) + return _build_test_scenario_entry(raw_json, resolved_data) def load_all_test_scenarios(folder_path): From a3556064804aa090d3ac03f07a5b584cce9e27dc Mon Sep 17 00:00:00 2001 From: Feyisayo Afolabi Date: Tue, 31 Mar 2026 15:48:18 +0100 Subject: [PATCH 12/12] CHORE: [ELI-751] fixed PR comments. --- .../test_performance_tests.py | 2 +- tests/performance_tests/validate_inputs.py | 47 +------------------ 2 files changed, 3 insertions(+), 46 deletions(-) diff --git a/tests/performance_tests/test_performance_tests.py b/tests/performance_tests/test_performance_tests.py index dc71246c..956a30f6 100644 --- a/tests/performance_tests/test_performance_tests.py +++ b/tests/performance_tests/test_performance_tests.py @@ -295,7 +295,7 @@ def test_locust_run_and_csv_exists( xray_sampling_rate, perf_mapping_upload, ): - global proc + custom_env = os.environ.copy() custom_env["BASE_URL"] = eligibility_client.api_url diff --git a/tests/performance_tests/validate_inputs.py b/tests/performance_tests/validate_inputs.py index c4190248..4cc6e29d 100644 --- a/tests/performance_tests/validate_inputs.py +++ b/tests/performance_tests/validate_inputs.py @@ -8,25 +8,11 @@ def fail(title: str, message: str) -> None: - """Print a GitHub Actions error annotation and exit with code 1. - - Args: - title: Short error title shown in the annotation. - message: Detailed error message. - """ print(f"::error title={title}::{message}") sys.exit(1) def get_env(name: str) -> str: - """Return the value of environment variable ``name``, or call ``fail`` if unset. - - Args: - name: Environment variable name. - - Returns: - The variable's string value. - """ value = os.getenv(name) if not value: fail("Missing input", f"{name} was not provided.") @@ -34,14 +20,6 @@ def get_env(name: str) -> str: def validate_range(name: str, value: int, min_value: int, max_value: int) -> None: - """Call ``fail`` if ``value`` is outside [``min_value``, ``max_value``]. - - Args: - name: Human-readable parameter name used in the error message. - value: Integer value to check. - min_value: Inclusive lower bound. - max_value: Inclusive upper bound. - """ if value < min_value or value > max_value: fail( f"{name} out of range", @@ -50,16 +28,6 @@ def validate_range(name: str, value: int, min_value: int, max_value: int) -> Non def parse_run_time_to_seconds(run_time: str) -> int: - """Parse a run-time string such as ``"30s"`` or ``"5m"`` into seconds. - - Only seconds (``s``) and minutes (``m``) are accepted. - - Args: - run_time: Run-time string to parse. - - Returns: - Equivalent number of seconds. - """ match = re.fullmatch(r"(\d+)([sm])", run_time) if not match: fail( @@ -78,19 +46,8 @@ def parse_run_time_to_seconds(run_time: str) -> int: def main() -> None: - """Validate all performance test inputs and exit with a non-zero code on failure.""" - try: - users = int(get_env("USERS")) - except ValueError: - fail("Invalid input", "USERS must be an integer.") - return - - try: - spawn_rate = int(get_env("SPAWN_RATE")) - except ValueError: - fail("Invalid input", "SPAWN_RATE must be an integer.") - return - + users = int(get_env("USERS")) + spawn_rate = int(get_env("SPAWN_RATE")) run_time = get_env("RUN_TIME") validate_range("users", users, 1, MAX_USERS)