Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions src/scanoss/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,117 @@ def scan_file(self, file: str) -> bool:
success = False
return success

def scan_files(self, files: list[str]) -> bool:
"""
Scan the specified list of files, producing fingerprints, send to the SCANOSS API and return results
Please note that by providing an explicit list you bypass any exclusions that may be defined on the scanner
:param files: list[str]
List of filenames to scan
:return True if successful, False otherwise
"""
success = True
if not files:
raise Exception(f"ERROR: Please provide a non-empty list of filenames to scan")
self.print_msg(f'Scanning {len(files)} files...')
spinner = None
if not self.quiet and self.isatty:
spinner = Spinner('Fingerprinting ')
save_wfps_for_print = not self.no_wfp_file or not self.threaded_scan
wfp_list = []
scan_block = ''
scan_size = 0
queue_size = 0
file_count = 0 # count all files fingerprinted
wfp_file_count = 0 # count number of files in each queue post
scan_started = False
for file in files:
if self.threaded_scan and self.threaded_scan.stop_scanning():
self.print_stderr('Warning: Aborting fingerprinting as the scanning service is not available.')
break
f_size = 0
try:
f_size = os.stat(file).st_size
except Exception as e:
self.print_trace(
f'Ignoring missing symlink file: {file} ({e})') # Can fail if there is a broken symlink
if f_size > 0: # Ignore broken links and empty files
self.print_trace(f'Fingerprinting {file}...')
if spinner:
spinner.next()
wfp = self.winnowing.wfp_for_file(file, file)
if wfp is None or wfp == '':
self.print_stderr(f'Warning: No WFP returned for {path}')
continue
if save_wfps_for_print:
wfp_list.append(wfp)
file_count += 1
if self.threaded_scan:
wfp_size = len(wfp.encode("utf-8"))
# If the WFP is bigger than the max post size and we already have something stored in the scan block, add it to the queue
if scan_block != '' and (wfp_size + scan_size) >= self.max_post_size:
self.threaded_scan.queue_add(scan_block)
queue_size += 1
scan_block = ''
wfp_file_count = 0
scan_block += wfp
scan_size = len(scan_block.encode("utf-8"))
wfp_file_count += 1
# If the scan request block (group of WFPs) or larger than the POST size or we have reached the file limit, add it to the queue
if wfp_file_count > self.post_file_count or scan_size >= self.max_post_size:
self.threaded_scan.queue_add(scan_block)
queue_size += 1
scan_block = ''
wfp_file_count = 0
if not scan_started and queue_size > self.nb_threads: # Start scanning if we have something to do
scan_started = True
if not self.threaded_scan.run(wait=False):
self.print_stderr(
f'Warning: Some errors encounted while scanning. Results might be incomplete.')
success = False
# End for loop
if self.threaded_scan and scan_block != '':
self.threaded_scan.queue_add(scan_block) # Make sure all files have been submitted
if spinner:
spinner.finish()

if file_count > 0:
if save_wfps_for_print: # Write a WFP file if no threading is requested
self.print_debug(f'Writing fingerprints to {self.wfp}')
with open(self.wfp, 'w') as f:
f.write(''.join(wfp_list))
else:
self.print_debug(f'Skipping writing WFP file {self.wfp}')
if self.threaded_scan:
success = self.__run_scan_threaded(scan_started, file_count)
else:
Scanner.print_stderr(f'Warning: No files found to scan in folder: {scan_dir}')
return success

def scan_files_with_options(self, files: list[str], deps_file: str = None, file_map: dict = None) -> bool:
"""
Scan the given folder for whatever scaning options that have been configured
:param files: list of files to scan
:param deps_file: pre-parsed dependency file to decorate
:param file_map: mapping of obfuscated files back into originals
:return: True if successful, False otherwise
"""
success = True
if not files:
raise Exception(f"ERROR: Please specify a list of files to scan")
if not self.is_file_or_snippet_scan():
raise Exception(f"ERROR: file or snippet scan options have to be set to scan files: {files}")
if self.is_dependency_scan():
raise Exception(f"ERROR: The dependency scan option is currently not supported when scanning a list of files")
if self.scan_output:
self.print_msg(f'Writing results to {self.scan_output}...')
if self.is_file_or_snippet_scan():
if not self.scan_files(files):
success = False
if self.threaded_scan:
if not self.__finish_scan_threaded(file_map):
success = False
return success

def scan_contents(self, filename: str, contents: bytes) -> bool:
"""
Scan the given contents as a file
Expand Down