44import subprocess # nosec
55import tempfile
66from dataclasses import dataclass
7+ from enum import Enum
8+ from inspect import cleandoc
79from pathlib import Path
810from re import search
911from typing import (
1012 Any ,
11- Union ,
1213)
1314
14- from pydantic import BaseModel
15+ from pydantic import (
16+ BaseModel ,
17+ ConfigDict ,
18+ )
1519
16- from exasol .toolbox .util .dependencies .shared_models import Package
20+ from exasol .toolbox .util .dependencies .shared_models import (
21+ Package ,
22+ poetry_files_from_latest_tag ,
23+ )
1724
1825PIP_AUDIT_VULNERABILITY_PATTERN = (
1926 r"^Found \d+ known vulnerabilit\w{1,3} in \d+ package\w?$"
@@ -32,7 +39,36 @@ def __init__(self, subprocess_output: subprocess.CompletedProcess) -> None:
3239 self .stderr = subprocess_output .stderr
3340
3441
35- class Vulnerability (Package ):
42+ class VulnerabilitySource (str , Enum ):
43+ CVE = "CVE"
44+ CWE = "CWE"
45+ GHSA = "GHSA"
46+ PYSEC = "PYSEC"
47+
48+ @classmethod
49+ def from_prefix (cls , name : str ) -> VulnerabilitySource | None :
50+ for el in cls :
51+ if name .upper ().startswith (el .value ):
52+ return el
53+ return None
54+
55+ def get_link (self , package : str , vuln_id : str ) -> str :
56+ if self == VulnerabilitySource .CWE :
57+ cwe_id = vuln_id .upper ().replace (f"{ VulnerabilitySource .CWE .value } -" , "" )
58+ return f"https://cwe.mitre.org/data/definitions/{ cwe_id } .html"
59+
60+ map_link = {
61+ VulnerabilitySource .CVE : "https://nvd.nist.gov/vuln/detail/{vuln_id}" ,
62+ VulnerabilitySource .GHSA : "https://github.com/advisories/{vuln_id}" ,
63+ VulnerabilitySource .PYSEC : "https://github.com/pypa/advisory-database/blob/main/vulns/{package}/{vuln_id}.yaml" ,
64+ }
65+ return map_link [self ].format (package = package , vuln_id = vuln_id )
66+
67+
68+ class Vulnerability (BaseModel ):
69+ model_config = ConfigDict (frozen = True , arbitrary_types_allowed = True )
70+
71+ package : Package
3672 id : str
3773 aliases : list [str ]
3874 fix_versions : list [str ]
@@ -46,23 +82,61 @@ def from_audit_entry(
4682 Create a Vulnerability from a pip-audit vulnerability entry
4783 """
4884 return cls (
49- name = package_name ,
50- version = version ,
85+ package = Package (name = package_name , version = version ),
5186 id = vuln_entry ["id" ],
5287 aliases = vuln_entry ["aliases" ],
5388 fix_versions = vuln_entry ["fix_versions" ],
5489 description = vuln_entry ["description" ],
5590 )
5691
5792 @property
58- def security_issue_entry (self ) -> dict [str , str | list [str ]]:
93+ def references (self ) -> list [str ]:
94+ return sorted ([self .id ] + self .aliases )
95+
96+ @property
97+ def reference_links (self ) -> tuple [str , ...]:
98+ return tuple (
99+ source .get_link (package = self .package .name , vuln_id = reference )
100+ for reference in self .references
101+ if (source := VulnerabilitySource .from_prefix (reference .upper ()))
102+ )
103+
104+ @property
105+ def security_issue_entry (self ) -> dict [str , str | list [str ] | tuple [str , ...]]:
59106 return {
60- "name" : self .name ,
61- "version" : str (self .version ),
62- "refs" : [ self .id ] + self . aliases ,
107+ "name" : self .package . name ,
108+ "version" : str (self .package . version ),
109+ "refs" : self .references ,
63110 "description" : self .description ,
111+ "coordinates" : self .package .coordinates ,
112+ "references" : self .reference_links ,
64113 }
65114
115+ @property
116+ def vulnerability_id (self ) -> str | None :
117+ """
118+ Ensure a consistent way of identifying a vulnerability for string generation.
119+ """
120+ for ref in self .references :
121+ ref_upper = ref .upper ()
122+ if ref_upper .startswith (VulnerabilitySource .CVE .value ):
123+ return ref
124+ if ref_upper .startswith (VulnerabilitySource .GHSA .value ):
125+ return ref
126+ if ref_upper .startswith (VulnerabilitySource .PYSEC .value ):
127+ return ref
128+ return self .references [0 ]
129+
130+ @property
131+ def subsection_for_changelog_summary (self ) -> str :
132+ """
133+ Create a subsection to be included in the Summary section of a versioned changelog.
134+ """
135+ links_join = "\n * " .join (sorted (self .reference_links ))
136+ references_subsection = f"\n #### References:\n \n * { links_join } \n \n "
137+ subsection = f"### { self .vulnerability_id } in { self .package .coordinates } \n \n { self .description } \n { references_subsection } "
138+ return cleandoc (subsection .strip ())
139+
66140
67141def audit_poetry_files (working_directory : Path ) -> str :
68142 """
@@ -141,7 +215,18 @@ def load_from_pip_audit(cls, working_directory: Path) -> Vulnerabilities:
141215 return Vulnerabilities (vulnerabilities = vulnerabilities )
142216
143217 @property
144- def security_issue_dict (self ) -> list [dict [str , str | list [str ]]]:
218+ def security_issue_dict (self ) -> list [dict [str , str | list [str ] | tuple [ str , ...] ]]:
145219 return [
146220 vulnerability .security_issue_entry for vulnerability in self .vulnerabilities
147221 ]
222+
223+
224+ def get_vulnerabilities (working_directory : Path ) -> list [Vulnerability ]:
225+ return Vulnerabilities .load_from_pip_audit (
226+ working_directory = working_directory
227+ ).vulnerabilities
228+
229+
230+ def get_vulnerabilities_from_latest_tag ():
231+ with poetry_files_from_latest_tag () as tmp_dir :
232+ return get_vulnerabilities (working_directory = tmp_dir )
0 commit comments