diff --git a/src/scanoss/cli.py b/src/scanoss/cli.py index ab0f2afa..08ddd6b3 100644 --- a/src/scanoss/cli.py +++ b/src/scanoss/cli.py @@ -29,7 +29,9 @@ from .scanner import Scanner +from .scanoss_settings import ScanossSettings from .scancodedeps import ScancodeDeps +from .scanner import FAST_WINNOWING, Scanner from .scantype import ScanType from .filecount import FileCount from .cyclonedx import CycloneDx @@ -102,6 +104,11 @@ def setup_args() -> None: help='Scancode command and path if required (optional - default scancode).') p_scan.add_argument('--sc-timeout', type=int, default=600, help='Timeout (in seconds) for scancode to complete (optional - default 600)') + p_scan.add_argument( + '--settings', + type=str, + help='Settings file to use for scanning (optional - default scanoss.json)', + ) # Sub-command: fingerprint p_wfp = subparsers.add_parser('fingerprint', aliases=['fp', 'wfp'], @@ -489,42 +496,70 @@ def scan(parser, args): args: Namespace Parsed arguments """ - if not args.scan_dir and not args.wfp and not args.stdin and not args.dep and not args.files: - print_stderr('Please specify a file/folder, files (--files), fingerprint (--wfp), dependency (--dep), or STDIN (--stdin)') + if ( + not args.scan_dir + and not args.wfp + and not args.stdin + and not args.dep + and not args.files + ): + print_stderr( + 'Please specify a file/folder, files (--files), fingerprint (--wfp), dependency (--dep), or STDIN (--stdin)' + ) parser.parse_args([args.subparser, '-h']) exit(1) if args.pac and args.proxy: print_stderr('Please specify one of --proxy or --pac, not both') parser.parse_args([args.subparser, '-h']) exit(1) - scan_type: str = None - sbom_path: str = None + + if args.identify and args.settings: + print_stderr(f'ERROR: Cannot specify both --identify and --settings options.') + exit(1) + + def is_valid_file(file_path: str) -> bool: + if not os.path.exists(file_path) or not os.path.isfile(file_path): + print_stderr(f'Specified file does not exist or is not a file: {file_path}') + return False + if not Scanner.valid_json_file(file_path): + return False + return True + + scan_settings = ScanossSettings( + debug=args.debug, trace=args.trace, quiet=args.quiet + ) + if args.identify: - sbom_path = args.identify - scan_type = 'identify' - if not os.path.exists(sbom_path) or not os.path.isfile(sbom_path): - print_stderr(f'Specified --identify file does not exist or is not a file: {sbom_path}') - exit(1) - if not Scanner.valid_json_file(sbom_path): # Make sure it's a valid JSON file + if not is_valid_file(args.identify) or args.ignore: exit(1) - if args.ignore: - print_stderr(f'Warning: Specified --identify and --ignore options. Skipping ignore.') + scan_settings.load_json_file(args.identify).set_file_type( + 'legacy' + ).set_scan_type('identify') elif args.ignore: - sbom_path = args.ignore - scan_type = 'blacklist' - if not os.path.exists(sbom_path) or not os.path.isfile(sbom_path): - print_stderr(f'Specified --ignore file does not exist or is not a file: {sbom_path}') + if not is_valid_file(args.ignore): exit(1) - if not Scanner.valid_json_file(sbom_path): # Make sure it's a valid JSON file + scan_settings.load_json_file(args.ignore).set_file_type('legacy').set_scan_type( + 'blacklist' + ) + elif args.settings: + if not is_valid_file(args.settings): exit(1) + scan_settings.load_json_file(args.settings).set_file_type('new').set_scan_type( + 'identify' + ) + if args.dep: if not os.path.exists(args.dep) or not os.path.isfile(args.dep): - print_stderr(f'Specified --dep file does not exist or is not a file: {args.dep}') + print_stderr( + f'Specified --dep file does not exist or is not a file: {args.dep}' + ) exit(1) if not Scanner.valid_json_file(args.dep): # Make sure it's a valid JSON file exit(1) if args.strip_hpsm and not args.hpsm and not args.quiet: - print_stderr(f'Warning: --strip-hpsm option supplied without enabling HPSM (--hpsm). Ignoring.') + print_stderr( + f'Warning: --strip-hpsm option supplied without enabling HPSM (--hpsm). Ignoring.' + ) scan_output: str = None if args.output: @@ -563,37 +598,72 @@ def scan(parser, args): print_stderr(f'Using flags {flags}...') elif not args.quiet: if args.timeout < 5: - print_stderr(f'POST timeout (--timeout) too small: {args.timeout}. Reverting to default.') + print_stderr( + f'POST timeout (--timeout) too small: {args.timeout}. Reverting to default.' + ) if args.retry < 0: - print_stderr(f'POST retry (--retry) too small: {args.retry}. Reverting to default.') + print_stderr( + f'POST retry (--retry) too small: {args.retry}. Reverting to default.' + ) - if not os.access(os.getcwd(), os.W_OK): # Make sure the current directory is writable. If not disable saving WFP + if not os.access( + os.getcwd(), os.W_OK + ): # Make sure the current directory is writable. If not disable saving WFP print_stderr(f'Warning: Current directory is not writable: {os.getcwd()}') args.no_wfp_output = True if args.ca_cert and not os.path.exists(args.ca_cert): print_stderr(f'Error: Certificate file does not exist: {args.ca_cert}.') exit(1) pac_file = get_pac_file(args.pac) - scan_options = get_scan_options(args) # Figure out what scanning options we have - - scanner = Scanner(debug=args.debug, trace=args.trace, quiet=args.quiet, api_key=args.key, url=args.apiurl, - sbom_path=sbom_path, scan_type=scan_type, scan_output=scan_output, output_format=output_format, - flags=flags, nb_threads=args.threads, post_size=args.post_size, - timeout=args.timeout, no_wfp_file=args.no_wfp_output, all_extensions=args.all_extensions, - all_folders=args.all_folders, hidden_files_folders=args.all_hidden, - scan_options=scan_options, sc_timeout=args.sc_timeout, sc_command=args.sc_command, - grpc_url=args.api2url, obfuscate=args.obfuscate, - ignore_cert_errors=args.ignore_cert_errors, proxy=args.proxy, grpc_proxy=args.grpc_proxy, - pac=pac_file, ca_cert=args.ca_cert, retry=args.retry, hpsm=args.hpsm, - skip_size=args.skip_size, skip_extensions=args.skip_extension, skip_folders=args.skip_folder, - skip_md5_ids=args.skip_md5, strip_hpsm_ids=args.strip_hpsm, strip_snippet_ids=args.strip_snippet - ) + scan_options = get_scan_options(args) # Figure out what scanning options we have + + scanner = Scanner( + debug=args.debug, + trace=args.trace, + quiet=args.quiet, + api_key=args.key, + url=args.apiurl, + scan_output=scan_output, + output_format=output_format, + flags=flags, + nb_threads=args.threads, + post_size=args.post_size, + timeout=args.timeout, + no_wfp_file=args.no_wfp_output, + all_extensions=args.all_extensions, + all_folders=args.all_folders, + hidden_files_folders=args.all_hidden, + scan_options=scan_options, + sc_timeout=args.sc_timeout, + sc_command=args.sc_command, + grpc_url=args.api2url, + obfuscate=args.obfuscate, + ignore_cert_errors=args.ignore_cert_errors, + proxy=args.proxy, + grpc_proxy=args.grpc_proxy, + pac=pac_file, + ca_cert=args.ca_cert, + retry=args.retry, + hpsm=args.hpsm, + skip_size=args.skip_size, + skip_extensions=args.skip_extension, + skip_folders=args.skip_folder, + skip_md5_ids=args.skip_md5, + strip_hpsm_ids=args.strip_hpsm, + strip_snippet_ids=args.strip_snippet, + scan_settings=scan_settings + ) + if args.wfp: if not scanner.is_file_or_snippet_scan(): - print_stderr(f'Error: Cannot specify WFP scanning if file/snippet options are disabled ({scan_options})') + print_stderr( + f'Error: Cannot specify WFP scanning if file/snippet options are disabled ({scan_options})' + ) exit(1) if scanner.is_dependency_scan() and not args.dep: - print_stderr(f'Error: Cannot specify WFP & Dependency scanning without a dependency file (--dep)') + print_stderr( + f'Error: Cannot specify WFP & Dependency scanning without a dependency file (--dep)' + ) exit(1) scanner.scan_wfp_with_options(args.wfp, args.dep) elif args.stdin: @@ -601,26 +671,40 @@ def scan(parser, args): if not scanner.scan_contents(args.stdin, contents): exit(1) elif args.files: - if not scanner.scan_files_with_options(args.files, args.dep, scanner.winnowing.file_map): + if not scanner.scan_files_with_options( + args.files, args.dep, scanner.winnowing.file_map + ): exit(1) elif args.scan_dir: if not os.path.exists(args.scan_dir): - print_stderr(f'Error: File or folder specified does not exist: {args.scan_dir}.') + print_stderr( + f'Error: File or folder specified does not exist: {args.scan_dir}.' + ) exit(1) if os.path.isdir(args.scan_dir): - if not scanner.scan_folder_with_options(args.scan_dir, args.dep, scanner.winnowing.file_map): + if not scanner.scan_folder_with_options( + args.scan_dir, args.dep, scanner.winnowing.file_map + ): exit(1) elif os.path.isfile(args.scan_dir): - if not scanner.scan_file_with_options(args.scan_dir, args.dep, scanner.winnowing.file_map): + if not scanner.scan_file_with_options( + args.scan_dir, args.dep, scanner.winnowing.file_map + ): exit(1) else: - print_stderr(f'Error: Path specified is neither a file or a folder: {args.scan_dir}.') + print_stderr( + f'Error: Path specified is neither a file or a folder: {args.scan_dir}.' + ) exit(1) elif args.dep: if not args.dependencies_only: - print_stderr(f'Error: No file or folder specified to scan. Please add --dependencies-only to decorate dependency file only.') + print_stderr( + f'Error: No file or folder specified to scan. Please add --dependencies-only to decorate dependency file only.' + ) exit(1) - if not scanner.scan_folder_with_options(".", args.dep, scanner.winnowing.file_map): + if not scanner.scan_folder_with_options( + ".", args.dep, scanner.winnowing.file_map + ): exit(1) else: print_stderr('No action found to process') @@ -707,10 +791,11 @@ def utils_cert_download(_, args): :param _: ignore/unused :param args: Parsed arguments """ - from urllib.parse import urlparse import socket - from OpenSSL import SSL, crypto import traceback + from urllib.parse import urlparse + + from OpenSSL import SSL, crypto file = sys.stdout if args.output: diff --git a/src/scanoss/results.py b/src/scanoss/results.py index dc790149..7174f53a 100644 --- a/src/scanoss/results.py +++ b/src/scanoss/results.py @@ -1,7 +1,7 @@ """ SPDX-License-Identifier: MIT - Copyright (c) 2023, SCANOSS + Copyright (c) 2024, SCANOSS Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/src/scanoss/scanner.py b/src/scanoss/scanner.py index 42574574..a0cec4ba 100644 --- a/src/scanoss/scanner.py +++ b/src/scanoss/scanner.py @@ -41,11 +41,13 @@ from .scanossgrpc import ScanossGrpc from .scantype import ScanType from .scanossbase import ScanossBase +from .scanoss_settings import ScanossSettings +from .scanpostprocessor import ScanPostProcessor from . import __version__ FAST_WINNOWING = False try: - from scanoss_winnowing.winnowing import Winnowing + from .winnowing import Winnowing FAST_WINNOWING = True except ModuleNotFoundError or ImportError: @@ -95,17 +97,18 @@ class Scanner(ScanossBase): def __init__(self, wfp: str = None, scan_output: str = None, output_format: str = 'plain', debug: bool = False, trace: bool = False, quiet: bool = False, api_key: str = None, url: str = None, - sbom_path: str = None, scan_type: str = None, flags: str = None, nb_threads: int = 5, + flags: str = None, nb_threads: int = 5, post_size: int = 32, timeout: int = 180, no_wfp_file: bool = False, all_extensions: bool = False, all_folders: bool = False, hidden_files_folders: bool = False, scan_options: int = 7, sc_timeout: int = 600, sc_command: str = None, grpc_url: str = None, obfuscate: bool = False, ignore_cert_errors: bool = False, proxy: str = None, grpc_proxy: str = None, ca_cert: str = None, pac: PACFile = None, retry: int = 5, hpsm: bool = False, skip_size: int = 0, skip_extensions=None, skip_folders=None, - strip_hpsm_ids=None, strip_snippet_ids=None, skip_md5_ids=None + strip_hpsm_ids=None, strip_snippet_ids=None, skip_md5_ids=None, + scan_settings: ScanossSettings = None ): """ - Initialise scanning class, including Winnowing, ScanossApi and ThreadedScanning + Initialise scanning class, including Winnowing, ScanossApi, ThreadedScanning """ super().__init__(debug, trace, quiet) if skip_folders is None: @@ -133,7 +136,7 @@ def __init__(self, wfp: str = None, scan_output: str = None, output_format: str skip_md5_ids=skip_md5_ids ) self.scanoss_api = ScanossApi(debug=debug, trace=trace, quiet=quiet, api_key=api_key, url=url, - sbom_path=sbom_path, scan_type=scan_type, flags=flags, timeout=timeout, + flags=flags, timeout=timeout, ver_details=ver_details, ignore_cert_errors=ignore_cert_errors, proxy=proxy, ca_cert=ca_cert, pac=pac, retry=retry ) @@ -157,6 +160,16 @@ def __init__(self, wfp: str = None, scan_output: str = None, output_format: str if skip_extensions: # Append extra file extensions to skip self.skip_extensions.extend(skip_extensions) + if scan_settings: + self.scan_settings = scan_settings + self.post_processor = ScanPostProcessor(scan_settings, debug=debug, trace=trace, quiet=quiet) + self._maybe_set_api_sbom() + + def _maybe_set_api_sbom(self): + sbom = self.scan_settings.get_sbom() + if sbom: + self.scanoss_api.set_sbom(sbom) + def __filter_files(self, files: list) -> list: """ Filter which files should be considered for processing @@ -524,35 +537,24 @@ def __finish_scan_threaded(self, file_map: dict = None) -> bool: raw_output += ",\n \"%s\":[%s]" % (file, json.dumps(dep_file, indent=2)) # End for loop raw_output += "\n}" - parsed_json = None try: - parsed_json = json.loads(raw_output) + raw_results = json.loads(raw_output) except Exception as e: - self.print_stderr(f'Warning: Problem decoding parsed json: {e}') + raise Exception(f'ERROR: Problem decoding parsed json: {e}') + + results = self.post_processor.load_results(raw_results).post_process() if self.output_format == 'plain': - if parsed_json: - self.__log_result(json.dumps(parsed_json, indent=2, sort_keys=True)) - else: - self.__log_result(raw_output) + self.__log_result(json.dumps(results, indent=2, sort_keys=True)) elif self.output_format == 'cyclonedx': cdx = CycloneDx(self.debug, self.scan_output) - if parsed_json: - success = cdx.produce_from_json(parsed_json) - else: - success = cdx.produce_from_str(raw_output) + success = cdx.produce_from_json(results) elif self.output_format == 'spdxlite': spdxlite = SpdxLite(self.debug, self.scan_output) - if parsed_json: - success = spdxlite.produce_from_json(parsed_json) - else: - success = spdxlite.produce_from_str(raw_output) + success = spdxlite.produce_from_json(results) elif self.output_format == 'csv': csvo = CsvOutput(self.debug, self.scan_output) - if parsed_json: - success = csvo.produce_from_json(parsed_json) - else: - success = csvo.produce_from_str(raw_output) + success = csvo.produce_from_json(results) else: self.print_stderr(f'ERROR: Unknown output format: {self.output_format}') success = False @@ -713,7 +715,7 @@ def scan_files(self, files: []) -> bool: else: Scanner.print_stderr(f'Warning: No files found to scan from: {filtered_files}') return success - + def scan_files_with_options(self, files: [], deps_file: str = None, file_map: dict = None) -> bool: """ Scan the given list of files for whatever scaning options that have been configured diff --git a/src/scanoss/scanoss_settings.py b/src/scanoss/scanoss_settings.py new file mode 100644 index 00000000..f2c0ad6d --- /dev/null +++ b/src/scanoss/scanoss_settings.py @@ -0,0 +1,198 @@ +""" + SPDX-License-Identifier: MIT + + Copyright (c) 2024, SCANOSS + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the 'Software'), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +""" + +import json +import os + +from .scanossbase import ScanossBase + + +class ScanossSettings(ScanossBase): + """Handles the loading and parsing of the SCANOSS settings file""" + + def __init__( + self, + debug: bool = False, + trace: bool = False, + quiet: bool = False, + filepath: str = None, + ): + """ + Args: + debug (bool, optional): Debug. Defaults to False. + trace (bool, optional): Trace. Defaults to False. + quiet (bool, optional): Quiet. Defaults to False. + filepath (str, optional): Path to settings file. Defaults to None. + """ + + super().__init__(debug, trace, quiet) + self.data = {} + self.settings_file_type = None + self.scan_type = None + + if filepath: + self.load_json_file(filepath) + + def load_json_file(self, filepath: str): + """Load the scan settings file + + Args: + filepath (str): Path to the SCANOSS settings file + """ + file = f"{os.getcwd()}/{filepath}" + + if not os.path.exists(file): + self.print_stderr(f"Scan settings file not found: {file}") + self.data = {} + + with open(file, "r") as jsonfile: + self.print_stderr(f"Loading scan settings from: {file}") + try: + self.data = json.load(jsonfile) + except Exception as e: + self.print_stderr(f"ERROR: Problem parsing input JSON: {e}") + return self + + def set_file_type(self, file_type: str): + """Set the file type in order to support both legacy SBOM.json and new scanoss.json files + + Args: + file_type (str): 'legacy' or 'new' + + Raises: + Exception: Invalid scan settings file, missing "components" or "bom" + """ + self.settings_file_type = file_type + if not self._is_valid_sbom_file: + raise Exception( + 'Invalid scan settings file, missing "components" or "bom")' + ) + return self + + def set_scan_type(self, scan_type: str): + """Set the scan type to support legacy SBOM.json files + + Args: + scan_type (str): 'identify' or 'exclude' + """ + self.scan_type = scan_type + return self + + def _is_valid_sbom_file(self): + """Check if the scan settings file is valid + + Returns: + bool: True if the file is valid, False otherwise + """ + if not self.data.get("components") or not self.data.get("bom"): + return False + return True + + def _get_bom(self): + """Get the Billing of Materials from the settings file + + Returns: + dict: If using scanoss.json + list: If using SBOM.json + """ + if self.settings_file_type == "legacy": + return self.data.get("components", []) + return self.data.get("bom", {}) + + def get_bom_include(self): + """Get the list of components to include in the scan + + Returns: + list: List of components to include in the scan + """ + if self.settings_file_type == "legacy": + return self._get_bom() + return self._get_bom().get("include", []) + + def get_bom_remove(self): + """Get the list of components to remove from the scan + + Returns: + list: List of components to remove from the scan + """ + if self.settings_file_type == "legacy": + return self._get_bom() + return self._get_bom().get("remove", []) + + def get_sbom(self): + """Get the SBOM to be sent to the SCANOSS API + + Returns: + dict: SBOM + """ + if not self.data: + return None + return { + "scan_type": self.scan_type, + "assets": json.dumps(self._get_sbom_assets()), + } + + def _get_sbom_assets(self): + """Get the SBOM assets + + Returns: + list: List of SBOM assets + """ + if self.scan_type == "identify": + return self.normalize_bom_entries(self.get_bom_include()) + return self.normalize_bom_entries(self.get_bom_remove()) + + @staticmethod + def normalize_bom_entries(bom_entries): + """Normalize the BOM entries + + Args: + bom_entries (dict): BOM entries + + Returns: + list: Normalized BOM entries + """ + normalized_bom_entries = [] + for entry in bom_entries: + normalized_bom_entries.append( + { + "purl": entry.get("purl", ""), + } + ) + return normalized_bom_entries + + def get_bom_remove_for_filtering(self): + """Get the list of files and purls to remove from the scan + + Returns: + (list[str], list[str]): List of files and list of purls to remove from the scan + """ + entries = self.get_bom_remove() + files = [ + entry.get("path") for entry in entries if entry.get("path") is not None + ] + purls = [ + entry.get("purl") for entry in entries if entry.get("purl") is not None + ] + return files, purls diff --git a/src/scanoss/scanossapi.py b/src/scanoss/scanossapi.py index 02b151af..3a0643d6 100644 --- a/src/scanoss/scanossapi.py +++ b/src/scanoss/scanossapi.py @@ -34,6 +34,7 @@ from pypac import PACSession from pypac.parser import PACFile from urllib3.exceptions import InsecureRequestWarning + from .scanossbase import ScanossBase from . import __version__ @@ -50,14 +51,12 @@ class ScanossApi(ScanossBase): Currently support posting scan requests to the SCANOSS streaming API """ - def __init__(self, scan_type: str = None, sbom_path: str = None, scan_format: str = None, flags: str = None, + def __init__(self, scan_format: str = None, flags: str = None, url: str = None, api_key: str = None, debug: bool = False, trace: bool = False, quiet: bool = False, timeout: int = 180, ver_details: str = None, ignore_cert_errors: bool = False, proxy: str = None, ca_cert: str = None, pac: PACFile = None, retry: int = 5): """ Initialise the SCANOSS API - :param scan_type: Scan type (default identify) - :param sbom_path: Input SBOM file to match scan type (default None) :param scan_format: Scan format (default plain) :param flags: Scanning flags (default None) :param url: API URL (default https://api.osskb.org/scan/direct) @@ -77,9 +76,8 @@ def __init__(self, scan_type: str = None, sbom_path: str = None, scan_format: st self.api_key = api_key if api_key else SCANOSS_API_KEY if self.api_key and not url and not os.environ.get("SCANOSS_SCAN_URL"): self.url = DEFAULT_URL2 # API key specific and no alternative URL, so use the default premium - self.scan_type = scan_type + self.sbom = None self.scan_format = scan_format if scan_format else 'plain' - self.sbom_path = sbom_path self.flags = flags self.timeout = timeout if timeout > 5 else 180 self.retry_limit = retry if retry >= 0 else 5 @@ -92,8 +90,6 @@ def __init__(self, scan_type: str = None, sbom_path: str = None, scan_format: st self.headers['x-api-key'] = self.api_key self.headers['User-Agent'] = f'scanoss-py/{__version__}' self.headers['user-agent'] = f'scanoss-py/{__version__}' - self.sbom = None - self.load_sbom() # Load an input SBOM if one is specified if self.trace: logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) http_client.HTTPConnection.debuglevel = 1 @@ -115,17 +111,6 @@ def __init__(self, scan_type: str = None, sbom_path: str = None, scan_format: st if self. proxies: self.session.proxies = self.proxies - def load_sbom(self): - """ - Load the input SBOM if one exists - """ - if self.sbom_path: - if not self.scan_type: - self.scan_type = 'identify' # Default to identify SBOM type if it's not set - self.print_debug(f'Loading {self.scan_type} SBOM {self.sbom_path}...') - with open(self.sbom_path) as f: - self.sbom = f.read() - def scan(self, wfp: str, context: str = None, scan_id: int = None): """ Scan the specified WFP and return the JSON object @@ -137,14 +122,15 @@ def scan(self, wfp: str, context: str = None, scan_id: int = None): request_id = str(uuid.uuid4()) form_data = {} if self.sbom: - form_data['type'] = self.scan_type - form_data['assets'] = self.sbom + form_data['type'] = self.sbom.get("scan_type") + form_data['assets'] = self.sbom.get("assets") if self.scan_format: form_data['format'] = self.scan_format if self.flags: form_data['flags'] = self.flags if context: form_data['context'] = context + scan_files = {'file': ("%s.wfp" % request_id, wfp)} headers = self.headers headers['x-request-id'] = request_id # send a unique request id for each post @@ -242,6 +228,10 @@ def save_bad_req_wfp(self, scan_files, request_id, scan_id): except Exception as ee: self.print_stderr(f'Warning: Issue writing bad request file - {bad_req_file} ({ee.__class__.__name__}):' f' {ee}') + + def set_sbom(self, sbom): + self.sbom = sbom + return self # # End of ScanossApi Class diff --git a/src/scanoss/scanpostprocessor.py b/src/scanoss/scanpostprocessor.py new file mode 100644 index 00000000..8c5c88ff --- /dev/null +++ b/src/scanoss/scanpostprocessor.py @@ -0,0 +1,104 @@ +""" + SPDX-License-Identifier: MIT + + Copyright (c) 2024, SCANOSS + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +""" + +from .scanoss_settings import ScanossSettings +from .scanossbase import ScanossBase + + +class ScanPostProcessor(ScanossBase): + """Handles post-processing of the scan results""" + + def __init__( + self, + scan_settings: ScanossSettings, + debug: bool = False, + trace: bool = False, + quiet: bool = False, + results: dict = None, + ): + """ + Args: + scan_settings (ScanossSettings): Scan settings object + debug (bool, optional): Debug mode. Defaults to False. + trace (bool, optional): Traces. Defaults to False. + quiet (bool, optional): Quiet mode. Defaults to False. + results (dict | str, optional): Results to be processed. Defaults to None. + """ + super().__init__(debug, trace, quiet) + self.scan_settings = scan_settings + self.results = results + + def load_results(self, raw_results: dict): + """Load the raw results + + Args: + raw_results (dict): Raw scan results + """ + self.results = raw_results + return self + + def post_process(self): + """Post-process the scan results + + Returns: + dict: Processed results + """ + self.remove_dismissed_files() + return self.results + + def remove_dismissed_files(self): + """Remove dismissed files in SCANOSS settings file from the results""" + to_remove_files, to_remove_purls = ( + self.scan_settings.get_bom_remove_for_filtering() + ) + + if not to_remove_files and not to_remove_purls: + return + + self.filter_files(to_remove_files, to_remove_purls) + return self + + def filter_files(self, files: list, purls: list): + """Filter files based on the provided list of files and purls + + Args: + files (list): List of files to be filtered + purls (list): List of purls to be filtered + """ + filtered_results = {} + + for file_name in self.results: + file = self.results.get(file_name) + file = file[0] if isinstance(file, list) else file + + identified_purls = file.get("purl") + if identified_purls and any(purl in purls for purl in identified_purls): + continue + elif file_name in files: + continue + + filtered_results[file_name] = file + + self.results = filtered_results + return self diff --git a/tests/scanpostprocessor-test.py b/tests/scanpostprocessor-test.py new file mode 100644 index 00000000..58f8d72e --- /dev/null +++ b/tests/scanpostprocessor-test.py @@ -0,0 +1,60 @@ +""" + SPDX-License-Identifier: MIT + + Copyright (c) 2024, SCANOSS + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +""" +import unittest + +from src.scanoss.scanoss_settings import ScanossSettings +from src.scanoss.scanpostprocessor import ScanPostProcessor + + +class MyTestCase(unittest.TestCase): + """ + Unit test cases for Scan Post-Processing + """ + + def test_remove_files(self): + """ + Should remove files by path from the scan results + """ + scan_settings = ScanossSettings(filepath="data/scanoss.json") + post_processor = ScanPostProcessor(scan_settings) + results = { + "scanoss_settings.py": [ + { + "purl": ["pkg:github/scanoss/scanoss.py"], + } + ], + "test_file_path.go": [ + { + "purl": ["pkg:github/scanoss/scanoss.lui"], + } + ] + } + processed_results = post_processor.load_results(results).post_process() + + self.assertEqual(len(processed_results), 0) + self.assertEqual(processed_results, {}) + + +if __name__ == '__main__': + unittest.main()