Skip to content

Commit d420b97

Browse files
committed
refactor: process never unfurled version ranges first
- re-unfurl version ranges every 2 days - run unfurl pipeline every 2 hours Signed-off-by: Keshav Priyadarshi <git@keshav.space>
1 parent 974bbb5 commit d420b97

4 files changed

Lines changed: 107 additions & 14 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Generated by Django 5.2.11 on 2026-04-06 20:51
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
("vulnerabilities", "0119_remove_advisoryset_identifiers_and_more"),
10+
]
11+
12+
operations = [
13+
migrations.AddField(
14+
model_name="impactedpackage",
15+
name="last_range_unfurl_at",
16+
field=models.DateTimeField(
17+
blank=True,
18+
db_index=True,
19+
help_text="Timestamp of the last vers range unfurl.",
20+
null=True,
21+
),
22+
),
23+
migrations.AddField(
24+
model_name="pipelineschedule",
25+
name="run_priority",
26+
field=models.IntegerField(
27+
choices=[(1, "high"), (2, "default")],
28+
default=2,
29+
help_text="Select the pipeline execution priority",
30+
),
31+
),
32+
]

vulnerabilities/models.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3252,6 +3252,13 @@ class ImpactedPackage(models.Model):
32523252
help_text="Timestamp indicating when this impact was added.",
32533253
)
32543254

3255+
last_range_unfurl_at = models.DateTimeField(
3256+
blank=True,
3257+
null=True,
3258+
db_index=True,
3259+
help_text="Timestamp of the last vers range unfurl.",
3260+
)
3261+
32553262
def to_dict(self):
32563263
from vulnerabilities.utils import purl_to_dict
32573264

vulnerabilities/pipelines/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from vulnerabilities.improver import MAX_CONFIDENCE
2525
from vulnerabilities.models import Advisory
2626
from vulnerabilities.models import PipelineRun
27+
from vulnerabilities.models import PipelineSchedule
2728
from vulnerabilities.pipes.advisory import import_advisory
2829
from vulnerabilities.pipes.advisory import insert_advisory
2930
from vulnerabilities.pipes.advisory import insert_advisory_v2
@@ -144,6 +145,9 @@ class VulnerableCodePipeline(PipelineDefinition, BasePipelineRun):
144145
# When set to true pipeline is run only once.
145146
# To rerun onetime pipeline reset is_active field to True via migration.
146147
run_once = False
148+
# Interval between runs in hour.
149+
run_interval = 24
150+
run_priority = PipelineSchedule.ExecutionPriority.DEFAULT
147151

