Skip to content

Commit 42c2d2e

Browse files
committed
feat(sbom):SP-4178 export acknowledgement as CycloneDX/SPDX annotations with timestamp and organization
1 parent f67c15b commit 42c2d2e

5 files changed

Lines changed: 258 additions & 93 deletions

File tree

src/scanoss/cyclonedx.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from cyclonedx.validation.json import JsonValidator
3333

3434
from . import __version__
35+
from .scanoss_settings import find_best_match
3536
from .scanossbase import ScanossBase
3637
from .spdxlite import SpdxLite
3738

@@ -42,13 +43,14 @@ class CycloneDx(ScanossBase):
4243
Handle all interaction with CycloneDX formatting
4344
"""
4445

45-
def __init__(self, debug: bool = False, output_file: str = None):
46+
def __init__(self, debug: bool = False, output_file: str = None, scanoss_settings=None):
4647
"""
4748
Initialise the CycloneDX class
4849
"""
4950
super().__init__(debug)
5051
self.output_file = output_file
5152
self.debug = debug
53+
self.scanoss_settings = scanoss_settings
5254
self._spdx = SpdxLite(debug=debug)
5355

5456
def parse(self, data: dict): # noqa: PLR0912, PLR0915
@@ -100,7 +102,7 @@ def parse(self, data: dict): # noqa: PLR0912, PLR0915
100102
fdl.append({'id': name})
101103
dc.append(name)
102104
fd['licenses'] = fdl
103-
fd['acknowledgement'] = deps.get('acknowledgement')
105+
fd['_file_path'] = f
104106
cdx[purl] = fd
105107
else:
106108
purls = d.get('purl')
@@ -159,7 +161,7 @@ def parse(self, data: dict): # noqa: PLR0912, PLR0915
159161
continue
160162
fdl.append({'id': name})
161163
fd['licenses'] = fdl
162-
fd['acknowledgement'] = d.get('acknowledgement')
164+
fd['_file_path'] = f
163165
cdx[purl] = fd
164166
# self.print_stderr(f'VD: {vdx}')
165167
# self.print_stderr(f'CDX: {cdx}')
@@ -202,13 +204,12 @@ def produce_from_json(self, data: dict, output_file: str = None) -> tuple[bool,
202204
self.print_msg('Warning: Empty scan results - generating minimal CycloneDX SBOM with no components.')
203205
self._spdx.load_license_data() # Load SPDX license name data for later reference
204206
#
205-
# Using CDX version 1.4: https://cyclonedx.org/docs/1.4/json/
207+
# Using CDX version 1.5: https://cyclonedx.org/docs/1.5/json/
206208
# Validate using: https://github.com/CycloneDX/cyclonedx-cli
207-
# cyclonedx-cli validate --input-format json --input-version v1_4 --fail-on-errors --input-file cdx.json
208209
#
209210
data = {
210211
'bomFormat': 'CycloneDX',
211-
'specVersion': '1.4',
212+
'specVersion': '1.5',
212213
'serialNumber': f'urn:uuid:{uuid.uuid4()}',
213214
'version': 1,
214215
'metadata': {
@@ -255,11 +256,35 @@ def produce_from_json(self, data: dict, output_file: str = None) -> tuple[bool,
255256
cpe = comp.get('cpe', '')
256257
if cpe and cpe != '':
257258
c_data['cpe'] = cpe
258-
acknowledgement = comp.get('acknowledgement')
259-
if acknowledgement:
260-
c_data['properties'] = [{'name': 'scanoss:acknowledgement', 'value': acknowledgement}]
261259
data['components'].append(c_data)
262260
# End for loop
261+
# Build annotations from BOM rules via ScanossSettings
262+
annotations = []
263+
if self.scanoss_settings:
264+
all_entries = (self.scanoss_settings.get_bom_include()
265+
+ self.scanoss_settings.get_bom_replace())
266+
entries_with_ack = [e for e in all_entries if e.acknowledgement]
267+
if entries_with_ack:
268+
org = self.scanoss_settings.get_organization()
269+
for purl in cdx:
270+
comp = cdx.get(purl)
271+
file_path = comp.get('_file_path', '')
272+
match = find_best_match(file_path, [purl], entries_with_ack)
273+
if match:
274+
ts = match.timestamp
275+
if not ts:
276+
self.print_stderr(
277+
f'Warning: No timestamp for annotation on {purl}, using current time'
278+
)
279+
ts = data['metadata']['timestamp']
280+
annotations.append({
281+
'subjects': [purl],
282+
'text': match.acknowledgement,
283+
'timestamp': ts,
284+
'annotator': {'organization': {'name': org}},
285+
})
286+
if annotations:
287+
data['annotations'] = annotations
263288
if vdx:
264289
for vuln_id in vdx:
265290
vulns = vdx.get(vuln_id)

src/scanoss/scanner.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -612,10 +612,10 @@ def __finish_scan_threaded(self, file_map: Optional[Dict[Any, Any]] = None) -> b
612612
if self.output_format == 'plain':
613613
self.__log_result(json.dumps(results, indent=2, sort_keys=True))
614614
elif self.output_format == 'cyclonedx':
615-
cdx = CycloneDx(self.debug, self.scan_output)
615+
cdx = CycloneDx(self.debug, self.scan_output, scanoss_settings=self.scanoss_settings)
616616
success, _ = cdx.produce_from_json(results)
617617
elif self.output_format == 'spdxlite':
618-
spdxlite = SpdxLite(self.debug, self.scan_output)
618+
spdxlite = SpdxLite(self.debug, self.scan_output, scanoss_settings=self.scanoss_settings)
619619
success = spdxlite.produce_from_json(results)
620620
elif self.output_format == 'csv':
621621
csvo = CsvOutput(self.debug, self.scan_output)
@@ -1050,10 +1050,10 @@ def scan_wfp(self, wfp: str) -> bool:
10501050
if self.output_format == 'plain':
10511051
self.__log_result(raw_output)
10521052
elif self.output_format == 'cyclonedx':
1053-
cdx = CycloneDx(self.debug, self.scan_output)
1053+
cdx = CycloneDx(self.debug, self.scan_output, scanoss_settings=self.scanoss_settings)
10541054
cdx.produce_from_str(raw_output)
10551055
elif self.output_format == 'spdxlite':
1056-
spdxlite = SpdxLite(self.debug, self.scan_output)
1056+
spdxlite = SpdxLite(self.debug, self.scan_output, scanoss_settings=self.scanoss_settings)
10571057
success = spdxlite.produce_from_str(raw_output)
10581058
elif self.output_format == 'csv':
10591059
csvo = CsvOutput(self.debug, self.scan_output)

src/scanoss/scanoss_settings.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class BomEntry:
4646
path: Optional[str] = None
4747
comment: Optional[str] = None
4848
acknowledgement: Optional[str] = None
49+
timestamp: Optional[str] = None
4950

5051
@classmethod
5152
def from_dict(cls, data: dict) -> 'BomEntry':
@@ -56,6 +57,7 @@ def from_dict(cls, data: dict) -> 'BomEntry':
5657
path=path,
5758
comment=data.get('comment'),
5859
acknowledgement=data.get('acknowledgement'),
60+
timestamp=data.get('timestamp'),
5961
)
6062

6163
def matches_path(self, result_path: str) -> bool:
@@ -112,6 +114,7 @@ def from_dict(cls, data: dict) -> 'ReplaceRule':
112114
path=path,
113115
comment=data.get('comment'),
114116
acknowledgement=data.get('acknowledgement'),
117+
timestamp=data.get('timestamp'),
115118
replace_with=data.get('replace_with'),
116119
license=data.get('license'),
117120
)
@@ -321,6 +324,10 @@ def _get_bom(self):
321324
return []
322325
return self.data.get('bom', {})
323326

327+
def get_organization(self) -> str:
328+
"""Get the organization name from self section. Returns 'unspecified' if not set."""
329+
return self.data.get('self', {}).get('organization') or 'unspecified'
330+
324331
def get_bom_include(self) -> List[BomEntry]:
325332
"""
326333
Get the list of components to include in the scan

