diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 82ee4525a..4777e6206 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -57,6 +57,7 @@ from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2 from vulnerabilities.pipelines.v2_importers import pypa_importer as pypa_importer_v2 from vulnerabilities.pipelines.v2_importers import pysec_importer as pysec_importer_v2 +from vulnerabilities.pipelines.v2_importers import pysec_live_importer as pysec_live_importer_v2 from vulnerabilities.pipelines.v2_importers import redhat_importer as redhat_importer_v2 from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2 from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2 @@ -117,3 +118,9 @@ oss_fuzz.OSSFuzzImporter, ] ) + +LIVE_IMPORTERS_REGISTRY = create_registry( + [ + pysec_live_importer_v2.PySecLiveImporterPipeline, + ] +) diff --git a/vulnerabilities/pipelines/v2_importers/pysec_live_importer.py b/vulnerabilities/pipelines/v2_importers/pysec_live_importer.py new file mode 100644 index 000000000..c5fbeee7b --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/pysec_live_importer.py @@ -0,0 +1,122 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# + +from io import BytesIO +from typing import Iterable +from zipfile import ZipFile + +from packageurl import PackageURL +from univers.versions import PypiVersion + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.pipelines.v2_importers.pysec_importer import PyPIImporterPipeline + + +class PySecLiveImporterPipeline(PyPIImporterPipeline): + """ + PySec Live Importer Pipeline + + Collect advisories from OSV PyPI zip for a single PURL. + """ + + pipeline_id = "pysec_live_importer_v2" + supported_types = ["pypi"] + + @classmethod + def steps(cls): + return ( + cls.get_purl_inputs, + cls.fetch_zip, + cls.collect_and_store_advisories, + ) + + def get_purl_inputs(self): + purl = self.inputs["purl"] + if not purl: + raise ValueError("PURL is required for PySecLiveImporterPipeline") + + if isinstance(purl, str): + purl = PackageURL.from_string(purl) + + if not isinstance(purl, PackageURL): + raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance") + + if purl.type not in self.supported_types: + raise ValueError( + f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}" + ) + + if not purl.version: + raise ValueError(f"PURL: {purl!s} is expected to have a version") + + self.purl = purl + + def _is_version_affected(self, advisory_dict, version): + affected = advisory_dict.get("affected", []) + try: + v = PypiVersion(version) + except Exception: + return False + for entry in affected: + ranges = entry.get("ranges", []) + for r in ranges: + events = r.get("events", []) + introduced = None + fixed = None + for event in events: + if "introduced" in event: + introduced = event["introduced"] + if "fixed" in event: + fixed = event["fixed"] + try: + if introduced: + introduced_v = PypiVersion(introduced) + if v < introduced_v: + continue + if fixed: + fixed_v = PypiVersion(fixed) + if v >= fixed_v: + continue + if introduced: + introduced_v = PypiVersion(introduced) + if (not fixed or v < PypiVersion(fixed)) and v >= introduced_v: + return True + except Exception: + continue + return False + + def collect_advisories(self) -> Iterable[AdvisoryData]: + from vulnerabilities.importers.osv import parse_advisory_data_v2 + + with ZipFile(BytesIO(self.advisory_zip)) as zip_file: + for file_name in zip_file.namelist(): + if not file_name.startswith("PYSEC-"): + continue + with zip_file.open(file_name) as f: + import json + + advisory_dict = json.load(f) + + affected = advisory_dict.get("affected", []) + found = False + for entry in affected: + pkg = entry.get("package", {}) + if pkg.get("name") == self.purl.name: + found = True + break + if not found: + continue + if not self._is_version_affected(advisory_dict, self.purl.version): + continue + + f.seek(0) + advisory_text = f.read().decode("utf-8") + yield parse_advisory_data_v2( + raw_data=advisory_dict, + supported_ecosystems=["pypi"], + advisory_url=self.url, + advisory_text=advisory_text, + ) diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_pysec_live_importer_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_pysec_live_importer_v2.py new file mode 100644 index 000000000..45f295a51 --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_pysec_live_importer_v2.py @@ -0,0 +1,141 @@ +import json +from io import BytesIO +from unittest.mock import patch +from zipfile import ZipFile + +import pytest +from packageurl import PackageURL + +from vulnerabilities.importer import AdvisoryData + + +@pytest.fixture +def mock_zip_data(): + # Create a zip with two advisories for the same package with different versions + zip_buffer = BytesIO() + with ZipFile(zip_buffer, mode="w") as zip_file: + advisory1 = { + "advisory_id": "PYSEC-1001", + "summary": "Vuln in foo", + "affected": [ + { + "package": {"name": "foo", "ecosystem": "PyPI"}, + "ranges": [ + { + "type": "ECOSYSTEM", + "events": [{"introduced": "1.0.0"}, {"fixed": "2.0.0"}], + } + ], + } + ], + } + advisory2 = { + "advisory_id": "PYSEC-1002", + "summary": "Vuln in foo, later version", + "affected": [ + { + "package": {"name": "foo", "ecosystem": "PyPI"}, + "ranges": [ + { + "type": "ECOSYSTEM", + "events": [{"introduced": "2.5.0"}, {"fixed": "3.0.0"}], + } + ], + } + ], + } + advisory3 = { + "advisory_id": "PYSEC-2000", + "summary": "Vuln in bar", + "affected": [ + { + "package": {"name": "bar", "ecosystem": "PyPI"}, + "ranges": [ + { + "type": "ECOSYSTEM", + "events": [{"introduced": "0.1.0"}, {"fixed": "0.2.0"}], + } + ], + } + ], + } + zip_file.writestr("PYSEC-1001.json", json.dumps(advisory1)) + zip_file.writestr("PYSEC-1002.json", json.dumps(advisory2)) + zip_file.writestr("PYSEC-2000.json", json.dumps(advisory3)) + zip_buffer.seek(0) + return zip_buffer + + +def test_package_with_version_affected(mock_zip_data): + from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline + + purl = PackageURL(type="pypi", name="foo", version="1.5.0") + + with patch("requests.get") as mock_get: + mock_get.return_value.content = mock_zip_data.read() + + with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse: + + def parse_side_effect(raw_data, supported_ecosystems, advisory_url, advisory_text): + return AdvisoryData( + advisory_id=raw_data["advisory_id"], + summary=raw_data["summary"], + references_v2=[{"url": advisory_url}], + affected_packages=[], + weaknesses=[], + url=advisory_url, + ) + + mock_parse.side_effect = parse_side_effect + + pipeline = PySecLiveImporterPipeline(purl=purl) + pipeline.get_purl_inputs() + pipeline.fetch_zip() + advisories = list(pipeline.collect_advisories()) + + # Only PYSEC-1001 should match + assert len(advisories) == 1 + assert advisories[0].advisory_id == "PYSEC-1001" + + +def test_package_with_version_not_affected(mock_zip_data): + from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline + + purl = PackageURL(type="pypi", name="foo", version="2.2.0") + + with patch("requests.get") as mock_get: + mock_get.return_value.content = mock_zip_data.read() + + with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse: + mock_parse.return_value = AdvisoryData( + advisory_id="PYSEC-1002", + summary="Vuln in foo, later version", + references_v2=[{"url": "dummy"}], + affected_packages=[], + weaknesses=[], + url="dummy", + ) + + pipeline = PySecLiveImporterPipeline(purl=purl) + pipeline.get_purl_inputs() + pipeline.fetch_zip() + advisories = list(pipeline.collect_advisories()) + + # No advisories should match + assert len(advisories) == 0 + + +def test_nonexistent_package(mock_zip_data): + from vulnerabilities.pipelines.v2_importers.pysec_live_importer import PySecLiveImporterPipeline + + purl = PackageURL(type="pypi", name="baz", version="1.0.0") + + with patch("requests.get") as mock_get: + mock_get.return_value.content = mock_zip_data.read() + + pipeline = PySecLiveImporterPipeline(purl=purl) + pipeline.get_purl_inputs() + pipeline.fetch_zip() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0