Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 132 additions & 47 deletions src/scanoss/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@


from .scanner import Scanner
from .scanoss_settings import ScanossSettings
from .scancodedeps import ScancodeDeps
from .scanner import FAST_WINNOWING, Scanner
from .scantype import ScanType
from .filecount import FileCount
from .cyclonedx import CycloneDx
Expand Down Expand Up @@ -102,6 +104,11 @@ def setup_args() -> None:
help='Scancode command and path if required (optional - default scancode).')
p_scan.add_argument('--sc-timeout', type=int, default=600,
help='Timeout (in seconds) for scancode to complete (optional - default 600)')
p_scan.add_argument(
'--settings',
type=str,
help='Settings file to use for scanning (optional - default scanoss.json)',
)

# Sub-command: fingerprint
p_wfp = subparsers.add_parser('fingerprint', aliases=['fp', 'wfp'],
Expand Down Expand Up @@ -489,42 +496,70 @@ def scan(parser, args):
args: Namespace
Parsed arguments
"""
if not args.scan_dir and not args.wfp and not args.stdin and not args.dep and not args.files:
print_stderr('Please specify a file/folder, files (--files), fingerprint (--wfp), dependency (--dep), or STDIN (--stdin)')
if (
not args.scan_dir
and not args.wfp
and not args.stdin
and not args.dep
and not args.files
):
print_stderr(
'Please specify a file/folder, files (--files), fingerprint (--wfp), dependency (--dep), or STDIN (--stdin)'
)
parser.parse_args([args.subparser, '-h'])
exit(1)
if args.pac and args.proxy:
print_stderr('Please specify one of --proxy or --pac, not both')
parser.parse_args([args.subparser, '-h'])
exit(1)
scan_type: str = None
sbom_path: str = None

if args.identify and args.settings:
print_stderr(f'ERROR: Cannot specify both --identify and --settings options.')
exit(1)

def is_valid_file(file_path: str) -> bool:
if not os.path.exists(file_path) or not os.path.isfile(file_path):
print_stderr(f'Specified file does not exist or is not a file: {file_path}')
return False
if not Scanner.valid_json_file(file_path):
return False
return True

scan_settings = ScanossSettings(
debug=args.debug, trace=args.trace, quiet=args.quiet
)

if args.identify:
sbom_path = args.identify
scan_type = 'identify'
if not os.path.exists(sbom_path) or not os.path.isfile(sbom_path):
print_stderr(f'Specified --identify file does not exist or is not a file: {sbom_path}')
exit(1)
if not Scanner.valid_json_file(sbom_path): # Make sure it's a valid JSON file
if not is_valid_file(args.identify) or args.ignore:
exit(1)
if args.ignore:
print_stderr(f'Warning: Specified --identify and --ignore options. Skipping ignore.')
scan_settings.load_json_file(args.identify).set_file_type(
'legacy'
).set_scan_type('identify')
elif args.ignore:
sbom_path = args.ignore
scan_type = 'blacklist'
if not os.path.exists(sbom_path) or not os.path.isfile(sbom_path):
print_stderr(f'Specified --ignore file does not exist or is not a file: {sbom_path}')
if not is_valid_file(args.ignore):
exit(1)
if not Scanner.valid_json_file(sbom_path): # Make sure it's a valid JSON file
scan_settings.load_json_file(args.ignore).set_file_type('legacy').set_scan_type(
'blacklist'
)
elif args.settings:
if not is_valid_file(args.settings):
exit(1)
scan_settings.load_json_file(args.settings).set_file_type('new').set_scan_type(
'identify'
)

if args.dep:
if not os.path.exists(args.dep) or not os.path.isfile(args.dep):
print_stderr(f'Specified --dep file does not exist or is not a file: {args.dep}')
print_stderr(
f'Specified --dep file does not exist or is not a file: {args.dep}'
)
exit(1)
if not Scanner.valid_json_file(args.dep): # Make sure it's a valid JSON file
exit(1)
if args.strip_hpsm and not args.hpsm and not args.quiet:
print_stderr(f'Warning: --strip-hpsm option supplied without enabling HPSM (--hpsm). Ignoring.')
print_stderr(
f'Warning: --strip-hpsm option supplied without enabling HPSM (--hpsm). Ignoring.'
)

scan_output: str = None
if args.output:
Expand Down Expand Up @@ -563,64 +598,113 @@ def scan(parser, args):
print_stderr(f'Using flags {flags}...')
elif not args.quiet:
if args.timeout < 5:
print_stderr(f'POST timeout (--timeout) too small: {args.timeout}. Reverting to default.')
print_stderr(
f'POST timeout (--timeout) too small: {args.timeout}. Reverting to default.'
)
if args.retry < 0:
print_stderr(f'POST retry (--retry) too small: {args.retry}. Reverting to default.')
print_stderr(
f'POST retry (--retry) too small: {args.retry}. Reverting to default.'
)

if not os.access(os.getcwd(), os.W_OK): # Make sure the current directory is writable. If not disable saving WFP
if not os.access(
os.getcwd(), os.W_OK
): # Make sure the current directory is writable. If not disable saving WFP
print_stderr(f'Warning: Current directory is not writable: {os.getcwd()}')
args.no_wfp_output = True
if args.ca_cert and not os.path.exists(args.ca_cert):
print_stderr(f'Error: Certificate file does not exist: {args.ca_cert}.')
exit(1)
pac_file = get_pac_file(args.pac)
scan_options = get_scan_options(args) # Figure out what scanning options we have

scanner = Scanner(debug=args.debug, trace=args.trace, quiet=args.quiet, api_key=args.key, url=args.apiurl,
sbom_path=sbom_path, scan_type=scan_type, scan_output=scan_output, output_format=output_format,
flags=flags, nb_threads=args.threads, post_size=args.post_size,
timeout=args.timeout, no_wfp_file=args.no_wfp_output, all_extensions=args.all_extensions,
all_folders=args.all_folders, hidden_files_folders=args.all_hidden,
scan_options=scan_options, sc_timeout=args.sc_timeout, sc_command=args.sc_command,
grpc_url=args.api2url, obfuscate=args.obfuscate,
ignore_cert_errors=args.ignore_cert_errors, proxy=args.proxy, grpc_proxy=args.grpc_proxy,
pac=pac_file, ca_cert=args.ca_cert, retry=args.retry, hpsm=args.hpsm,
skip_size=args.skip_size, skip_extensions=args.skip_extension, skip_folders=args.skip_folder,
skip_md5_ids=args.skip_md5, strip_hpsm_ids=args.strip_hpsm, strip_snippet_ids=args.strip_snippet
)
scan_options = get_scan_options(args) # Figure out what scanning options we have

scanner = Scanner(
debug=args.debug,
trace=args.trace,
quiet=args.quiet,
api_key=args.key,
url=args.apiurl,
scan_output=scan_output,
output_format=output_format,
flags=flags,
nb_threads=args.threads,
post_size=args.post_size,
timeout=args.timeout,
no_wfp_file=args.no_wfp_output,
all_extensions=args.all_extensions,
all_folders=args.all_folders,
hidden_files_folders=args.all_hidden,
scan_options=scan_options,
sc_timeout=args.sc_timeout,
sc_command=args.sc_command,
grpc_url=args.api2url,
obfuscate=args.obfuscate,
ignore_cert_errors=args.ignore_cert_errors,
proxy=args.proxy,
grpc_proxy=args.grpc_proxy,
pac=pac_file,
ca_cert=args.ca_cert,
retry=args.retry,
hpsm=args.hpsm,
skip_size=args.skip_size,
skip_extensions=args.skip_extension,
skip_folders=args.skip_folder,
skip_md5_ids=args.skip_md5,
strip_hpsm_ids=args.strip_hpsm,
strip_snippet_ids=args.strip_snippet,
scan_settings=scan_settings
)

if args.wfp:
if not scanner.is_file_or_snippet_scan():
print_stderr(f'Error: Cannot specify WFP scanning if file/snippet options are disabled ({scan_options})')
print_stderr(
f'Error: Cannot specify WFP scanning if file/snippet options are disabled ({scan_options})'
)
exit(1)
if scanner.is_dependency_scan() and not args.dep:
print_stderr(f'Error: Cannot specify WFP & Dependency scanning without a dependency file (--dep)')
print_stderr(
f'Error: Cannot specify WFP & Dependency scanning without a dependency file (--dep)'
)
exit(1)
scanner.scan_wfp_with_options(args.wfp, args.dep)
elif args.stdin:
contents = sys.stdin.buffer.read()
if not scanner.scan_contents(args.stdin, contents):
exit(1)
elif args.files:
if not scanner.scan_files_with_options(args.files, args.dep, scanner.winnowing.file_map):
if not scanner.scan_files_with_options(
args.files, args.dep, scanner.winnowing.file_map
):
exit(1)
elif args.scan_dir:
if not os.path.exists(args.scan_dir):
print_stderr(f'Error: File or folder specified does not exist: {args.scan_dir}.')
print_stderr(
f'Error: File or folder specified does not exist: {args.scan_dir}.'
)
exit(1)
if os.path.isdir(args.scan_dir):
if not scanner.scan_folder_with_options(args.scan_dir, args.dep, scanner.winnowing.file_map):
if not scanner.scan_folder_with_options(
args.scan_dir, args.dep, scanner.winnowing.file_map
):
exit(1)
elif os.path.isfile(args.scan_dir):
if not scanner.scan_file_with_options(args.scan_dir, args.dep, scanner.winnowing.file_map):
if not scanner.scan_file_with_options(
args.scan_dir, args.dep, scanner.winnowing.file_map
):
exit(1)
else:
print_stderr(f'Error: Path specified is neither a file or a folder: {args.scan_dir}.')
print_stderr(
f'Error: Path specified is neither a file or a folder: {args.scan_dir}.'
)
exit(1)
elif args.dep:
if not args.dependencies_only:
print_stderr(f'Error: No file or folder specified to scan. Please add --dependencies-only to decorate dependency file only.')
print_stderr(
f'Error: No file or folder specified to scan. Please add --dependencies-only to decorate dependency file only.'
)
exit(1)
if not scanner.scan_folder_with_options(".", args.dep, scanner.winnowing.file_map):
if not scanner.scan_folder_with_options(
".", args.dep, scanner.winnowing.file_map
):
exit(1)
else:
print_stderr('No action found to process')
Expand Down Expand Up @@ -707,10 +791,11 @@ def utils_cert_download(_, args):
:param _: ignore/unused
:param args: Parsed arguments
"""
from urllib.parse import urlparse
import socket
from OpenSSL import SSL, crypto
import traceback
from urllib.parse import urlparse

from OpenSSL import SSL, crypto

file = sys.stdout
if args.output:
Expand Down
2 changes: 1 addition & 1 deletion src/scanoss/results.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
SPDX-License-Identifier: MIT

Copyright (c) 2023, SCANOSS
Copyright (c) 2024, SCANOSS

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
52 changes: 27 additions & 25 deletions src/scanoss/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
from .scanossgrpc import ScanossGrpc
from .scantype import ScanType
from .scanossbase import ScanossBase
from .scanoss_settings import ScanossSettings
from .scanpostprocessor import ScanPostProcessor
from . import __version__

FAST_WINNOWING = False
try:
from scanoss_winnowing.winnowing import Winnowing
from .winnowing import Winnowing

FAST_WINNOWING = True
except ModuleNotFoundError or ImportError:
Expand Down Expand Up @@ -95,17 +97,18 @@ class Scanner(ScanossBase):

def __init__(self, wfp: str = None, scan_output: str = None, output_format: str = 'plain',
debug: bool = False, trace: bool = False, quiet: bool = False, api_key: str = None, url: str = None,
sbom_path: str = None, scan_type: str = None, flags: str = None, nb_threads: int = 5,
flags: str = None, nb_threads: int = 5,
post_size: int = 32, timeout: int = 180, no_wfp_file: bool = False,
all_extensions: bool = False, all_folders: bool = False, hidden_files_folders: bool = False,
scan_options: int = 7, sc_timeout: int = 600, sc_command: str = None, grpc_url: str = None,
obfuscate: bool = False, ignore_cert_errors: bool = False, proxy: str = None, grpc_proxy: str = None,
ca_cert: str = None, pac: PACFile = None, retry: int = 5, hpsm: bool = False,
skip_size: int = 0, skip_extensions=None, skip_folders=None,
strip_hpsm_ids=None, strip_snippet_ids=None, skip_md5_ids=None
strip_hpsm_ids=None, strip_snippet_ids=None, skip_md5_ids=None,
scan_settings: ScanossSettings = None
):
"""
Initialise scanning class, including Winnowing, ScanossApi and ThreadedScanning
Initialise scanning class, including Winnowing, ScanossApi, ThreadedScanning
"""
super().__init__(debug, trace, quiet)
if skip_folders is None:
Expand Down Expand Up @@ -133,7 +136,7 @@ def __init__(self, wfp: str = None, scan_output: str = None, output_format: str
skip_md5_ids=skip_md5_ids
)
self.scanoss_api = ScanossApi(debug=debug, trace=trace, quiet=quiet, api_key=api_key, url=url,
sbom_path=sbom_path, scan_type=scan_type, flags=flags, timeout=timeout,
flags=flags, timeout=timeout,
ver_details=ver_details, ignore_cert_errors=ignore_cert_errors,
proxy=proxy, ca_cert=ca_cert, pac=pac, retry=retry
)
Expand All @@ -157,6 +160,16 @@ def __init__(self, wfp: str = None, scan_output: str = None, output_format: str
if skip_extensions: # Append extra file extensions to skip
self.skip_extensions.extend(skip_extensions)

if scan_settings:
self.scan_settings = scan_settings
self.post_processor = ScanPostProcessor(scan_settings, debug=debug, trace=trace, quiet=quiet)
self._maybe_set_api_sbom()

def _maybe_set_api_sbom(self):
sbom = self.scan_settings.get_sbom()
if sbom:
self.scanoss_api.set_sbom(sbom)

def __filter_files(self, files: list) -> list:
"""
Filter which files should be considered for processing
Expand Down Expand Up @@ -524,35 +537,24 @@ def __finish_scan_threaded(self, file_map: dict = None) -> bool:
raw_output += ",\n \"%s\":[%s]" % (file, json.dumps(dep_file, indent=2))
# End for loop
raw_output += "\n}"
parsed_json = None
try:
parsed_json = json.loads(raw_output)
raw_results = json.loads(raw_output)
except Exception as e:
self.print_stderr(f'Warning: Problem decoding parsed json: {e}')
raise Exception(f'ERROR: Problem decoding parsed json: {e}')

results = self.post_processor.load_results(raw_results).post_process()

if self.output_format == 'plain':
if parsed_json:
self.__log_result(json.dumps(parsed_json, indent=2, sort_keys=True))
else:
self.__log_result(raw_output)
self.__log_result(json.dumps(results, indent=2, sort_keys=True))
elif self.output_format == 'cyclonedx':
cdx = CycloneDx(self.debug, self.scan_output)
if parsed_json:
success = cdx.produce_from_json(parsed_json)
else:
success = cdx.produce_from_str(raw_output)
success = cdx.produce_from_json(results)
elif self.output_format == 'spdxlite':
spdxlite = SpdxLite(self.debug, self.scan_output)
if parsed_json:
success = spdxlite.produce_from_json(parsed_json)
else:
success = spdxlite.produce_from_str(raw_output)
success = spdxlite.produce_from_json(results)
elif self.output_format == 'csv':
csvo = CsvOutput(self.debug, self.scan_output)
if parsed_json:
success = csvo.produce_from_json(parsed_json)
else:
success = csvo.produce_from_str(raw_output)
success = csvo.produce_from_json(results)
else:
self.print_stderr(f'ERROR: Unknown output format: {self.output_format}')
success = False
Expand Down Expand Up @@ -713,7 +715,7 @@ def scan_files(self, files: []) -> bool:
else:
Scanner.print_stderr(f'Warning: No files found to scan from: {filtered_files}')
return success

def scan_files_with_options(self, files: [], deps_file: str = None, file_map: dict = None) -> bool:
"""
Scan the given list of files for whatever scaning options that have been configured
Expand Down
Loading