Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions minecode/collectors/hex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# purldb is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/purldb for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import logging
import requests
from packageurl import PackageURL

from minecode.miners.hex import build_packages
from minecode import priority_router
from packagedb.models import PackageContentType

logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
logger.addHandler(handler)
logger.setLevel(logging.INFO)


def get_hex_package_json(name):
"""
Return the metadata JSON for a package from hex.pm API.
Example: https://hex.pm/api/packages/phoenix
"""

url = f"https://hex.pm/api/packages/{name}"

try:
response = requests.get(url)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as err:
logger.error(f"HTTP error occurred: {err}")


def map_hex_package(package_url, pipelines, priority=0):
"""
Add a hex `package_url` to the PackageDB.
"""
from minecode.model_utils import add_package_to_scan_queue, merge_or_create_package

name = package_url.name
package_json = get_hex_package_json(name=name)

if not package_json:
error = f"Package does not exist on hex.pm: {package_url}"
logger.error(error)
return error

packages = build_packages(metadata_dict=package_json, purl=package_url)

error = None
for package in packages:
package.extra_data["package_content"] = PackageContentType.SOURCE_ARCHIVE
db_package, _, _, error = merge_or_create_package(package, visit_level=0)
if error:
break
if db_package:
add_package_to_scan_queue(package=db_package, pipelines=pipelines, priority=priority)

return error


@priority_router.route("pkg:hex/.*")
def process_request(purl_str, **kwargs):
"""
Process `priority_resource_uri` containing a hex Package URL (PURL).
"""
from minecode.model_utils import DEFAULT_PIPELINES

addon_pipelines = kwargs.get("addon_pipelines", [])
pipelines = DEFAULT_PIPELINES + tuple(addon_pipelines)
priority = kwargs.get("priority", 0)

package_url = PackageURL.from_string(purl_str)

error_msg = map_hex_package(package_url, pipelines, priority)

if error_msg:
return error_msg
1 change: 0 additions & 1 deletion minecode/collectors/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def map_pub_package(package_url, pipelines, priority=0):
db_package, _, _, error = merge_or_create_package(package, visit_level=0)
if error:
break
print(db_package)
if db_package:
add_package_to_scan_queue(package=db_package, pipelines=pipelines, priority=priority)

Expand Down
100 changes: 100 additions & 0 deletions minecode/miners/hex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# purldb is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
#

from packageurl import PackageURL
from packagedcode import models as scan_models
import requests
from packageurl.contrib.purl2url import build_hex_download_url

import logging

logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
logger.addHandler(handler)
logger.setLevel(logging.INFO)


def build_single_package(version_info, package_name, version, metadata_dict):
"""
Return a PackageData object from a single pub.dev version_info dict.
"""
description = metadata_dict.get("meta", {}).get("description")
extracted_license_statement = metadata_dict.get("meta", {}).get("licenses")
owners = metadata_dict.get("owners", [])
created_at = metadata_dict.get("inserted_at")

parties = []
for owner in owners:
parties.append(
scan_models.Party(name=owner.get("username"), role="owner", email=owner.get("email"))
)

homepage_url = version_info.get("html_url")

purl = PackageURL(
type="hex",
name=package_name,
version=version,
)

package = scan_models.PackageData(
type="hex",
name=package_name,
version=version,
description=description,
homepage_url=homepage_url,
download_url=build_hex_download_url(str(purl)),
parties=parties,
sha256=version_info.get("checksum"),
api_data_url=f"https://hex.pm/api/packages/{package_name}/releases/{version}",
release_date=created_at,
license_detections=extracted_license_statement,
)
package.datasource_id = "hex_api_metadata"
package.set_purl(PackageURL(type="hex", name=package_name, version=version))
return package


def build_packages(metadata_dict, purl):
"""
Yield one or more PackageData objects from pub.dev metadata.
If purl.version is set, use the single-version API response.
Otherwise, use the all-versions API response.
"""
if isinstance(purl, str):
purl = PackageURL.from_string(purl)

purl_version = purl.version
package_name = purl.name

