Source code for debsbom.securityscan.writer

# Copyright (C) 2026 Siemens
#
# SPDX-License-Identifier: MIT

from abc import abstractmethod
from collections import defaultdict
import dataclasses
from datetime import datetime, timezone
from importlib.metadata import version, metadata
import json
import os
from pathlib import Path
import sys

from ..tracepath.walker import GraphWalker
from ..dpkg.package import BinaryPackage, SourcePackage, Package, filter_binaries
from .scanner import CveEntry, CveStatus, CveUrgency, ScanResultItem

_CHECKSUM_TO_VEX_HASH = {
    "MD5SUM": "md5",
    "SHA1SUM": "sha1",
    "SHA256SUM": "sha-256",
    "SHA512SUM": "sha-512",
}

SARIF_SCHEMA_URL = "https://json.schemastore.org/sarif-2.1.0.json"
VEX_CONTEXT = "https://openvex.dev/ns/v0.2.0"
VEX_SCHEMA_ID = "https://openvex.dev/docs/public/vex-adc52fe6c8d2ba0feee7f4343f9b40c90e8cdb077817f880a6650502aece82bc"


[docs] class ScanResultWriter: """ Emit the security scan results in the specified format. All instances should be used with a context manager. """ @classmethod def create( cls, format: str, sdo_url: str, bdo_url: str, packages: list[Package] | None = None, graph_walker: GraphWalker | None = None, author: str | None = None, input_filename: Path | None = None, product: str | None = None, file=sys.stdout, ) -> "ScanResultWriter": match format.lower(): case "text": return ScanResultTextWriter( packages=[], sdo_url=sdo_url, bdo_url=bdo_url, file=file ) case "json": return ScanResultJsonWriter( packages=[], sdo_url=sdo_url, bdo_url=bdo_url, graph_walker=graph_walker, file=file, ) case "sarif": return ScanResultSarifWriter( packages=packages or [], sdo_url=sdo_url, bdo_url=bdo_url, path=input_filename, file=file, ) case "vex": return ScanResultVexWriter( packages=packages or [], sdo_url=sdo_url, bdo_url=bdo_url, author=author, product=product, file=file, ) case _: raise RuntimeError(f'No formatter for "{format}"') def __init__(self, packages: list[Package], sdo_url: str, bdo_url: str, file=sys.stdout): self.sdo_url = sdo_url self.bdo_url = bdo_url self.out = file # compute source -> binary relations as vulns are filed against src packages # but systems have binary packages installed binaries = list(filter_binaries(packages)) self._name_to_pkg_map: dict[str, BinaryPackage] = {p.name: p for p in binaries} # note, that the source packages are only stubs with an equal hash / purl self._built_using_map: dict[SourcePackage, list[BinaryPackage]] = defaultdict(list) for bp in binaries: for dep in bp.built_using: self._built_using_map[SourcePackage(dep.name, dep.version[1])].append(bp) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False def close(self) -> None: pass
[docs] def affected_binaries(self, src_pkg: SourcePackage) -> list[BinaryPackage]: """Return binary packages built from or built-using the given source package.""" candidates = set(map(lambda name: self._name_to_pkg_map.get(name), src_pkg.binaries)) return list(candidates | set(self._built_using_map.get(src_pkg, [])))
@abstractmethod def write(self, v: ScanResultItem) -> None: raise NotImplementedError()
[docs] class ScanResultTextWriter(ScanResultWriter): def write(self, r: ScanResultItem) -> None: if not r.affected: return v = r.vulnerability if self.sdo_url: print( f"{r.package} {v.cve} {v.status} ({v.fixed_version or 'no version'}) {v.urgency} {self.sdo_url}/{v.cve}", file=self.out, ) else: print( f"{r.package} {v.cve} {v.status} ({v.fixed_version or 'no version'}) {v.urgency}", file=self.out, )
[docs] class ScanResultJsonWriter(ScanResultWriter): def __init__(self, graph_walker: GraphWalker | None = None, **args): super().__init__(**args) self.graph_walker = graph_walker def write(self, r: ScanResultItem) -> None: if not r.affected: return v = r.vulnerability data = { "package": str(r.package), "purl": str(r.package.purl()), "vulnerability": { "id": v.cve, "status": str(v.status), "urgency": str(v.urgency), "tracker": f"{self.sdo_url}/{v.cve}", }, } if v.fixed_version: data["vulnerability"]["fixed-in"] = v.fixed_version data["vulnerability"]["desc"] = v.description if v.debianbug: data["vulnerability"]["debianbug"] = v.debianbug data["vulnerability"]["bugreport"] = f"{self.bdo_url}?bug={v.debianbug}" if v.nodsa: data["vulnerability"]["nodsa"] = v.nodsa if self.graph_walker: allShortest = self.graph_walker.all_shortest(r.package.purl()) data["pathsToRoot"] = { "allShortest": [[dataclasses.asdict(_s) for _s in s] for s in allShortest] } json.dump(data, self.out) self.out.write("\n")
[docs] class ScanResultSarifWriter(ScanResultWriter): def __init__(self, path: Path, **args): super().__init__(**args) self.frame = self._create_skeleton() self.path = path def close(self) -> None: json.dump(self.frame, self.out) self.out.write("\n") @classmethod def _create_skeleton(cls) -> dict: def get_project_url(): url_node = metadata("debsbom").get("Project-URL") if not url_node: return None return url_node.split(",")[-1].strip() return { "version": "2.1.0", "$schema": SARIF_SCHEMA_URL, "runs": [ { "tool": { "driver": { "name": "debsbom", "version": version("debsbom"), "informationUri": get_project_url(), "rules": [], }, }, "results": [], } ], } def write(self, r: ScanResultItem) -> None: if not r.affected: return v = r.vulnerability rule_id = f"{v.cve}-{r.package.name}" rule = { "id": rule_id, "name": "OsPackageVulnerability", "shortDescription": { "text": f"{v.cve} {v.urgency} vulnerability for {r.package.name} package" }, "help": { "text": f"Vulnerability {v.cve}\n" f"Severity: {v.urgency}\n" f"Package: {r.package.name}\n" f"Version: {r.package.version}\n" f"Fix Version: {v.fixed_version or '(unfixed)'}\n" f"Link: [{v.cve}]({self.sdo_url}/{v.cve})" }, "properties": { "tracker": f"{self.sdo_url}/{v.cve}", }, } if v.description: rule["fullDescription"] = {"text": v.description} if v.debianbug: rule["properties"]["debianbug"] = v.debianbug rule["properties"]["bugreport"] = f"{self.bdo_url}?bug={v.debianbug}" if v.fixed_version: rule["properties"]["fixVersion"] = v.fixed_version self.frame["runs"][0]["tool"]["driver"]["rules"].append(rule) for bin_pkg in self.affected_binaries(r.package): result = { "ruleId": rule_id, "level": "warning" if v.urgency == CveUrgency.HIGH else "note", "message": { "text": f"The SBOM reports {bin_pkg.name} at version {bin_pkg.version} which is a vulnerable deb package affected by {v.cve}", }, "locations": [ { "logicalLocations": [ { "name": str(bin_pkg), "fullyQualifiedName": str(bin_pkg.purl()), }, ], }, ], "properties": { "PURL": str(bin_pkg.purl()), }, } if self.path: result["locations"][0]["physicalLocation"] = { "artifactLocation": {"uri": f"file://{self.path.resolve()}"}, } self.frame["runs"][0]["results"].append(result)
[docs] class ScanResultVexWriter(ScanResultWriter): def __init__(self, author, product, **args): super().__init__(**args) sde = os.environ.get("SOURCE_DATE_EPOCH") if sde: self.ts = datetime.fromtimestamp(float(sde)) else: self.ts = datetime.now(timezone.utc) if not author: raise RuntimeError("No author information provided (needed for VEX)") self.author = author self.product = product self.frame = self._create_skeleton() def close(self) -> None: json.dump(self.frame, self.out) self.out.write("\n") def _create_skeleton(self) -> dict: return { "@context": VEX_CONTEXT, "@id": VEX_SCHEMA_ID, "author": self.author, "timestamp": self.ts.isoformat(), "version": 1, "tooling": "debsbom {}".format(version("debsbom")), "statements": [], } def _vuln_to_vex(self, r: ScanResultItem) -> dict: def _get_status(v: CveEntry, affected): if not affected: return "not_affected" elif v.status == CveStatus.UNDETERMINED: return "under_investigation" else: return "affected" v = r.vulnerability status = _get_status(v, r.affected) purl = str(r.package.purl()) component = { "@id": purl, "identifiers": { "purl": purl, }, } if r.package.checksums: component["hashes"] = { _CHECKSUM_TO_VEX_HASH[algo.name]: value for algo, value in r.package.checksums.items() if algo.name in _CHECKSUM_TO_VEX_HASH } components = [component] for bin_pkg in self.affected_binaries(r.package): bin_purl = str(bin_pkg.purl()) bin_comp = { "@id": bin_purl, "identifiers": { "purl": bin_purl, }, } if bin_pkg.checksums: bin_comp["hashes"] = { _CHECKSUM_TO_VEX_HASH[algo.name]: value for algo, value in bin_pkg.checksums.items() if algo.name in _CHECKSUM_TO_VEX_HASH } components.append(bin_comp) vex = { "vulnerability": { "@id": f"{self.sdo_url}/{v.cve}", "name": v.cve, }, "status": status, } if self.product: vex["products"] = [ { "@id": self.product, "subcomponents": components, } ] else: vex["products"] = components if v.description: vex["vulnerability"]["description"] = v.description if status == "not_affected": # The Debian tracker does not distinguish between fixed due to inline mitigations # and not affected because the code is not in use. Once upstream gives more # precise information, we can optimize this. # Ref: https://salsa.debian.org/security-tracker-team/security-tracker/-/issues/38 vex["justification"] = "inline_mitigations_already_exist" if status == "affected": if v.status == CveStatus.RESOLVED: vex["action_statement"] = f"update package to {v.fixed_version}" elif v.nodsa: vex["action_statement"] = f"assess relevance (no DSA: {v.nodsa})" else: vex["action_statement"] = "apply inline mitigations" return vex def write(self, r: ScanResultItem) -> None: self.frame["statements"].append(self._vuln_to_vex(r))