148152
def on_failure(self):
149153
"""
@@ -176,6 +180,9 @@ class VulnerableCodeBaseImporterPipeline(VulnerableCodePipeline):
176180
# When set to true pipeline is run only once.
177181
# To rerun onetime pipeline reset is_active field to True via migration.
178182
run_once = False
183+
# Interval between runs in hour.
184+
run_interval = 24
185+
run_priority = PipelineSchedule.ExecutionPriority.DEFAULT
179186

180187
@classmethod
181188
def steps(cls):
@@ -277,6 +284,9 @@ class VulnerableCodeBaseImporterPipelineV2(VulnerableCodePipeline):
277284
# When set to true pipeline is run only once.
278285
# To rerun onetime pipeline reset is_active field to True via migration.
279286
run_once = False
287+
# Interval between runs in hour.
288+
run_interval = 24
289+
run_priority = PipelineSchedule.ExecutionPriority.DEFAULT
280290

281291
@classmethod
282292
def steps(cls):

vulnerabilities/pipelines/v2_improvers/unfurl_version_range.py

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@
88
#
99

1010
import logging
11+
from datetime import timedelta
1112
from traceback import format_exc as traceback_format_exc
1213

1314
from aboutcode.pipeline import LoopProgress
15+
from django.db.models import F
16+
from django.db.models import Q
17+
from django.utils import timezone
1418
from fetchcode.package_versions import SUPPORTED_ECOSYSTEMS as FETCHCODE_SUPPORTED_ECOSYSTEMS
1519
from packageurl import PackageURL
1620
from univers.version_range import RANGE_CLASS_BY_SCHEMES
@@ -19,29 +23,45 @@
1923
from vulnerabilities.models import ImpactedPackage
2024
from vulnerabilities.models import ImpactedPackageAffecting
2125
from vulnerabilities.models import PackageV2
26+
from vulnerabilities.models import PipelineSchedule
2227
from vulnerabilities.pipelines import VulnerableCodePipeline
2328
from vulnerabilities.pipes.fetchcode_utils import get_versions
2429
from vulnerabilities.utils import update_purl_version
2530

2631

2732
class UnfurlVersionRangePipeline(VulnerableCodePipeline):
33+
"""
34+
Unfurl affected version ranges by first processing those that have
35+
never been unfurled and then handling ranges that were last unfurled
36+
two or more days ago.
37+
"""
2838

2939
pipeline_id = "unfurl_version_range_v2"
3040

41+
run_interval = 2
42+
run_priority = PipelineSchedule.ExecutionPriority.HIGH
43+
44+
# Days elapsed before version range is re-unfurled
45+
reunfurl_after_days = 2
46+
3147
@classmethod
3248
def steps(cls):
3349
return (cls.unfurl_version_range,)
3450

3551
def unfurl_version_range(self):
36-
impacted_packages = ImpactedPackage.objects.all().order_by("-created_at")
37-
impacted_packages_count = impacted_packages.count()
38-
3952
processed_impacted_packages_count = 0
4053
processed_affected_packages_count = 0
4154
cached_versions = {}
55+
impacts_to_update = []
56+
update_batch_size = 5
57+
58+
impacted_packages = impacted_package_qs(cutoff_day=self.reunfurl_after_days)
59+
impacted_packages_count = impacted_packages.count()
4260
self.log(f"Unfurl affected vers range for {impacted_packages_count:,d} ImpactedPackage.")
61+
4362
progress = LoopProgress(total_iterations=impacted_packages_count, logger=self.log)
44-
for impact in progress.iter(impacted_packages):
63+
for impact in progress.iter(impacted_packages.iterator(chunk_size=5000)):
64+
impacts_to_update.append(impact.pk)
4565
purl = PackageURL.from_string(impact.base_purl)
4666
if not impact.affecting_vers or not any(
4767
c in impact.affecting_vers for c in ("<", ">", "!")
@@ -52,11 +72,10 @@ def unfurl_version_range(self):
5272
if purl.type not in RANGE_CLASS_BY_SCHEMES:
5373
continue
5474

55-
versions = get_purl_versions(purl, cached_versions) or []
75+
versions = get_purl_versions(purl, cached_versions, self.log) or []
5676
affected_purls = get_affected_purls(
5777
versions=versions,
58-
affecting_vers=impact.affecting_vers,
59-
base_purl=purl,
78+
impact=impact,
6079
logger=self.log,
6180
)
6281
if not affected_purls:
@@ -70,12 +89,21 @@ def unfurl_version_range(self):
7089
)
7190
processed_impacted_packages_count += 1
7291

92+
if len(impacts_to_update) > update_batch_size:
93+
ImpactedPackage.objects.filter(pk__in=impacts_to_update).update(
94+
last_range_unfurl_at=timezone.now()
95+
)
96+
impacts_to_update.clear()
97+
98+
ImpactedPackage.objects.filter(pk__in=impacts_to_update).update(
99+
last_range_unfurl_at=timezone.now()
100+
)
73101
self.log(f"Successfully processed {processed_impacted_packages_count:,d} ImpactedPackage.")
74102
self.log(f"{processed_affected_packages_count:,d} new Impact-Package relation created.")
75103

76104

77-
def get_affected_purls(versions, affecting_vers, base_purl, logger):
78-
affecting_version_range = VersionRange.from_string(affecting_vers)
105+
def get_affected_purls(versions, impact, logger):
106+
affecting_version_range = VersionRange.from_string(impact.affecting_vers)
79107
version_class = affecting_version_range.version_class
80108

81109
try:
@@ -84,7 +112,7 @@ def get_affected_purls(versions, affecting_vers, base_purl, logger):
84112
versions = [version_class(v) for v in versions]
85113
except Exception as e:
86114
logger(
87-
f"Error while parsing versions for {base_purl!s}: {e!r} \n {traceback_format_exc()}",
115+
f"Error while parsing versions for {impact.base_purl!s}: {e!r} \n {traceback_format_exc()}",
88116
level=logging.ERROR,
89117
)
90118
return
@@ -95,21 +123,24 @@ def get_affected_purls(versions, affecting_vers, base_purl, logger):
95123
if version in affecting_version_range:
96124
affected_purls.append(
97125
update_purl_version(
98-
purl=base_purl,
126+
purl=impact.base_purl,
99127
version=str(version),
100128
)
101129
)
102130
except Exception as e:
103131
logger(
104-
f"Error while checking {version!s} in {affecting_version_range!s}: {e!r} \n {traceback_format_exc()}",
132+
(
133+
f"Error while checking {version!s} in {affecting_version_range!s} for "
134+
f"advisory {impact.advisory.avid}: {e!r} \n {traceback_format_exc()}"
135+
),
105136
level=logging.ERROR,
106137
)
107138
return affected_purls
108139

109140

110-
def get_purl_versions(purl, cached_versions):
141+
def get_purl_versions(purl, cached_versions, logger):
111142
if not purl in cached_versions:
112-
purls = get_versions(purl)
143+
purls = get_versions(purl, logger)
113144
if purls is not None:
114145
cached_versions[purl] = purls
115146
return cached_versions.get(purl) or []
@@ -135,3 +166,16 @@ def bulk_create_with_m2m(purls, impact, relation, logger):
135166
return 0
136167

137168
return len(relations)
169+
170+
171+
def impacted_package_qs(cutoff_day=2):
172+
cutoff = timezone.now() - timedelta(days=cutoff_day)
173+
return (
174+
ImpactedPackage.objects.filter(
175+
(Q(last_range_unfurl_at__isnull=True) | Q(last_range_unfurl_at__lte=cutoff))
176+
& Q(affecting_vers__isnull=False)
177+
& ~Q(affecting_vers="")
178+
)
179+
.order_by(F("last_range_unfurl_at").asc(nulls_first=True))
180+
.only("pk", "affecting_vers", "advisory", "base_purl")
181+
)

0 commit comments

Comments
 (0)