|
1 | | -import json, os, re, subprocess, sys |
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +PreProd resolver: |
| 4 | +- Determines the furthest-ahead successful TEST commit on main (sha + dev-* tag). |
| 5 | +- Resolves THIS run's sha + dev-* tag (auto via workflow_run, or manual via input tag). |
| 6 | +- Applies the stale-run guard (auto blocks older; manual requires allow_older=true). |
| 7 | +- Emits outputs for later steps. |
| 8 | +
|
| 9 | +Outputs (via $GITHUB_OUTPUT): |
| 10 | + this_sha, this_ref, latest_test_sha, latest_test_ref |
| 11 | +""" |
| 12 | + |
| 13 | +from __future__ import annotations |
| 14 | +import json |
| 15 | +import os |
| 16 | +import re |
| 17 | +import subprocess |
| 18 | +import sys |
| 19 | +from dataclasses import dataclass |
2 | 20 | from typing import List, Optional |
3 | 21 |
|
| 22 | + |
4 | 23 | WORKFLOW_NAME = os.getenv("WORKFLOW_NAME", "3. CD | Deploy to Test") |
5 | 24 | BRANCH = os.getenv("BRANCH", "main") |
6 | 25 | LIMIT = int(os.getenv("LIMIT", "30")) |
|
9 | 28 | HEAD_SHA_AUTO = os.getenv("WORKFLOW_RUN_HEAD_SHA", "") |
10 | 29 | MANUAL_REF = os.getenv("MANUAL_REF", "") |
11 | 30 | ALLOW_OLDER = os.getenv("ALLOW_OLDER", "false").lower() |
12 | | -REPO = os.getenv("GITHUB_REPOSITORY", "") |
13 | 31 |
|
14 | | -def run(cmd: List[str], check=True, capture=True) -> subprocess.CompletedProcess: |
15 | | - return subprocess.run(cmd, check=check, capture_output=capture, text=True) |
| 32 | + |
| 33 | +def fail(msg: str) -> "NoReturn": |
| 34 | + print(f"::error::{msg}", file=sys.stderr) |
| 35 | + sys.exit(1) |
| 36 | + |
| 37 | +def run(cmd: List[str], *, check: bool = True) -> subprocess.CompletedProcess: |
| 38 | + return subprocess.run(cmd, check=check, capture_output=True, text=True) |
16 | 39 |
|
17 | 40 | def git_ok(args: List[str]) -> bool: |
18 | 41 | return subprocess.run(["git", *args]).returncode == 0 |
19 | 42 |
|
20 | | -def is_ancestor(a: str, b: str) -> bool: |
21 | | - return subprocess.run(["git","merge-base","--is-ancestor", a, b]).returncode == 0 |
| 43 | +def is_ancestor(older: str, newer: str) -> bool: |
| 44 | + return subprocess.run(["git", "merge-base", "--is-ancestor", older, newer]).returncode == 0 |
22 | 45 |
|
23 | | -def fetch_graph(): |
24 | | - subprocess.run(["git","fetch","origin", BRANCH, "--quiet"], check=True) |
25 | | - subprocess.run(["git","fetch","--tags","--force","--quiet"], check=True) |
| 46 | +def fetch_graph() -> None: |
| 47 | + run(["git", "fetch", "origin", BRANCH, "--quiet"]) |
| 48 | + run(["git", "fetch", "--tags", "--force", "--quiet"]) |
| 49 | + |
| 50 | +def gh_json(args: List[str]) -> list: |
| 51 | + cp = run(["gh", *args]) |
| 52 | + raw = cp.stdout.strip() |
| 53 | + return json.loads(raw) if raw else [] |
26 | 54 |
|
27 | 55 | def dev_tag_for_sha(sha: str) -> Optional[str]: |
28 | | - cp = run(["git","tag","--points-at", sha], check=False) |
29 | | - for t in (cp.stdout or "").splitlines(): |
| 56 | + cp = run(["git", "tag", "--points-at", sha], check=False) |
| 57 | + for t in cp.stdout.splitlines(): |
30 | 58 | if t.startswith("dev-"): |
31 | 59 | return t |
32 | 60 | return None |
33 | 61 |
|
34 | 62 | def sha_for_tag(tag: str) -> Optional[str]: |
35 | | - cp = run(["git","rev-list","-n1", tag], check=False) |
| 63 | + cp = run(["git", "rev-list", "-n1", tag], check=False) |
36 | 64 | return cp.stdout.strip() or None |
37 | 65 |
|
| 66 | + |
| 67 | +@dataclass(frozen=True) |
| 68 | +class RefInfo: |
| 69 | + sha: str |
| 70 | + ref: str |
| 71 | + |
| 72 | + |
| 73 | +def require_token() -> None: |
| 74 | + if not (os.getenv("GH_TOKEN") or os.getenv("GITHUB_TOKEN")): |
| 75 | + fail("GH_TOKEN/GITHUB_TOKEN is required") |
| 76 | + |
38 | 77 | def list_successful_test_shas() -> List[str]: |
39 | | - cp = run([ |
40 | | - "gh","run","list", |
| 78 | + data = gh_json([ |
| 79 | + "run", "list", |
41 | 80 | "--workflow", WORKFLOW_NAME, |
42 | 81 | "--branch", BRANCH, |
43 | | - "--status","success", |
44 | | - "--json","headSha", |
| 82 | + "--status", "success", |
| 83 | + "--json", "headSha", |
45 | 84 | "--limit", str(LIMIT), |
46 | 85 | ]) |
47 | | - data = json.loads(cp.stdout or "[]") |
48 | | - return [it["headSha"] for it in data if "headSha" in it and it["headSha"]] |
| 86 | + return [it["headSha"] for it in data if it.get("headSha")] |
49 | 87 |
|
50 | 88 | def pick_furthest_ahead(shas: List[str]) -> str: |
51 | | - latest = None |
| 89 | + latest: Optional[str] = None |
52 | 90 | for cand in shas: |
53 | | - if latest is None: |
| 91 | + if latest is None or is_ancestor(latest, cand): |
54 | 92 | latest = cand |
55 | | - continue |
56 | | - if is_ancestor(latest, cand): |
57 | | - latest = cand |
58 | | - return latest |
59 | | - |
60 | | -def fail(msg: str) -> int: |
61 | | - print(f"::error::{msg}", file=sys.stderr) |
62 | | - return 1 |
63 | | - |
64 | | -def main() -> int: |
65 | | - if not (os.getenv("GH_TOKEN") or os.getenv("GITHUB_TOKEN")): |
66 | | - return fail("GH_TOKEN/GITHUB_TOKEN is required") |
67 | | - |
68 | | - fetch_graph() |
| 93 | + return latest or "" |
69 | 94 |
|
| 95 | +def resolve_latest_test() -> RefInfo: |
70 | 96 | shas = list_successful_test_shas() |
71 | 97 | if not shas: |
72 | | - return fail("No successful TEST runs found on branch") |
| 98 | + fail("No successful TEST runs found on branch") |
73 | 99 | shas = [s for s in shas if is_ancestor(s, f"origin/{BRANCH}")] |
74 | 100 | if not shas: |
75 | | - return fail("No TEST SHAs are ancestors of origin/main") |
76 | | - latest_test_sha = pick_furthest_ahead(shas) |
77 | | - if not latest_test_sha: |
78 | | - return fail("Could not determine latest TEST SHA") |
79 | | - latest_test_ref = dev_tag_for_sha(latest_test_sha) |
80 | | - if not latest_test_ref: |
81 | | - return fail(f"No dev-* tag found on latest TEST SHA ({latest_test_sha})") |
82 | | - |
| 101 | + fail("No TEST SHAs are ancestors of origin/main") |
| 102 | + latest_sha = pick_furthest_ahead(shas) |
| 103 | + if not latest_sha: |
| 104 | + fail("Could not determine latest TEST SHA") |
| 105 | + latest_ref = dev_tag_for_sha(latest_sha) |
| 106 | + if not latest_ref: |
| 107 | + fail(f"No dev-* tag found on latest TEST SHA ({latest_sha})") |
| 108 | + return RefInfo(sha=latest_sha, ref=latest_ref) |
| 109 | + |
| 110 | +def resolve_this_run() -> RefInfo: |
83 | 111 | if EVENT_NAME == "workflow_run": |
84 | 112 | if not HEAD_SHA_AUTO: |
85 | | - return fail("WORKFLOW_RUN_HEAD_SHA missing for workflow_run event") |
86 | | - this_sha = HEAD_SHA_AUTO |
87 | | - this_ref = dev_tag_for_sha(this_sha) or "" |
88 | | - if not this_ref: |
89 | | - return fail(f"No dev-* tag found on THIS SHA ({this_sha})") |
90 | | - elif EVENT_NAME == "workflow_dispatch": |
| 113 | + fail("WORKFLOW_RUN_HEAD_SHA missing for workflow_run event") |
| 114 | + ref = dev_tag_for_sha(HEAD_SHA_AUTO) |
| 115 | + if not ref: |
| 116 | + fail(f"No dev-* tag found on THIS SHA ({HEAD_SHA_AUTO})") |
| 117 | + return RefInfo(sha=HEAD_SHA_AUTO, ref=ref) |
| 118 | + |
| 119 | + if EVENT_NAME == "workflow_dispatch": |
91 | 120 | if not MANUAL_REF: |
92 | | - return fail("MANUAL_REF (inputs.ref) is required for manual dispatch") |
| 121 | + fail("MANUAL_REF (inputs.ref) is required for manual dispatch") |
93 | 122 | if not re.match(r"^dev-\d{14}$", MANUAL_REF): |
94 | | - return fail(f"Invalid dev-* tag format: {MANUAL_REF}") |
95 | | - if not git_ok(["rev-parse","-q","--verify", f"refs/tags/{MANUAL_REF}"]): |
96 | | - return fail(f"Tag not found: {MANUAL_REF}") |
97 | | - this_ref = MANUAL_REF |
98 | | - this_sha = sha_for_tag(this_ref) or "" |
99 | | - if not this_sha: |
100 | | - return fail(f"Cannot resolve SHA for {this_ref}") |
101 | | - if not is_ancestor(this_sha, f"origin/{BRANCH}"): |
102 | | - return fail(f"Chosen tag {this_ref} is not on origin/{BRANCH} history") |
103 | | - else: |
104 | | - return fail(f"Unsupported EVENT_NAME: {EVENT_NAME}") |
| 123 | + fail(f"Invalid dev-* tag format: {MANUAL_REF}") |
| 124 | + if not git_ok(["rev-parse", "-q", "--verify", f"refs/tags/{MANUAL_REF}"]): |
| 125 | + fail(f"Tag not found: {MANUAL_REF}") |
| 126 | + sha = sha_for_tag(MANUAL_REF) |
| 127 | + if not sha: |
| 128 | + fail(f"Cannot resolve SHA for {MANUAL_REF}") |
| 129 | + if not is_ancestor(sha, f"origin/{BRANCH}"): |
| 130 | + fail(f"Chosen tag {MANUAL_REF} is not on origin/{BRANCH} history") |
| 131 | + return RefInfo(sha=sha, ref=MANUAL_REF) |
| 132 | + |
| 133 | + fail(f"Unsupported EVENT_NAME: {EVENT_NAME}") |
| 134 | + raise AssertionError("unreachable") |
| 135 | + |
| 136 | +def enforce_guard(this_: RefInfo, latest: RefInfo) -> None: |
| 137 | + older_than_latest = (this_.sha != latest.sha) and is_ancestor(this_.sha, latest.sha) |
105 | 138 |
|
106 | 139 | if EVENT_NAME == "workflow_run": |
107 | | - # Stale guard: block if THIS is behind latest tested |
108 | | - if this_sha != latest_test_sha and is_ancestor(this_sha, latest_test_sha): |
109 | | - return fail( |
110 | | - f"Stale PreProd approval. Latest tested is {latest_test_ref} ({latest_test_sha}); " |
111 | | - f"this run is {this_ref} ({this_sha})." |
| 140 | + if older_than_latest: |
| 141 | + fail( |
| 142 | + f"Stale PreProd approval. Latest tested is {latest.ref} ({latest.sha}); " |
| 143 | + f"this run is {this_.ref} ({this_.sha})." |
112 | 144 | ) |
113 | 145 | else: |
114 | | - # Manual: allow older only if explicitly requested |
115 | | - if ALLOW_OLDER != "true" and this_sha != latest_test_sha and is_ancestor(this_sha, latest_test_sha): |
116 | | - return fail("Older than latest tested. Set allow_older=true if you intend to roll back deploy.") |
| 146 | + if ALLOW_OLDER != "true" and older_than_latest: |
| 147 | + fail("Older than latest tested. Set allow_older=true if you intend to backdeploy.") |
117 | 148 |
|
| 149 | +def write_outputs(this_: RefInfo, latest: RefInfo) -> None: |
118 | 150 | out = os.getenv("GITHUB_OUTPUT") |
119 | | - if out: |
120 | | - with open(out, "a") as f: |
121 | | - f.write(f"this_sha={this_sha}\n") |
122 | | - f.write(f"this_ref={this_ref}\n") |
123 | | - f.write(f"latest_test_sha={latest_test_sha}\n") |
124 | | - f.write(f"latest_test_ref={latest_test_ref}\n") |
125 | | - |
126 | | - print(f"THIS: {this_ref} ({this_sha})") |
127 | | - print(f"LATEST TEST: {latest_test_ref} ({latest_test_sha})") |
| 151 | + if not out: |
| 152 | + return |
| 153 | + with open(out, "a") as f: |
| 154 | + f.write(f"this_sha={this_.sha}\n") |
| 155 | + f.write(f"this_ref={this_.ref}\n") |
| 156 | + f.write(f"latest_test_sha={latest.sha}\n") |
| 157 | + f.write(f"latest_test_ref={latest.ref}\n") |
| 158 | + |
| 159 | + |
| 160 | +def main() -> int: |
| 161 | + require_token() |
| 162 | + fetch_graph() |
| 163 | + latest = resolve_latest_test() |
| 164 | + current = resolve_this_run() |
| 165 | + enforce_guard(current, latest) |
| 166 | + write_outputs(current, latest) |
| 167 | + print(f"THIS: {current.ref} ({current.sha})") |
| 168 | + print(f"LATEST TEST: {latest.ref} ({latest.sha})") |
128 | 169 | return 0 |
129 | 170 |
|
| 171 | + |
130 | 172 | if __name__ == "__main__": |
131 | 173 | sys.exit(main()) |
0 commit comments