1313from unittest .mock import patch
1414
1515from django .test import TestCase
16+ from django .utils import timezone
1617from packageurl import PackageURL
1718from univers .version_range import VersionRange
1819
1920from vulnerabilities .importer import AdvisoryDataV2
2021from vulnerabilities .importer import AffectedPackageV2
22+ from vulnerabilities .importer import PackageCommitPatchData
2123from vulnerabilities .models import AdvisoryV2
24+ from vulnerabilities .models import ImpactedPackage
2225from vulnerabilities .models import PackageV2
2326from vulnerabilities .pipelines .v2_improvers .unfurl_version_range import UnfurlVersionRangePipeline
27+ from vulnerabilities .pipelines .v2_improvers .unfurl_version_range import impacted_package_qs
2428from vulnerabilities .pipes .advisory import insert_advisory_v2
2529from vulnerabilities .tests .pipelines import TestLogger
2630
2731
2832class TestUnfurlVersionRangePipeline (TestCase ):
2933 def setUp (self ):
3034 self .logger = TestLogger ()
31- advisory1 = AdvisoryDataV2 (
35+ self . advisory1 = AdvisoryDataV2 (
3236 summary = "Test advisory" ,
3337 aliases = ["CVE-2025-0001" ],
3438 references = [],
@@ -48,14 +52,54 @@ def setUp(self):
4852 date_published = datetime .now () - timedelta (days = 10 ),
4953 url = "https://example.com/advisory" ,
5054 )
51- insert_advisory_v2 (
52- advisory = advisory1 ,
53- pipeline_id = "test_pipeline_v2" ,
54- logger = self .logger .write ,
55+
56+ self .advisory2 = AdvisoryDataV2 (
57+ summary = "Test advisory" ,
58+ aliases = ["CVE-2025-0001" ],
59+ references = [],
60+ severities = [],
61+ weaknesses = [],
62+ affected_packages = [
63+ AffectedPackageV2 (
64+ package = PackageURL .from_string ("pkg:npm/foobar" ),
65+ affected_version_range = VersionRange .from_string ("vers:npm/>3.2.1|<4.0.0" ),
66+ fixed_version_range = VersionRange .from_string ("vers:npm/4.0.0" ),
67+ introduced_by_commit_patches = [],
68+ fixed_by_commit_patches = [],
69+ ),
70+ AffectedPackageV2 (
71+ package = PackageURL .from_string ("pkg:npm/foobar" ),
72+ affected_version_range = VersionRange .from_string ("vers:npm/>4.2.1|<5.0.0" ),
73+ fixed_version_range = VersionRange .from_string ("vers:npm/5.0.0" ),
74+ introduced_by_commit_patches = [],
75+ fixed_by_commit_patches = [],
76+ ),
77+ AffectedPackageV2 (
78+ package = PackageURL .from_string ("pkg:npm/foobar" ),
79+ affected_version_range = None ,
80+ fixed_version_range = None ,
81+ introduced_by_commit_patches = [],
82+ fixed_by_commit_patches = [
83+ PackageCommitPatchData (
84+ vcs_url = "https://foobar.vcs/" ,
85+ commit_hash = "982f801f" ,
86+ ),
87+ ],
88+ ),
89+ ],
90+ patches = [],
91+ advisory_id = "GHSA-1234" ,
92+ date_published = datetime .now () - timedelta (days = 10 ),
93+ url = "https://example.com/advisory" ,
5594 )
5695
5796 @patch ("vulnerabilities.pipelines.v2_improvers.unfurl_version_range.get_purl_versions" )
5897 def test_affecting_version_range_unfurl (self , mock_fetch ):
98+ insert_advisory_v2 (
99+ advisory = self .advisory1 ,
100+ pipeline_id = "test_pipeline_v2" ,
101+ logger = self .logger .write ,
102+ )
59103 self .assertEqual (1 , PackageV2 .objects .count ())
60104 mock_fetch .return_value = {"3.4.1" , "3.9.0" , "2.1.0" , "4.0.0" , "4.1.0" }
61105 pipeline = UnfurlVersionRangePipeline ()
@@ -67,3 +111,51 @@ def test_affecting_version_range_unfurl(self, mock_fetch):
67111 self .assertEqual (3 , PackageV2 .objects .count ())
68112 self .assertEqual (1 , impact .fixed_by_packages .count ())
69113 self .assertEqual (2 , impact .affecting_packages .count ())
114+
115+ def test_impacted_package_qs_dont_process_empty_vers (self ):
116+ insert_advisory_v2 (
117+ advisory = self .advisory2 ,
118+ pipeline_id = "test_pipeline_v2" ,
119+ logger = self .logger .write ,
120+ )
121+
122+ self .assertEqual (3 , ImpactedPackage .objects .count ())
123+ self .assertEqual (2 , impacted_package_qs ().count ())
124+
125+ def test_impacted_package_qs_dont_process_empty_vers (self ):
126+ insert_advisory_v2 (
127+ advisory = self .advisory2 ,
128+ pipeline_id = "test_pipeline_v2" ,
129+ logger = self .logger .write ,
130+ )
131+ impact = ImpactedPackage .objects .filter (affecting_vers__isnull = False ).first ()
132+ impact .last_range_unfurl_at = timezone .now ()
133+ impact .save ()
134+
135+ self .assertEqual (1 , impacted_package_qs ().count ())
136+
137+ def test_impacted_package_qs_prioritize_never_unfurled_impact_first (self ):
138+ insert_advisory_v2 (
139+ advisory = self .advisory2 ,
140+ pipeline_id = "test_pipeline_v2" ,
141+ logger = self .logger .write ,
142+ )
143+ impact = ImpactedPackage .objects .filter (affecting_vers__isnull = False ).first ()
144+ impact .last_range_unfurl_at = timezone .now () - timedelta (days = 4 )
145+ impact .save ()
146+
147+ self .assertEqual (2 , impacted_package_qs ().count ())
148+ first_impact_to_process = impacted_package_qs ().first ()
149+ self .assertEqual (None , first_impact_to_process .last_range_unfurl_at )
150+
151+ def test_impacted_package_reunfurl_vers (self ):
152+ insert_advisory_v2 (
153+ advisory = self .advisory2 ,
154+ pipeline_id = "test_pipeline_v2" ,
155+ logger = self .logger .write ,
156+ )
157+ impact = ImpactedPackage .objects .filter (affecting_vers__isnull = False ).first ()
158+ impact .last_range_unfurl_at = timezone .now ()
159+ impact .save ()
160+
161+ self .assertEqual (1 , impacted_package_qs ().count ())
0 commit comments