|
1 | | -import boto3 |
| 1 | +import json |
| 2 | +import logging |
| 3 | +import os |
2 | 4 | import secrets |
3 | 5 | import string |
4 | | -import os |
5 | | -import logging |
6 | | -import json |
7 | 6 |
|
8 | | -SECRET_NAME = os.environ.get('SECRET_NAME') |
9 | | -REGION_NAME = os.environ.get('AWS_REGION') |
| 7 | +import boto3 |
| 8 | + |
| 9 | +SECRET_NAME = os.environ.get("SECRET_NAME") |
| 10 | +REGION_NAME = os.environ.get("AWS_REGION") |
10 | 11 |
|
11 | 12 | logger = logging.getLogger() |
12 | 13 | logger.setLevel(logging.INFO) |
13 | 14 |
|
| 15 | + |
14 | 16 | def generate_password(length=32): |
15 | 17 | """Generates a secure random password.""" |
16 | 18 | alphabet = string.ascii_letters + string.digits + "!@#$%^&*" |
17 | | - return ''.join(secrets.choice(alphabet) for i in range(length)) |
| 19 | + return "".join(secrets.choice(alphabet) for i in range(length)) |
| 20 | + |
18 | 21 |
|
19 | 22 | def lambda_handler(event, context): |
20 | | - sm_client = boto3.client('secretsmanager', region_name=REGION_NAME) |
| 23 | + sm_client = boto3.client("secretsmanager", region_name=REGION_NAME) |
21 | 24 |
|
22 | | - logger.info(json.dumps({ |
23 | | - 'event': 'rotation_started', |
24 | | - 'request_id': context.aws_request_id, |
25 | | - 'secret_name': SECRET_NAME, |
26 | | - 'function': 'create_pending_secret' |
27 | | - })) |
| 25 | + logger.info( |
| 26 | + json.dumps( |
| 27 | + { |
| 28 | + "event": "rotation_started", |
| 29 | + "request_id": context.aws_request_id, |
| 30 | + "secret_name": SECRET_NAME, |
| 31 | + "function": "create_pending_secret", |
| 32 | + } |
| 33 | + ) |
| 34 | + ) |
28 | 35 |
|
29 | 36 | try: |
30 | 37 | metadata = sm_client.describe_secret(SecretId=SECRET_NAME) |
31 | 38 | # Check if any version currently has the 'AWSPENDING' label |
32 | | - for version_id, stages in metadata.get('VersionIdsToStages', {}).items(): |
33 | | - if 'AWSPENDING' in stages: |
| 39 | + for version_id, stages in metadata.get("VersionIdsToStages", {}).items(): |
| 40 | + if "AWSPENDING" in stages: |
34 | 41 | msg = f"Pending version already exists with version_id: {version_id}." |
35 | 42 |
|
36 | | - logger.warning(json.dumps({ |
37 | | - 'event': 'rotation_aborted', |
38 | | - 'reason': 'pending_version_exists', |
39 | | - 'pending_version_id': version_id |
40 | | - })) |
| 43 | + logger.warning( |
| 44 | + json.dumps( |
| 45 | + { |
| 46 | + "event": "rotation_aborted", |
| 47 | + "reason": "pending_version_exists", |
| 48 | + "pending_version_id": version_id, |
| 49 | + } |
| 50 | + ) |
| 51 | + ) |
41 | 52 |
|
42 | 53 | raise Exception(msg) |
43 | 54 | except sm_client.exceptions.ResourceNotFoundException: |
44 | 55 | logger.info("Secret not found. Proceeding to create (assuming it will be initialized).") |
45 | | - pass |
46 | 56 |
|
47 | 57 | new_password = generate_password() |
48 | 58 |
|
49 | 59 | try: |
50 | | - resp = sm_client.put_secret_value( |
51 | | - SecretId=SECRET_NAME, |
52 | | - SecretString=new_password, |
53 | | - VersionStages=['AWSPENDING'] |
54 | | - ) |
| 60 | + resp = sm_client.put_secret_value(SecretId=SECRET_NAME, SecretString=new_password, VersionStages=["AWSPENDING"]) |
55 | 61 |
|
56 | | - logger.info(json.dumps({ |
57 | | - 'event': 'pending_version_created', |
58 | | - 'version_id': resp['VersionId'], |
59 | | - 'status': 'success' |
60 | | - })) |
61 | | - return { |
62 | | - "status": "success", |
63 | | - "secret_name": SECRET_NAME, |
64 | | - "version_id": resp['VersionId'] |
65 | | - } |
| 62 | + logger.info( |
| 63 | + json.dumps({"event": "pending_version_created", "version_id": resp["VersionId"], "status": "success"}) |
| 64 | + ) |
| 65 | + return {"status": "success", "secret_name": SECRET_NAME, "version_id": resp["VersionId"]} |
66 | 66 |
|
67 | 67 | except sm_client.exceptions.ResourceNotFoundException: |
68 | 68 | raise Exception(f"The secret '{SECRET_NAME}' was not found in region '{REGION_NAME}'.") |
69 | 69 | except Exception as e: |
70 | | - logger.error(json.dumps({ |
71 | | - 'event': 'rotation_failed', |
72 | | - 'error': str(e), |
73 | | - 'type': type(e).__name__ |
74 | | - })) |
75 | | - raise Exception(f"Error creating pending secret: {str(e)}") |
| 70 | + logger.error(json.dumps({"event": "rotation_failed", "error": str(e), "type": type(e).__name__})) |
| 71 | + raise Exception(f"Error creating pending secret: {e!s}") |
0 commit comments