if purl_version:
url = f"https://hex.pm/api/packages/{package_name}/releases/{purl_version}"
try:
version_info = requests.get(url).json()
yield build_single_package(
version_info=version_info,
package_name=package_name,
version=purl_version,
metadata_dict=metadata_dict,
)
except Exception:
return iter([])
else:
releases = metadata_dict.get("releases", [])
for release in releases:
version = release.get("version")
url = release.get("url")
try:
version_info = requests.get(url).json()
yield build_single_package(
version_info=version_info,
package_name=package_name,
version=version,
metadata_dict=metadata_dict,
)
except Exception:
logger.error(f"Failed to fetch or parse version info from {url}")
continue
132 changes: 132 additions & 0 deletions minecode/tests/collectors/test_hex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# purldb is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/purldb for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import pytest
from unittest.mock import patch, MagicMock

from packageurl import PackageURL
import requests

import minecode.collectors.hex as hex_collector
import minecode.miners.hex as hex_miner


@pytest.fixture
def package_url():
return PackageURL.from_string("pkg:hex/[email protected]")


def test_get_hex_package_json_success():
with patch("minecode.collectors.hex.requests.get") as mock_get:
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"name": "phoenix"}
mock_get.return_value = mock_response

result = hex_collector.get_hex_package_json("phoenix")
assert result == {"name": "phoenix"}
mock_get.assert_called_once_with("https://hex.pm/api/packages/phoenix")


def test_get_hex_package_json_http_error():
with patch("minecode.collectors.hex.requests.get") as mock_get:
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError("404 Not Found")
mock_get.return_value = mock_response

result = hex_collector.get_hex_package_json("badpkg")
assert result is None


def test_map_hex_package_success(package_url):
with (
patch("minecode.collectors.hex.get_hex_package_json") as mock_get_json,
patch("minecode.collectors.hex.build_packages") as mock_build,
patch("minecode.model_utils.merge_or_create_package") as mock_merge,
patch("minecode.model_utils.add_package_to_scan_queue") as mock_add,
):
mock_get_json.return_value = {"meta": {}, "releases": []}
mock_package = MagicMock()
mock_build.return_value = [mock_package]
mock_merge.return_value = ("db_package", None, None, None)

error = hex_collector.map_hex_package(package_url, pipelines=["p1"], priority=2)

assert error is None
mock_get_json.assert_called_once()
mock_build.assert_called_once()
mock_merge.assert_called_once()
mock_add.assert_called_once_with(package="db_package", pipelines=["p1"], priority=2)


def test_map_hex_package_not_found(package_url):
with patch("minecode.collectors.hex.get_hex_package_json") as mock_get_json:
mock_get_json.return_value = None

error = hex_collector.map_hex_package(package_url, pipelines=[])
assert "Package does not exist" in error


def test_build_single_package_creates_package():
version_info = {
"html_url": "https://hex.pm/packages/phoenix/1.7.11",
"checksum": "deadbeef",
}
metadata_dict = {
"meta": {"description": "test desc", "licenses": ["MIT"]},
"owners": [{"username": "joe", "email": "[email protected]"}],
"inserted_at": "2024-01-01T00:00:00Z",
}

pkg = hex_miner.build_single_package(
version_info=version_info,
package_name="phoenix",
version="1.7.11",
metadata_dict=metadata_dict,
)

assert pkg.name == "phoenix"
assert pkg.version == "1.7.11"
assert pkg.description == "test desc"
assert pkg.license_detections == ["MIT"]
assert pkg.parties[0].name == "joe"
assert pkg.sha256 == "deadbeef"


def test_build_packages_with_version(package_url):
with (
patch("minecode.miners.hex.requests.get") as mock_get,
patch("minecode.miners.hex.build_single_package") as mock_build,
):
mock_get.return_value.json.return_value = {"html_url": "fake"}
mock_build.return_value = "fake_package"

results = list(hex_miner.build_packages({"meta": {}}, package_url))
assert results == ["fake_package"]
mock_get.assert_called_once()


def test_build_packages_all_versions():
purl = PackageURL(type="hex", name="phoenix")
metadata = {
"releases": [
{"version": "1.0.0", "url": "https://hex.pm/api/packages/phoenix/releases/1.0.0"}
]
}

with (
patch("minecode.miners.hex.requests.get") as mock_get,
patch("minecode.miners.hex.build_single_package") as mock_build,
):
mock_get.return_value.json.return_value = {"html_url": "fake"}
mock_build.return_value = "fake_package"

results = list(hex_miner.build_packages(metadata, purl))
assert results == ["fake_package"]
mock_get.assert_called_once()
Loading