Skip to content

Commit 5c8f008

Browse files
committed
Add Curl Live V2 Importer Pipeline #1918
* Add Curl Live V2 Importer * Add tests for the Curl Live V2 Importer * Tested functionally using the Live Evaluation API in #1969 Signed-off-by: Michael Ehab Mikhail <[email protected]>
1 parent dcb0511 commit 5c8f008

File tree

2 files changed

+184
-0
lines changed

2 files changed

+184
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
6+
import logging
7+
from typing import Iterable
8+
9+
from packageurl import PackageURL
10+
from univers.versions import SemverVersion
11+
12+
from vulnerabilities.importer import AdvisoryData
13+
from vulnerabilities.pipelines.v2_importers.curl_importer import CurlImporterPipeline
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class CurlLiveImporterPipeline(CurlImporterPipeline):
19+
"""
20+
Pipeline-based importer for curl advisories from curl.se for a single PURL.
21+
"""
22+
23+
pipeline_id = "curl_live_importer_v2"
24+
supported_types = ["generic"]
25+
26+
@classmethod
27+
def steps(cls):
28+
return (
29+
cls.get_purl_inputs,
30+
cls.collect_and_store_advisories,
31+
)
32+
33+
def get_purl_inputs(self):
34+
purl = self.inputs["purl"]
35+
if not purl:
36+
raise ValueError("PURL is required for CurlLiveImporterPipeline")
37+
38+
if isinstance(purl, str):
39+
purl = PackageURL.from_string(purl)
40+
41+
if not isinstance(purl, PackageURL):
42+
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")
43+
44+
if purl.type not in self.supported_types:
45+
raise ValueError(
46+
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
47+
)
48+
49+
if purl.name != "curl" or purl.namespace != "curl.se":
50+
raise ValueError(f"PURL: {purl!s} is expected to be for curl")
51+
52+
if not purl.version:
53+
raise ValueError(f"PURL: {purl!s} is expected to have a version")
54+
55+
self.purl = purl
56+
57+
def collect_advisories(self) -> Iterable[AdvisoryData]:
58+
for advisory in super().collect_advisories():
59+
if self._advisory_affects_purl(advisory):
60+
yield advisory
61+
62+
def _advisory_affects_purl(self, advisory: AdvisoryData) -> bool:
63+
for affected_package in advisory.affected_packages:
64+
if affected_package.package.name != "curl":
65+
continue
66+
67+
if affected_package.affected_version_range:
68+
try:
69+
purl_version = SemverVersion(self.purl.version)
70+
71+
if purl_version not in affected_package.affected_version_range:
72+
continue
73+
except Exception as e:
74+
logger.error(f"Error checking version {self.purl.version}: {e}")
75+
continue
76+
77+
return True
78+
79+
return False
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
from datetime import datetime
11+
from datetime import timezone
12+
from unittest.mock import patch
13+
14+
import pytest
15+
from packageurl import PackageURL
16+
from univers.versions import SemverVersion
17+
from univers.version_range import GenericVersionRange, VersionConstraint
18+
19+
from vulnerabilities.importer import AffectedPackageV2
20+
from vulnerabilities.pipelines.v2_importers.curl_live_importer import CurlLiveImporterPipeline
21+
22+
SAMPLE_CURL_ADVISORY = {
23+
"aliases": ["CVE-2024-12345"],
24+
"id": "CVE-2024-12345",
25+
"summary": "Sample vulnerability in curl",
26+
"published": "2024-06-30T08:00:00.00Z",
27+
"affected": [
28+
{
29+
"ranges": [{"type": "SEMVER", "events": [{"introduced": "8.6.0"}, {"fixed": "8.7.0"}]}],
30+
"versions": ["8.6.0"],
31+
}
32+
],
33+
"database_specific": {
34+
"package": "curl",
35+
"URL": "https://curl.se/docs/CVE-2024-12345.json",
36+
"www": "https://curl.se/docs/CVE-2024-12345.html",
37+
"issue": "https://hackerone.com/reports/1111111",
38+
"severity": "High",
39+
"CWE": {
40+
"id": "CWE-119",
41+
"desc": "Improper restriction of operations within bounds of a memory buffer",
42+
},
43+
},
44+
}
45+
46+
47+
@pytest.fixture
48+
def pipeline():
49+
return CurlLiveImporterPipeline()
50+
51+
52+
@patch("vulnerabilities.pipelines.v2_importers.curl_importer.fetch_response")
53+
def test_live_importer_valid_version(mock_fetch, pipeline):
54+
mock_fetch.return_value.json.return_value = [SAMPLE_CURL_ADVISORY]
55+
pipeline.inputs = {"purl": "pkg:generic/curl.se/[email protected]"}
56+
57+
pipeline.get_purl_inputs()
58+
advisories = list(pipeline.collect_advisories())
59+
60+
assert len(advisories) == 1
61+
advisory = advisories[0]
62+
63+
assert advisory.advisory_id == "CVE-2024-12345"
64+
assert advisory.aliases == []
65+
assert advisory.summary == "Sample vulnerability in curl"
66+
assert advisory.date_published == datetime(2024, 6, 30, 8, 0, tzinfo=timezone.utc)
67+
assert advisory.url == "https://curl.se/docs/CVE-2024-12345.json"
68+
assert advisory.weaknesses == [119]
69+
70+
# Affected package check
71+
pkg = advisory.affected_packages[0]
72+
assert isinstance(pkg, AffectedPackageV2)
73+
assert pkg.package == PackageURL(type="generic", namespace="curl.se", name="curl")
74+
assert "8.7.0" in str(pkg.fixed_version_range)
75+
assert "8.6.0" in str(pkg.affected_version_range)
76+
77+
# References
78+
urls = [ref.url for ref in advisory.references_v2]
79+
assert "https://curl.se/docs/CVE-2024-12345.html" in urls
80+
assert "https://hackerone.com/reports/1111111" in urls
81+
82+
# Severity
83+
severity = advisory.severities[0]
84+
assert severity.value == "High"
85+
86+
87+
@patch("vulnerabilities.pipelines.v2_importers.curl_importer.fetch_response")
88+
def test_live_importer_invalid_version(mock_fetch, pipeline):
89+
mock_fetch.return_value.json.return_value = [SAMPLE_CURL_ADVISORY]
90+
pipeline.inputs = {"purl": "pkg:generic/curl.se/[email protected]"}
91+
92+
pipeline.get_purl_inputs()
93+
advisories = list(pipeline.collect_advisories())
94+
95+
assert len(advisories) == 0
96+
97+
98+
def test_invalid_purl(pipeline):
99+
pipeline.inputs = {"purl": "pkg:generic/invalid_namespace/curl@invalid_version"}
100+
with pytest.raises(ValueError):
101+
pipeline.get_purl_inputs()
102+
103+
pipeline.inputs = {"purl": "pkg:generic/curl.se/[email protected]"}
104+
with pytest.raises(ValueError):
105+
pipeline.get_purl_inputs()

0 commit comments

Comments
 (0)