diff --git a/minecode/collectors/hex.py b/minecode/collectors/hex.py new file mode 100644 index 00000000..f77b3d40 --- /dev/null +++ b/minecode/collectors/hex.py @@ -0,0 +1,84 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# purldb is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/nexB/purldb for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import logging +import requests +from packageurl import PackageURL + +from minecode.miners.hex import build_packages +from minecode import priority_router +from packagedb.models import PackageContentType + +logger = logging.getLogger(__name__) +handler = logging.StreamHandler() +logger.addHandler(handler) +logger.setLevel(logging.INFO) + + +def get_hex_package_json(name): + """ + Return the metadata JSON for a package from hex.pm API. + Example: https://hex.pm/api/packages/phoenix + """ + + url = f"https://hex.pm/api/packages/{name}" + + try: + response = requests.get(url) + response.raise_for_status() + return response.json() + except requests.exceptions.HTTPError as err: + logger.error(f"HTTP error occurred: {err}") + + +def map_hex_package(package_url, pipelines, priority=0): + """ + Add a hex `package_url` to the PackageDB. + """ + from minecode.model_utils import add_package_to_scan_queue, merge_or_create_package + + name = package_url.name + package_json = get_hex_package_json(name=name) + + if not package_json: + error = f"Package does not exist on hex.pm: {package_url}" + logger.error(error) + return error + + packages = build_packages(metadata_dict=package_json, purl=package_url) + + error = None + for package in packages: + package.extra_data["package_content"] = PackageContentType.SOURCE_ARCHIVE + db_package, _, _, error = merge_or_create_package(package, visit_level=0) + if error: + break + if db_package: + add_package_to_scan_queue(package=db_package, pipelines=pipelines, priority=priority) + + return error + + +@priority_router.route("pkg:hex/.*") +def process_request(purl_str, **kwargs): + """ + Process `priority_resource_uri` containing a hex Package URL (PURL). + """ + from minecode.model_utils import DEFAULT_PIPELINES + + addon_pipelines = kwargs.get("addon_pipelines", []) + pipelines = DEFAULT_PIPELINES + tuple(addon_pipelines) + priority = kwargs.get("priority", 0) + + package_url = PackageURL.from_string(purl_str) + + error_msg = map_hex_package(package_url, pipelines, priority) + + if error_msg: + return error_msg diff --git a/minecode/collectors/pub.py b/minecode/collectors/pub.py index 2190944a..b8a1f84e 100644 --- a/minecode/collectors/pub.py +++ b/minecode/collectors/pub.py @@ -60,7 +60,6 @@ def map_pub_package(package_url, pipelines, priority=0): db_package, _, _, error = merge_or_create_package(package, visit_level=0) if error: break - print(db_package) if db_package: add_package_to_scan_queue(package=db_package, pipelines=pipelines, priority=priority) diff --git a/minecode/miners/hex.py b/minecode/miners/hex.py new file mode 100644 index 00000000..a627dfa5 --- /dev/null +++ b/minecode/miners/hex.py @@ -0,0 +1,100 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# purldb is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# + +from packageurl import PackageURL +from packagedcode import models as scan_models +import requests +from packageurl.contrib.purl2url import build_hex_download_url + +import logging + +logger = logging.getLogger(__name__) +handler = logging.StreamHandler() +logger.addHandler(handler) +logger.setLevel(logging.INFO) + + +def build_single_package(version_info, package_name, version, metadata_dict): + """ + Return a PackageData object from a single pub.dev version_info dict. + """ + description = metadata_dict.get("meta", {}).get("description") + extracted_license_statement = metadata_dict.get("meta", {}).get("licenses") + owners = metadata_dict.get("owners", []) + created_at = metadata_dict.get("inserted_at") + + parties = [] + for owner in owners: + parties.append( + scan_models.Party(name=owner.get("username"), role="owner", email=owner.get("email")) + ) + + homepage_url = version_info.get("html_url") + + purl = PackageURL( + type="hex", + name=package_name, + version=version, + ) + + package = scan_models.PackageData( + type="hex", + name=package_name, + version=version, + description=description, + homepage_url=homepage_url, + download_url=build_hex_download_url(str(purl)), + parties=parties, + sha256=version_info.get("checksum"), + api_data_url=f"https://hex.pm/api/packages/{package_name}/releases/{version}", + release_date=created_at, + license_detections=extracted_license_statement, + ) + package.datasource_id = "hex_api_metadata" + package.set_purl(PackageURL(type="hex", name=package_name, version=version)) + return package + + +def build_packages(metadata_dict, purl): + """ + Yield one or more PackageData objects from pub.dev metadata. + If purl.version is set, use the single-version API response. + Otherwise, use the all-versions API response. + """ + if isinstance(purl, str): + purl = PackageURL.from_string(purl) + + purl_version = purl.version + package_name = purl.name + + if purl_version: + url = f"https://hex.pm/api/packages/{package_name}/releases/{purl_version}" + try: + version_info = requests.get(url).json() + yield build_single_package( + version_info=version_info, + package_name=package_name, + version=purl_version, + metadata_dict=metadata_dict, + ) + except Exception: + return iter([]) + else: + releases = metadata_dict.get("releases", []) + for release in releases: + version = release.get("version") + url = release.get("url") + try: + version_info = requests.get(url).json() + yield build_single_package( + version_info=version_info, + package_name=package_name, + version=version, + metadata_dict=metadata_dict, + ) + except Exception: + logger.error(f"Failed to fetch or parse version info from {url}") + continue diff --git a/minecode/tests/collectors/test_hex.py b/minecode/tests/collectors/test_hex.py new file mode 100644 index 00000000..2698b472 --- /dev/null +++ b/minecode/tests/collectors/test_hex.py @@ -0,0 +1,132 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# purldb is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/nexB/purldb for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import pytest +from unittest.mock import patch, MagicMock + +from packageurl import PackageURL +import requests + +import minecode.collectors.hex as hex_collector +import minecode.miners.hex as hex_miner + + +@pytest.fixture +def package_url(): + return PackageURL.from_string("pkg:hex/phoenix@1.7.11") + + +def test_get_hex_package_json_success(): + with patch("minecode.collectors.hex.requests.get") as mock_get: + mock_response = MagicMock() + mock_response.raise_for_status.return_value = None + mock_response.json.return_value = {"name": "phoenix"} + mock_get.return_value = mock_response + + result = hex_collector.get_hex_package_json("phoenix") + assert result == {"name": "phoenix"} + mock_get.assert_called_once_with("https://hex.pm/api/packages/phoenix") + + +def test_get_hex_package_json_http_error(): + with patch("minecode.collectors.hex.requests.get") as mock_get: + mock_response = MagicMock() + mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError("404 Not Found") + mock_get.return_value = mock_response + + result = hex_collector.get_hex_package_json("badpkg") + assert result is None + + +def test_map_hex_package_success(package_url): + with ( + patch("minecode.collectors.hex.get_hex_package_json") as mock_get_json, + patch("minecode.collectors.hex.build_packages") as mock_build, + patch("minecode.model_utils.merge_or_create_package") as mock_merge, + patch("minecode.model_utils.add_package_to_scan_queue") as mock_add, + ): + mock_get_json.return_value = {"meta": {}, "releases": []} + mock_package = MagicMock() + mock_build.return_value = [mock_package] + mock_merge.return_value = ("db_package", None, None, None) + + error = hex_collector.map_hex_package(package_url, pipelines=["p1"], priority=2) + + assert error is None + mock_get_json.assert_called_once() + mock_build.assert_called_once() + mock_merge.assert_called_once() + mock_add.assert_called_once_with(package="db_package", pipelines=["p1"], priority=2) + + +def test_map_hex_package_not_found(package_url): + with patch("minecode.collectors.hex.get_hex_package_json") as mock_get_json: + mock_get_json.return_value = None + + error = hex_collector.map_hex_package(package_url, pipelines=[]) + assert "Package does not exist" in error + + +def test_build_single_package_creates_package(): + version_info = { + "html_url": "https://hex.pm/packages/phoenix/1.7.11", + "checksum": "deadbeef", + } + metadata_dict = { + "meta": {"description": "test desc", "licenses": ["MIT"]}, + "owners": [{"username": "joe", "email": "joe@example.com"}], + "inserted_at": "2024-01-01T00:00:00Z", + } + + pkg = hex_miner.build_single_package( + version_info=version_info, + package_name="phoenix", + version="1.7.11", + metadata_dict=metadata_dict, + ) + + assert pkg.name == "phoenix" + assert pkg.version == "1.7.11" + assert pkg.description == "test desc" + assert pkg.license_detections == ["MIT"] + assert pkg.parties[0].name == "joe" + assert pkg.sha256 == "deadbeef" + + +def test_build_packages_with_version(package_url): + with ( + patch("minecode.miners.hex.requests.get") as mock_get, + patch("minecode.miners.hex.build_single_package") as mock_build, + ): + mock_get.return_value.json.return_value = {"html_url": "fake"} + mock_build.return_value = "fake_package" + + results = list(hex_miner.build_packages({"meta": {}}, package_url)) + assert results == ["fake_package"] + mock_get.assert_called_once() + + +def test_build_packages_all_versions(): + purl = PackageURL(type="hex", name="phoenix") + metadata = { + "releases": [ + {"version": "1.0.0", "url": "https://hex.pm/api/packages/phoenix/releases/1.0.0"} + ] + } + + with ( + patch("minecode.miners.hex.requests.get") as mock_get, + patch("minecode.miners.hex.build_single_package") as mock_build, + ): + mock_get.return_value.json.return_value = {"html_url": "fake"} + mock_build.return_value = "fake_package" + + results = list(hex_miner.build_packages(metadata, purl)) + assert results == ["fake_package"] + mock_get.assert_called_once()