Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ALTER TABLE drift_analysis_run
ADD COLUMN idempotency_key TEXT;

CREATE UNIQUE INDEX drift_analysis_run_repo_idem_key_uidx
ON drift_analysis_run (repository_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
10 changes: 10 additions & 0 deletions pkg/repository/drift_analysis_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type DriftAnalysisRepository interface {
CreateDriftAnalysisProject(ctx context.Context, params queries.CreateDriftAnalysisProjectParams) (queries.DriftAnalysisProject, error)
FindDriftAnalysisRunsByRepositoryID(ctx context.Context, repoId int64, page int) ([]queries.DriftAnalysisRun, error)
FindDriftAnalysisRunByUUID(ctx context.Context, uuid uuid.UUID) (queries.DriftAnalysisRun, error)
FindRunByRepoAndIdempotencyKey(ctx context.Context, repoId int64, idempotencyKey string) (queries.DriftAnalysisRun, error)
FindDriftAnalysisProjectsByRunId(ctx context.Context, runId uuid.UUID) ([]queries.DriftAnalysisProject, error)
GetRepositoryRunStats(ctx context.Context, repoId int64) (queries.GetRepositoryRunStatsRow, error)
GetLatestRunForRepository(ctx context.Context, repoId int64) (queries.DriftAnalysisRun, error)
Expand Down Expand Up @@ -54,6 +55,15 @@ func (r *DriftAnalysisRepo) FindDriftAnalysisRunByUUID(ctx context.Context, uuid
return r.db.Queries(ctx).FindDriftAnalysisRunByUUID(ctx, uuid)
}

// FindRunByRepoAndIdempotencyKey returns pgx.ErrNoRows when no matching run exists; callers
// should check with errors.Is(err, pgx.ErrNoRows).
func (r *DriftAnalysisRepo) FindRunByRepoAndIdempotencyKey(ctx context.Context, repoId int64, idempotencyKey string) (queries.DriftAnalysisRun, error) {
return r.db.Queries(ctx).FindDriftAnalysisRunByRepoAndIdempotencyKey(ctx, queries.FindDriftAnalysisRunByRepoAndIdempotencyKeyParams{
RepositoryID: repoId,
IdempotencyKey: &idempotencyKey,
})
}

func (r *DriftAnalysisRepo) FindDriftAnalysisProjectsByRunId(ctx context.Context, runId uuid.UUID) ([]queries.DriftAnalysisProject, error) {
return r.db.Queries(ctx).FindDriftAnalysisProjectsByRunId(ctx, runId)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/queries/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions pkg/repository/queries/drift_analysis.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
-- name: CreateDriftAnalysisRun :one
INSERT INTO drift_analysis_run (uuid, repository_id, total_projects, total_projects_drifted, total_projects_errored, total_projects_skipped, analysis_duration_millis)
VALUES (@uuid, @repository_id, @total_projects, @total_projects_drifted, @total_projects_errored, @total_projects_skipped, @analysis_duration_millis)
INSERT INTO drift_analysis_run (uuid, repository_id, total_projects, total_projects_drifted, total_projects_errored, total_projects_skipped, analysis_duration_millis, idempotency_key)
VALUES (@uuid, @repository_id, @total_projects, @total_projects_drifted, @total_projects_errored, @total_projects_skipped, @analysis_duration_millis, @idempotency_key)
RETURNING *;

-- name: FindDriftAnalysisRunByRepoAndIdempotencyKey :one
SELECT *
FROM drift_analysis_run
WHERE repository_id = @repository_id AND idempotency_key = @idempotency_key;

-- name: CreateDriftAnalysisProject :one
INSERT INTO drift_analysis_project (drift_analysis_run_id, dir, type, drifted, succeeded, init_output, plan_output, skipped_due_to_pr)
VALUES (@drift_analysis_run_id, @dir, @type, @drifted, @succeeded, @init_output, @plan_output, @skipped_due_to_pr)
Expand Down
49 changes: 42 additions & 7 deletions pkg/repository/queries/drift_analysis.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/repository/queries/git_organization.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/repository/queries/git_repository.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/repository/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/repository/queries/sync_org.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/repository/queries/sync_status_user.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/repository/queries/users.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 45 additions & 6 deletions pkg/usecase/drift_stream/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ import (
"github.com/gofiber/fiber/v3/log"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"time"
)

// pgUniqueViolation is the SQLSTATE code for unique_violation.
const pgUniqueViolation = "23505"

type DriftStateHandler struct {
cfg *config.Config
orgRepository repository.GitOrgRepository
Expand Down Expand Up @@ -97,6 +101,22 @@ func (d *DriftStateHandler) HandleUpdate(c fiber.Ctx) error {
return c.SendStatus(fiber.StatusInternalServerError)
}

idemKey := strings.TrimSpace(c.Get("Idempotency-Key"))

// If the client sent an Idempotency-Key and we already have a run for it, return that run
// without re-inserting. This lets the CLI safely retry transient failures.
if idemKey != "" {
existing, err := d.driftAnalysisRepository.FindRunByRepoAndIdempotencyKey(c.Context(), repo.ID, idemKey)
if err == nil {
log.Infof("Idempotent replay for repository %d, key %s -> run %s", repo.ID, idemKey, existing.Uuid)
return c.JSON(buildAnalysisResponse(d.cfg.Frontend.FrontendURL, org, repo, existing.Uuid))
}
if !errors.Is(err, pgx.ErrNoRows) {
log.Errorf("Error looking up run by idempotency key: %v", err)
return c.SendStatus(fiber.StatusInternalServerError)
}
}

var state DriftDetectionResult
if err := c.Bind().Body(&state); err != nil {
return c.SendStatus(fiber.StatusBadRequest)
Expand All @@ -116,6 +136,11 @@ func (d *DriftStateHandler) HandleUpdate(c fiber.Ctx) error {
}
}

var idemKeyPtr *string
if idemKey != "" {
idemKeyPtr = &idemKey
}

var runUUID uuid.UUID
err = d.driftAnalysisRepository.WithTx(c.Context(), func(ctx context.Context) error {
params := queries.CreateDriftAnalysisRunParams{
Expand All @@ -126,6 +151,7 @@ func (d *DriftStateHandler) HandleUpdate(c fiber.Ctx) error {
TotalProjectsErrored: totalErrored,
TotalProjectsSkipped: state.TotalSkipped,
AnalysisDurationMillis: state.Duration.Milliseconds(),
IdempotencyKey: idemKeyPtr,
}

run, err := d.driftAnalysisRepository.CreateDriftAnalysisRun(ctx, params)
Expand Down Expand Up @@ -165,6 +191,19 @@ func (d *DriftStateHandler) HandleUpdate(c fiber.Ctx) error {
})

if err != nil {
// Race: a concurrent retry with the same idempotency key may have inserted first.
// The partial unique index on (repository_id, idempotency_key) trips with SQLSTATE 23505;
// re-fetch and return the winning row.
if idemKey != "" {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == pgUniqueViolation {
existing, lookupErr := d.driftAnalysisRepository.FindRunByRepoAndIdempotencyKey(c.Context(), repo.ID, idemKey)
if lookupErr == nil {
log.Infof("Idempotent race resolved for repository %d, key %s -> run %s", repo.ID, idemKey, existing.Uuid)
return c.JSON(buildAnalysisResponse(d.cfg.Frontend.FrontendURL, org, repo, existing.Uuid))
}
}
}
log.Errorf("Error handling drift state update: %v", err)
return c.SendStatus(fiber.StatusInternalServerError)
}
Expand All @@ -176,21 +215,21 @@ func (d *DriftStateHandler) HandleUpdate(c fiber.Ctx) error {
}
}

// Build dashboard URL: /:provider/:org/:repo/run/:run_uuid
return c.JSON(buildAnalysisResponse(d.cfg.Frontend.FrontendURL, org, repo, runUUID))
}

func buildAnalysisResponse(frontendURL string, org queries.GitOrganization, repo queries.GitRepository, runUUID uuid.UUID) DriftAnalysisResponse {
dashboardURL := fmt.Sprintf("%s/%s/%s/%s/run/%s",
d.cfg.Frontend.FrontendURL,
frontendURL,
providerToSlug(org.Provider),
org.Name,
repo.Name,
runUUID.String(),
)

response := DriftAnalysisResponse{
return DriftAnalysisResponse{
RunID: runUUID.String(),
DashboardURL: dashboardURL,
}

return c.JSON(response)
}

func (d *DriftStateHandler) ListRunsByRepoId(c fiber.Ctx) error {
Expand Down