-
-
Notifications
You must be signed in to change notification settings - Fork 303
Expand file tree
/
Copy pathenhance_with_exploitdb.py
More file actions
167 lines (136 loc) · 5.63 KB
/
enhance_with_exploitdb.py
File metadata and controls
167 lines (136 loc) · 5.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import csv
import io
import logging
from traceback import format_exc as traceback_format_exc
import requests
from aboutcode.pipeline import LoopProgress
from dateutil import parser as dateparser
from django.db import DataError
from vulnerabilities.models import Alias
from vulnerabilities.models import Exploit
from vulnerabilities.models import VulnerabilityReference
from vulnerabilities.models import VulnerabilityRelatedReference
from vulnerabilities.pipelines import VulnerableCodePipeline
class ExploitDBImproverPipeline(VulnerableCodePipeline):
"""
Fetch ExploitDB data, iterate over it to find the vulnerability with
the specified alias, and create or update the ref and ref-type accordingly.
"""
pipeline_id = "enhance_with_exploitdb"
spdx_license_expression = "GPL-2.0"
@classmethod
def steps(cls):
return (
cls.fetch_exploits,
cls.add_exploit,
)
def fetch_exploits(self):
exploit_db_url = (
"https://gitlab.com/exploit-database/exploitdb/-/raw/main/files_exploits.csv"
)
self.log(f"Fetching {exploit_db_url}")
try:
response = requests.get(exploit_db_url)
response.raise_for_status()
except requests.exceptions.HTTPError as http_err:
self.log(
f"Failed to fetch the Exploit-DB Exploits: {exploit_db_url} with error {http_err!r}:\n{traceback_format_exc()}",
level=logging.ERROR,
)
raise
self.exploit_data = io.StringIO(response.text)
def add_exploit(self):
csvreader = csv.DictReader(self.exploit_data)
raw_data = list(csvreader)
fetched_exploit_count = len(raw_data)
vulnerability_exploit_count = 0
self.log(f"Enhancing the vulnerability with {fetched_exploit_count:,d} exploit records")
progress = LoopProgress(total_iterations=fetched_exploit_count, logger=self.log)
for row in progress.iter(raw_data):
vulnerability_exploit_count += add_vulnerability_exploit(row, self.log)
self.log(
f"Successfully added {vulnerability_exploit_count:,d} exploit-db vulnerability exploit"
)
def add_vulnerability_exploit(row, logger):
aliases = row["codes"].split(";") if row["codes"] else []
if not aliases:
return 0
vulnerabilities = (
Alias.objects.filter(alias__in=aliases, vulnerability__isnull=False)
.values_list("vulnerability_id", flat=True)
.distinct()
)
if not vulnerabilities:
logger(f"No vulnerability found for aliases {aliases}")
return 0
date_added = parse_date(row["date_added"])
source_date_published = parse_date(row["date_published"])
source_date_updated = parse_date(row["date_updated"])
for vulnerability in vulnerabilities:
add_exploit_references(row["codes"], row["source_url"], row["file"], vulnerability, logger)
try:
Exploit.objects.update_or_create(
vulnerability_id=vulnerability,
data_source="Exploit-DB",
defaults={
"date_added": date_added,
"description": row["description"],
"known_ransomware_campaign_use": row["verified"],
"source_date_published": source_date_published,
"exploit_type": row["type"],
"platform": row["platform"],
"source_date_updated": source_date_updated,
"source_url": row["source_url"],
},
)
except DataError as e:
logger(
f"Failed to Create the Vulnerability Exploit-DB with error {e!r}:\n{traceback_format_exc()}",
level=logging.ERROR,
)
return 1
def add_exploit_references(ref_id, direct_url, path, vul_id, logger):
url_map = {
"file_url": f"https://gitlab.com/exploit-database/exploitdb/-/blob/main/{path}",
"direct_url": direct_url,
}
MAX_REF_LEN = 200
safe_ref_id = ref_id[:MAX_REF_LEN] if ref_id else ref_id
for key, url in url_map.items():
if url:
try:
ref, created = VulnerabilityReference.objects.update_or_create(
url=url,
defaults={
"reference_id": safe_ref_id,
"reference_type": VulnerabilityReference.EXPLOIT,
},
)
if created:
VulnerabilityRelatedReference.objects.get_or_create(
vulnerability_id=vul_id,
reference=ref,
)
except DataError as e:
logger(
f"Failed to Create the Vulnerability Reference For Exploit-DB with error {e!r}:\n{traceback_format_exc()}",
level=logging.ERROR,
)
def parse_date(date_string):
if date_string:
try:
date_obj = dateparser.parse(date_string).date()
return date_obj.strftime("%Y-%m-%d")
except (ValueError, TypeError, Exception) as e:
logging.error(
f"Error while parsing ExploitDB date '{date_string}' with error {e!r}:\n{traceback_format_exc()}"
)
return