diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index c916979..70b8537 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -9,6 +9,7 @@ on: branches: - develop - main + workflow_dispatch: permissions: id-token: write # This is required for requesting the JWT @@ -24,7 +25,26 @@ env: SMTP_SENDER: ${{ secrets.SMTP_SENDER }} jobs: - build: + + fetch-ecr-password: + runs-on: ubuntu-latest + permissions: + id-token: write + contents: read + steps: + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ secrets.OIDC_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + + - id: get-ecr-pw + run: | + echo "ECR_PW=$(aws ecr get-login-password --region ${{ env.AWS_REGION }} --output text)" >> "$GITHUB_OUTPUT" + outputs: + ECR_PW: ${{ steps.get-ecr-pw.outputs.ecr_pw }} + + set-environment-variables: runs-on: ubuntu-latest steps: - name: Select Environment @@ -45,9 +65,6 @@ jobs: echo "TMP_TASK_DEFINITION_ARN_PARAMETER_NAME=${{ vars.PROD_TMP_TASK_DEFINITION_ARN_PARAMETER_NAME }}" >> $GITHUB_ENV echo "CALLBACK_ENDPOINT_PARAMETER_NAME=${{ vars.PROD_CALLBACK_ENDPOINT_PARAMETER_NAME }}" >> $GITHUB_ENV echo "SES_SMTP_CREDENTIALS_SECRET_NAME=${{ vars.PROD_SES_SMTP_CREDENTIALS_SECRET_NAME }}" >> $GITHUB_ENV - echo "SMTP_USER=${{ secrets.PROD_SMTP_USER }}" >> $GITHUB_ENV - echo "SMTP_PASSWORD=${{ secrets.PROD_SMTP_PASSWORD }}" >> $GITHUB_ENV - echo "DD_API_KEY_SECRET_NAME=${{ secrets.PROD_DD_API_KEY_SECRET_NAME }}" >> $GITHUB_ENV elif [ "${{ github.ref }}" == "refs/heads/develop" ]; then echo "ENVIRONMENT=stage" >> $GITHUB_ENV echo "SERVICE_CPU=${{ vars.STAGE_SERVICE_CPU }}" >> $GITHUB_ENV @@ -64,52 +81,125 @@ jobs: echo "TMP_TASK_DEFINITION_ARN_PARAMETER_NAME=${{ vars.STAGE_TMP_TASK_DEFINITION_ARN_PARAMETER_NAME }}" >> $GITHUB_ENV echo "CALLBACK_ENDPOINT_PARAMETER_NAME=${{ vars.STAGE_CALLBACK_ENDPOINT_PARAMETER_NAME }}" >> $GITHUB_ENV echo "SES_SMTP_CREDENTIALS_SECRET_NAME=${{ vars.STAGE_SES_SMTP_CREDENTIALS_SECRET_NAME }}" >> $GITHUB_ENV - echo "SMTP_USER=${{ secrets.STAGE_SMTP_USER }}" >> $GITHUB_ENV - echo "SMTP_PASSWORD=${{ secrets.STAGE_SMTP_PASSWORD }}" >> $GITHUB_ENV - echo "DD_API_KEY_SECRET_NAME=${{ secrets.STAGE_DD_API_KEY_SECRET_NAME }}" >> $GITHUB_ENV fi + echo "IMAGE_TAG=${{ github.sha }}" >> $GITHUB_ENV + outputs: + ENVIRONMENT: ${{ env.ENVIRONMENT }} + SERVICE_CPU: ${{ env.SERVICE_CPU }} + SERVICE_MEMORY: ${{ env.SERVICE_MEMORY }} + OUTBOX_TABLE_NAME_PARAMETER_NAME: ${{ env.OUTBOX_TABLE_NAME_PARAMETER_NAME }} + MC_EML_EFS_ACCESS_POINT_ARN_PARAMETER_NAME: ${{ env.MC_EML_EFS_ACCESS_POINT_ARN_PARAMETER_NAME }} + MC_EML_EFS_ACCESS_POINT_ID_PARAMETER_NAME: ${{ env.MC_EML_EFS_ACCESS_POINT_ID_PARAMETER_NAME }} + MC_EML_EFS_ID_PARAMETER_NAME: ${{ env.MC_EML_EFS_ID_PARAMETER_NAME }} + REPOSITORY_NAME_PARAMETER_NAME: ${{ env.REPOSITORY_NAME_PARAMETER_NAME }} + MD_REST_EFS_ID_PARAMETER_NAME: ${{ env.MD_REST_EFS_ID_PARAMETER_NAME }} + MD_REST_ACCESS_POINT_ID_PARAMETER_NAME: ${{ env.MD_REST_ACCESS_POINT_ID_PARAMETER_NAME }} + MD_REST_ACCESS_POINT_ARN_PARAMETER_NAME: ${{ env.MD_REST_ACCESS_POINT_ARN_PARAMETER_NAME }} + TASK_DEFINITION_ARN_PARAMETER_NAME: ${{ env.TASK_DEFINITION_ARN_PARAMETER_NAME }} + TMP_TASK_DEFINITION_ARN_PARAMETER_NAME: ${{ env.TMP_TASK_DEFINITION_ARN_PARAMETER_NAME }} + CALLBACK_ENDPOINT_PARAMETER_NAME: ${{ env.CALLBACK_ENDPOINT_PARAMETER_NAME }} + SES_SMTP_CREDENTIALS_SECRET_NAME: ${{ env.SES_SMTP_CREDENTIALS_SECRET_NAME }} + IMAGE_TAG: ${{ env.IMAGE_TAG }} + IMAGE_NAME: ${{ env.ENVIRONMENT }}-${{ env.SERVICE_NAME }} - # Step 1: Checkout the repository + build-image: + env: + IMAGE_TAG: ${{ needs.set-environment-variables.outputs.IMAGE_TAG }} + IMAGE_NAME: ${{ needs.set-environment-variables.outputs.IMAGE_NAME }} + runs-on: ubuntu-latest + needs: + - set-environment-variables + steps: - name: Checkout Code uses: actions/checkout@v3 - # Step 2: Configure AWS credentials - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v4 with: role-to-assume: ${{ secrets.OIDC_ROLE_ARN }} aws-region: ${{ vars.AWS_REGION }} - # Step 3: Login to ECR registry - name: Login ECR id: login-ecr uses: aws-actions/amazon-ecr-login@v2 - # Step 4: Build & push image - name: Image build + continue-on-error: false id: build-image env: ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} - IMAGE_NAME: ${{ env.ENVIRONMENT }}-${{ env.SERVICE_NAME }} + IMAGE_NAME: ${{ env.IMAGE_NAME }} + IMAGE_TAG: ${{ env.IMAGE_TAG }} run: | docker build -t "$IMAGE_NAME:latest" -f Dockerfile . docker tag $IMAGE_NAME:latest $ECR_REGISTRY/$IMAGE_NAME:latest - docker tag $IMAGE_NAME:latest $ECR_REGISTRY/$IMAGE_NAME:$GITHUB_SHA + docker tag $IMAGE_NAME:latest $ECR_REGISTRY/$IMAGE_NAME:$IMAGE_TAG docker push $ECR_REGISTRY/$IMAGE_NAME --all-tags - echo "image=$ECR_REGISTRY/$IMAGE_NAME:$GITHUB_SHA" >> $GITHUB_OUTPUT - echo "image-tag=$GITHUB_SHA" >> $GITHUB_OUTPUT - # Step 5: Deploy updated task definition stack - - name: Deploy CDK Stack - continue-on-error: false - uses: arnaskro/aws-cdk-v2-github-actions@v2.3.0 + deploy-cdk: + env: + ENVIRONMENT: ${{ needs.set-environment-variables.outputs.ENVIRONMENT }} + IMAGE_NAME: ${{ needs.set-environment-variables.outputs.IMAGE_NAME }} + IMAGE_TAG: ${{ needs.set-environment-variables.outputs.IMAGE_TAG }} + TMP_TASK_DEFINITION_ARN_PARAMETER_NAME: ${{ needs.set-environment-variables.outputs.TMP_TASK_DEFINITION_ARN_PARAMETER_NAME }} + TASK_DEFINITION_ARN_PARAMETER_NAME: ${{ needs.set-environment-variables.outputs.TASK_DEFINITION_ARN_PARAMETER_NAME }} + SERVICE_CPU: ${{ needs.set-environment-variables.outputs.SERVICE_CPU }} + SERVICE_MEMORY: ${{ needs.set-environment-variables.outputs.SERVICE_MEMORY }} + OUTBOX_TABLE_NAME_PARAMETER_NAME: ${{ needs.set-environment-variables.outputs.OUTBOX_TABLE_NAME_PARAMETER_NAME }} + MC_EML_EFS_ACCESS_POINT_ARN_PARAMETER_NAME: ${{ needs.set-environment-variables.outputs.MC_EML_EFS_ACCESS_POINT_ARN_PARAMETER_NAME }} + MC_EML_EFS_ACCESS_POINT_ID_PARAMETER_NAME: ${{ needs.set-environment-variables.outputs.MC_EML_EFS_ACCESS_POINT_ID_PARAMETER_NAME }} + MC_EML_EFS_ID_PARAMETER_NAME: ${{ needs.set-environment-variables.outputs.MC_EML_EFS_ID_PARAMETER_NAME }} + REPOSITORY_NAME_PARAMETER_NAME: ${{ needs.set-environment-variables.outputs.REPOSITORY_NAME_PARAMETER_NAME }} + MD_REST_EFS_ID_PARAMETER_NAME: ${{ needs.set-environment-variables.outputs.MD_REST_EFS_ID_PARAMETER_NAME }} + MD_REST_ACCESS_POINT_ID_PARAMETER_NAME: ${{ needs.set-environment-variables.outputs.MD_REST_ACCESS_POINT_ID_PARAMETER_NAME }} + MD_REST_ACCESS_POINT_ARN_PARAMETER_NAME: ${{ needs.set-environment-variables.outputs.MD_REST_ACCESS_POINT_ARN_PARAMETER_NAME }} + CALLBACK_ENDPOINT_PARAMETER_NAME: ${{ needs.set-environment-variables.outputs.CALLBACK_ENDPOINT_PARAMETER_NAME }} + SES_SMTP_CREDENTIALS_SECRET_NAME: ${{ needs.set-environment-variables.outputs.SES_SMTP_CREDENTIALS_SECRET_NAME }} + runs-on: ubuntu-latest + needs: + - fetch-ecr-password + - set-environment-variables + - build-image + container: + image: 823598220965.dkr.ecr.eu-west-1.amazonaws.com/alpine-cdk-runner:0ce104344ee2d098f181b1d785bfa55fa68b6e9f + credentials: + username: AWS + password: ${{ needs.fetch-ecr-password.outputs.ECR_PW }} + steps: + - name: Checkout Code + uses: actions/checkout@v3 + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 with: - cdk_subcommand: 'deploy' - cdk_args: '-c environment=${{ env.ENVIRONMENT }} -c image_tag=${{ steps.build-image.outputs.image-tag }} --require-approval never' - actions_comment: false - working_dir: cdk + role-to-assume: ${{ secrets.OIDC_ROLE_ARN }} + aws-region: ${{ vars.AWS_REGION }} + + - name: Inject cross repo shared token + working-directory: cdk + run: | + mv requirements.txt temp_requirements.txt + sed -e "s/__SHARED_TOKEN__/${{ secrets.SHARED_TOKEN }}/g" temp_requirements.txt > requirements.txt + + - name: Set CDK environment variables + run: | + echo "CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text)" >> $GITHUB_ENV + echo "CDK_DEFAULT_REGION=${{ vars.AWS_REGION }}" >> $GITHUB_ENV + + - name: Deploy CDK Stack + working-directory: cdk + run: | + python3 -m venv venv + . venv/bin/activate + pip install -r requirements.txt + cdk deploy -c environment=${{ env.ENVIRONMENT }} \ + -c image_tag=${{ env.IMAGE_TAG }} \ + -c dd_api_key_secret_name=${{ ( env.ENVIRONMENT == 'prod' ) && secrets.PROD_DD_API_KEY_SECRET_NAME || secrets.STAGE_DD_API_KEY_SECRET_NAME }} \ + -c smtp_user=${{ ( env.ENVIRONMENT == 'prod' ) && secrets.PROD_SMTP_USER || secrets.STAGE_SMTP_USER }} \ + -c smtp_password=${{ ( env.ENVIRONMENT == 'prod' ) && secrets.PROD_SMTP_PASSWORD || secrets.STAGE_SMTP_PASSWORD }} \ + --require-approval never \ + --all - # Step 6: Retrieve updated temporary task definition arn - name: Retrieve image updated task definition continue-on-error: false id: get-tmp-task-definition @@ -117,21 +207,20 @@ jobs: TMP_TASK_DEFINITION_ARN=$(aws ssm get-parameter --name ${{ env.TMP_TASK_DEFINITION_ARN_PARAMETER_NAME }} --query Parameter.Value --output text) echo "task-definition-arn=$TMP_TASK_DEFINITION_ARN" >> $GITHUB_OUTPUT - # Step 7: Deploy new task definition and wait for service to be stable - name: Deploy task definition id: task-definition-deploy run: | aws ecs update-service \ --cluster ${{ env.ENVIRONMENT }} \ --service ${{ env.SERVICE_NAME }} \ - --task-definition ${{ steps.get-tmp-task-definition.outputs.task-definition-arn }} + --task-definition ${{ steps.get-tmp-task-definition.outputs.task-definition-arn }} \ + --no-paginate aws ecs wait services-stable \ --cluster ${{ env.ENVIRONMENT }} \ --services ${{ env.SERVICE_NAME }} - # Step 8a: Update task definition ssm parameter if the deployment succeeds - name: Update the task definition ssm parameter -# if: ${{ steps.task-definition-deploy.outcome == 'success' }} + if: ${{ steps.task-definition-deploy.outcome == 'success' }} continue-on-error: false run: | aws ssm put-parameter \ @@ -140,9 +229,8 @@ jobs: --type String \ --overwrite - # Step 8b: Deregister the task definition if the deployment fails - name: Delete task definition if deploy fails if: ${{ steps.task-definition-deploy.outcome == 'failure' }} run: | aws ecs deregister-task-definition \ - --task-definition ${{ steps.get-tmp-task-definition.outputs.task-definition-arn }} + --task-definition ${{ steps.get-tmp-task-definition.outputs.task-definition-arn }} \ No newline at end of file diff --git a/.gitignore b/.gitignore index 07c7037..c630aeb 100644 --- a/.gitignore +++ b/.gitignore @@ -6,9 +6,10 @@ deployments/build !**/.gitkeep .venv +**/requirements.local.txt *.iml cdk/cdk.out -.idea \ No newline at end of file +.idea diff --git a/cdk/app.py b/cdk/app.py index a9c6e7a..93383c9 100644 --- a/cdk/app.py +++ b/cdk/app.py @@ -5,28 +5,46 @@ Tags ) +from os import environ + from get_env_variables import GetEnvVariables from task_definition_stack import TaskDefinitionStack +from multidialogo_cdk_shared.environment_secrets_resolver import EnvironmentSecretsResolver +from multidialogo_cdk_shared.enums import EnvironmentsEnum + if __name__ == "__main__": app = App() selected_environment = app.node.try_get_context('environment') image_tag = app.node.try_get_context('image_tag') + dd_api_key_secret_name = app.node.try_get_context('dd_api_key_secret_name') + smtp_user = app.node.try_get_context('smtp_user') + smtp_password = app.node.try_get_context('smtp_password') env_parameters = GetEnvVariables(selected_environment).env_dict - environment = Environment(account=env_parameters['ACCOUNT_ID'], region=env_parameters['AWS_REGION']) + account = environ.get('CDK_DEFAULT_ACCOUNT') + region = environ.get('CDK_DEFAULT_REGION') + + environment = Environment(account=account, region=region) + + environment_secrets_resolver = EnvironmentSecretsResolver( + selected_environment=EnvironmentsEnum[selected_environment.upper()] + ) TaskDefinitionStack( app, f"{env_parameters['SELECTED_ENVIRONMENT']}-multicarrier-email-daemon-task-definition-stack", env_parameters=env_parameters, image_tag=image_tag, - env=environment + env=environment, + environment_secrets_resolver=environment_secrets_resolver, + smtp_user=smtp_user, + smtp_password=smtp_password ) Tags.of(app).add('env', selected_environment) Tags.of(app).add('ecs_cluster_name', selected_environment) - app.synth() + app.synth() \ No newline at end of file diff --git a/cdk/get_env_variables.py b/cdk/get_env_variables.py index 822b90c..ac3e5c9 100644 --- a/cdk/get_env_variables.py +++ b/cdk/get_env_variables.py @@ -21,10 +21,7 @@ 'TMP_TASK_DEFINITION_ARN_PARAMETER_NAME', 'CALLBACK_ENDPOINT_PARAMETER_NAME', 'SES_SMTP_CREDENTIALS_SECRET_NAME', - 'SMTP_USER', - 'SMTP_PASSWORD', - 'SMTP_SENDER', - 'DD_API_KEY_SECRET_NAME' + 'SMTP_SENDER' ] class GetEnvVariables: @@ -38,4 +35,4 @@ def __init__( } for i in ENVIRONMENT_VARIABLES: - self.env_dict[i] = os.environ[i] + self.env_dict[i] = os.environ[i] \ No newline at end of file diff --git a/cdk/requirements.txt b/cdk/requirements.txt index a69275d..e3a8a83 100644 --- a/cdk/requirements.txt +++ b/cdk/requirements.txt @@ -1,2 +1,3 @@ aws-cdk-lib -cdk-nag \ No newline at end of file +cdk-nag +multidialogo_cdk_shared @ git+https://__SHARED_TOKEN__@github.com/Multidialogo/multidialogo-cdk-shared.git@main \ No newline at end of file diff --git a/cdk/task_definition_stack.py b/cdk/task_definition_stack.py index a5a2a63..f6c2a82 100644 --- a/cdk/task_definition_stack.py +++ b/cdk/task_definition_stack.py @@ -11,6 +11,7 @@ Tags, Duration ) from constructs import Construct +from multidialogo_cdk_shared.environment_secrets_resolver import EnvironmentSecretsResolver MD_REST_VOLUME_NAME = 'rest-volume' MC_VOLUME_NAME = 'mc-volume' @@ -25,6 +26,9 @@ def __init__( id: str, env_parameters: dict, image_tag: str, + environment_secrets_resolver: EnvironmentSecretsResolver, + smtp_user: str, + smtp_password: str, **kwargs ) -> None: super().__init__( @@ -53,10 +57,7 @@ def __init__( tmp_task_definition_arn_parameter_name = env_parameters['TMP_TASK_DEFINITION_ARN_PARAMETER_NAME'] ses_smtp_credentials_secret_name = env_parameters['SES_SMTP_CREDENTIALS_SECRET_NAME'] callback_endpoint_parameter_name = env_parameters['CALLBACK_ENDPOINT_PARAMETER_NAME'] - smtp_user = env_parameters['SMTP_USER'] - smtp_password = env_parameters['SMTP_PASSWORD'] smtp_sender = env_parameters['SMTP_SENDER'] - dd_api_key_secret_name = env_parameters['DD_API_KEY_SECRET_NAME'] task_definition_family = f'{selected_environment}-{service_name}' @@ -65,12 +66,21 @@ def __init__( parameter_name=md_rest_access_point_arn_parameter_name ) + task_execution_role = iam.Role( + scope=self, + id='execution-role', + assumed_by=iam.ServicePrincipal( + 'ecs-tasks.amazonaws.com' + ) + ) + task_definition = ecs.FargateTaskDefinition( scope=self, id=f'{service_name}-task-definition', cpu=int(service_cpu), family=task_definition_family, - memory_limit_mib=int(service_memory) + memory_limit_mib=int(service_memory), + execution_role=task_execution_role ) task_definition.apply_removal_policy( @@ -150,17 +160,6 @@ def __init__( ) ) - task_definition.add_to_execution_role_policy( - statement=iam.PolicyStatement( - actions=[ - 'secretsmanager:GetSecretValue' - ], - resources=[ - f'arn:aws:secretsmanager:{self.region}:{self.account}:secret:{dd_api_key_secret_name}-*' - ] - ) - ) - log_group_retainment = RemovalPolicy.RETAIN if selected_environment == 'prod' else RemovalPolicy.DESTROY log_group = logs.LogGroup( @@ -250,8 +249,9 @@ def __init__( dd_api_key_secret = secretsmanager.Secret.from_secret_name_v2( scope=self, id='dd-api-key-secret', - secret_name=dd_api_key_secret_name, + secret_name=environment_secrets_resolver.monitoring_datadog_fargate_key_secret_name, ) + dd_api_key_secret.grant_read(task_execution_role) datadog_container = task_definition.add_container( id='datadog-container', @@ -333,7 +333,7 @@ def __init__( container.add_environment( name='EML_STORAGE_PATH', - value=mc_email_efs_folder_name + "/eml" + value=mc_email_efs_folder_name + "/emls" ) container.add_environment( @@ -386,6 +386,53 @@ def __init__( value=ses_smtp_credentials_secret_name ) + container.add_environment( + name='DYNAMODB_PIPELINES_ENABLED', + value='true' + ) + + container.add_environment( + name='MYSQL_PIPELINES_ENABLED', + value='true' + ) + + db_secret = secretsmanager.Secret.from_secret_name_v2( + scope=self, + id='db-secret', + secret_name=environment_secrets_resolver.rds_instances_multicarrier_credentials_admin_secret_name + ) + db_secret.grant_read(task_execution_role) + + container.add_secret( + name='MYSQL_HOST', + secret=ecs.Secret.from_secrets_manager(secret=db_secret, field='host') + ) + + container.add_secret( + name='MYSQL_PORT', + secret=ecs.Secret.from_secrets_manager(secret=db_secret, field='port') + ) + + container.add_secret( + name='MYSQL_USER', + secret=ecs.Secret.from_secrets_manager(secret=db_secret, field='username') + ) + + container.add_secret( + name='MYSQL_PASSWORD', + secret=ecs.Secret.from_secrets_manager(secret=db_secret, field='password') + ) + + container.add_secret( + name='MYSQL_DATABASE', + secret=ecs.Secret.from_secrets_manager(secret=db_secret, field='dbname') + ) + + container.add_environment( + name='MYSQL_TLS', + value='true' + ) + ssm.StringParameter( scope=self, id='temporary-task-definition-arn', diff --git a/cmd/main/config/config.yaml b/cmd/main/config/config.yaml index 3adea99..8d1ebf6 100644 --- a/cmd/main/config/config.yaml +++ b/cmd/main/config/config.yaml @@ -16,12 +16,25 @@ health-check: server: port: 8080 +mysql: + host: "${MYSQL_HOST}" + port: ${MYSQL_PORT} + user: "${MYSQL_USER}" + password: "${MYSQL_PASSWORD}" + database: "${MYSQL_DATABASE}" + outbox: table-name: "${EMAIL_OUTBOX_TABLE}" pipeline: interval: ${PIPELINE_INTERVAL} +pipelines: + dynamodb: + enabled: ${DYNAMODB_PIPELINES_ENABLED} + mysql: + enabled: ${MYSQL_PIPELINES_ENABLED} + smtp: host: "${SMTP_HOST}" port: ${SMTP_PORT} diff --git a/compose.yml b/compose.yml index 57f38cd..8b352d6 100755 --- a/compose.yml +++ b/compose.yml @@ -25,6 +25,14 @@ services: ATTACHMENTS_BASE_PATH: 'testdata/attachments' EMAIL_OUTBOX_TABLE: 'Outbox' EML_STORAGE_PATH: 'testdata/.out/eml' + MYSQL_HOST: '127.0.0.1' + MYSQL_PORT: '3306' + MYSQL_USER: 'root' + MYSQL_PASSWORD: 'test' + MYSQL_DATABASE: 'mailculator_test' + MYSQL_TLS: 'false' + DYNAMODB_PIPELINES_ENABLED: 'true' + MYSQL_PIPELINES_ENABLED: 'true' command: > sh -c "go mod tidy && ./scripts/coverage.sh unit && @@ -153,3 +161,64 @@ services: AWS_SECRET_ACCESS_KEY: 'local' networks: - mailculator-processor-deployments-net + + mysql: &mysql-base + profiles: + - 'none' + image: mariadb:10.11 + environment: + MARIADB_ROOT_PASSWORD: 'test' + MARIADB_DATABASE: 'mailculator_test' + volumes: + - './docker/mysql/migrations:/docker-entrypoint-initdb.d/migrations:ro' + - './docker/mysql/init.sh:/docker-entrypoint-initdb.d/zzz-init.sh:ro' + healthcheck: + test: ['CMD-SHELL', 'test -f /var/lib/mysql/zz-finish && mysqladmin ping -h localhost -ptest'] + interval: 2s + timeout: 5s + retries: 30 + ports: + - '127.0.0.1:3306:3306' + networks: + - mailculator-processor-deployments-net + + mysql-test: + <<: *mysql-base + container_name: mailculator_processor_mysql_test + profiles: + - 'test-deps' + + mysql-devcontainer: + <<: *mysql-base + container_name: mailculator_processor_mysql_devcontainer + profiles: + - 'devcontainer-deps' + + wait-for-mysql-test: + container_name: mailculator_processor_wait_for_mysql_test + profiles: + - 'test-deps' + image: golang:1.25-alpine + command: ['echo', 'Service mysql-test is ready'] + networks: + - mailculator-processor-deployments-net + depends_on: + mysql-test: + condition: service_healthy + + phpmyadmin: + container_name: mailculator_processor_phpmyadmin_devcontainer + profiles: + - 'devcontainer-deps' + image: phpmyadmin:latest + ports: + - '127.0.0.1:9003:80' + environment: + PMA_HOST: 'mysql-devcontainer' + PMA_USER: 'root' + PMA_PASSWORD: 'test' + networks: + - mailculator-processor-deployments-net + depends_on: + mysql-devcontainer: + condition: service_healthy diff --git a/docker/mysql/init.sh b/docker/mysql/init.sh new file mode 100755 index 0000000..2631358 --- /dev/null +++ b/docker/mysql/init.sh @@ -0,0 +1,14 @@ +#!/bin/bash +set -e + +echo "Running MariaDB migrations..." + +for f in /docker-entrypoint-initdb.d/migrations/*.sql; do + echo "Executing $f..." + mysql -u root -p"$MARIADB_ROOT_PASSWORD" "$MARIADB_DATABASE" < "$f" +done + +echo "Migrations completed. Creating finish marker..." +touch /var/lib/mysql/zz-finish + +echo "MariaDB initialization complete." diff --git a/docker/mysql/migrations/001_create_emails_table.sql b/docker/mysql/migrations/001_create_emails_table.sql new file mode 100644 index 0000000..2186760 --- /dev/null +++ b/docker/mysql/migrations/001_create_emails_table.sql @@ -0,0 +1,18 @@ +CREATE TABLE IF NOT EXISTS emails ( + id CHAR(36) PRIMARY KEY, + status ENUM( + 'ACCEPTED','INTAKING','READY','PROCESSING', + 'SENT','FAILED','INVALID', + 'CALLING-SENT-CALLBACK','CALLING-FAILED-CALLBACK', + 'SENT-ACKNOWLEDGED','FAILED-ACKNOWLEDGED' + ) NOT NULL, + eml_file_path VARCHAR(500), + payload_file_path VARCHAR(500), + reason TEXT, + version INT NOT NULL DEFAULT 1, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + + INDEX idx_status (status), + INDEX idx_status_updated (status, updated_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/docker/mysql/migrations/002_create_email_statuses_table.sql b/docker/mysql/migrations/002_create_email_statuses_table.sql new file mode 100644 index 0000000..360e8d8 --- /dev/null +++ b/docker/mysql/migrations/002_create_email_statuses_table.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS email_statuses ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + email_id CHAR(36) NOT NULL, + status VARCHAR(50) NOT NULL, + reason TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + + INDEX idx_email_id (email_id), + FOREIGN KEY (email_id) REFERENCES emails(id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/docs/database.md b/docs/database.md index 7a87e35..da1d886 100644 --- a/docs/database.md +++ b/docs/database.md @@ -17,6 +17,7 @@ type Email struct { UpdatedAt string // Timestamp RFC3339 dell'ultimo aggiornamento Reason string // Motivo dell'ultimo stato TTL *int64 // Timestamp Unix in secondi per DynamoDB TTL (nil se non presente) + Version int // Versione per optimistic locking (MySQL) o 0 (DynamoDB) } ``` @@ -34,7 +35,7 @@ DynamoDB TTL è configurato per eliminare automaticamente i record obsoleti. L'a ttl := time.Now().Add(7 * 24 * time.Hour).Unix() ``` -## Pattern di Versionamento +## Pattern di Versionamento (DynamoDB) ### Status Meta - **Costante**: `StatusMeta = "_META"` @@ -63,6 +64,101 @@ INSERT INTO "table" VALUE {'Id': ?, 'Status': ?, 'Attributes': ?, 'TTL': ?} **Nota**: Il TTL viene sempre sincronizzato tra il record _META e i record di stato. Quando un TTL è presente, viene impostato sia alla radice del record _META che alla radice del nuovo record di stato. +## Unificazione dei Tipi + +Entrambi i backend (DynamoDB e MySQL) utilizzano ora lo stesso tipo `Email` con tutti i campi necessari: + +- **DynamoDB**: `Version = 0` (non usa locking basato su versione) +- **MySQL**: `Version` popolato dal database per optimistic locking +- **TTL**: Presente per DynamoDB, `nil` per MySQL (non supportato) + +Questo approccio elimina la duplicazione dei tipi e semplifica l'architettura. + +--- + +## MySQL Schema + +### Tabella `emails` +Tabella principale per la gestione delle email. + +```sql +CREATE TABLE IF NOT EXISTS emails ( + id CHAR(36) PRIMARY KEY, + status ENUM( + 'ACCEPTED','INTAKING','READY','PROCESSING', + 'SENT','FAILED','INVALID', + 'CALLING-SENT-CALLBACK','CALLING-FAILED-CALLBACK', + 'SENT-ACKNOWLEDGED','FAILED-ACKNOWLEDGED' + ) NOT NULL, + eml_file_path VARCHAR(500), + payload_file_path VARCHAR(500), + reason TEXT, + version INT NOT NULL DEFAULT 1, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + + INDEX idx_status (status), + INDEX idx_status_updated (status, updated_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +``` + +**Nota**: A differenza di DynamoDB, MySQL non supporta TTL nativo. La pulizia dei record obsoleti deve essere gestita esternamente. + +### Tabella `email_statuses` +Tabella per lo storico dei cambi di stato (history). + +```sql +CREATE TABLE IF NOT EXISTS email_statuses ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + email_id CHAR(36) NOT NULL, + status VARCHAR(50) NOT NULL, + reason TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + + INDEX idx_email_id (email_id), + FOREIGN KEY (email_id) REFERENCES emails(id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +``` + +### Optimistic Locking (MySQL) +MySQL utilizza optimistic locking basato su: +- Campo `Version` nel tipo `Email` per tracciare le modifiche +- Campo `status` per validare la transizione di stato + +Ogni update incrementa la versione e verifica lo stato atteso: +```sql +UPDATE emails +SET status = ?, reason = ?, version = version + 1 +WHERE id = ? AND status = ? +``` + +Se `affected_rows = 0`, l'operazione restituisce `ErrLockNotAcquired`. + +**Nota**: DynamoDB non usa version-based locking, quindi restituisce sempre `Version = 0`. + +### Query con SKIP LOCKED +Le query di lettura utilizzano `FOR UPDATE SKIP LOCKED` per: +- Evitare blocchi su righe già in uso da altri worker +- Migliorare il throughput in scenari con più processori concorrenti + +```sql +SELECT id, status, eml_file_path, payload_file_path, reason, version, updated_at +FROM emails +WHERE status = ? +ORDER BY updated_at ASC +LIMIT ? +FOR UPDATE SKIP LOCKED +``` + +### Transazioni +Le operazioni di update e insert history sono eseguite in transazione per garantire atomicità: +1. `BEGIN` +2. `UPDATE emails ...` +3. `INSERT INTO email_statuses ...` +4. `COMMIT` (o `ROLLBACK` in caso di errore) + +--- + ## Stati Disponibili - `ACCEPTED` - Email accettato, in attesa di intake - `INTAKING` - Email in fase di elaborazione intake @@ -76,7 +172,7 @@ INSERT INTO "table" VALUE {'Id': ?, 'Status': ?, 'Attributes': ?, 'TTL': ?} - `SENT-ACKNOWLEDGED` - Callback per email inviato completato - `FAILED-ACKNOWLEDGED` - Callback per email fallito completato -## Query Pattern +## Query Pattern (DynamoDB) ### Query per Stato ```sql @@ -90,7 +186,7 @@ WHERE Status=? AND Attributes.Latest =? - Utilizza `NextToken` di DynamoDB per paginazione automatica - Interrompe quando raggiunge il limite di 25 record -## PartiQL Operations +## PartiQL Operations (DynamoDB) ### ExecuteStatement Utilizzato per query con parametri e paginazione. diff --git a/docs/error-handling.md b/docs/error-handling.md index a6f16ec..eedc875 100644 --- a/docs/error-handling.md +++ b/docs/error-handling.md @@ -19,6 +19,32 @@ Il sistema effettua retry automatico per le seguenti eccezioni DynamoDB: - **Formula**: Durata casuale tra 0 e min(2^attempt * base_delay, max_delay) +## Retry MySQL + +### Condizioni di Retry +Il sistema effettua retry automatico per i seguenti errori MySQL: +- `1205` - Lock wait timeout exceeded +- `1213` - Deadlock found when trying to get lock +- `1040` - Too many connections +- `1203` - User already has more than max_user_connections active connections +- `driver.ErrBadConn` - Connessione al database persa + +### Errori NON Soggetti a Retry +- `ErrLockNotAcquired` - Conflitto di lock ottimistico (il record è stato modificato da un altro processo) + +### Backoff Strategy +- **Max Attempts**: 8 tentativi +- **Base Delay**: 30 millisecondi +- **Max Delay**: 1 secondo +- **Formula**: Durata casuale tra 0 e min(2^attempt * base_delay, max_delay) + +### Transazioni +Le operazioni MySQL (Update, Ready, Create) sono eseguite in transazione: +- In caso di errore, viene eseguito automaticamente il rollback +- In caso di successo, viene eseguito il commit +- Gli errori transitori sono gestiti con retry (la transazione viene ritentata dall'inizio) + + ## Retry Callback HTTP ### Condizioni di Retry @@ -43,6 +69,7 @@ Quando il callback HTTP fallisce dopo tutti i retry: Quando non riesce ad acquisire il lock di processamento: - Operazione saltata - Log warning: "failed to acquire processing lock" +- **Nessun retry**: `ErrLockNotAcquired` indica che un altro worker sta già processando il record ## Context Cancellation Tutti i retry rispettano il context cancellation: diff --git a/docs/migration-guide.md b/docs/migration-guide.md new file mode 100644 index 0000000..06c5682 --- /dev/null +++ b/docs/migration-guide.md @@ -0,0 +1,429 @@ +# Migration Guide: DynamoDB to MariaDB to Laravel/SQS + +This document describes the complete migration path from the current DynamoDB-based system to MariaDB, and subsequently to Laravel with SQS. + +## Table of Contents + +1. [Current State](#current-state) +2. [Phase 1: Producer Switch to MySQL](#phase-1-producer-switch-to-mysql) +3. [Phase 2: Drain DynamoDB](#phase-2-drain-dynamodb) +4. [Phase 3: Disable DynamoDB Pipelines](#phase-3-disable-dynamodb-pipelines) +5. [Phase 4: Migration to Laravel + SQS](#phase-4-migration-to-laravel--sqs) +6. [Rollback Procedures](#rollback-procedures) + +--- + +## Current State + +After implementing the MySQL support, the system now has **parallel pipelines**: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Go Application │ +│ │ +│ ┌─────────────────────────┐ ┌─────────────────────────┐ │ +│ │ DynamoDB Pipelines │ │ MySQL Pipelines │ │ +│ │ (processing old msgs) │ │ (idle - no messages) │ │ +│ │ │ │ │ │ +│ │ - IntakePipeline │ │ - IntakePipeline │ │ +│ │ - SenderPipeline │ │ - SenderPipeline │ │ +│ │ - SentCallbackPipeline │ │ - SentCallbackPipeline │ │ +│ │ - FailedCallbackPipe │ │ - FailedCallbackPipe │ │ +│ └───────────┬─────────────┘ └───────────┬─────────────┘ │ +│ │ │ │ +│ ▼ ▼ │ +│ ┌──────────┐ ┌──────────┐ │ +│ │ DynamoDB │ │ MariaDB │ │ +│ └──────────┘ └──────────┘ │ +└─────────────────────────────────────────────────────────────┘ + │ + │ Producer writes to DynamoDB + ▼ + ┌──────────────┐ + │ Producer │ + │ (external) │ + └──────────────┘ +``` + +### Configuration + +Current environment variables: + +```bash +# Pipeline toggles +DYNAMODB_PIPELINES_ENABLED=true # DynamoDB pipelines active +MYSQL_PIPELINES_ENABLED=true # MySQL pipelines active (but idle) + +# MariaDB connection +MYSQL_HOST= +MYSQL_PORT=3306 +MYSQL_USER= +MYSQL_PASSWORD= +MYSQL_DATABASE=mailculator +``` + +--- + +## Phase 1: Producer Switch to MariaDB + +### Prerequisites + +1. MariaDB instance is provisioned and accessible +2. Schema has been applied (migrations executed) +3. Application is deployed with both pipeline types enabled +4. MariaDB pipelines have been tested (manual test messages) + +### Steps + +#### 1.1 Verify MariaDB Connectivity + +```bash +# From application container +mysql -h $MYSQL_HOST -u $MYSQL_USER -p$MYSQL_PASSWORD $MYSQL_DATABASE -e "SELECT 1" +``` + +#### 1.2 Test MariaDB Pipelines (Optional) + +Insert a test message directly into MariaDB: + +```sql +INSERT INTO emails (id, status, payload_file_path, ttl) +VALUES (UUID(), 'ACCEPTED', '/path/to/test/payload.json', UNIX_TIMESTAMP() + 3600); +``` + +Verify: +- Message transitions through states +- Email is sent successfully +- Callback is executed + +#### 1.3 Switch Producer + +Update the producer application to write to MariaDB instead of DynamoDB. + +**For the producer (external system):** + +```sql +-- New email insertion (MariaDB) +INSERT INTO emails (id, status, payload_file_path, ttl) +VALUES (?, 'ACCEPTED', ?, ?); + +INSERT INTO email_status_history (email_id, status, reason) +VALUES (?, 'ACCEPTED', 'Created by producer'); +``` + +#### 1.4 Verify + +After switching: + +```sql +-- Check new messages are being created in MariaDB +SELECT COUNT(*) FROM emails WHERE created_at > NOW() - INTERVAL 5 MINUTE; + +-- Check DynamoDB is no longer receiving new messages +-- (via AWS Console or CLI) +``` + +--- + +## Phase 2: Drain DynamoDB + +### Timeline + +With a TTL of 10-14 days, DynamoDB will naturally drain within 2 weeks after the producer switch. + +### Monitoring + +#### Check DynamoDB Queue Depth + +Monitor these states in DynamoDB: + +```bash +# Via AWS CLI or Console +# Count messages in each "active" state: +# - ACCEPTED +# - INTAKING +# - READY +# - PROCESSING +# - SENT +# - FAILED +# - CALLING-SENT-CALLBACK +# - CALLING-FAILED-CALLBACK +``` + +#### Application Logs + +Monitor logs for DynamoDB pipeline activity: + +```bash +# Look for log entries from DynamoDB pipelines +# When queue is empty, pipelines will log 0 messages found +``` + +### Expected Behavior + +``` +Day 0: Producer switches to MySQL +Day 1: DynamoDB: ~30k messages (backlog), MySQL: ~30k messages (new) +Day 2: DynamoDB: ~25k messages, MySQL: ~60k messages +... +Day 7: DynamoDB: ~5k messages, MySQL: ~210k messages +Day 14: DynamoDB: ~0 messages (TTL expired), MySQL: ~420k messages +``` + +--- + +## Phase 3: Disable DynamoDB Pipelines + +### Prerequisites + +1. DynamoDB has been empty for at least 24 hours +2. All messages have been processed (no ACCEPTED/READY/PROCESSING states) +3. Monitoring confirms no activity on DynamoDB pipelines + +### Steps + +#### 3.1 Disable DynamoDB Pipelines + +Update environment variables: + +```bash +DYNAMODB_PIPELINES_ENABLED=false +MYSQL_PIPELINES_ENABLED=true +``` + +Deploy the application with new configuration. + +#### 3.2 Verify Application Health + +```bash +# Check health endpoint +curl http://localhost:8080/health + +# Check logs for successful startup with only MySQL pipelines +``` + +#### 3.3 Remove DynamoDB Infrastructure + +After confirming stability (wait 1-2 days): + +1. Delete DynamoDB table via AWS Console or CDK +2. Remove DynamoDB-related environment variables +3. (Optional) Remove DynamoDB code from application + +--- + +## Phase 4: Migration to Laravel + SQS + +### Architecture Target + +``` +┌─────────────┐ +│ Producer │ +│ (Laravel) │ +└──────┬──────┘ + │ dispatch() + ▼ +┌──────────────────────────────────────────────────────────────┐ +│ SQS Queues │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ intake │ │ send │ │ sent │ │ failed │ │ +│ │ queue │ │ queue │ │ callback │ │ callback │ │ +│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ +└───────┼─────────────┼─────────────┼─────────────┼────────────┘ + ▼ ▼ ▼ ▼ +┌──────────────────────────────────────────────────────────────┐ +│ Laravel Workers │ +│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ +│ │ Intake │ │ Send │ │ Sent │ │ Failed │ │ +│ │ Job │ │ Job │ │ Callback │ │ Callback │ │ +│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────┐ │ +│ │ MySQL │ │ +│ │ (existing) │ │ +│ └──────────────┘ │ +└──────────────────────────────────────────────────────────────┘ +``` + +### Implementation Steps + +#### 4.1 Create Laravel Project + +```bash +laravel new mailculator-laravel +cd mailculator-laravel + +# Install dependencies +composer require aws/aws-sdk-php-laravel +``` + +#### 4.2 Configure SQS + +```php +// config/queue.php +'connections' => [ + 'sqs' => [ + 'driver' => 'sqs', + 'key' => env('AWS_ACCESS_KEY_ID'), + 'secret' => env('AWS_SECRET_ACCESS_KEY'), + 'prefix' => env('SQS_PREFIX'), + 'queue' => env('SQS_QUEUE', 'default'), + 'region' => env('AWS_DEFAULT_REGION', 'eu-west-1'), + 'after_commit' => true, + ], +], +``` + +#### 4.3 Create Job Classes + +Create four job classes corresponding to the current pipelines: + +```php +// app/Jobs/Email/ProcessIntakeJob.php +// app/Jobs/Email/SendEmailJob.php +// app/Jobs/Email/SentCallbackJob.php +// app/Jobs/Email/FailedCallbackJob.php +``` + +Each job should: +1. Implement `ShouldQueue` +2. Use optimistic locking via version column +3. Dispatch the next job in the chain upon success + +#### 4.4 Database Configuration + +Use the same MariaDB database: + +```php +// config/database.php +'mysql' => [ + 'driver' => 'mysql', + 'host' => env('DB_HOST', '127.0.0.1'), + 'port' => env('DB_PORT', '3306'), + 'database' => env('DB_DATABASE', 'mailculator'), + 'username' => env('DB_USERNAME', 'root'), + 'password' => env('DB_PASSWORD', ''), + // ... +], +``` + +#### 4.5 Migration Strategy + +Similar to DynamoDB → MySQL migration: + +1. **Deploy Laravel workers (idle)** - No messages in SQS yet +2. **Switch producer** - Start dispatching to SQS instead of inserting directly +3. **Run both systems in parallel** - Go processes MariaDB backlog, Laravel processes new SQS messages +4. **Drain Go application** - Wait for MySQL queue to empty +5. **Decommission Go** - Remove Go application + +### Producer Code (Laravel) + +```php +// When creating a new email +public function sendEmail(Request $request) +{ + // Validate request + $validated = $request->validate([...]); + + // Store payload file + $payloadPath = $this->storePayload($validated); + + // Create email record + $email = Email::create([ + 'id' => Str::uuid(), + 'status' => 'ACCEPTED', + 'payload_file_path' => $payloadPath, + 'ttl' => now()->addDays(14)->timestamp, + ]); + + // Dispatch intake job + ProcessIntakeJob::dispatch($email->id); + + return response()->json(['id' => $email->id], 202); +} +``` + +--- + +## Rollback Procedures + +### Rollback from Phase 1 (Producer Switch) + +If issues occur after switching the producer to MariaDB: + +1. Switch producer back to DynamoDB +2. MariaDB messages will still be processed by MariaDB pipelines +3. New messages will go to DynamoDB + +### Rollback from Phase 3 (DynamoDB Disabled) + +If issues occur after disabling DynamoDB: + +1. Re-enable DynamoDB pipelines: + ```bash + DYNAMODB_PIPELINES_ENABLED=true + ``` +2. Switch producer back to DynamoDB (if needed) + +### Rollback from Phase 4 (Laravel Migration) + +If issues occur during Laravel migration: + +1. Stop Laravel workers +2. Switch producer back to direct MariaDB insert +3. Go application will continue processing + +--- + +## Monitoring Checklist + +### During Phase 1-2 + +- [ ] MariaDB pipeline processing rate +- [ ] DynamoDB pipeline processing rate (should decrease) +- [ ] Email delivery success rate +- [ ] Callback success rate +- [ ] Error logs + +### During Phase 3 + +- [ ] Application startup logs +- [ ] Health check endpoint +- [ ] MariaDB connection pool metrics +- [ ] No DynamoDB errors (pipelines disabled) + +### During Phase 4 + +- [ ] SQS queue depth +- [ ] Laravel worker logs +- [ ] Job failure rate +- [ ] Dead Letter Queue (DLQ) messages +- [ ] Database query performance + +--- + +## Timeline Summary + +| Phase | Duration | Description | +|-------|----------|-------------| +| Phase 1 | 1 day | Switch producer to MariaDB | +| Phase 2 | 10-14 days | Wait for DynamoDB to drain (TTL) | +| Phase 3 | 1 day | Disable DynamoDB pipelines | +| Phase 4 | 2-3 weeks | Develop and deploy Laravel + SQS | + +**Total estimated time: 4-6 weeks** + +--- + +## Cost Impact + +| Phase | DynamoDB Cost | MariaDB Cost | SQS Cost | +|-------|---------------|------------|----------| +| Before | ~$1,600/month | $0 | $0 | +| Phase 1-2 | ~$800/month (decreasing) | ~$116/month | $0 | +| Phase 3+ | $0 | ~$116/month | $0 | +| Phase 4+ | $0 | ~$116/month | ~$1/month | + +**Annual savings: ~$17,800** diff --git a/go.mod b/go.mod index 446158d..1a94fa1 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.0 toolchain go1.25.3 require ( + github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/aws/aws-sdk-go-v2 v1.36.3 github.com/aws/aws-sdk-go-v2/config v1.29.14 github.com/aws/aws-sdk-go-v2/credentials v1.17.67 @@ -14,6 +15,7 @@ require ( github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6 github.com/emersion/go-smtp v0.21.3 github.com/go-playground/validator/v10 v10.25.0 + github.com/go-sql-driver/mysql v1.9.3 github.com/google/uuid v1.6.0 github.com/h2non/filetype v1.1.3 github.com/stretchr/testify v1.10.0 @@ -21,6 +23,7 @@ require ( ) require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect diff --git a/go.sum b/go.sum index 858c41c..750f35a 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM= github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= github.com/aws/aws-sdk-go-v2/config v1.29.14 h1:f+eEi/2cKCg9pqKBoAIwRGzVb70MRKqWX4dg1BDcSJM= @@ -50,10 +54,13 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.25.0 h1:5Dh7cjvzR7BRZadnsVOzPhWsrwUr0nmsZJxEAnFLNO8= github.com/go-playground/validator/v10 v10.25.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus= +github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo= +github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/h2non/filetype v1.1.3 h1:FKkx9QbD7HR/zjK1Ia5XiBsq9zdLi5Kf3zGyFTAFkGg= github.com/h2non/filetype v1.1.3/go.mod h1:319b3zT68BvV+WRj7cwy856M2ehB3HqNOt6sy1HndBY= +github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= diff --git a/internal/app/app.go b/internal/app/app.go index 855110d..3c3d71c 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -2,6 +2,7 @@ package app import ( "context" + "database/sql" "fmt" "log/slog" "sync" @@ -9,9 +10,11 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" + _ "github.com/go-sql-driver/mysql" "mailculator-processor/internal/eml" "mailculator-processor/internal/healthcheck" + "mailculator-processor/internal/mysql_outbox" "mailculator-processor/internal/outbox" "mailculator-processor/internal/pipeline" "mailculator-processor/internal/smtp" @@ -25,6 +28,7 @@ type App struct { pipes []pipelineProcessor interval int healthCheckServer *healthcheck.Server + mysqlDB *sql.DB // Keep reference for cleanup } type configProvider interface { @@ -36,27 +40,81 @@ type configProvider interface { GetSmtpConfig() smtp.Config GetEmlStoragePath() string GetAttachmentsBasePath() string + GetMySQLDSN() string + DynamoDBPipelinesEnabled() bool + MySQLPipelinesEnabled() bool } func New(cp configProvider) (*App, error) { - db := dynamodb.NewFromConfig(cp.GetAwsConfig()) client := smtp.New(cp.GetSmtpConfig()) - outboxService := outbox.NewOutbox(db, cp.GetOutboxTableName()) emlStorage := eml.NewEMLStorage(cp.GetEmlStoragePath()) - - intakePipe := pipeline.NewIntakePipeline(outboxService, emlStorage, cp.GetAttachmentsBasePath()) - mainSenderPipe := pipeline.NewMainSenderPipeline(outboxService, client) callbackConfig := cp.GetCallbackConfig() - sentCallbackPipe := pipeline.NewSentCallbackPipeline(outboxService, callbackConfig) - failedCallbackPipe := pipeline.NewFailedCallbackPipeline(outboxService, callbackConfig) - - pipes := []pipelineProcessor{intakePipe, mainSenderPipe, sentCallbackPipe, failedCallbackPipe} healthCheckServer := healthcheck.NewServer(cp.GetHealthCheckServerPort()) + var pipes []pipelineProcessor + var mysqlDB *sql.DB + + // Create DynamoDB pipelines if enabled + if cp.DynamoDBPipelinesEnabled() { + slog.Info("DynamoDB pipelines enabled, initializing...") + db := dynamodb.NewFromConfig(cp.GetAwsConfig()) + dynamoOutbox := outbox.NewOutbox(db, cp.GetOutboxTableName()) + + pipes = append(pipes, + pipeline.NewIntakePipeline(dynamoOutbox, emlStorage, cp.GetAttachmentsBasePath()), + pipeline.NewMainSenderPipeline(dynamoOutbox, client), + pipeline.NewSentCallbackPipeline(dynamoOutbox, callbackConfig), + pipeline.NewFailedCallbackPipeline(dynamoOutbox, callbackConfig), + ) + slog.Info("DynamoDB pipelines initialized", "count", 4) + } + + // Create MySQL pipelines if enabled + if cp.MySQLPipelinesEnabled() { + dsn := cp.GetMySQLDSN() + if dsn == "" { + return nil, fmt.Errorf("MySQL pipelines enabled but MySQL DSN is empty") + } + + slog.Info("MySQL pipelines enabled, initializing...") + var err error + mysqlDB, err = sql.Open("mysql", dsn) + if err != nil { + return nil, fmt.Errorf("failed to open MySQL connection: %w", err) + } + + // Configure connection pool + mysqlDB.SetMaxOpenConns(25) + mysqlDB.SetMaxIdleConns(5) + mysqlDB.SetConnMaxLifetime(5 * time.Minute) + + // Test connection + if err := mysqlDB.Ping(); err != nil { + return nil, fmt.Errorf("failed to ping MySQL: %w", err) + } + + mysqlOutbox := mysql_outbox.NewOutbox(mysqlDB) + + pipes = append(pipes, + pipeline.NewIntakePipeline(mysqlOutbox, emlStorage, cp.GetAttachmentsBasePath()), + pipeline.NewMainSenderPipeline(mysqlOutbox, client), + pipeline.NewSentCallbackPipeline(mysqlOutbox, callbackConfig), + pipeline.NewFailedCallbackPipeline(mysqlOutbox, callbackConfig), + ) + slog.Info("MySQL pipelines initialized", "count", 4) + } + + if len(pipes) == 0 { + return nil, fmt.Errorf("no pipelines enabled, at least one of DynamoDB or MySQL must be enabled") + } + + slog.Info("App initialized", "total_pipelines", len(pipes)) + return &App{ pipes: pipes, interval: cp.GetPipelineInterval(), healthCheckServer: healthCheckServer, + mysqlDB: mysqlDB, }, nil } @@ -90,4 +148,11 @@ func (a *App) Run(ctx context.Context) { }() wg.Wait() + + // Cleanup MySQL connection if it was opened + if a.mysqlDB != nil { + if err := a.mysqlDB.Close(); err != nil { + slog.Error("failed to close MySQL connection", "error", err) + } + } } diff --git a/internal/app/app_test.go b/internal/app/app_test.go index 0af188a..0fa6a2b 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -69,6 +69,18 @@ func (cp *configProviderMock) GetAttachmentsBasePath() string { return "/base/attachments/path/" } +func (cp *configProviderMock) GetMySQLDSN() string { + return "" +} + +func (cp *configProviderMock) DynamoDBPipelinesEnabled() bool { + return true +} + +func (cp *configProviderMock) MySQLPipelinesEnabled() bool { + return false +} + func TestAppInstance(t *testing.T) { app, errNew := New(newConfigProviderMock()) require.NoError(t, errNew) diff --git a/internal/config/config.go b/internal/config/config.go index 50c84d9..10a9535 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,12 +3,13 @@ package config import ( "context" "fmt" - "gopkg.in/yaml.v3" "io" "os" "strings" "time" + "gopkg.in/yaml.v3" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/go-playground/validator/v10" @@ -61,14 +62,33 @@ type EmlStorageConfig struct { Path string `yaml:"path" validate:"required"` } +type MySQLConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + User string `yaml:"user"` + Password string `yaml:"password"` + Database string `yaml:"database"` +} + +type PipelineToggle struct { + Enabled bool `yaml:"enabled"` +} + +type PipelinesConfig struct { + DynamoDB PipelineToggle `yaml:"dynamodb"` + MySQL PipelineToggle `yaml:"mysql"` +} + type Config struct { Attachments AttachmentsConfig `yaml:"attachments,flow" validate:"required"` Aws AwsConfig `yaml:"aws,flow"` Callback CallbacksConfig `yaml:"callback,flow" validate:"required"` EmlStorage EmlStorageConfig `yaml:"eml-storage,flow" validate:"required"` HealthCheck HealthCheckConfig `yaml:"health-check,flow" validate:"required"` + MySQL MySQLConfig `yaml:"mysql,flow"` Outbox OutboxConfig `yaml:"outbox,flow" validate:"required"` Pipeline PipelineConfig `yaml:"pipeline,flow" validate:"required"` + Pipelines PipelinesConfig `yaml:"pipelines,flow"` Smtp SmtpConfig `yaml:"smtp,flow" validate:"required"` } @@ -157,3 +177,24 @@ func (c *Config) GetEmlStoragePath() string { func (c *Config) GetAttachmentsBasePath() string { return c.Attachments.BasePath } + +func (c *Config) GetMySQLConfig() MySQLConfig { + return c.MySQL +} + +func (c *Config) GetMySQLDSN() string { + cfg := c.MySQL + if cfg.Host == "" { + return "" + } + return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", + cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Database) +} + +func (c *Config) DynamoDBPipelinesEnabled() bool { + return c.Pipelines.DynamoDB.Enabled +} + +func (c *Config) MySQLPipelinesEnabled() bool { + return c.Pipelines.MySQL.Enabled +} diff --git a/internal/config/testdata/valid-with-envvar-in-outbox-table-name.yaml b/internal/config/testdata/valid-with-envvar-in-outbox-table-name.yaml index bcfc410..8383821 100644 --- a/internal/config/testdata/valid-with-envvar-in-outbox-table-name.yaml +++ b/internal/config/testdata/valid-with-envvar-in-outbox-table-name.yaml @@ -13,12 +13,25 @@ health-check: server: port: 8080 +mysql: + host: "localhost" + port: 3306 + user: "root" + password: "test" + database: "mailculator_test" + outbox: table-name: "${TEST_ENV_VAR}" pipeline: interval: 3 +pipelines: + dynamodb: + enabled: true + mysql: + enabled: true + smtp: host: dummy-host port: 12345 diff --git a/internal/config/testdata/valid.yaml b/internal/config/testdata/valid.yaml index 6629534..a8c2339 100644 --- a/internal/config/testdata/valid.yaml +++ b/internal/config/testdata/valid.yaml @@ -13,12 +13,25 @@ health-check: server: port: 8080 +mysql: + host: "localhost" + port: 3306 + user: "root" + password: "test" + database: "mailculator_test" + outbox: table-name: "Outbox" pipeline: interval: 3 +pipelines: + dynamodb: + enabled: true + mysql: + enabled: true + smtp: host: dummy-host port: 12345 diff --git a/internal/mysql_outbox/outbox.go b/internal/mysql_outbox/outbox.go new file mode 100644 index 0000000..3b4b05a --- /dev/null +++ b/internal/mysql_outbox/outbox.go @@ -0,0 +1,338 @@ +package mysql_outbox + +import ( + "context" + "database/sql" + "database/sql/driver" + "errors" + "math/rand" + "time" + + "mailculator-processor/internal/outbox" + + "github.com/go-sql-driver/mysql" +) + +const ( + maxAttempts = 8 + baseDelay = 30 * time.Millisecond + maxDelay = 1 * time.Second +) + +var ErrLockNotAcquired = errors.New("lock not acquired: record was modified by another process") + +// MySQL error numbers for retryable errors +var retryableErrNos = map[uint16]bool{ + 1205: true, // Lock wait timeout exceeded + 1213: true, // Deadlock found + 1040: true, // Too many connections + 1203: true, // Max user connections exceeded +} + +// sqlDBInterface defines the minimal interface for database operations +type sqlDBInterface interface { + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) + BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) +} + +type Outbox struct { + db sqlDBInterface +} + +func NewOutbox(db *sql.DB) *Outbox { + return &Outbox{ + db: db, + } +} + +// NewOutboxWithDB creates an Outbox with a custom database interface (for testing) +func NewOutboxWithDB(db sqlDBInterface) *Outbox { + return &Outbox{ + db: db, + } +} + +// shouldRetryMySQL checks if the error is a transient MySQL error that should be retried. +// It returns false for ErrLockNotAcquired (optimistic lock conflict). +func (o *Outbox) shouldRetryMySQL(err error) bool { + if err == nil { + return false + } + + // Don't retry optimistic lock conflicts + if errors.Is(err, ErrLockNotAcquired) { + return false + } + + // Check for MySQL-specific errors + var mysqlErr *mysql.MySQLError + if errors.As(err, &mysqlErr) { + return retryableErrNos[mysqlErr.Number] + } + + // Check for connection errors + if errors.Is(err, driver.ErrBadConn) { + return true + } + + return false +} + +// backoffDuration calculates the backoff duration for a given retry attempt +// using exponential backoff with jitter. +func (o *Outbox) backoffDuration(attempt int) time.Duration { + max := min(time.Duration(1< INTAKING -> READY -> PROCESSING -> SENT -> CALLING-SENT-CALLBACK -> SENT-ACKNOWLEDGED + id := "test-transitions-id" + defer func() { + _ = sut.Delete(context.TODO(), id) + }() + + // Create in ACCEPTED status + err = sut.Create(context.TODO(), id, outbox.StatusAccepted, "/path/to/payload.json") + require.NoError(t, err) + + // ACCEPTED -> INTAKING + err = sut.Update(context.TODO(), id, outbox.StatusIntaking, "", nil) + require.NoError(t, err) + + status, err := facade.GetEmailStatus(context.TODO(), id) + require.NoError(t, err) + assert.Equal(t, outbox.StatusIntaking, status) + + // INTAKING -> READY (using Ready method) + err = sut.Ready(context.TODO(), id, "/path/to/email.eml", nil) + require.NoError(t, err) + + status, err = facade.GetEmailStatus(context.TODO(), id) + require.NoError(t, err) + assert.Equal(t, outbox.StatusReady, status) + + // READY -> PROCESSING + err = sut.Update(context.TODO(), id, outbox.StatusProcessing, "", nil) + require.NoError(t, err) + + // PROCESSING -> SENT + err = sut.Update(context.TODO(), id, outbox.StatusSent, "", nil) + require.NoError(t, err) + + // SENT -> CALLING-SENT-CALLBACK + err = sut.Update(context.TODO(), id, outbox.StatusCallingSentCallback, "", nil) + require.NoError(t, err) + + // CALLING-SENT-CALLBACK -> SENT-ACKNOWLEDGED + err = sut.Update(context.TODO(), id, outbox.StatusSentAcknowledged, "", nil) + require.NoError(t, err) + + status, err = facade.GetEmailStatus(context.TODO(), id) + require.NoError(t, err) + assert.Equal(t, outbox.StatusSentAcknowledged, status) +} diff --git a/internal/mysql_outbox/outbox_test.go b/internal/mysql_outbox/outbox_test.go new file mode 100644 index 0000000..9a5ffcc --- /dev/null +++ b/internal/mysql_outbox/outbox_test.go @@ -0,0 +1,331 @@ +//go:build unit + +package mysql_outbox + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "mailculator-processor/internal/outbox" +) + +func TestQuery_WhenDatabaseHasRecords_ShouldReturnEmails(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + now := time.Now() + + rows := sqlmock.NewRows([]string{"id", "status", "eml_file_path", "payload_file_path", "reason", "version", "updated_at"}). + AddRow("test-id-1", "READY", "/path/to/eml", "/path/to/payload", "", 1, now). + AddRow("test-id-2", "READY", "/path/to/eml2", "/path/to/payload2", "some reason", 2, now) + + mock.ExpectQuery("SELECT id, status, eml_file_path, payload_file_path, reason, version, updated_at FROM emails"). + WithArgs("READY", 25). + WillReturnRows(rows) + + sut := NewOutboxWithDB(db) + + emails, err := sut.Query(context.TODO(), outbox.StatusReady, 25) + + assert.NoError(t, err) + require.Len(t, emails, 2) + assert.Equal(t, "test-id-1", emails[0].Id) + assert.Equal(t, "READY", emails[0].Status) + assert.Equal(t, "/path/to/eml", emails[0].EmlFilePath) + assert.Equal(t, "test-id-2", emails[1].Id) + + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestQuery_WhenDatabaseReturnsError_ShouldReturnError(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + expectedError := errors.New("database error") + mock.ExpectQuery("SELECT").WillReturnError(expectedError) + + sut := NewOutboxWithDB(db) + + _, err = sut.Query(context.TODO(), outbox.StatusReady, 25) + + assert.Error(t, err) + assert.Equal(t, expectedError, err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestQuery_WhenDatabaseHasNoRecords_ShouldReturnEmptySlice(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + rows := sqlmock.NewRows([]string{"id", "status", "eml_file_path", "payload_file_path", "reason", "version", "updated_at"}) + + mock.ExpectQuery("SELECT"). + WithArgs("READY", 10). + WillReturnRows(rows) + + sut := NewOutboxWithDB(db) + + emails, err := sut.Query(context.TODO(), outbox.StatusReady, 10) + + assert.NoError(t, err) + assert.Len(t, emails, 0) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestUpdate_WhenUpdateSucceeds_ShouldReturnNoError(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectBegin() + mock.ExpectExec("UPDATE emails"). + WithArgs("PROCESSING", "", "test-id", "READY"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO email_statuses"). + WithArgs("test-id", "PROCESSING", ""). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + sut := NewOutboxWithDB(db) + + err = sut.Update(context.TODO(), "test-id", outbox.StatusProcessing, "", nil) + + assert.NoError(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestUpdate_WhenNoRowsAffected_ShouldReturnLockError(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectBegin() + mock.ExpectExec("UPDATE emails"). + WithArgs("PROCESSING", "", "test-id", "READY"). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectRollback() + + sut := NewOutboxWithDB(db) + + err = sut.Update(context.TODO(), "test-id", outbox.StatusProcessing, "", nil) + + assert.Error(t, err) + assert.ErrorIs(t, err, ErrLockNotAcquired) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestUpdate_WhenDatabaseReturnsError_ShouldReturnError(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + expectedError := errors.New("database error") + mock.ExpectBegin() + mock.ExpectExec("UPDATE emails"). + WithArgs("PROCESSING", "", "test-id", "READY"). + WillReturnError(expectedError) + mock.ExpectRollback() + + sut := NewOutboxWithDB(db) + + err = sut.Update(context.TODO(), "test-id", outbox.StatusProcessing, "", nil) + + assert.Error(t, err) + assert.Equal(t, expectedError, err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestReady_WhenUpdateSucceeds_ShouldReturnNoError(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectBegin() + mock.ExpectExec("UPDATE emails"). + WithArgs("READY", "/path/to/eml", "test-id", "INTAKING"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO email_statuses"). + WithArgs("test-id", "READY", ""). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + sut := NewOutboxWithDB(db) + + err = sut.Ready(context.TODO(), "test-id", "/path/to/eml", nil) + + assert.NoError(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestReady_WhenNoRowsAffected_ShouldReturnLockError(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectBegin() + mock.ExpectExec("UPDATE emails"). + WithArgs("READY", "/path/to/eml", "test-id", "INTAKING"). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectRollback() + + sut := NewOutboxWithDB(db) + + err = sut.Ready(context.TODO(), "test-id", "/path/to/eml", nil) + + assert.Error(t, err) + assert.ErrorIs(t, err, ErrLockNotAcquired) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestGetExpectedFromStatus_ShouldReturnCorrectTransitions(t *testing.T) { + t.Parallel() + + testCases := []struct { + toStatus string + fromStatus string + }{ + {outbox.StatusIntaking, outbox.StatusAccepted}, + {outbox.StatusReady, outbox.StatusIntaking}, + {outbox.StatusProcessing, outbox.StatusReady}, + {outbox.StatusSent, outbox.StatusProcessing}, + {outbox.StatusFailed, outbox.StatusProcessing}, + {outbox.StatusInvalid, outbox.StatusIntaking}, + {outbox.StatusCallingSentCallback, outbox.StatusSent}, + {outbox.StatusCallingFailedCallback, outbox.StatusFailed}, + {outbox.StatusSentAcknowledged, outbox.StatusCallingSentCallback}, + {outbox.StatusFailedAcknowledged, outbox.StatusCallingFailedCallback}, + } + + for _, tc := range testCases { + t.Run(tc.toStatus, func(t *testing.T) { + result := getExpectedFromStatus(tc.toStatus) + assert.Equal(t, tc.fromStatus, result) + }) + } +} + +func TestCreate_WhenInsertSucceeds_ShouldReturnNoError(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectBegin() + mock.ExpectExec("INSERT INTO emails"). + WithArgs("test-id", "ACCEPTED", "/path/to/payload"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("INSERT INTO email_statuses"). + WithArgs("test-id", "ACCEPTED", ""). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + sut := NewOutboxWithDB(db) + + err = sut.Create(context.TODO(), "test-id", outbox.StatusAccepted, "/path/to/payload") + + assert.NoError(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestDelete_WhenDeleteSucceeds_ShouldReturnNoError(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectExec("DELETE FROM emails"). + WithArgs("test-id"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + sut := NewOutboxWithDB(db) + + err = sut.Delete(context.TODO(), "test-id") + + assert.NoError(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestShouldRetryMySQL_WhenErrLockNotAcquired_ShouldReturnFalse(t *testing.T) { + t.Parallel() + + db, _, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + sut := NewOutboxWithDB(db) + + result := sut.shouldRetryMySQL(ErrLockNotAcquired) + + assert.False(t, result) +} + +func TestShouldRetryMySQL_WhenNilError_ShouldReturnFalse(t *testing.T) { + t.Parallel() + + db, _, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + sut := NewOutboxWithDB(db) + + result := sut.shouldRetryMySQL(nil) + + assert.False(t, result) +} + +func TestShouldRetryMySQL_WhenGenericError_ShouldReturnFalse(t *testing.T) { + t.Parallel() + + db, _, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + sut := NewOutboxWithDB(db) + + result := sut.shouldRetryMySQL(errors.New("some generic error")) + + assert.False(t, result) +} + +func TestBackoffDuration_ShouldReturnPositiveDuration(t *testing.T) { + t.Parallel() + + db, _, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + sut := NewOutboxWithDB(db) + + for attempt := 0; attempt < 10; attempt++ { + duration := sut.backoffDuration(attempt) + assert.GreaterOrEqual(t, int64(duration), int64(0)) + assert.LessOrEqual(t, duration, maxDelay) + } +} diff --git a/internal/outbox/outbox.go b/internal/outbox/outbox.go index c8a5d2a..0652889 100644 --- a/internal/outbox/outbox.go +++ b/internal/outbox/outbox.go @@ -47,6 +47,7 @@ type Email struct { UpdatedAt string Reason string TTL *int64 + Version int } type dynamodbInterface interface { @@ -368,6 +369,7 @@ func (m *emailMarshaller) UnmarshalList(attrsList []map[string]types.AttributeVa UpdatedAt: fmt.Sprint(item.Attributes["UpdatedAt"]), Reason: fmt.Sprint(item.Attributes["Reason"]), TTL: ttl, + Version: 0, // DynamoDB doesn't use version-based locking }) } diff --git a/internal/testutils/facades/mysql_outbox.go b/internal/testutils/facades/mysql_outbox.go new file mode 100644 index 0000000..34c1575 --- /dev/null +++ b/internal/testutils/facades/mysql_outbox.go @@ -0,0 +1,159 @@ +package facades + +import ( + "context" + "database/sql" + "fmt" + "os" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/google/uuid" +) + +type MySQLOutboxFacade struct { + db *sql.DB +} + +func NewMySQLConfigFromEnv() string { + host := os.Getenv("MYSQL_HOST") + port := os.Getenv("MYSQL_PORT") + user := os.Getenv("MYSQL_USER") + password := os.Getenv("MYSQL_PASSWORD") + database := os.Getenv("MYSQL_DATABASE") + tls := os.Getenv("MYSQL_TLS") + + if host == "" { + host = "127.0.0.1" + } + if port == "" { + port = "3306" + } + if user == "" { + user = "root" + } + if password == "" { + password = "test" + } + if database == "" { + database = "mailculator_test" + } + if tls == "" { + tls = "true" + } + + return fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true&tls=%s", user, password, host, port, database, tls) +} + +func NewMySQLOutboxFacade() (*MySQLOutboxFacade, error) { + dsn := NewMySQLConfigFromEnv() + + db, err := sql.Open("mysql", dsn) + if err != nil { + return nil, fmt.Errorf("failed to open mysql connection: %w", err) + } + + // Test the connection + if err := db.Ping(); err != nil { + return nil, fmt.Errorf("failed to ping mysql: %w", err) + } + + return &MySQLOutboxFacade{db: db}, nil +} + +func (f *MySQLOutboxFacade) GetDB() *sql.DB { + return f.db +} + +func (f *MySQLOutboxFacade) Close() error { + return f.db.Close() +} + +func (f *MySQLOutboxFacade) AddEmail(ctx context.Context, emlFilePath string) (string, error) { + if emlFilePath == "" { + emlFilePath = "testdata/smol.EML" + } + + id := uuid.NewString() + status := "READY" + + query := ` + INSERT INTO emails (id, status, eml_file_path) + VALUES (?, ?, ?) + ` + + _, err := f.db.ExecContext(ctx, query, id, status, emlFilePath) + if err != nil { + return "", fmt.Errorf("failed to insert email: %w", err) + } + + // Insert initial history + historyQuery := ` + INSERT INTO email_statuses (email_id, status, reason) + VALUES (?, ?, ?) + ` + _, _ = f.db.ExecContext(ctx, historyQuery, id, status, "test fixture") + + return id, nil +} + +func (f *MySQLOutboxFacade) AddEmailWithStatus(ctx context.Context, status string, emlFilePath string) (string, error) { + if emlFilePath == "" { + emlFilePath = "testdata/smol.EML" + } + + id := uuid.NewString() + + query := ` + INSERT INTO emails (id, status, eml_file_path) + VALUES (?, ?, ?) + ` + + _, err := f.db.ExecContext(ctx, query, id, status, emlFilePath) + if err != nil { + return "", fmt.Errorf("failed to insert email: %w", err) + } + + // Insert initial history + historyQuery := ` + INSERT INTO email_statuses (email_id, status, reason) + VALUES (?, ?, ?) + ` + _, _ = f.db.ExecContext(ctx, historyQuery, id, status, "test fixture") + + return id, nil +} + +func (f *MySQLOutboxFacade) DeleteEmail(ctx context.Context, id string) error { + if id == "" { + return fmt.Errorf("id is required") + } + + // History will be deleted via CASCADE + query := `DELETE FROM emails WHERE id = ?` + _, err := f.db.ExecContext(ctx, query, id) + return err +} + +func (f *MySQLOutboxFacade) GetEmailStatus(ctx context.Context, id string) (string, error) { + var status string + err := f.db.QueryRowContext(ctx, "SELECT status FROM emails WHERE id = ?", id).Scan(&status) + return status, err +} + +// WaitForReady waits for MariaDB to be ready by checking for the zz-finish marker file +// This is used in tests to wait for MariaDB initialization to complete +func WaitForMySQLReady(timeout time.Duration) error { + deadline := time.Now().Add(timeout) + + for time.Now().Before(deadline) { + facade, err := NewMySQLOutboxFacade() + if err == nil { + facade.Close() + return nil + } + time.Sleep(500 * time.Millisecond) + } + + return fmt.Errorf("timeout waiting for MySQL to be ready") +} diff --git a/run-tests-local.sh b/run-tests-local.sh index b975f86..ea9fb6e 100755 --- a/run-tests-local.sh +++ b/run-tests-local.sh @@ -21,6 +21,14 @@ export PIPELINE_CALLBACK_URL=http://127.0.0.1:8080 export ATTACHMENTS_BASE_PATH=testdata/attachments export EMAIL_OUTBOX_TABLE=Outbox export EML_STORAGE_PATH=testdata/.out/eml +export MYSQL_HOST=127.0.0.1 +export MYSQL_PORT=3306 +export MYSQL_USER=root +export MYSQL_PASSWORD=test +export MYSQL_DATABASE=mailculator_test +export MYSQL_TLS=false +export DYNAMODB_PIPELINES_ENABLED=true +export MYSQL_PIPELINES_ENABLED=true if ! docker compose -f "$script_dir/compose.yml" --profile test-deps up -d --build --force-recreate; then echo "Could not start test dependencies"