src/scanoss/spdxlite.py

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from packageurl import PackageURL
3535

3636
from . import __version__
37+
from .scanoss_settings import find_best_match
3738

3839

3940
class SpdxLite:
@@ -42,12 +43,13 @@ class SpdxLite:
4243
Handle all interaction with SPDX Lite formatting
4344
"""
4445

45-
def __init__(self, debug: bool = False, output_file: str = None):
46+
def __init__(self, debug: bool = False, output_file: str = None, scanoss_settings=None):
4647
"""
4748
Initialise the SpdxLite class
4849
"""
4950
self.output_file = output_file
5051
self.debug = debug
52+
self.scanoss_settings = scanoss_settings
5153
self._spdx_licenses = {} # Used to lookup for valid SPDX license identifiers
5254
self._spdx_lic_names = {} # Used to look for SPDX license identifiers by name
5355

@@ -136,7 +138,9 @@ def _process_dependency_entry(self, file_path: str, entry: dict, summary: dict):
136138
if not self._is_valid_purl(file_path, dep, purl, summary):
137139
continue
138140
# Modifying the summary dictionary directly as it's passed by reference
139-
summary[purl] = self._create_dependency_summary(dep)
141+
dep_summary = self._create_dependency_summary(dep)
142+
dep_summary['_file_path'] = file_path
143+
summary[purl] = dep_summary
140144

141145
def _process_file_entry(self, file_path: str, entry: dict, summary: dict):
142146
"""
@@ -156,7 +160,9 @@ def _process_file_entry(self, file_path: str, entry: dict, summary: dict):
156160
if not self._is_valid_purl(file_path, entry, purl, summary):
157161
return
158162

