-
-
Notifications
You must be signed in to change notification settings - Fork 303
Expand file tree
/
Copy pathzdi_importer.py
More file actions
136 lines (111 loc) · 4.37 KB
/
zdi_importer.py
File metadata and controls
136 lines (111 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import logging
import re
from datetime import datetime
from datetime import timezone
from typing import Iterable
from xml.etree import ElementTree
import dateparser
from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.severity_systems import GENERIC
from vulnerabilities.utils import fetch_response
from vulnerabilities.utils import find_all_cve
logger = logging.getLogger(__name__)
ZDI_RSS_YEAR_URL = "https://www.zerodayinitiative.com/rss/published/{year}/"
ZDI_START_YEAR = 2007
ZDI_ID_RE = re.compile(r"ZDI-\d+-\d+")
CVSS_RE = re.compile(r"CVSS rating of (\d+\.?\d*)")
class ZDIImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""Collect ZDI security advisories from the Zero Day Initiative RSS feeds."""
pipeline_id = "zdi_importer"
spdx_license_expression = "LicenseRef-scancode-proprietary-license"
license_url = "https://www.zerodayinitiative.com"
precedence = 200
@classmethod
def steps(cls):
return (cls.collect_and_store_advisories,)
def advisories_count(self) -> int:
return 0
def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
current_year = datetime.now(tz=timezone.utc).year
urls = [
ZDI_RSS_YEAR_URL.format(year=year) for year in range(ZDI_START_YEAR, current_year + 1)
]
for url in urls:
self.log(f"Fetching ZDI RSS feed: {url}")
try:
response = fetch_response(url)
items = parse_rss_feed(response.text)
except Exception as e:
logger.error("Failed to fetch %s: %s", url, e)
continue
for item in items:
advisory = parse_advisory_data(item)
if advisory:
yield advisory
def parse_rss_feed(xml_text: str) -> list:
"""Parse ZDI RSS items from XML text; returns [] on malformed XML."""
try:
root = ElementTree.fromstring(xml_text)
except ElementTree.ParseError as e:
logger.error("Failed to parse RSS XML: %s", e)
return []
channel = root.find("channel")
if channel is None:
logger.error("RSS feed has no <channel> element")
return []
items = []
for item_el in channel.findall("item"):
items.append(
{
"title": (item_el.findtext("title") or "").strip(),
"link": (item_el.findtext("link") or "").strip(),
"description": (item_el.findtext("description") or "").strip(),
"pub_date": (item_el.findtext("pubDate") or "").strip(),
}
)
return items
def parse_advisory_data(item: dict):
"""Parse a ZDI RSS item; return None if no ZDI ID found in the link."""
link = item.get("link") or ""
title = item.get("title") or ""
description = item.get("description") or ""
pub_date_str = item.get("pub_date") or ""
match = ZDI_ID_RE.search(link)
if not match:
logger.error("Could not extract ZDI advisory ID from link: %r", link)
return None
advisory_id = match.group(0)
aliases = list(dict.fromkeys(find_all_cve(description)))
date_published = None
if pub_date_str:
date_published = dateparser.parse(pub_date_str)
if date_published is None:
logger.warning("Could not parse date %r for advisory %s", pub_date_str, advisory_id)
severities = []
cvss_match = CVSS_RE.search(description)
if cvss_match:
severities.append(VulnerabilitySeverity(system=GENERIC, value=cvss_match.group(1)))
references = []
if link:
references.append(ReferenceV2(url=link))
return AdvisoryDataV2(
advisory_id=advisory_id,
aliases=aliases,
summary=title,
affected_packages=[],
references=references,
date_published=date_published,
severities=severities,
url=link,
)