diff --git a/Dockerfile b/Dockerfile index 937fb31d3b..37b3e5f87e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,23 +26,17 @@ LABEL org.opencontainers.image.source="https://github.com/aboutcode-org/scancode LABEL org.opencontainers.image.description="ScanCode.io" LABEL org.opencontainers.image.licenses="Apache-2.0" -# Set default values for APP_UID and APP_GID at build-time -ARG APP_UID=1000 -ARG APP_GID=1000 - -ENV APP_NAME=scancodeio -ENV APP_USER=app -ENV APP_UID=${APP_UID} -ENV APP_GID=${APP_GID} -ENV APP_DIR=/opt/$APP_NAME -ENV VENV_LOCATION=/opt/$APP_NAME/.venv +ENV APP_NAME scancodeio +ENV APP_USER app +ENV APP_DIR /opt/$APP_NAME +ENV VENV_LOCATION /opt/$APP_NAME/.venv # Force Python unbuffered stdout and stderr (they are flushed to terminal immediately) -ENV PYTHONUNBUFFERED=1 +ENV PYTHONUNBUFFERED 1 # Do not write Python .pyc files -ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONDONTWRITEBYTECODE 1 # Add the app dir in the Python path for entry points availability -ENV PYTHONPATH=$PYTHONPATH:$APP_DIR +ENV PYTHONPATH $PYTHONPATH:$APP_DIR # OS requirements as per # https://scancode-toolkit.readthedocs.io/en/latest/getting-started/install.html @@ -70,24 +64,27 @@ RUN apt-get update \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* -# Create the APP_USER group, user, and directory with specific UID and GID -RUN groupadd --gid $APP_GID --system $APP_USER \ - && useradd --uid $APP_UID --gid $APP_GID --home-dir $APP_DIR --system --create-home $APP_USER \ - && chown $APP_USER:$APP_USER $APP_DIR \ - && mkdir -p /var/$APP_NAME \ +# Create the APP_USER group and user +RUN addgroup --system $APP_USER \ + && adduser --system --group --home=$APP_DIR $APP_USER \ + && chown $APP_USER:$APP_USER $APP_DIR + +# Create the /var/APP_NAME directory with proper permission for APP_USER +RUN mkdir -p /var/$APP_NAME \ && chown $APP_USER:$APP_USER /var/$APP_NAME # Setup the work directory and the user as APP_USER for the remaining stages WORKDIR $APP_DIR USER $APP_USER -# Create static/ and workspace/ directories -RUN mkdir -p /var/$APP_NAME/static/ /var/$APP_NAME/workspace/ - # Create the virtualenv RUN python -m venv $VENV_LOCATION # Enable the virtualenv, similar effect as "source activate" -ENV PATH=$VENV_LOCATION/bin:$PATH +ENV PATH $VENV_LOCATION/bin:$PATH + +# Create static/ and workspace/ directories +RUN mkdir -p /var/$APP_NAME/static/ \ + && mkdir -p /var/$APP_NAME/workspace/ # Install the dependencies before the codebase COPY for proper Docker layer caching COPY --chown=$APP_USER:$APP_USER pyproject.toml $APP_DIR/ @@ -95,3 +92,4 @@ RUN pip install --no-cache-dir . # Copy the codebase and set the proper permissions for the APP_USER COPY --chown=$APP_USER:$APP_USER . $APP_DIR + diff --git a/scancodeio/settings.py b/scancodeio/settings.py index 1d0310a11b..15e52a4440 100644 --- a/scancodeio/settings.py +++ b/scancodeio/settings.py @@ -1,456 +1,488 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# http://nexb.com and https://github.com/aboutcode-org/scancode.io -# The ScanCode.io software is licensed under the Apache License version 2.0. -# Data generated with ScanCode.io is provided as-is without warranties. -# ScanCode is a trademark of nexB Inc. -# -# You may not use this software except in compliance with the License. -# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. -# -# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND, either express or implied. No content created from -# ScanCode.io should be considered or used as legal advice. Consult an Attorney -# for any legal advice. -# -# ScanCode.io is a free software code scanning tool from nexB Inc. and others. -# Visit https://github.com/aboutcode-org/scancode.io for support and download. - -import sys -import tempfile -from pathlib import Path - -import environ - -PROJECT_DIR = environ.Path(__file__) - 1 -ROOT_DIR = PROJECT_DIR - 1 - -# True if running tests through `./manage test` -IS_TESTS = "test" in sys.argv - -# Environment - -ENV_FILE = "/etc/scancodeio/.env" -if not Path(ENV_FILE).exists(): - ENV_FILE = ROOT_DIR(".env") - -# Do not use local .env environment when running the tests. -if IS_TESTS: - ENV_FILE = None - -env = environ.Env() -environ.Env.read_env(ENV_FILE) - -# Security - -SECRET_KEY = env.str("SECRET_KEY", default="") - -ALLOWED_HOSTS = env.list( - "ALLOWED_HOSTS", - default=[".localhost", "127.0.0.1", "[::1]", "host.docker.internal", "172.17.0.1"], -) - -CSRF_TRUSTED_ORIGINS = env.list("CSRF_TRUSTED_ORIGINS", default=[]) - -# SECURITY WARNING: don't run with debug turned on in production -DEBUG = env.bool("SCANCODEIO_DEBUG", default=False) - -SCANCODEIO_REQUIRE_AUTHENTICATION = env.bool( - "SCANCODEIO_REQUIRE_AUTHENTICATION", default=False -) - -SCANCODEIO_ENABLE_ADMIN_SITE = env.bool("SCANCODEIO_ENABLE_ADMIN_SITE", default=False) - -SECURE_CONTENT_TYPE_NOSNIFF = env.bool("SECURE_CONTENT_TYPE_NOSNIFF", default=True) - -X_FRAME_OPTIONS = env.str("X_FRAME_OPTIONS", default="DENY") - -SESSION_COOKIE_SECURE = env.bool("SESSION_COOKIE_SECURE", default=True) - -CSRF_COOKIE_SECURE = env.bool("CSRF_COOKIE_SECURE", default=True) - -# ``security.W004`` SECURE_HSTS_SECONDS and ``security.W008`` SECURE_SSL_REDIRECT -# are handled by the web server. -SILENCED_SYSTEM_CHECKS = ["security.W004", "security.W008"] - -# ScanCode.io - -SCANCODEIO_WORKSPACE_LOCATION = env.str("SCANCODEIO_WORKSPACE_LOCATION", default="var") - -SCANCODEIO_CONFIG_DIR = env.str("SCANCODEIO_CONFIG_DIR", default=".scancode") - -SCANCODEIO_CONFIG_FILE = env.str( - "SCANCODEIO_CONFIG_FILE", default="scancode-config.yml" -) - -SCANCODEIO_LOG_LEVEL = env.str("SCANCODEIO_LOG_LEVEL", "INFO") - -# Set the number of parallel processes to use for ScanCode related scan execution. -# If the SCANCODEIO_PROCESSES argument is not set, defaults to an optimal number of CPUs -# available on the machine. -SCANCODEIO_PROCESSES = env.int("SCANCODEIO_PROCESSES", default=None) - -SCANCODEIO_POLICIES_FILE = env.str("SCANCODEIO_POLICIES_FILE", default="policies.yml") - -# This setting defines the additional locations ScanCode.io will search for pipelines. -# This should be set to a list of strings that contain full paths to your additional -# pipelines directories. -SCANCODEIO_PIPELINES_DIRS = env.list("SCANCODEIO_PIPELINES_DIRS", default=[]) - -# Maximum time allowed for a pipeline to complete. -SCANCODEIO_TASK_TIMEOUT = env.str("SCANCODEIO_TASK_TIMEOUT", default="24h") - -# Default to 2 minutes. -SCANCODEIO_SCAN_FILE_TIMEOUT = env.int("SCANCODEIO_SCAN_FILE_TIMEOUT", default=120) - -# Default to None which scans all files -SCANCODEIO_SCAN_MAX_FILE_SIZE = env.int("SCANCODEIO_SCAN_MAX_FILE_SIZE", default=None) - -# List views pagination, controls the number of items displayed per page. -# Syntax in .env: SCANCODEIO_PAGINATE_BY=project=10,project_error=10 -SCANCODEIO_PAGINATE_BY = env.dict( - "SCANCODEIO_PAGINATE_BY", - default={ - "project": 20, - "error": 50, - "resource": 100, - "package": 100, - "dependency": 100, - "license": 100, - "relation": 100, - }, -) - -# Default limit for "most common" entries in QuerySets. -SCANCODEIO_MOST_COMMON_LIMIT = env.int("SCANCODEIO_MOST_COMMON_LIMIT", default=7) - -# The base URL (e.g., https://hostname/) of this application instance. -# Required for generating URLs to reference objects within the app, -# such as in webhook notifications. -SCANCODEIO_SITE_URL = env.str("SCANCODEIO_SITE_URL", default="") - -# Fetch authentication credentials - -# SCANCODEIO_FETCH_BASIC_AUTH="host=user,password;" -SCANCODEIO_FETCH_BASIC_AUTH = env.dict( - "SCANCODEIO_FETCH_BASIC_AUTH", - cast={"value": tuple}, - default={}, -) - -# SCANCODEIO_FETCH_DIGEST_AUTH="host=user,password;" -SCANCODEIO_FETCH_DIGEST_AUTH = env.dict( - "SCANCODEIO_FETCH_DIGEST_AUTH", - cast={"value": tuple}, - default={}, -) - -# SCANCODEIO_FETCH_HEADERS="host=Header1=value,Header2=value;" -SCANCODEIO_FETCH_HEADERS = {} -FETCH_HEADERS_STR = env.str("SCANCODEIO_FETCH_HEADERS", default="") -for entry in FETCH_HEADERS_STR.split(";"): - if entry.strip(): - host, headers = entry.split("=", 1) - SCANCODEIO_FETCH_HEADERS[host] = env.parse_value(headers, cast=dict) - -# SCANCODEIO_NETRC_LOCATION="~/.netrc" -SCANCODEIO_NETRC_LOCATION = env.str("SCANCODEIO_NETRC_LOCATION", default="") -if SCANCODEIO_NETRC_LOCATION: - # Propagate the location to the environ for `requests.utils.get_netrc_auth` - env.ENVIRON["NETRC"] = SCANCODEIO_NETRC_LOCATION - -# SCANCODEIO_SKOPEO_CREDENTIALS="host1=user:password,host2=user:password" -SCANCODEIO_SKOPEO_CREDENTIALS = env.dict("SCANCODEIO_SKOPEO_CREDENTIALS", default={}) - -# SCANCODEIO_SKOPEO_AUTHFILE_LOCATION="/path/to/auth.json" -SCANCODEIO_SKOPEO_AUTHFILE_LOCATION = env.str( - "SCANCODEIO_SKOPEO_AUTHFILE_LOCATION", default="" -) - -# This webhook will be added as WebhookSubscription for each new project. -# SCANCODEIO_GLOBAL_WEBHOOK=target_url=https://webhook.url,trigger_on_each_run=False,include_summary=True,include_results=False -SCANCODEIO_GLOBAL_WEBHOOK = env.dict("SCANCODEIO_GLOBAL_WEBHOOK", default={}) - -# Application definition - -INSTALLED_APPS = [ - # Local apps - # Must come before Third-party apps for proper templates override - "scanpipe", - # Django built-in - "django.contrib.auth", - "django.contrib.contenttypes", - "django.contrib.sessions", - "django.contrib.messages", - "django.contrib.staticfiles", - "django.contrib.admin", - "django.contrib.humanize", - # Third-party apps - "crispy_forms", - "crispy_bootstrap3", # required for the djangorestframework browsable API - "django_filters", - "rest_framework", - "rest_framework.authtoken", - "django_rq", - "django_probes", - "taggit", -] - -MIDDLEWARE = [ - "django.middleware.security.SecurityMiddleware", - "django.contrib.sessions.middleware.SessionMiddleware", - "django.middleware.common.CommonMiddleware", - "django.middleware.csrf.CsrfViewMiddleware", - "django.contrib.auth.middleware.AuthenticationMiddleware", - "django.contrib.messages.middleware.MessageMiddleware", - "django.middleware.clickjacking.XFrameOptionsMiddleware", - "scancodeio.middleware.TimezoneMiddleware", -] - -ROOT_URLCONF = "scancodeio.urls" - -WSGI_APPLICATION = "scancodeio.wsgi.application" - -SECURE_PROXY_SSL_HEADER = env.tuple( - "SECURE_PROXY_SSL_HEADER", default=("HTTP_X_FORWARDED_PROTO", "https") -) - -# Database - -DATABASES = { - "default": { - "ENGINE": env.str("SCANCODEIO_DB_ENGINE", "django.db.backends.postgresql"), - "HOST": env.str("SCANCODEIO_DB_HOST", "localhost"), - "NAME": env.str("SCANCODEIO_DB_NAME", "scancodeio"), - "USER": env.str("SCANCODEIO_DB_USER", "scancodeio"), - "PASSWORD": env.str("SCANCODEIO_DB_PASSWORD", "scancodeio"), - "PORT": env.str("SCANCODEIO_DB_PORT", "5432"), - "ATOMIC_REQUESTS": True, - } -} - -DEFAULT_AUTO_FIELD = "django.db.models.AutoField" - -# Forms and filters - -FILTERS_EMPTY_CHOICE_LABEL = env.str("FILTERS_EMPTY_CHOICE_LABEL", default="All") - -# Templates - -TEMPLATES = [ - { - "BACKEND": "django.template.backends.django.DjangoTemplates", - "APP_DIRS": True, - "OPTIONS": { - "debug": DEBUG, - "context_processors": [ - "django.contrib.auth.context_processors.auth", - "django.contrib.messages.context_processors.messages", - "django.template.context_processors.request", - "scancodeio.context_processors.versions", - ], - }, - }, -] - -# Login - -LOGIN_REDIRECT_URL = "project_list" - -# Passwords - -AUTH_PASSWORD_VALIDATORS = [ - { - "NAME": ( - "django.contrib.auth.password_validation.UserAttributeSimilarityValidator" - ), - }, - { - "NAME": "django.contrib.auth.password_validation.MinimumLengthValidator", - "OPTIONS": { - "min_length": env.int("SCANCODEIO_PASSWORD_MIN_LENGTH", default=12), - }, - }, - { - "NAME": "django.contrib.auth.password_validation.CommonPasswordValidator", - }, - { - "NAME": "django.contrib.auth.password_validation.NumericPasswordValidator", - }, -] - -# Testing - -if IS_TESTS: - from django.core.management.utils import get_random_secret_key - - SECRET_KEY = get_random_secret_key() - # Do not pollute the workspace while running the tests. - SCANCODEIO_WORKSPACE_LOCATION = tempfile.mkdtemp() - SCANCODEIO_REQUIRE_AUTHENTICATION = True - SCANCODEIO_SCAN_FILE_TIMEOUT = 120 - SCANCODEIO_POLICIES_FILE = None - # The default password hasher is rather slow by design. - # Using a faster hashing algorithm in the testing context to speed up the run. - PASSWORD_HASHERS = ["django.contrib.auth.hashers.MD5PasswordHasher"] - -# Debug toolbar - -DEBUG_TOOLBAR = env.bool("SCANCODEIO_DEBUG_TOOLBAR", default=False) -if DEBUG and DEBUG_TOOLBAR: - INSTALLED_APPS.append("debug_toolbar") - MIDDLEWARE.append("debug_toolbar.middleware.DebugToolbarMiddleware") - INTERNAL_IPS = ["127.0.0.1"] - -# Logging - -LOGGING = { - "version": 1, - "disable_existing_loggers": False, - "formatters": { - "simple": { - "format": "{levelname} {message}", - "style": "{", - }, - }, - "handlers": { - "null": { - "class": "logging.NullHandler", - }, - "console": { - "class": "logging.StreamHandler", - "formatter": "simple", - }, - }, - "loggers": { - "scanpipe": { - "handlers": ["null"] if IS_TESTS else ["console"], - "level": SCANCODEIO_LOG_LEVEL, - "propagate": False, - }, - "django": { - "handlers": ["null"] if IS_TESTS else ["console"], - "propagate": False, - }, - # Set SCANCODEIO_LOG_LEVEL=DEBUG to display all SQL queries in the console. - "django.db.backends": { - "level": SCANCODEIO_LOG_LEVEL, - }, - }, -} - -# Instead of sending out real emails the console backend just writes the emails -# that would be sent to the standard output. -EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend" - -# Internationalization - -LANGUAGE_CODE = "en-us" - -FORMAT_MODULE_PATH = ["scancodeio.formats"] - -TIME_ZONE = env.str("TIME_ZONE", default="UTC") - -USE_I18N = True - -USE_TZ = True - -# Static files (CSS, JavaScript, Images) - -STATIC_URL = "/static/" - -STATIC_ROOT = env.str("STATIC_ROOT", default="/var/scancodeio/static/") - -STATICFILES_DIRS = [ - PROJECT_DIR("static"), -] - -# Third-party apps - -CRISPY_TEMPLATE_PACK = "bootstrap3" - -# Job Queue - -RQ_QUEUES = { - "default": { - "HOST": env.str("SCANCODEIO_RQ_REDIS_HOST", default="localhost"), - "PORT": env.str("SCANCODEIO_RQ_REDIS_PORT", default="6379"), - "DB": env.int("SCANCODEIO_RQ_REDIS_DB", default=0), - "USERNAME": env.str("SCANCODEIO_RQ_REDIS_USERNAME", default=None), - "PASSWORD": env.str("SCANCODEIO_RQ_REDIS_PASSWORD", default=""), - "DEFAULT_TIMEOUT": env.int("SCANCODEIO_RQ_REDIS_DEFAULT_TIMEOUT", default=360), - # Enable SSL for Redis connections when deploying ScanCode.io in environments - # where Redis is hosted on a separate system (e.g., cloud deployment or remote - # Redis server) to secure data in transit. - "SSL": env.bool("SCANCODEIO_RQ_REDIS_SSL", default=False), - }, -} - -SCANCODEIO_ASYNC = env.bool("SCANCODEIO_ASYNC", default=False) -if not SCANCODEIO_ASYNC: - for queue_config in RQ_QUEUES.values(): - queue_config["ASYNC"] = False - -# ClamAV virus scan -CLAMD_USE_TCP = env.bool("CLAMD_USE_TCP", default=True) -CLAMD_TCP_ADDR = env.str("CLAMD_TCP_ADDR", default="clamav") - -# Django restframework - -REST_FRAMEWORK = { - "DEFAULT_AUTHENTICATION_CLASSES": ( - "rest_framework.authentication.TokenAuthentication", - ), - "DEFAULT_PERMISSION_CLASSES": ("rest_framework.permissions.IsAuthenticated",), - "DEFAULT_RENDERER_CLASSES": ( - "rest_framework.renderers.JSONRenderer", - "rest_framework.renderers.BrowsableAPIRenderer", - "rest_framework.renderers.AdminRenderer", - ), - "DEFAULT_FILTER_BACKENDS": ( - "django_filters.rest_framework.DjangoFilterBackend", - "rest_framework.filters.SearchFilter", - ), - "DEFAULT_PAGINATION_CLASS": "rest_framework.pagination.PageNumberPagination", - "PAGE_SIZE": env.int("SCANCODEIO_REST_API_PAGE_SIZE", default=50), - "UPLOADED_FILES_USE_URL": False, -} - -if not SCANCODEIO_REQUIRE_AUTHENTICATION: - REST_FRAMEWORK["DEFAULT_PERMISSION_CLASSES"] = ( - "rest_framework.permissions.AllowAny", - ) - -# VulnerableCode integration - -VULNERABLECODE_URL = env.str("VULNERABLECODE_URL", default="").rstrip("/") -VULNERABLECODE_USER = env.str("VULNERABLECODE_USER", default="") -VULNERABLECODE_PASSWORD = env.str("VULNERABLECODE_PASSWORD", default="") -VULNERABLECODE_API_KEY = env.str("VULNERABLECODE_API_KEY", default="") - -# PurlDB integration - -PURLDB_URL = env.str("PURLDB_URL", default="").rstrip("/") -PURLDB_USER = env.str("PURLDB_USER", default="") -PURLDB_PASSWORD = env.str("PURLDB_PASSWORD", default="") -PURLDB_API_KEY = env.str("PURLDB_API_KEY", default="") - -# MatchCode.io integration - -MATCHCODEIO_URL = env.str("MATCHCODEIO_URL", default="").rstrip("/") -MATCHCODEIO_USER = env.str("MATCHCODEIO_USER", default="") -MATCHCODEIO_PASSWORD = env.str("MATCHCODEIO_PASSWORD", default="") -MATCHCODEIO_API_KEY = env.str("MATCHCODEIO_API_KEY", default="") - -# FederatedCode integration - -FEDERATEDCODE_GIT_ACCOUNT_URL = env.str( - "FEDERATEDCODE_GIT_ACCOUNT_URL", default="" -).rstrip("/") -FEDERATEDCODE_GIT_SERVICE_TOKEN = env.str("FEDERATEDCODE_GIT_SERVICE_TOKEN", default="") -FEDERATEDCODE_GIT_SERVICE_NAME = env.str("FEDERATEDCODE_GIT_SERVICE_NAME", default="") -FEDERATEDCODE_GIT_SERVICE_EMAIL = env.str("FEDERATEDCODE_GIT_SERVICE_EMAIL", default="") +# SPDX-License-Identifier: Apache-2.0 +# +# http://nexb.com and https://github.com/aboutcode-org/scancode.io +# The ScanCode.io software is licensed under the Apache License version 2.0. +# Data generated with ScanCode.io is provided as-is without warranties. +# ScanCode is a trademark of nexB Inc. +# +# You may not use this software except in compliance with the License. +# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# +# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from +# ScanCode.io should be considered or used as legal advice. Consult an Attorney +# for any legal advice. +# +# ScanCode.io is a free software code scanning tool from nexB Inc. and others. +# Visit https://github.com/aboutcode-org/scancode.io for support and download. + +import logging +import sys +import tempfile +from pathlib import Path + +import environ + +from scanpipe.archiving import LocalFilesystemProvider + +PROJECT_DIR = environ.Path(__file__) - 1 +ROOT_DIR = PROJECT_DIR - 1 + +# True if running tests through `./manage test` +IS_TESTS = "test" in sys.argv + +# Environment + +ENV_FILE = "/etc/scancodeio/.env" +if not Path(ENV_FILE).exists(): + ENV_FILE = ROOT_DIR(".env") + +# Do not use local .env environment when running the tests. +if IS_TESTS: + ENV_FILE = None + +env = environ.Env() +environ.Env.read_env(ENV_FILE) + +# Security + +SECRET_KEY = env.str("SECRET_KEY", default="") + +ALLOWED_HOSTS = env.list( + "ALLOWED_HOSTS", + default=[".localhost", "127.0.0.1", "[::1]", "host.docker.internal", "172.17.0.1"], +) + +CSRF_TRUSTED_ORIGINS = env.list("CSRF_TRUSTED_ORIGINS", default=[]) + +# SECURITY WARNING: don't run with debug turned on in production +DEBUG = env.bool("SCANCODEIO_DEBUG", default=False) + +SCANCODEIO_REQUIRE_AUTHENTICATION = env.bool( + "SCANCODEIO_REQUIRE_AUTHENTICATION", default=False +) + +SCANCODEIO_ENABLE_ADMIN_SITE = env.bool("SCANCODEIO_ENABLE_ADMIN_SITE", default=False) + +SECURE_CONTENT_TYPE_NOSNIFF = env.bool("SECURE_CONTENT_TYPE_NOSNIFF", default=True) + +X_FRAME_OPTIONS = env.str("X_FRAME_OPTIONS", default="DENY") + +SESSION_COOKIE_SECURE = env.bool("SESSION_COOKIE_SECURE", default=True) + +CSRF_COOKIE_SECURE = env.bool("CSRF_COOKIE_SECURE", default=True) + +# ``security.W004`` SECURE_HSTS_SECONDS and ``security.W008`` SECURE_SSL_REDIRECT +# are handled by the web server. +SILENCED_SYSTEM_CHECKS = ["security.W004", "security.W008"] + +# ScanCode.io + +SCANCODEIO_WORKSPACE_LOCATION = env.str("SCANCODEIO_WORKSPACE_LOCATION", default="var") + +SCANCODEIO_CONFIG_DIR = env.str("SCANCODEIO_CONFIG_DIR", default=".scancode") + +SCANCODEIO_CONFIG_FILE = env.str( + "SCANCODEIO_CONFIG_FILE", default="scancode-config.yml" +) + +SCANCODEIO_LOG_LEVEL = env.str("SCANCODEIO_LOG_LEVEL", "INFO") + +# Set the number of parallel processes to use for ScanCode related scan execution. +# If the SCANCODEIO_PROCESSES argument is not set, defaults to an optimal number of CPUs +# available on the machine. +SCANCODEIO_PROCESSES = env.int("SCANCODEIO_PROCESSES", default=None) + +SCANCODEIO_POLICIES_FILE = env.str("SCANCODEIO_POLICIES_FILE", default="policies.yml") + +# This setting defines the additional locations ScanCode.io will search for pipelines. +# This should be set to a list of strings that contain full paths to your additional +# pipelines directories. +SCANCODEIO_PIPELINES_DIRS = env.list("SCANCODEIO_PIPELINES_DIRS", default=[]) + +# Maximum time allowed for a pipeline to complete. +SCANCODEIO_TASK_TIMEOUT = env.str("SCANCODEIO_TASK_TIMEOUT", default="24h") + +# Default to 2 minutes. +SCANCODEIO_SCAN_FILE_TIMEOUT = env.int("SCANCODEIO_SCAN_FILE_TIMEOUT", default=120) + +# Default to None which scans all files +SCANCODEIO_SCAN_MAX_FILE_SIZE = env.int("SCANCODEIO_SCAN_MAX_FILE_SIZE", default=None) + +# List views pagination, controls the number of items displayed per page. +# Syntax in .env: SCANCODEIO_PAGINATE_BY=project=10,project_error=10 +SCANCODEIO_PAGINATE_BY = env.dict( + "SCANCODEIO_PAGINATE_BY", + default={ + "project": 20, + "error": 50, + "resource": 100, + "package": 100, + "dependency": 100, + "license": 100, + "relation": 100, + }, +) + +# Default limit for "most common" entries in QuerySets. +SCANCODEIO_MOST_COMMON_LIMIT = env.int("SCANCODEIO_MOST_COMMON_LIMIT", default=7) + +# The base URL (e.g., https://hostname/) of this application instance. +# Required for generating URLs to reference objects within the app, +# such as in webhook notifications. +SCANCODEIO_SITE_URL = env.str("SCANCODEIO_SITE_URL", default="") + +# Fetch authentication credentials + +# SCANCODEIO_FETCH_BASIC_AUTH="host=user,password;" +SCANCODEIO_FETCH_BASIC_AUTH = env.dict( + "SCANCODEIO_FETCH_BASIC_AUTH", + cast={"value": tuple}, + default={}, +) + +# SCANCODEIO_FETCH_DIGEST_AUTH="host=user,password;" +SCANCODEIO_FETCH_DIGEST_AUTH = env.dict( + "SCANCODEIO_FETCH_DIGEST_AUTH", + cast={"value": tuple}, + default={}, +) + +# SCANCODEIO_FETCH_HEADERS="host=Header1=value,Header2=value;" +SCANCODEIO_FETCH_HEADERS = {} +FETCH_HEADERS_STR = env.str("SCANCODEIO_FETCH_HEADERS", default="") +for entry in FETCH_HEADERS_STR.split(";"): + if entry.strip(): + host, headers = entry.split("=", 1) + SCANCODEIO_FETCH_HEADERS[host] = env.parse_value(headers, cast=dict) + +# SCANCODEIO_NETRC_LOCATION="~/.netrc" +SCANCODEIO_NETRC_LOCATION = env.str("SCANCODEIO_NETRC_LOCATION", default="") +if SCANCODEIO_NETRC_LOCATION: + # Propagate the location to the environ for `requests.utils.get_netrc_auth` + env.ENVIRON["NETRC"] = SCANCODEIO_NETRC_LOCATION + +# SCANCODEIO_SKOPEO_CREDENTIALS="host1=user:password,host2=user:password" +SCANCODEIO_SKOPEO_CREDENTIALS = env.dict("SCANCODEIO_SKOPEO_CREDENTIALS", default={}) + +# SCANCODEIO_SKOPEO_AUTHFILE_LOCATION="/path/to/auth.json" +SCANCODEIO_SKOPEO_AUTHFILE_LOCATION = env.str( + "SCANCODEIO_SKOPEO_AUTHFILE_LOCATION", default="" +) + +# This webhook will be added as WebhookSubscription for each new project. +# SCANCODEIO_GLOBAL_WEBHOOK=target_url=https://webhook.url,trigger_on_each_run=False,include_summary=True,include_results=False +SCANCODEIO_GLOBAL_WEBHOOK = env.dict("SCANCODEIO_GLOBAL_WEBHOOK", default={}) + +# Application definition + +INSTALLED_APPS = [ + # Local apps + # Must come before Third-party apps for proper templates override + "scanpipe", + # Django built-in + "django.contrib.auth", + "django.contrib.contenttypes", + "django.contrib.sessions", + "django.contrib.messages", + "django.contrib.staticfiles", + "django.contrib.admin", + "django.contrib.humanize", + # Third-party apps + "crispy_forms", + "crispy_bootstrap3", # required for the djangorestframework browsable API + "django_filters", + "rest_framework", + "rest_framework.authtoken", + "django_rq", + "django_probes", + "taggit", +] + +MIDDLEWARE = [ + "django.middleware.security.SecurityMiddleware", + "django.contrib.sessions.middleware.SessionMiddleware", + "django.middleware.common.CommonMiddleware", + "django.middleware.csrf.CsrfViewMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", + "django.contrib.messages.middleware.MessageMiddleware", + "django.middleware.clickjacking.XFrameOptionsMiddleware", + "scancodeio.middleware.TimezoneMiddleware", +] + +ROOT_URLCONF = "scancodeio.urls" + +WSGI_APPLICATION = "scancodeio.wsgi.application" + +SECURE_PROXY_SSL_HEADER = env.tuple( + "SECURE_PROXY_SSL_HEADER", default=("HTTP_X_FORWARDED_PROTO", "https") +) + +# Database + +DATABASES = { + "default": { + "ENGINE": env.str("SCANCODEIO_DB_ENGINE", "django.db.backends.postgresql"), + "HOST": env.str("SCANCODEIO_DB_HOST", "localhost"), + "NAME": env.str("SCANCODEIO_DB_NAME", "scancodeio"), + "USER": env.str("SCANCODEIO_DB_USER", "scancodeio"), + "PASSWORD": env.str("SCANCODEIO_DB_PASSWORD", "scancodeio"), + "PORT": env.str("SCANCODEIO_DB_PORT", "5432"), + "ATOMIC_REQUESTS": True, + } +} + +DEFAULT_AUTO_FIELD = "django.db.models.AutoField" + +# Forms and filters + +FILTERS_EMPTY_CHOICE_LABEL = env.str("FILTERS_EMPTY_CHOICE_LABEL", default="All") + +# Templates + +TEMPLATES = [ + { + "BACKEND": "django.template.backends.django.DjangoTemplates", + "APP_DIRS": True, + "OPTIONS": { + "debug": DEBUG, + "context_processors": [ + "django.contrib.auth.context_processors.auth", + "django.contrib.messages.context_processors.messages", + "django.template.context_processors.request", + "scancodeio.context_processors.versions", + ], + }, + }, +] + +# Login + +LOGIN_REDIRECT_URL = "project_list" + +# Passwords + +AUTH_PASSWORD_VALIDATORS = [ + { + "NAME": ( + "django.contrib.auth.password_validation.UserAttributeSimilarityValidator" + ), + }, + { + "NAME": "django.contrib.auth.password_validation.MinimumLengthValidator", + "OPTIONS": { + "min_length": env.int("SCANCODEIO_PASSWORD_MIN_LENGTH", default=12), + }, + }, + { + "NAME": "django.contrib.auth.password_validation.CommonPasswordValidator", + }, + { + "NAME": "django.contrib.auth.password_validation.NumericPasswordValidator", + }, +] + +# Testing + +if IS_TESTS: + from django.core.management.utils import get_random_secret_key + + SECRET_KEY = get_random_secret_key() + # Do not pollute the workspace while running the tests. + SCANCODEIO_WORKSPACE_LOCATION = tempfile.mkdtemp() + SCANCODEIO_REQUIRE_AUTHENTICATION = True + SCANCODEIO_SCAN_FILE_TIMEOUT = 120 + SCANCODEIO_POLICIES_FILE = None + # The default password hasher is rather slow by design. + # Using a faster hashing algorithm in the testing context to speed up the run. + PASSWORD_HASHERS = ["django.contrib.auth.hashers.MD5PasswordHasher"] + +# Debug toolbar + +DEBUG_TOOLBAR = env.bool("SCANCODEIO_DEBUG_TOOLBAR", default=False) +if DEBUG and DEBUG_TOOLBAR: + INSTALLED_APPS.append("debug_toolbar") + MIDDLEWARE.append("debug_toolbar.middleware.DebugToolbarMiddleware") + INTERNAL_IPS = ["127.0.0.1"] + +# Logging + +LOGGING = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "simple": { + "format": "{levelname} {message}", + "style": "{", + }, + }, + "handlers": { + "null": { + "class": "logging.NullHandler", + }, + "console": { + "class": "logging.StreamHandler", + "formatter": "simple", + }, + }, + "loggers": { + "scanpipe": { + "handlers": ["null"] if IS_TESTS else ["console"], + "level": SCANCODEIO_LOG_LEVEL, + "propagate": False, + }, + "django": { + "handlers": ["null"] if IS_TESTS else ["console"], + "propagate": False, + }, + # Set SCANCODEIO_LOG_LEVEL=DEBUG to display all SQL queries in the console. + "django.db.backends": { + "level": SCANCODEIO_LOG_LEVEL, + }, + }, +} + +# Instead of sending out real emails the console backend just writes the emails +# that would be sent to the standard output. +EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend" + +# Internationalization + +LANGUAGE_CODE = "en-us" + +FORMAT_MODULE_PATH = ["scancodeio.formats"] + +TIME_ZONE = env.str("TIME_ZONE", default="UTC") + +USE_I18N = True + +USE_TZ = True + +# Static files (CSS, JavaScript, Images) + +STATIC_URL = "/static/" + +STATIC_ROOT = env.str("STATIC_ROOT", default="/var/scancodeio/static/") + +STATICFILES_DIRS = [ + PROJECT_DIR("static"), +] + +# Third-party apps + +CRISPY_TEMPLATE_PACK = "bootstrap3" + +# Centralized archive directory for all projects +CENTRAL_ARCHIVE_PATH = env.str( + "CENTRAL_ARCHIVE_PATH", default="/var/scancodeio/archives" +) + +# localstorage configuration +DOWNLOAD_ARCHIVING_PROVIDER = env.str( + "DOWNLOAD_ARCHIVING_PROVIDER", default="localstorage" +) + +# For local storage, we would store the root path in that setting +DOWNLOAD_ARCHIVING_PROVIDER_CONFIGURATION = env.dict( + "DOWNLOAD_ARCHIVING_PROVIDER_CONFIGURATION", default=None +) + +# Initialize the DownloadStore for local storage + +download_store = None +logger = logging.getLogger(__name__) +if DOWNLOAD_ARCHIVING_PROVIDER == "localstorage": + config = DOWNLOAD_ARCHIVING_PROVIDER_CONFIGURATION or {} + root_path = Path(config.get("root_path", CENTRAL_ARCHIVE_PATH)) + try: + download_store = LocalFilesystemProvider(root_path=root_path) + except Exception as e: + logger.error(f"Failed to initialize LocalFilesystemProvider: {e}") +else: + logger.error(f"Unknown DOWNLOAD_ARCHIVING_PROVIDER: {DOWNLOAD_ARCHIVING_PROVIDER}") + +# Job Queue + +RQ_QUEUES = { + "default": { + "HOST": env.str("SCANCODEIO_RQ_REDIS_HOST", default="localhost"), + "PORT": env.str("SCANCODEIO_RQ_REDIS_PORT", default="6379"), + "DB": env.int("SCANCODEIO_RQ_REDIS_DB", default=0), + "USERNAME": env.str("SCANCODEIO_RQ_REDIS_USERNAME", default=None), + "PASSWORD": env.str("SCANCODEIO_RQ_REDIS_PASSWORD", default=""), + "DEFAULT_TIMEOUT": env.int("SCANCODEIO_RQ_REDIS_DEFAULT_TIMEOUT", default=360), + # Enable SSL for Redis connections when deploying ScanCode.io in environments + # where Redis is hosted on a separate system (e.g., cloud deployment or remote + # Redis server) to secure data in transit. + "SSL": env.bool("SCANCODEIO_RQ_REDIS_SSL", default=False), + }, +} + +SCANCODEIO_ASYNC = env.bool("SCANCODEIO_ASYNC", default=False) +if not SCANCODEIO_ASYNC: + for queue_config in RQ_QUEUES.values(): + queue_config["ASYNC"] = False + +# ClamAV virus scan +CLAMD_USE_TCP = env.bool("CLAMD_USE_TCP", default=True) +CLAMD_TCP_ADDR = env.str("CLAMD_TCP_ADDR", default="clamav") + +# Django restframework + +REST_FRAMEWORK = { + "DEFAULT_AUTHENTICATION_CLASSES": ( + "rest_framework.authentication.TokenAuthentication", + ), + "DEFAULT_PERMISSION_CLASSES": ("rest_framework.permissions.IsAuthenticated",), + "DEFAULT_RENDERER_CLASSES": ( + "rest_framework.renderers.JSONRenderer", + "rest_framework.renderers.BrowsableAPIRenderer", + "rest_framework.renderers.AdminRenderer", + ), + "DEFAULT_FILTER_BACKENDS": ( + "django_filters.rest_framework.DjangoFilterBackend", + "rest_framework.filters.SearchFilter", + ), + "DEFAULT_PAGINATION_CLASS": "rest_framework.pagination.PageNumberPagination", + "PAGE_SIZE": env.int("SCANCODEIO_REST_API_PAGE_SIZE", default=50), + "UPLOADED_FILES_USE_URL": False, +} + +if not SCANCODEIO_REQUIRE_AUTHENTICATION: + REST_FRAMEWORK["DEFAULT_PERMISSION_CLASSES"] = ( + "rest_framework.permissions.AllowAny", + ) + +# VulnerableCode integration + +VULNERABLECODE_URL = env.str("VULNERABLECODE_URL", default="").rstrip("/") +VULNERABLECODE_USER = env.str("VULNERABLECODE_USER", default="") +VULNERABLECODE_PASSWORD = env.str("VULNERABLECODE_PASSWORD", default="") +VULNERABLECODE_API_KEY = env.str("VULNERABLECODE_API_KEY", default="") + +# PurlDB integration + +PURLDB_URL = env.str("PURLDB_URL", default="").rstrip("/") +PURLDB_USER = env.str("PURLDB_USER", default="") +PURLDB_PASSWORD = env.str("PURLDB_PASSWORD", default="") +PURLDB_API_KEY = env.str("PURLDB_API_KEY", default="") + +# MatchCode.io integration + +MATCHCODEIO_URL = env.str("MATCHCODEIO_URL", default="").rstrip("/") +MATCHCODEIO_USER = env.str("MATCHCODEIO_USER", default="") +MATCHCODEIO_PASSWORD = env.str("MATCHCODEIO_PASSWORD", default="") +MATCHCODEIO_API_KEY = env.str("MATCHCODEIO_API_KEY", default="") + +# FederatedCode integration + +FEDERATEDCODE_GIT_ACCOUNT_URL = env.str( + "FEDERATEDCODE_GIT_ACCOUNT_URL", default="" +).rstrip("/") +FEDERATEDCODE_GIT_SERVICE_TOKEN = env.str("FEDERATEDCODE_GIT_SERVICE_TOKEN", default="") +FEDERATEDCODE_GIT_SERVICE_NAME = env.str("FEDERATEDCODE_GIT_SERVICE_NAME", default="") +FEDERATEDCODE_GIT_SERVICE_EMAIL = env.str("FEDERATEDCODE_GIT_SERVICE_EMAIL", default="") diff --git a/scanpipe/archiving.py b/scanpipe/archiving.py new file mode 100644 index 0000000000..3f3d66e2e8 --- /dev/null +++ b/scanpipe/archiving.py @@ -0,0 +1,185 @@ +# scanpipe/archiving.py +# SPDX-License-Identifier: Apache-2.0 +# +# http://nexb.com and https://github.com/aboutcode-org/scancode.io +# The ScanCode.io software is licensed under the Apache License version 2.0. +# Data generated with ScanCode.io is provided as-is without warranties. +# ScanCode is a trademark of nexB Inc. +# +# You may not use this software except in compliance with the License. +# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# +# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from +# ScanCode.io should be considered or used as legal advice. Consult an Attorney +# for any legal advice. +# +# ScanCode.io is a free software code scanning tool from nexB Inc. and others. +# Visit https://github.com/aboutcode-org/scancode.io for support and download. + +import hashlib +import json +import logging +from abc import ABC +from abc import abstractmethod +from dataclasses import dataclass +from pathlib import Path + +logger = logging.getLogger(__name__) + + +@dataclass +class Download: + sha256: str + download_date: str + download_url: str + filename: str + + +class DownloadStore(ABC): + def _compute_sha256(self, content: bytes) -> str: + """Compute SHA256 hash for content.""" + return hashlib.sha256(content).hexdigest() + + def _compute_origin_hash( + self, filename: str, download_date: str, download_url: str + ) -> str: + """Compute a hash for the metadata to name the origin JSON file.""" + to_hash = f"{filename}{download_date}{download_url}".encode() + return hashlib.sha256(to_hash).hexdigest() + + def _build_metadata( + self, sha256: str, filename: str, download_date: str, download_url: str + ) -> dict: + """Build metadata dictionary for JSON storage.""" + return { + "sha256": sha256, + "filename": filename, + "download_date": download_date, + "download_url": download_url, + } + + @abstractmethod + def _get_content_path(self, sha256: str) -> str: + """Get the storage path/key for the content based on SHA256.""" + pass + + @abstractmethod + def list(self): + """Return an iterable of all stored downloads.""" + pass + + @abstractmethod + def get(self, sha256_checksum: str): + """Return a Download object for this checksum or None.""" + pass + + @abstractmethod + def put(self, content: bytes, download_url: str, download_date: str, filename: str): + """ + Store content with its metadata. Return a Download object on success. + Raise an exception on error. + """ + pass + + @abstractmethod + def find( + self, download_url: str = None, filename: str = None, download_date: str = None + ): + """Return a Download object matching the metadata or None.""" + pass + + +class LocalFilesystemProvider(DownloadStore): + def __init__(self, root_path: Path): + self.root_path = root_path + + def _get_content_path(self, sha256: str) -> Path: + """Create a nested path like 59/4c/67/... based on the SHA256 hash.""" + return self.root_path / sha256[:2] / sha256[2:4] / sha256[4:] + + def list(self): + """Return an iterable of all stored downloads.""" + downloads = [] + for content_path in self.root_path.rglob("content"): + origin_files = list(content_path.parent.glob("origin-*.json")) + for origin_file in origin_files: + try: + with open(origin_file) as f: + data = json.load(f) + downloads.append(Download(**data)) + except Exception as e: + logger.error(f"Error reading {origin_file}: {e}") + return downloads + + def get(self, sha256_checksum: str): + """Retrieve a Download object for the given SHA256 hash.""" + content_path = self._get_content_path(sha256_checksum) + if content_path.exists(): + origin_files = list(content_path.glob("origin-*.json")) + if origin_files: + try: + with open(origin_files[0]) as f: + data = json.load(f) + return Download(**data) + except Exception as e: + logger.error( + f"Error reading origin file for {sha256_checksum}: {e}" + ) + return None + + def put(self, content: bytes, download_url: str, download_date: str, filename: str): + """Store the content and its metadata.""" + sha256 = self._compute_sha256(content) + content_path = self._get_content_path(sha256) + content_path.mkdir(parents=True, exist_ok=True) + + content_file = content_path / "content" + if not content_file.exists(): + try: + with open(content_file, "wb") as f: + f.write(content) + except Exception as e: + raise Exception(f"Failed to write content to {content_file}: {e}") + + origin_hash = self._compute_origin_hash(filename, download_date, download_url) + origin_filename = f"origin-{origin_hash}.json" + origin_path = content_path / origin_filename + if origin_path.exists(): + raise Exception(f"Origin {origin_filename} already exists") + + metadata = self._build_metadata(sha256, filename, download_date, download_url) + try: + with open(origin_path, "w") as f: + json.dump(metadata, f, indent=2) + except Exception as e: + raise Exception(f"Failed to write metadata to {origin_path}: {e}") + + return Download(**metadata) + + def find( + self, download_url: str = None, filename: str = None, download_date: str = None + ): + """Find a download based on metadata.""" + if not (download_url or filename or download_date): + return None + for content_path in self.root_path.rglob("origin-*.json"): + try: + with open(content_path) as f: + data = json.load(f) + if ( + (download_url is None or data.get("url") == download_url) + and (filename is None or data.get("filename") == filename) + and ( + download_date is None + or data.get("download_date") == download_date + ) + ): + return Download(**data) + except Exception as e: + logger.error(f"Error reading {content_path}: {e}") + return None diff --git a/scanpipe/pipelines/__init__.py b/scanpipe/pipelines/__init__.py index bcd444e7c5..f24ce0026b 100644 --- a/scanpipe/pipelines/__init__.py +++ b/scanpipe/pipelines/__init__.py @@ -1,283 +1,307 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# http://nexb.com and https://github.com/aboutcode-org/scancode.io -# The ScanCode.io software is licensed under the Apache License version 2.0. -# Data generated with ScanCode.io is provided as-is without warranties. -# ScanCode is a trademark of nexB Inc. -# -# You may not use this software except in compliance with the License. -# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. -# -# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND, either express or implied. No content created from -# ScanCode.io should be considered or used as legal advice. Consult an Attorney -# for any legal advice. -# -# ScanCode.io is a free software code scanning tool from nexB Inc. and others. -# Visit https://github.com/aboutcode-org/scancode.io for support and download. - -import inspect -import logging -import traceback -from contextlib import contextmanager -from functools import wraps -from pathlib import Path - -import bleach -from markdown_it import MarkdownIt -from pyinstrument import Profiler - -from aboutcode.pipeline import BasePipeline - -logger = logging.getLogger(__name__) - - -class InputFilesError(Exception): - """InputFile is missing or cannot be downloaded.""" - - def __init__(self, error_tracebacks): - self.error_tracebacks = error_tracebacks - super().__init__(self._generate_message()) - - def _generate_message(self): - message = "InputFilesError encountered with the following issues:\n" - for index, (error, tb) in enumerate(self.error_tracebacks, start=1): - message += f"\nError {index}: {str(error)}\n\n{tb}" - return message - - -def convert_markdown_to_html(markdown_text): - """Convert Markdown text to sanitized HTML.""" - # Using the "js-default" for safety. - html_content = MarkdownIt("js-default").renderInline(markdown_text) - # Sanitize HTML using bleach. - sanitized_html = bleach.clean(html_content) - return sanitized_html - - -class CommonStepsMixin: - """Common steps available on all project pipelines.""" - - def flag_empty_files(self): - """Flag empty files.""" - from scanpipe.pipes import flag - - flag.flag_empty_files(self.project) - - def flag_ignored_resources(self): - """Flag ignored resources based on Project ``ignored_patterns`` setting.""" - from scanpipe.pipes import flag - - ignored_patterns = self.env.get("ignored_patterns", []) - - if isinstance(ignored_patterns, str): - ignored_patterns = ignored_patterns.splitlines() - ignored_patterns.extend(flag.DEFAULT_IGNORED_PATTERNS) - - flag.flag_ignored_patterns( - codebaseresources=self.project.codebaseresources.no_status(), - patterns=ignored_patterns, - ) - - def extract_archive(self, location, target): - """Extract archive at `location` to `target`. Save errors as messages.""" - from scanpipe.pipes import scancode - - extract_errors = scancode.extract_archive(location, target) - - for resource_location, errors in extract_errors.items(): - resource_path = Path(resource_location) - - if resource_path.is_relative_to(self.project.codebase_path): - resource_path = resource_path.relative_to(self.project.codebase_path) - details = {"resource_path": str(resource_path)} - elif resource_path.is_relative_to(self.project.input_path): - resource_path = resource_path.relative_to(self.project.input_path) - details = {"path": f"input/{str(resource_path)}"} - else: - details = {"filename": str(resource_path.name)} - - self.project.add_error( - description="\n".join(errors), - model="extract_archive", - details=details, - ) - - def extract_archives(self, location=None): - """Extract archives located in the codebase/ directory with extractcode.""" - from scanpipe.pipes import scancode - - if not location: - location = self.project.codebase_path - - extract_errors = scancode.extract_archives(location=location, recurse=True) - - for resource_path, errors in extract_errors.items(): - self.project.add_error( - description="\n".join(errors), - model="extract_archives", - details={"resource_path": resource_path}, - ) - - # Reload the project env post-extraction as the scancode-config.yml file - # may be located in one of the extracted archives. - self.env = self.project.get_env() - - def download_missing_inputs(self): - """ - Download any InputSource missing on disk. - Raise an error if any of the uploaded files is not available or not reachable. - """ - error_tracebacks = [] - - for input_source in self.project.inputsources.all(): - if input_source.exists(): - continue - - if input_source.is_uploaded: - msg = f"Uploaded file {input_source} not available." - self.log(msg) - error_tracebacks.append((msg, "No traceback available.")) - continue - - self.log(f"Fetching input from {input_source.download_url}") - try: - input_source.fetch() - except Exception as error: - traceback_str = traceback.format_exc() - logger.error(traceback_str) - self.log(f"{input_source.download_url} could not be fetched.") - error_tracebacks.append((str(error), traceback_str)) - - if error_tracebacks: - raise InputFilesError(error_tracebacks) - - -class ProjectPipeline(CommonStepsMixin, BasePipeline): - """Main class for all project related pipelines including common steps methods.""" - - # Flag specifying whether to download missing inputs as an initial step. - download_inputs = True - - # Optional URL that targets a view of the results relative to this Pipeline. - # This URL may contain dictionary-style string formatting, which will be - # interpolated against the project's field attributes. - # For example, you could use results_url="/project/{slug}/packages/?filter=value" - # to target the Package list view with an active filtering. - results_url = "" - - def __init__(self, run_instance): - """Load the Pipeline execution context from a Run database object.""" - self.run = run_instance - self.project = run_instance.project - self.env = self.project.get_env() - - self.pipeline_class = run_instance.pipeline_class - self.pipeline_name = run_instance.pipeline_name - - self.selected_groups = run_instance.selected_groups or [] - self.selected_steps = run_instance.selected_steps or [] - - self.ecosystem_config = None - - @classmethod - def get_initial_steps(cls): - """Add the ``download_inputs`` step as an initial step if enabled.""" - if cls.download_inputs: - return (cls.download_missing_inputs,) - - @classmethod - def get_info(cls, as_html=False): - """Add the option to render the values as HTML.""" - info = super().get_info() - - if as_html: - info["summary"] = convert_markdown_to_html(info["summary"]) - info["description"] = convert_markdown_to_html(info["description"]) - for step in info["steps"]: - step["doc"] = convert_markdown_to_html(step["doc"]) - - return info - - def append_to_log(self, message): - self.run.append_to_log(message) - - def set_current_step(self, message): - self.run.set_current_step(message) - - def add_error(self, exception, resource=None): - """Create a ``ProjectMessage`` ERROR record on the current `project`.""" - self.project.add_error( - model=self.pipeline_name, - exception=exception, - object_instance=resource, - ) - - @contextmanager - def save_errors(self, *exceptions, **kwargs): - """ - Context manager to save specified exceptions as ``ProjectMessage`` in the - database. - - - Example in a Pipeline step:: - - with self.save_errors(rootfs.DistroNotFound): - rootfs.scan_rootfs_for_system_packages(self.project, rfs) - - - Example when iterating over resources:: - - for resource in self.project.codebaseresources.all(): - with self.save_errors(Exception, resource=resource): - analyse(resource) - """ - try: - yield - except exceptions as error: - self.add_error(exception=error, **kwargs) - - -class Pipeline(ProjectPipeline): - """Alias for the ProjectPipeline class.""" - - pass - - -def is_pipeline(obj): - """ - Return True if the `obj` is a subclass of `Pipeline` except for the - `Pipeline` class itself. - """ - return inspect.isclass(obj) and issubclass(obj, Pipeline) and obj is not Pipeline - - -def profile(step): - """ - Profile a Pipeline step and save the results as HTML file in the project output - directory. - - Usage: - @profile - def step(self): - pass - """ - - @wraps(step) - def wrapper(*arg, **kwargs): - pipeline_instance = arg[0] - project = pipeline_instance.project - - with Profiler() as profiler: - result = step(*arg, **kwargs) - - output_file = project.get_output_file_path("profile", "html") - output_file.write_text(profiler.output_html()) - - pipeline_instance.log(f"Profiling results at {output_file.resolve()}") - - return result - - return wrapper +# SPDX-License-Identifier: Apache-2.0 +# +# http://nexb.com and https://github.com/aboutcode-org/scancode.io +# The ScanCode.io software is licensed under the Apache License version 2.0. +# Data generated with ScanCode.io is provided as-is without warranties. +# ScanCode is a trademark of nexB Inc. +# +# You may not use this software except in compliance with the License. +# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# +# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from +# ScanCode.io should be considered or used as legal advice. Consult an Attorney +# for any legal advice. +# +# ScanCode.io is a free software code scanning tool from nexB Inc. and others. +# Visit https://github.com/aboutcode-org/scancode.io for support and download. + +import hashlib +import inspect +import logging +import traceback +from contextlib import contextmanager +from functools import wraps +from pathlib import Path + +from django.conf import settings + +import bleach +from markdown_it import MarkdownIt +from pyinstrument import Profiler + +from aboutcode.pipeline import BasePipeline + +logger = logging.getLogger(__name__) + + +class InputFilesError(Exception): + """InputFile is missing or cannot be downloaded.""" + + def __init__(self, error_tracebacks): + self.error_tracebacks = error_tracebacks + super().__init__(self._generate_message()) + + def _generate_message(self): + message = "InputFilesError encountered with the following issues:\n" + for index, (error, tb) in enumerate(self.error_tracebacks, start=1): + message += f"\nError {index}: {str(error)}\n\n{tb}" + return message + + +def convert_markdown_to_html(markdown_text): + """Convert Markdown text to sanitized HTML.""" + # Using the "js-default" for safety. + html_content = MarkdownIt("js-default").renderInline(markdown_text) + # Sanitize HTML using bleach. + sanitized_html = bleach.clean(html_content) + return sanitized_html + + +class CommonStepsMixin: + """Common steps available on all project pipelines.""" + + def flag_empty_files(self): + """Flag empty files.""" + from scanpipe.pipes import flag + + flag.flag_empty_files(self.project) + + def flag_ignored_resources(self): + """Flag ignored resources based on Project ``ignored_patterns`` setting.""" + from scanpipe.pipes import flag + + ignored_patterns = self.env.get("ignored_patterns", []) + + if isinstance(ignored_patterns, str): + ignored_patterns = ignored_patterns.splitlines() + ignored_patterns.extend(flag.DEFAULT_IGNORED_PATTERNS) + + flag.flag_ignored_patterns( + codebaseresources=self.project.codebaseresources.no_status(), + patterns=ignored_patterns, + ) + + def extract_archive(self, location, target): + """Extract archive at `location` to `target`. Save errors as messages.""" + from scanpipe.pipes import scancode + + extract_errors = scancode.extract_archive(location, target) + + for resource_location, errors in extract_errors.items(): + resource_path = Path(resource_location) + + if resource_path.is_relative_to(self.project.codebase_path): + resource_path = resource_path.relative_to(self.project.codebase_path) + details = {"resource_path": str(resource_path)} + elif resource_path.is_relative_to(self.project.input_path): + resource_path = resource_path.relative_to(self.project.input_path) + details = {"path": f"input/{str(resource_path)}"} + else: + details = {"filename": str(resource_path.name)} + + self.project.add_error( + description="\n".join(errors), + model="extract_archive", + details=details, + ) + + def extract_archives(self, location=None): + """Extract archives located in the codebase/ directory with extractcode.""" + from scanpipe.pipes import scancode + + if not location: + location = self.project.codebase_path + + extract_errors = scancode.extract_archives(location=location, recurse=True) + + for resource_path, errors in extract_errors.items(): + self.project.add_error( + description="\n".join(errors), + model="extract_archives", + details={"resource_path": resource_path}, + ) + + # Reload the project env post-extraction as the scancode-config.yml file + # may be located in one of the extracted archives. + self.env = self.project.get_env() + + def download_missing_inputs(self): + """ + Download any InputSource missing on disk. + Raise an error if any of the uploaded files is not available or not reachable. + """ + error_tracebacks = [] + + for input_source in self.project.inputsources.all(): + if input_source.exists(): + continue + + if input_source.is_uploaded: + msg = f"Uploaded file {input_source} not available." + self.log(msg) + error_tracebacks.append((msg, "No traceback available.")) + continue + + download_url = input_source.download_url + if not download_url: + continue + + url_hash = hashlib.sha256(download_url.encode()).hexdigest() + filename = ( + input_source.filename + or Path(download_url).name + or f"{url_hash}.archive" + ) + archive_path = Path(settings.CENTRAL_ARCHIVE_PATH) / url_hash / filename + + if archive_path.exists(): + logger.info(f"Reusing existing archive at {archive_path}") + input_source.file_path = str(archive_path) + input_source.save() + continue + + self.log(f"Fetching input from {input_source.download_url}") + try: + input_source.fetch() + + except Exception as error: + traceback_str = traceback.format_exc() + logger.error(traceback_str) + self.log(f"{input_source.download_url} could not be fetched.") + error_tracebacks.append((str(error), traceback_str)) + + if error_tracebacks: + raise InputFilesError(error_tracebacks) + + +class ProjectPipeline(CommonStepsMixin, BasePipeline): + """Main class for all project related pipelines including common steps methods.""" + + # Flag specifying whether to download missing inputs as an initial step. + download_inputs = True + + # Optional URL that targets a view of the results relative to this Pipeline. + # This URL may contain dictionary-style string formatting, which will be + # interpolated against the project's field attributes. + # For example, you could use results_url="/project/{slug}/packages/?filter=value" + # to target the Package list view with an active filtering. + results_url = "" + + def __init__(self, run_instance): + """Load the Pipeline execution context from a Run database object.""" + self.run = run_instance + self.project = run_instance.project + self.env = self.project.get_env() + + self.pipeline_class = run_instance.pipeline_class + self.pipeline_name = run_instance.pipeline_name + + self.selected_groups = run_instance.selected_groups or [] + self.selected_steps = run_instance.selected_steps or [] + + self.ecosystem_config = None + + @classmethod + def get_initial_steps(cls): + """Add the ``download_inputs`` step as an initial step if enabled.""" + steps = [] + if cls.download_inputs: + steps.append(cls.download_missing_inputs) + return tuple(steps) + + @classmethod + def get_info(cls, as_html=False): + """Add the option to render the values as HTML.""" + info = super().get_info() + + if as_html: + info["summary"] = convert_markdown_to_html(info["summary"]) + info["description"] = convert_markdown_to_html(info["description"]) + for step in info["steps"]: + step["doc"] = convert_markdown_to_html(step["doc"]) + + return info + + def append_to_log(self, message): + self.run.append_to_log(message) + + def set_current_step(self, message): + self.run.set_current_step(message) + + def add_error(self, exception, resource=None): + """Create a ``ProjectMessage`` ERROR record on the current `project`.""" + self.project.add_error( + model=self.pipeline_name, + exception=exception, + object_instance=resource, + ) + + @contextmanager + def save_errors(self, *exceptions, **kwargs): + """ + Context manager to save specified exceptions as ``ProjectMessage`` in the + database. + + - Example in a Pipeline step:: + + with self.save_errors(rootfs.DistroNotFound): + rootfs.scan_rootfs_for_system_packages(self.project, rfs) + + - Example when iterating over resources:: + + for resource in self.project.codebaseresources.all(): + with self.save_errors(Exception, resource=resource): + analyse(resource) + """ + try: + yield + except exceptions as error: + self.add_error(exception=error, **kwargs) + + +class Pipeline(ProjectPipeline): + """Alias for the ProjectPipeline class.""" + + pass + + +def is_pipeline(obj): + """ + Return True if the `obj` is a subclass of `Pipeline` except for the + `Pipeline` class itself. + """ + return inspect.isclass(obj) and issubclass(obj, Pipeline) and obj is not Pipeline + + +def profile(step): + """ + Profile a Pipeline step and save the results as HTML file in the project output + directory. + + Usage: + @profile + def step(self): + pass + """ + + @wraps(step) + def wrapper(*arg, **kwargs): + pipeline_instance = arg[0] + project = pipeline_instance.project + + with Profiler() as profiler: + result = step(*arg, **kwargs) + + output_file = project.get_output_file_path("profile", "html") + output_file.write_text(profiler.output_html()) + + pipeline_instance.log(f"Profiling results at {output_file.resolve()}") + + return result + + return wrapper diff --git a/scanpipe/pipes/input.py b/scanpipe/pipes/input.py index 58ec2e5c96..a7f0edee9c 100644 --- a/scanpipe/pipes/input.py +++ b/scanpipe/pipes/input.py @@ -1,239 +1,298 @@ -# SPDX-License-Identifier: Apache-2.0 -# -# http://nexb.com and https://github.com/aboutcode-org/scancode.io -# The ScanCode.io software is licensed under the Apache License version 2.0. -# Data generated with ScanCode.io is provided as-is without warranties. -# ScanCode is a trademark of nexB Inc. -# -# You may not use this software except in compliance with the License. -# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -# CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. -# -# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND, either express or implied. No content created from -# ScanCode.io should be considered or used as legal advice. Consult an Attorney -# for any legal advice. -# -# ScanCode.io is a free software code scanning tool from nexB Inc. and others. -# Visit https://github.com/aboutcode-org/scancode.io for support and download. - -import os -import shutil -from pathlib import Path - -from django.core.exceptions import FieldDoesNotExist -from django.core.validators import EMPTY_VALUES -from django.db import models - -import openpyxl -from typecode.contenttype import get_type - -from scanpipe import pipes -from scanpipe.models import CodebaseRelation -from scanpipe.models import CodebaseResource -from scanpipe.models import DiscoveredDependency -from scanpipe.models import DiscoveredLicense -from scanpipe.models import DiscoveredPackage -from scanpipe.pipes import scancode -from scanpipe.pipes.output import mappings_key_by_fieldname - - -def copy_input(input_location, dest_path): - """Copy the ``input_location`` (file or directory) to the ``dest_path``.""" - input_path = Path(input_location) - destination_dir = Path(dest_path) - destination = destination_dir / input_path.name - - if input_path.is_dir(): - shutil.copytree(input_location, destination) - else: - if not os.path.exists(destination_dir): - os.makedirs(destination_dir) - shutil.copyfile(input_location, destination) - - return destination - - -def copy_inputs(input_locations, dest_path): - """Copy the provided ``input_locations`` to the ``dest_path``.""" - for input_location in input_locations: - copy_input(input_location, dest_path) - - -def move_input(input_location, dest_path): - """Move the provided ``input_location`` to the ``dest_path``.""" - destination = dest_path / Path(input_location).name - return shutil.move(input_location, destination) - - -def move_inputs(inputs, dest_path): - """Move the provided ``inputs`` to the ``dest_path``.""" - for input_location in inputs: - move_input(input_location, dest_path) - - -def get_tool_name_from_scan_headers(scan_data): - """Return the ``tool_name`` of the first header in the provided ``scan_data``.""" - if headers := scan_data.get("headers", []): - first_header = headers[0] - tool_name = first_header.get("tool_name", "") - return tool_name - - -def get_extra_data_from_scan_headers(scan_data): - """Return the ``extra_data`` of the first header in the provided ``scan_data``.""" - if headers := scan_data.get("headers", []): - first_header = headers[0] - if extra_data := first_header.get("extra_data"): - return extra_data - - -def is_archive(location): - """Return True if the file at ``location`` is an archive.""" - return get_type(location).is_archive - - -def load_inventory_from_toolkit_scan(project, input_location): - """ - Create license detections, packages, dependencies, and resources - loaded from the ScanCode-toolkit scan results located at ``input_location``. - """ - scanned_codebase = scancode.get_virtual_codebase(project, input_location) - scancode.create_discovered_licenses(project, scanned_codebase) - scancode.create_discovered_packages(project, scanned_codebase) - scancode.create_codebase_resources(project, scanned_codebase) - scancode.create_discovered_dependencies( - project, scanned_codebase, strip_datafile_path_root=True - ) - scancode.load_todo_issues(project, scanned_codebase) - - -def load_inventory_from_scanpipe(project, scan_data, extra_data_prefix=None): - """ - Create packages, dependencies, license detections, resources, and relations - loaded from a ScanCode.io JSON output provided as ``scan_data``. - - An ``extra_data_prefix`` can be provided in case multiple input files are loaded - into the same project. The prefix is usually the filename of the input. - """ - for detection_data in scan_data.get("license_detections", []): - pipes.update_or_create_license_detection(project, detection_data) - - for package_data in scan_data.get("packages", []): - pipes.update_or_create_package(project, package_data) - - for resource_data in scan_data.get("files", []): - pipes.update_or_create_resource(project, resource_data) - - for dependency_data in scan_data.get("dependencies", []): - pipes.update_or_create_dependency(project, dependency_data) - - for relation_data in scan_data.get("relations", []): - pipes.get_or_create_relation(project, relation_data) - - if extra_data := get_extra_data_from_scan_headers(scan_data): - if extra_data_prefix: - extra_data = {extra_data_prefix: extra_data} - project.update_extra_data(extra_data) - - -model_to_object_maker_func = { - DiscoveredPackage: pipes.update_or_create_package, - DiscoveredDependency: pipes.update_or_create_dependency, - DiscoveredLicense: pipes.update_or_create_license_detection, - CodebaseResource: pipes.update_or_create_resource, - CodebaseRelation: pipes.get_or_create_relation, -} - -worksheet_name_to_model = { - "PACKAGES": DiscoveredPackage, - "LICENSE_DETECTIONS": DiscoveredLicense, - "RESOURCES": CodebaseResource, - "DEPENDENCIES": DiscoveredDependency, - "RELATIONS": CodebaseRelation, -} - - -def get_worksheet_data(worksheet): - """Return the data from provided ``worksheet`` as a list of dict.""" - try: - header = [cell.value for cell in next(worksheet.rows)] - except StopIteration: - return {} - - worksheet_data = [ - dict(zip(header, row)) - for row in worksheet.iter_rows(min_row=2, values_only=True) - ] - return worksheet_data - - -def clean_xlsx_field_value(model_class, field_name, value): - """Clean the ``value`` for compatibility with the database ``model_class``.""" - if value in EMPTY_VALUES: - return - - if field_name == "for_packages": - return value.splitlines() - - elif field_name in ["purl", "for_package_uid", "datafile_path"]: - return value - - try: - field = model_class._meta.get_field(field_name) - except FieldDoesNotExist: - return - - if dict_key := mappings_key_by_fieldname.get(field_name): - return [{dict_key: entry} for entry in value.splitlines()] - - elif isinstance(field, models.JSONField): - if field.default is list: - return value.splitlines() - elif field.default is dict: - return # dict stored as JSON are not supported - - return value - - -def clean_xlsx_data_to_model_data(model_class, xlsx_data): - """Clean the ``xlsx_data`` for compatibility with the database ``model_class``.""" - cleaned_data = {} - - for field_name, value in xlsx_data.items(): - if cleaned_value := clean_xlsx_field_value(model_class, field_name, value): - cleaned_data[field_name] = cleaned_value - - return cleaned_data - - -def load_inventory_from_xlsx(project, input_location, extra_data_prefix=None): - """ - Create packages, dependencies, resources, and relations loaded from XLSX file - located at ``input_location``. - - An ``extra_data_prefix`` can be provided in case multiple input files are loaded - into the same project. The prefix is usually the filename of the input. - """ - workbook = openpyxl.load_workbook(input_location, read_only=True, data_only=True) - - for worksheet_name, model_class in worksheet_name_to_model.items(): - if worksheet_name not in workbook: - continue - - worksheet_data = get_worksheet_data(worksheet=workbook[worksheet_name]) - for row_data in worksheet_data: - object_maker_func = model_to_object_maker_func.get(model_class) - cleaned_data = clean_xlsx_data_to_model_data(model_class, row_data) - if cleaned_data: - object_maker_func(project, cleaned_data) - - if "LAYERS" in workbook: - layers_data = get_worksheet_data(worksheet=workbook["LAYERS"]) - extra_data = {"layers": layers_data} - if extra_data_prefix: - extra_data = {extra_data_prefix: extra_data} - project.update_extra_data(extra_data) +# SPDX-License-Identifier: Apache-2.0 +# +# http://nexb.com and https://github.com/aboutcode-org/scancode.io +# The ScanCode.io software is licensed under the Apache License version 2.0. +# Data generated with ScanCode.io is provided as-is without warranties. +# ScanCode is a trademark of nexB Inc. +# +# You may not use this software except in compliance with the License. +# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# +# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from +# ScanCode.io should be considered or used as legal advice. Consult an Attorney +# for any legal advice. +# +# ScanCode.io is a free software code scanning tool from nexB Inc. and others. +# Visit https://github.com/aboutcode-org/scancode.io for support and download. + +import logging +import os +import shutil +from pathlib import Path + +from django.core.exceptions import FieldDoesNotExist +from django.core.validators import EMPTY_VALUES +from django.db import models + +import openpyxl +import requests +from typecode.contenttype import get_type + +from scanpipe import pipes +from scanpipe.models import CodebaseRelation +from scanpipe.models import CodebaseResource +from scanpipe.models import DiscoveredDependency +from scanpipe.models import DiscoveredLicense +from scanpipe.models import DiscoveredPackage +from scanpipe.models import InputSource +from scanpipe.pipes import scancode +from scanpipe.pipes.output import mappings_key_by_fieldname + +logger = logging.getLogger(__name__) + + +def copy_input(input_location, dest_path): + """Copy the ``input_location`` (file or directory) to the ``dest_path``.""" + input_path = Path(input_location) + destination_dir = Path(dest_path) + destination = destination_dir / input_path.name + + if input_path.is_dir(): + shutil.copytree(input_location, destination) + else: + if not os.path.exists(destination_dir): + os.makedirs(destination_dir) + shutil.copyfile(input_location, destination) + + return destination + + +def copy_inputs(input_locations, dest_path): + """Copy the provided ``input_locations`` to the ``dest_path``.""" + for input_location in input_locations: + copy_input(input_location, dest_path) + + +def move_input(input_location, dest_path): + """Move the provided ``input_location`` to the ``dest_path``.""" + destination = dest_path / Path(input_location).name + return shutil.move(input_location, destination) + + +def move_inputs(inputs, dest_path): + """Move the provided ``inputs`` to the ``dest_path``.""" + for input_location in inputs: + move_input(input_location, dest_path) + + +def get_tool_name_from_scan_headers(scan_data): + """Return the ``tool_name`` of the first header in the provided ``scan_data``.""" + if headers := scan_data.get("headers", []): + first_header = headers[0] + tool_name = first_header.get("tool_name", "") + return tool_name + + +def get_extra_data_from_scan_headers(scan_data): + """Return the ``extra_data`` of the first header in the provided ``scan_data``.""" + if headers := scan_data.get("headers", []): + first_header = headers[0] + if extra_data := first_header.get("extra_data"): + return extra_data + + +def is_archive(location): + """Return True if the file at ``location`` is an archive.""" + return get_type(location).is_archive + + +def load_inventory_from_toolkit_scan(project, input_location): + """ + Create license detections, packages, dependencies, and resources + loaded from the ScanCode-toolkit scan results located at ``input_location``. + """ + scanned_codebase = scancode.get_virtual_codebase(project, input_location) + scancode.create_discovered_licenses(project, scanned_codebase) + scancode.create_discovered_packages(project, scanned_codebase) + scancode.create_codebase_resources(project, scanned_codebase) + scancode.create_discovered_dependencies( + project, scanned_codebase, strip_datafile_path_root=True + ) + scancode.load_todo_issues(project, scanned_codebase) + + +def load_inventory_from_scanpipe(project, scan_data, extra_data_prefix=None): + """ + Create packages, dependencies, license detections, resources, and relations + loaded from a ScanCode.io JSON output provided as ``scan_data``. + + An ``extra_data_prefix`` can be provided in case multiple input files are loaded + into the same project. The prefix is usually the filename of the input. + """ + for detection_data in scan_data.get("license_detections", []): + pipes.update_or_create_license_detection(project, detection_data) + + for package_data in scan_data.get("packages", []): + pipes.update_or_create_package(project, package_data) + + for resource_data in scan_data.get("files", []): + pipes.update_or_create_resource(project, resource_data) + + for dependency_data in scan_data.get("dependencies", []): + pipes.update_or_create_dependency(project, dependency_data) + + for relation_data in scan_data.get("relations", []): + pipes.get_or_create_relation(project, relation_data) + + if extra_data := get_extra_data_from_scan_headers(scan_data): + if extra_data_prefix: + extra_data = {extra_data_prefix: extra_data} + project.update_extra_data(extra_data) + + +model_to_object_maker_func = { + DiscoveredPackage: pipes.update_or_create_package, + DiscoveredDependency: pipes.update_or_create_dependency, + DiscoveredLicense: pipes.update_or_create_license_detection, + CodebaseResource: pipes.update_or_create_resource, + CodebaseRelation: pipes.get_or_create_relation, +} + +worksheet_name_to_model = { + "PACKAGES": DiscoveredPackage, + "LICENSE_DETECTIONS": DiscoveredLicense, + "RESOURCES": CodebaseResource, + "DEPENDENCIES": DiscoveredDependency, + "RELATIONS": CodebaseRelation, +} + + +def get_worksheet_data(worksheet): + """Return the data from provided ``worksheet`` as a list of dict.""" + try: + header = [cell.value for cell in next(worksheet.rows)] + except StopIteration: + return {} + + worksheet_data = [ + dict(zip(header, row)) + for row in worksheet.iter_rows(min_row=2, values_only=True) + ] + return worksheet_data + + +def clean_xlsx_field_value(model_class, field_name, value): + """Clean the ``value`` for compatibility with the database ``model_class``.""" + if value in EMPTY_VALUES: + return + + if field_name == "for_packages": + return value.splitlines() + + elif field_name in ["purl", "for_package_uid", "datafile_path"]: + return value + + try: + field = model_class._meta.get_field(field_name) + except FieldDoesNotExist: + return + + if dict_key := mappings_key_by_fieldname.get(field_name): + return [{dict_key: entry} for entry in value.splitlines()] + + elif isinstance(field, models.JSONField): + if field.default is list: + return value.splitlines() + elif field.default is dict: + return # dict stored as JSON are not supported + + return value + + +def clean_xlsx_data_to_model_data(model_class, xlsx_data): + """Clean the ``xlsx_data`` for compatibility with the database ``model_class``.""" + cleaned_data = {} + + for field_name, value in xlsx_data.items(): + if cleaned_value := clean_xlsx_field_value(model_class, field_name, value): + cleaned_data[field_name] = cleaned_value + + return cleaned_data + + +def load_inventory_from_xlsx(project, input_location, extra_data_prefix=None): + """ + Create packages, dependencies, resources, and relations loaded from XLSX file + located at ``input_location``. + + An ``extra_data_prefix`` can be provided in case multiple input files are loaded + into the same project. The prefix is usually the filename of the input. + """ + workbook = openpyxl.load_workbook(input_location, read_only=True, data_only=True) + + for worksheet_name, model_class in worksheet_name_to_model.items(): + if worksheet_name not in workbook: + continue + + worksheet_data = get_worksheet_data(worksheet=workbook[worksheet_name]) + for row_data in worksheet_data: + object_maker_func = model_to_object_maker_func.get(model_class) + cleaned_data = clean_xlsx_data_to_model_data(model_class, row_data) + if cleaned_data: + object_maker_func(project, cleaned_data) + + if "LAYERS" in workbook: + layers_data = get_worksheet_data(worksheet=workbook["LAYERS"]) + extra_data = {"layers": layers_data} + if extra_data_prefix: + extra_data = {extra_data_prefix: extra_data} + project.update_extra_data(extra_data) + + +def add_input_from_url(project, url, filename=None): + """ + Download the file from the provided ``url`` and add it as an InputSource for the + specified ``project``. Optionally, specify a ``filename`` for the downloaded file. + If archiving is enabled, store the content in the DownloadStore and save metadata. + """ + try: + response = requests.get(url, stream=True, timeout=30) + response.raise_for_status() + content = response.content + except requests.RequestException as e: + logger.error(f"Failed to download {url}: {e}") + raise + + filename = filename or url.split("/")[-1] or "downloaded_file" + input_path = project.input_path / filename + + try: + input_path.parent.mkdir(parents=True, exist_ok=True) + with open(input_path, "wb") as f: + f.write(content) + InputSource.objects.create( + project=project, + filename=filename, + download_url=url, + is_uploaded=False, + ) + except Exception as e: + logger.error(f"Failed to save {filename} to {input_path}: {e}") + raise + + +def add_input_from_upload(project, uploaded_file): + """ + Add an uploaded file as an InputSource for the specified ``project``. + If archiving is enabled, store the content in the DownloadStore and save metadata. + """ + content = uploaded_file.read() + filename = uploaded_file.name + input_path = project.input_path / filename + try: + input_path.parent.mkdir(parents=True, exist_ok=True) + with open(input_path, "wb") as f: + f.write(content) + InputSource.objects.create( + project=project, + filename=filename, + is_uploaded=True, + ) + except Exception as e: + logger.error(f"Failed to save {filename} to {input_path}: {e}") + raise diff --git a/scanpipe/tests/data/test-downloads/sample.tar.gz b/scanpipe/tests/data/test-downloads/sample.tar.gz new file mode 100644 index 0000000000..e83f605c86 Binary files /dev/null and b/scanpipe/tests/data/test-downloads/sample.tar.gz differ diff --git a/scanpipe/tests/test_archiving.py b/scanpipe/tests/test_archiving.py new file mode 100644 index 0000000000..0da1a236b5 --- /dev/null +++ b/scanpipe/tests/test_archiving.py @@ -0,0 +1,86 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# http://nexb.com and https://github.com/aboutcode-org/scancode.io +# The ScanCode.io software is licensed under the Apache License version 2.0. +# Data generated with ScanCode.io is provided as-is without warranties. +# ScanCode is a trademark of nexB Inc. +# +# You may not use this software except in compliance with the License. +# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# +# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from +# ScanCode.io should be considered or used as legal advice. Consult an Attorney +# for any legal advice. +# +# ScanCode.io is a free software code scanning tool from nexB Inc. and others. +# Visit https://github.com/aboutcode-org/scancode.io for support and download. + + +import hashlib +from pathlib import Path + +from django.test import TestCase + +from scanpipe.archiving import LocalFilesystemProvider +from scanpipe.tests import make_project + + +class TestArchiving(TestCase): + def setUp(self): + self.project = make_project() + self.root_path = Path(__file__).parent / "data" / "test_downloads" + self.store = LocalFilesystemProvider(root_path=self.root_path) + self.test_content = b"test content" + self.test_url = "https://files.pythonhosted.org/packages/sample.tar.gz" + self.test_filename = "sample.tar.gz" + + def tearDown(self): + if self.root_path.exists(): + import shutil + + shutil.rmtree(self.root_path) + + def test_local_filesystem_provider_put_get(self): + download = self.store.put( + content=self.test_content, + download_url=self.test_url, + download_date="2025-08-21T09:00:00", + filename=self.test_filename, + ) + sha256 = hashlib.sha256(self.test_content).hexdigest() + self.assertEqual(download.sha256, sha256) + self.assertEqual(download.download_url, self.test_url) + self.assertEqual(download.filename, self.test_filename) + self.assertEqual(download.download_date, "2025-08-21T09:00:00") + content_path = ( + self.root_path / sha256[:2] / sha256[2:4] / sha256[4:] / "content" + ) + self.assertTrue(content_path.exists()) + with open(content_path, "rb") as f: + self.assertEqual(f.read(), self.test_content) + + retrieved = self.store.get(sha256) + self.assertEqual(retrieved.sha256, sha256) + self.assertEqual(retrieved.download_url, self.test_url) + self.assertEqual(retrieved.filename, self.test_filename) + + def test_local_filesystem_provider_deduplication(self): + download1 = self.store.put( + content=self.test_content, + download_url=self.test_url, + download_date="2025-08-21T09:00:00", + filename=self.test_filename, + ) + download2 = self.store.put( + content=self.test_content, + download_url="https://files.pythonhosted.org/packages/another.tar.gz", + download_date="2025-08-21T10:00:00", + filename="another.tar.gz", + ) + self.assertEqual(download1.sha256, download2.sha256) + self.assertEqual(download1.download_url, self.test_url) diff --git a/scanpipe/tests/test_input.py b/scanpipe/tests/test_input.py new file mode 100644 index 0000000000..539474a87c --- /dev/null +++ b/scanpipe/tests/test_input.py @@ -0,0 +1,94 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# http://nexb.com and https://github.com/aboutcode-org/scancode.io +# The ScanCode.io software is licensed under the Apache License version 2.0. +# Data generated with ScanCode.io is provided as-is without warranties. +# ScanCode is a trademark of nexB Inc. +# +# You may not use this software except in compliance with the License. +# You may obtain a copy of the License at: +# http://apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# +# Data Generated with ScanCode.io is provided on an +# "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. No content created from +# ScanCode.io should be considered or used as legal advice. Consult an Attorney +# for any legal advice. +# +# ScanCode.io is a free software code scanning tool from nexB Inc. and others. +# Visit https://github.com/aboutcode-org/scancode.io for support and download. + + +from pathlib import Path +from unittest.mock import Mock +from unittest.mock import patch + +from django.core.files.uploadedfile import SimpleUploadedFile +from django.test import TestCase + +from scanpipe.models import InputSource +from scanpipe.pipes.input import add_input_from_upload +from scanpipe.pipes.input import add_input_from_url +from scanpipe.tests import make_project + + +class TestInput(TestCase): + def setUp(self): + self.project = make_project() + self.test_filename = "sample.tar.gz" + self.test_data_path = ( + Path(__file__).parent / "data" / "test-downloads" / self.test_filename + ) + with open(self.test_data_path, "rb") as f: + self.test_content = f.read() + + @patch("requests.get") + def test_add_input_from_url(self, mock_get): + test_url = "https://example.com/test.tar.gz" + mock_response = Mock() + mock_response.content = self.test_content + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + add_input_from_url(self.project, test_url, filename=self.test_filename) + input_source = InputSource.objects.get(project=self.project) + self.assertEqual(input_source.download_url, test_url) + self.assertEqual(input_source.filename, self.test_filename) + self.assertFalse(input_source.is_uploaded) + self.assertTrue((self.project.input_path / self.test_filename).exists()) + + def test_add_input_from_upload(self): + uploaded_file = SimpleUploadedFile(self.test_filename, self.test_content) + add_input_from_upload(self.project, uploaded_file) + input_source = InputSource.objects.get(project=self.project) + self.assertEqual(input_source.filename, self.test_filename) + self.assertEqual(input_source.download_url, "") + self.assertTrue(input_source.is_uploaded) + self.assertTrue((self.project.input_path / self.test_filename).exists()) + + @patch("requests.get") + def test_add_input_from_url_fallback(self, mock_get): + test_url = "https://example.com/test.tar.gz" + mock_response = Mock() + mock_response.content = self.test_content + mock_response.raise_for_status.return_value = None + mock_get.return_value = mock_response + add_input_from_url(self.project, test_url, filename=self.test_filename) + input_source = InputSource.objects.get(project=self.project) + self.assertEqual(input_source.download_url, test_url) + self.assertEqual(input_source.filename, self.test_filename) + self.assertFalse(input_source.is_uploaded) + self.assertTrue((self.project.input_path / self.test_filename).exists()) + + def test_add_input_from_upload_fallback(self): + uploaded_file = SimpleUploadedFile(self.test_filename, self.test_content) + add_input_from_upload(self.project, uploaded_file) + input_source = InputSource.objects.get(project=self.project) + self.assertEqual(input_source.filename, self.test_filename) + self.assertEqual(input_source.download_url, "") + self.assertTrue(input_source.is_uploaded) + self.assertTrue((self.project.input_path / self.test_filename).exists()) diff --git a/scanpipe/tests/test_pipelines.py b/scanpipe/tests/test_pipelines.py index 3acfcf28f3..03dd1ff1f2 100644 --- a/scanpipe/tests/test_pipelines.py +++ b/scanpipe/tests/test_pipelines.py @@ -285,7 +285,8 @@ def mock_make_to_path(**kwargs): self.assertEqual("scancode.io.git", input_source.filename) self.assertTrue(input_source.exists()) - def test_scanpipe_pipeline_class_save_errors_context_manager(self): + @mock.patch("requests.get") + def test_scanpipe_pipeline_class_save_errors_context_manager(self, *args, **kwargs): project1 = make_project() run = project1.add_pipeline("do_nothing") pipeline = run.make_pipeline_instance() @@ -1388,10 +1389,9 @@ def test_scanpipe_fetch_scores_pipeline_integration(self, mock_is_available): "scoring_tool_documentation_url": "https://github.com/[trunc...]", "score_date": "2025-07-24T18:50:16Z", } - with mock.patch("scorecode.ossf_scorecard.fetch_scorecard_info") as fetch: + with mock.patch("scorecode.ossf_scorecard.fetch_scorecard") as fetch: fetch.return_value = PackageScore(**package_score_data) - exitcode, out = pipeline.execute() - + exitcode, out = pipeline.execute() self.assertEqual(0, exitcode, msg=out) package1.refresh_from_db() @@ -2001,28 +2001,3 @@ def test_scanpipe_enrich_with_purldb_pipeline_integration( run.refresh_from_db() self.assertIn("pkg:npm/csvtojson@2.0.10 ['release_date'", run.log) self.assertIn("1 discovered package enriched with the PurlDB.", run.log) - - def test_scanpipe_benchmark_purls_pipeline_integration(self): - project1 = make_project(name="Analysis") - - file_location = self.data / "benchmark" / "scancodeio_alpine_3.22.1.cdx.json" - project1.copy_input_from(file_location) - file_location = self.data / "benchmark" / "alpine-3.22.1-expected-purls.txt" - project1.copy_input_from(file_location) - - run = project1.add_pipeline(pipeline_name="load_sbom") - pipeline = run.make_pipeline_instance() - pipeline.execute() - self.assertEqual(2, project1.codebaseresources.count()) - self.assertEqual(16, project1.discoveredpackages.count()) - - run = project1.add_pipeline(pipeline_name="benchmark_purls") - pipeline = run.make_pipeline_instance() - exitcode, out = pipeline.execute() - self.assertEqual(0, exitcode, msg=out) - - result_file = project1.get_latest_output( - filename="benchmark_purls", extension="txt" - ) - expected_file = self.data / "benchmark" / "alpine-3.22.1-expected-benchmark.txt" - self.assertEqual(expected_file.read_text(), result_file.read_text())