Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions vulnerabilities/pipelines/v2_importers/curl_live_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#

import logging
from typing import Iterable

from packageurl import PackageURL
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines.v2_importers.curl_importer import CurlImporterPipeline

logger = logging.getLogger(__name__)


class CurlLiveImporterPipeline(CurlImporterPipeline):
"""
Pipeline-based importer for curl advisories from curl.se for a single PURL.
"""

pipeline_id = "curl_live_importer_v2"
supported_types = ["generic"]

@classmethod
def steps(cls):
return (
cls.get_purl_inputs,
cls.collect_and_store_advisories,
)

def get_purl_inputs(self):
purl = self.inputs["purl"]
if not purl:
raise ValueError("PURL is required for CurlLiveImporterPipeline")

if isinstance(purl, str):
purl = PackageURL.from_string(purl)

if not isinstance(purl, PackageURL):
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")

if purl.type not in self.supported_types:
raise ValueError(
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
)

if purl.name != "curl" or purl.namespace != "curl.se":
raise ValueError(f"PURL: {purl!s} is expected to be for curl")

if not purl.version:
raise ValueError(f"PURL: {purl!s} is expected to have a version")

self.purl = purl

def collect_advisories(self) -> Iterable[AdvisoryData]:
for advisory in super().collect_advisories():
if self._advisory_affects_purl(advisory):
yield advisory

def _advisory_affects_purl(self, advisory: AdvisoryData) -> bool:
for affected_package in advisory.affected_packages:
if affected_package.package.name != "curl":
continue

if affected_package.affected_version_range:
try:
purl_version = SemverVersion(self.purl.version)

if purl_version not in affected_package.affected_version_range:
continue
except Exception as e:
logger.error(f"Error checking version {self.purl.version}: {e}")
continue

return True

return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from datetime import datetime
from datetime import timezone
from unittest.mock import patch

import pytest
from packageurl import PackageURL
from univers.version_range import GenericVersionRange
from univers.version_range import VersionConstraint
from univers.versions import SemverVersion

from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.pipelines.v2_importers.curl_live_importer import CurlLiveImporterPipeline

SAMPLE_CURL_ADVISORY = {
"aliases": ["CVE-2024-12345"],
"id": "CVE-2024-12345",
"summary": "Sample vulnerability in curl",
"published": "2024-06-30T08:00:00.00Z",
"affected": [
{
"ranges": [{"type": "SEMVER", "events": [{"introduced": "8.6.0"}, {"fixed": "8.7.0"}]}],
"versions": ["8.6.0"],
}
],
"database_specific": {
"package": "curl",
"URL": "https://curl.se/docs/CVE-2024-12345.json",
"www": "https://curl.se/docs/CVE-2024-12345.html",
"issue": "https://hackerone.com/reports/1111111",
"severity": "High",
"CWE": {
"id": "CWE-119",
"desc": "Improper restriction of operations within bounds of a memory buffer",
},
},
}


@pytest.fixture
def pipeline():
return CurlLiveImporterPipeline()


@patch("vulnerabilities.pipelines.v2_importers.curl_importer.fetch_response")
def test_live_importer_valid_version(mock_fetch, pipeline):
mock_fetch.return_value.json.return_value = [SAMPLE_CURL_ADVISORY]
pipeline.inputs = {"purl": "pkg:generic/curl.se/[email protected]"}

pipeline.get_purl_inputs()
advisories = list(pipeline.collect_advisories())

assert len(advisories) == 1
advisory = advisories[0]

assert advisory.advisory_id == "CVE-2024-12345"
assert advisory.aliases == []
assert advisory.summary == "Sample vulnerability in curl"
assert advisory.date_published == datetime(2024, 6, 30, 8, 0, tzinfo=timezone.utc)
assert advisory.url == "https://curl.se/docs/CVE-2024-12345.json"
assert advisory.weaknesses == [119]

# Affected package check
pkg = advisory.affected_packages[0]
assert isinstance(pkg, AffectedPackageV2)
assert pkg.package == PackageURL(type="generic", namespace="curl.se", name="curl")
assert "8.7.0" in str(pkg.fixed_version_range)
assert "8.6.0" in str(pkg.affected_version_range)

# References
urls = [ref.url for ref in advisory.references_v2]
assert "https://curl.se/docs/CVE-2024-12345.html" in urls
assert "https://hackerone.com/reports/1111111" in urls

# Severity
severity = advisory.severities[0]
assert severity.value == "High"


@patch("vulnerabilities.pipelines.v2_importers.curl_importer.fetch_response")
def test_live_importer_invalid_version(mock_fetch, pipeline):
mock_fetch.return_value.json.return_value = [SAMPLE_CURL_ADVISORY]
pipeline.inputs = {"purl": "pkg:generic/curl.se/[email protected]"}

pipeline.get_purl_inputs()
advisories = list(pipeline.collect_advisories())

assert len(advisories) == 0


def test_invalid_purl(pipeline):
pipeline.inputs = {"purl": "pkg:generic/invalid_namespace/curl@invalid_version"}
with pytest.raises(ValueError):
pipeline.get_purl_inputs()

pipeline.inputs = {"purl": "pkg:generic/curl.se/[email protected]"}
with pytest.raises(ValueError):
pipeline.get_purl_inputs()