159-
summary[purl] = self._create_file_summary(entry)
163+
file_summary = self._create_file_summary(entry)
164+
file_summary['_file_path'] = file_path
165+
summary[purl] = file_summary
160166

161167
def _is_valid_purl(self, file_path: str, entry: dict, purl: str, summary: dict) -> bool:
162168
"""
@@ -199,7 +205,6 @@ def _create_dependency_summary(self, dep: dict) -> dict:
199205
for field in ['component', 'version', 'url']:
200206
summary[field] = dep.get(field, '')
201207
summary['licenses'] = self._process_licenses(dep.get('licenses'))
202-
summary['acknowledgement'] = dep.get('acknowledgement')
203208
return summary
204209

205210
def _create_file_summary(self, entry: dict) -> dict:
@@ -220,7 +225,6 @@ def _create_file_summary(self, entry: dict) -> dict:
220225
for field in fields:
221226
summary[field] = entry.get(field)
222227
summary['licenses'] = self._process_licenses(entry.get('licenses'))
223-
summary['acknowledgement'] = entry.get('acknowledgement')
224228
return summary
225229

226230
def _process_licenses(self, licenses: list) -> list:
@@ -293,6 +297,7 @@ def produce_from_json(self, data: json, output_file: str = None) -> bool:
293297
self.load_license_data()
294298
spdx_document = self._create_base_document(raw_data)
295299
self._process_packages(raw_data, spdx_document)
300+
self._build_annotations(raw_data, spdx_document)
296301
return self._write_output(spdx_document, output_file)
297302

298303
def _create_base_document(self, raw_data: dict) -> dict:
@@ -392,6 +397,36 @@ def _process_packages(self, raw_data: dict, spdx_document: dict):
392397

393398
self._process_license_refs(lic_refs, spdx_document)
394399

400+
def _build_annotations(self, raw_data: dict, spdx_document: dict):
401+
"""Build SPDX annotations from BOM rules via ScanossSettings."""
402+
if not self.scanoss_settings:
403+
return
404+
all_entries = (self.scanoss_settings.get_bom_include()
405+
+ self.scanoss_settings.get_bom_replace())
406+
entries_with_ack = [e for e in all_entries if e.acknowledgement]
407+
if not entries_with_ack:
408+
return
409+
annotations = []
410+
org = self.scanoss_settings.get_organization()
411+
for purl, comp in raw_data.items():
412+
file_path = comp.get('_file_path', '')
413+
match = find_best_match(file_path, [purl], entries_with_ack)
414+
if match:
415+
ts = match.timestamp
416+
if not ts:
417+
self.print_stderr(
418+
f'Warning: No timestamp for annotation on {purl}, using current time'
419+
)
420+
ts = spdx_document['creationInfo']['created']
421+
annotations.append({
422+
'annotationDate': ts,
423+
'annotationType': 'REVIEW',
424+
'annotator': f'Organization: {org}',
425+
'comment': match.acknowledgement,
426+
})
427+
if annotations:
428+
spdx_document['annotations'] = annotations
429+
395430
def _create_package_info(self, purl: str, comp: dict, lic_refs: set) -> dict:
396431
"""
397432
Create package information for SPDX document.
@@ -453,9 +488,6 @@ def _create_package_info(self, purl: str, comp: dict, lic_refs: set) -> dict:
453488
}
454489
],
455490
}
456-
acknowledgement = comp.get('acknowledgement')
457-
if acknowledgement:
458-
package_info['comment'] = acknowledgement
459491
return package_info
460492

461493
def _process_package_licenses(self, licenses: list, lic_refs: set) -> str:

0 commit comments

Comments
 (0)