|
8 | 8 | # |
9 | 9 |
|
10 | 10 | import logging |
| 11 | +import re |
11 | 12 | import urllib |
12 | 13 |
|
13 | 14 | import requests |
14 | 15 | from bs4 import BeautifulSoup |
| 16 | +from cwe2.database import Database |
15 | 17 | from packageurl import PackageURL |
16 | 18 | from univers.version_constraint import VersionConstraint |
17 | 19 | from univers.version_range import ApacheVersionRange |
|
23 | 25 | from vulnerabilities.importer import Reference |
24 | 26 | from vulnerabilities.importer import VulnerabilitySeverity |
25 | 27 | from vulnerabilities.severity_systems import APACHE_HTTPD |
| 28 | +from vulnerabilities.utils import get_cwe_id |
26 | 29 | from vulnerabilities.utils import get_item |
27 | 30 |
|
28 | 31 | logger = logging.getLogger(__name__) |
@@ -102,11 +105,14 @@ def to_advisory(self, data): |
102 | 105 | ) |
103 | 106 | ) |
104 | 107 |
|
| 108 | + weaknesses = get_weaknesses(data) |
| 109 | + |
105 | 110 | return AdvisoryData( |
106 | 111 | aliases=[alias], |
107 | 112 | summary=description or "", |
108 | 113 | affected_packages=affected_packages, |
109 | 114 | references=[reference], |
| 115 | + weaknesses=weaknesses, |
110 | 116 | url=reference.url, |
111 | 117 | ) |
112 | 118 |
|
@@ -152,3 +158,70 @@ def fetch_links(url): |
152 | 158 | continue |
153 | 159 | links.append(urllib.parse.urljoin(url, link)) |
154 | 160 | return links |
| 161 | + |
| 162 | + |
| 163 | +def get_weaknesses(cve_data): |
| 164 | + """ |
| 165 | + Extract CWE IDs from CVE data. |
| 166 | +
|
| 167 | + Args: |
| 168 | + cve_data (dict): The CVE data in a dictionary format. |
| 169 | +
|
| 170 | + Returns: |
| 171 | + List[int]: A list of unique CWE IDs. |
| 172 | +
|
| 173 | + >>> mock_cve_data = { |
| 174 | + ... "containers": { |
| 175 | + ... "cna": { |
| 176 | + ... "providerMetadata": { |
| 177 | + ... "orgId": "f0158376-9dc2-43b6-827c-5f631a4d8d09" |
| 178 | + ... }, |
| 179 | + ... "title": "mod_macro buffer over-read", |
| 180 | + ... "problemTypes": [ |
| 181 | + ... { |
| 182 | + ... "descriptions": [ |
| 183 | + ... { |
| 184 | + ... "description": "CWE-125 Out-of-bounds Read", |
| 185 | + ... "lang": "en", |
| 186 | + ... "cweId": "CWE-125", |
| 187 | + ... "type": "CWE" |
| 188 | + ... } |
| 189 | + ... ] |
| 190 | + ... } |
| 191 | + ... ] |
| 192 | + ... } |
| 193 | + ... } |
| 194 | + ... } |
| 195 | + >>> get_weaknesses(mock_cve_data) |
| 196 | + [125] |
| 197 | + """ |
| 198 | + problem_types = cve_data.get("containers", {}).get("cna", {}).get("problemTypes", []) |
| 199 | + descriptions = problem_types[0].get("descriptions", []) if len(problem_types) > 0 else [] |
| 200 | + cwe_string = descriptions[0].get("cweId", "") if len(descriptions) > 0 else "" |
| 201 | + cwe_pattern = r"CWE-\d+" |
| 202 | + description = descriptions[0].get("description", "") if len(descriptions) > 0 else "" |
| 203 | + matches = re.findall(cwe_pattern, description) |
| 204 | + db = Database() |
| 205 | + weaknesses = [] |
| 206 | + cwe_string_from_description = "" |
| 207 | + if matches: |
| 208 | + cwe_string_from_description = matches[0] |
| 209 | + if cwe_string or cwe_string_from_description: |
| 210 | + if cwe_string: |
| 211 | + cwe_id = get_cwe_id(cwe_string) |
| 212 | + try: |
| 213 | + db.get(cwe_id) |
| 214 | + weaknesses.append(cwe_id) |
| 215 | + except Exception: |
| 216 | + logger.error("Invalid CWE id") |
| 217 | + elif cwe_string_from_description: |
| 218 | + cwe_id = get_cwe_id(cwe_string_from_description) |
| 219 | + try: |
| 220 | + db.get(cwe_id) |
| 221 | + weaknesses.append(cwe_id) |
| 222 | + except Exception: |
| 223 | + logger.error("Invalid CWE id") |
| 224 | + |
| 225 | + seen = set() |
| 226 | + unique_cwe = [x for x in weaknesses if not (x in seen or seen.add(x))] |
| 227 | + return unique_cwe |
0 commit comments