Skip to content

Commit c69e9dc

Browse files
committed
Move get crypto algorithms to it's own subcommand
1 parent f3bdb41 commit c69e9dc

File tree

4 files changed

+226
-48
lines changed

4 files changed

+226
-48
lines changed

src/scanoss/cli.py

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import pypac
3333

34+
from scanoss.cryptography import Cryptography, create_cryptography_config_from_args
3435
from scanoss.scanners.container_scanner import (
3536
DEFAULT_SYFT_COMMAND,
3637
DEFAULT_SYFT_TIMEOUT,
@@ -1432,22 +1433,20 @@ def crypto_algorithms(parser, args):
14321433
if args.ca_cert and not os.path.exists(args.ca_cert):
14331434
print_stderr(f'Error: Certificate file does not exist: {args.ca_cert}.')
14341435
sys.exit(1)
1435-
pac_file = get_pac_file(args.pac)
14361436

1437-
comps = Components(
1438-
debug=args.debug,
1439-
trace=args.trace,
1440-
quiet=args.quiet,
1441-
grpc_url=args.api2url,
1442-
api_key=args.key,
1443-
ca_cert=args.ca_cert,
1444-
proxy=args.proxy,
1445-
grpc_proxy=args.grpc_proxy,
1446-
pac=pac_file,
1447-
timeout=args.timeout,
1448-
req_headers=process_req_headers(args.header),
1449-
)
1450-
if not comps.get_crypto_details(args.input, args.purl, args.output):
1437+
try:
1438+
config = create_cryptography_config_from_args(args)
1439+
grpc_config = create_grpc_config_from_args(args)
1440+
client = ScanossGrpc(**asdict(grpc_config))
1441+
1442+
# TODO: Add PAC file support
1443+
# pac_file = get_pac_file(config.pac)
1444+
1445+
cryptography = Cryptography(config=config, client=client)
1446+
cryptography.get_algorithms()
1447+
cryptography.present(output_file=args.output)
1448+
except Exception as e:
1449+
print_stderr(f'ERROR: {e}')
14511450
sys.exit(1)
14521451

14531452

src/scanoss/cryptography.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
import json
2+
from dataclasses import dataclass
3+
from typing import Dict, Optional
4+
5+
from scanoss.scanossbase import ScanossBase
6+
from scanoss.scanossgrpc import ScanossGrpc
7+
from scanoss.utils.abstract_presenter import AbstractPresenter
8+
from scanoss.utils.file import validate_json_file
9+
10+
11+
class ScanossCryptographyError(Exception):
12+
pass
13+
14+
15+
@dataclass
16+
class CryptographyConfig:
17+
debug: bool = False
18+
trace: bool = False
19+
quiet: bool = False
20+
get_range: bool = False
21+
purl: str = None
22+
input_file: str = None
23+
output_file: str = None
24+
header: str = None
25+
26+
27+
def create_cryptography_config_from_args(args) -> CryptographyConfig:
28+
return CryptographyConfig(
29+
debug=getattr(args, 'debug', None),
30+
trace=getattr(args, 'trace', None),
31+
quiet=getattr(args, 'quiet', None),
32+
get_range=getattr(args, 'range', None),
33+
purl=getattr(args, 'purl', None),
34+
input_file=getattr(args, 'input', None),
35+
output_file=getattr(args, 'output', None),
36+
header=getattr(args, 'header', None),
37+
)
38+
39+
40+
class Cryptography:
41+
"""
42+
Cryptography Class
43+
44+
This class is used to decorate purls with cryptography information.
45+
"""
46+
47+
def __init__(
48+
self,
49+
config: CryptographyConfig,
50+
client: ScanossGrpc,
51+
):
52+
"""
53+
Initialize the Cryptography.
54+
55+
Args:
56+
config (CryptographyConfig): Configuration parameters for the cryptography.
57+
client (ScanossGrpc): gRPC client for communicating with the scanning service.
58+
"""
59+
self.base = ScanossBase(
60+
debug=config.debug,
61+
trace=config.trace,
62+
quiet=config.quiet,
63+
)
64+
self.presenter = CryptographyPresenter(
65+
self,
66+
debug=config.debug,
67+
trace=config.trace,
68+
quiet=config.quiet,
69+
)
70+
71+
self.client = client
72+
self.config = config
73+
self.purls_request = self._build_purls_request()
74+
self.results = None
75+
76+
def get_algorithms(self) -> Optional[Dict]:
77+
"""
78+
Get the cryptographic algorithms for the provided purl or input file.
79+
80+
Returns:
81+
Optional[Dict]: The folder hash response from the gRPC client, or None if an error occurs.
82+
"""
83+
84+
try:
85+
self.base.print_stderr(
86+
f'Getting cryptographic algorithms for {", ".join([p["purl"] for p in self.purls_request["purls"]])}'
87+
)
88+
if self.config.get_range:
89+
response = self.client.get_crypto_algorithms_in_range_for_purl(self.purls_request)
90+
else:
91+
response = self.client.get_crypto_algorithms_for_purl(self.purls_request)
92+
if response:
93+
self.results = response
94+
except Exception as e:
95+
raise ScanossCryptographyError(f'Problem with purl input file. {e}')
96+
97+
return self.results
98+
99+
def _build_purls_request(
100+
self,
101+
) -> Optional[dict]:
102+
"""
103+
Load the specified purls from a JSON file or a list of PURLs and return a dictionary
104+
105+
Args:
106+
json_file (Optional[str], optional): The JSON file containing the PURLs. Defaults to None.
107+
purls (Optional[List[str]], optional): The list of PURLs. Defaults to None.
108+
109+
Returns:
110+
Optional[dict]: The dictionary containing the PURLs
111+
"""
112+
if self.config.input_file:
113+
input_file_validation = validate_json_file(self.config.input_file)
114+
if not input_file_validation.is_valid:
115+
self.base.print_stderr(
116+
f'ERROR: The supplied input file "{self.config.input_file}" was not found or is empty.'
117+
)
118+
raise Exception(f'Problem with purl input file. {input_file_validation.error}')
119+
# Validate the input file is in PurlRequest format
120+
if (
121+
not isinstance(input_file_validation.data, dict)
122+
or 'purls' not in input_file_validation.data
123+
or not isinstance(input_file_validation.data['purls'], list)
124+
or not all(isinstance(p, dict) and 'purl' in p for p in input_file_validation.data['purls'])
125+
):
126+
raise Exception('The supplied input file is not in the correct PurlRequest format.')
127+
return input_file_validation.data
128+
if self.config.purl:
129+
return {'purls': [{'purl': p} for p in self.config.purl]}
130+
return None
131+
132+
def present(self, output_format: str = None, output_file: str = None):
133+
"""Present the results in the selected format"""
134+
self.presenter.present(output_format=output_format, output_file=output_file)
135+
136+
137+
class CryptographyPresenter(AbstractPresenter):
138+
"""
139+
Cryptography presenter class
140+
Handles the presentation of the cryptography results
141+
"""
142+
143+
def __init__(self, cryptography: Cryptography, **kwargs):
144+
super().__init__(**kwargs)
145+
self.cryptography = cryptography
146+
147+
def _format_json_output(self) -> str:
148+
"""
149+
Format the scan output data into a JSON object
150+
151+
Returns:
152+
str: The formatted JSON string
153+
"""
154+
return json.dumps(self.cryptography.results, indent=2)
155+
156+
def _format_plain_output(self) -> str:
157+
"""
158+
Format the scan output data into a plain text string
159+
"""
160+
return (
161+
json.dumps(self.cryptography.results, indent=2)
162+
if isinstance(self.cryptography.results, dict)
163+
else str(self.cryptography.results)
164+
)
165+
166+
def _format_cyclonedx_output(self) -> str:
167+
raise NotImplementedError('CycloneDX output is not implemented')
168+
169+
def _format_spdxlite_output(self) -> str:
170+
raise NotImplementedError('SPDXlite output is not implemented')
171+
172+
def _format_csv_output(self) -> str:
173+
raise NotImplementedError('CSV output is not implemented')
174+
175+
def _format_raw_output(self) -> str:
176+
raise NotImplementedError('Raw output is not implemented')

src/scanoss/scanossgrpc.py

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
CompVersionResponse,
5555
)
5656
from .api.components.v2.scanoss_components_pb2_grpc import ComponentsStub
57-
from .api.cryptography.v2.scanoss_cryptography_pb2 import AlgorithmResponse
5857
from .api.cryptography.v2.scanoss_cryptography_pb2_grpc import CryptographyStub
5958
from .api.dependencies.v2.scanoss_dependencies_pb2 import DependencyRequest
6059
from .api.dependencies.v2.scanoss_dependencies_pb2_grpc import DependenciesStub
@@ -312,36 +311,6 @@ def process_file(file):
312311
merged_response['status'] = response['status']
313312
return merged_response
314313

315-
def get_crypto_json(self, purls: dict) -> dict:
316-
"""
317-
Client function to call the rpc for Cryptography GetAlgorithms
318-
:param purls: Message to send to the service
319-
:return: Server response or None
320-
"""
321-
if not purls:
322-
self.print_stderr('ERROR: No message supplied to send to gRPC service.')
323-
return None
324-
request_id = str(uuid.uuid4())
325-
resp: AlgorithmResponse
326-
try:
327-
request = ParseDict(purls, PurlRequest()) # Parse the JSON/Dict into the purl request object
328-
metadata = self.metadata[:]
329-
metadata.append(('x-request-id', request_id)) # Set a Request ID
330-
self.print_debug(f'Sending crypto data for decoration (rqId: {request_id})...')
331-
resp = self.crypto_stub.GetAlgorithms(request, metadata=metadata, timeout=self.timeout)
332-
except Exception as e:
333-
self.print_stderr(
334-
f'ERROR: {e.__class__.__name__} Problem encountered sending gRPC message (rqId: {request_id}): {e}'
335-
)
336-
else:
337-
if resp:
338-
if not self._check_status_response(resp.status, request_id):
339-
return None
340-
resp_dict = MessageToDict(resp, preserving_proto_field_name=True) # Convert gRPC response to a dict
341-
del resp_dict['status']
342-
return resp_dict
343-
return None
344-
345314
def get_vulnerabilities_json(self, purls: dict) -> dict:
346315
"""
347316
Client function to call the rpc for Vulnerability GetVulnerabilities
@@ -607,6 +576,40 @@ def get_provenance_origin(self, request: Dict) -> Optional[Dict]:
607576
'Sending data for provenance origin decoration (rqId: {rqId})...',
608577
)
609578

579+
def get_crypto_algorithms_for_purl(self, request: Dict) -> Optional[Dict]:
580+
"""
581+
Client function to call the rpc for GetAlgorithms for a list of purls
582+
583+
Args:
584+
request (Dict): PurlRequest
585+
586+
Returns:
587+
Optional[Dict]: AlgorithmResponse, or None if the request was not successfull
588+
"""
589+
return self._call_rpc(
590+
self.crypto_stub.GetAlgorithms,
591+
request,
592+
PurlRequest,
593+
'Sending data for cryptographic algorithms decoration (rqId: {rqId})...',
594+
)
595+
596+
def get_crypto_algorithms_in_range_for_purl(self, request: Dict) -> Optional[Dict]:
597+
"""
598+
Client function to call the rpc for GetAlgorithmsInRange for a list of purls
599+
600+
Args:
601+
request (Dict): PurlRequest
602+
603+
Returns:
604+
Optional[Dict]: AlgorithmsInRangeResponse, or None if the request was not successfull
605+
"""
606+
return self._call_rpc(
607+
self.crypto_stub.GetAlgorithmsInRange,
608+
request,
609+
PurlRequest,
610+
'Sending data for cryptographic algorithms in range decoration (rqId: {rqId})...',
611+
)
612+
610613
def load_generic_headers(self):
611614
"""
612615
Adds custom headers from req_headers to metadata.

src/scanoss/utils/file.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ def validate_json_file(json_file_path: str) -> JsonValidation:
4949
json_file_path (str): The JSON file to validate
5050
5151
Returns:
52-
Tuple[bool, str]: A tuple containing a boolean indicating if the file is valid and a message
53-
"""
52+
JsonValidation: A JsonValidation object containing a boolean indicating if the file is valid, the data, error, and error code
53+
""" # noqa: E501
5454
if not json_file_path:
5555
return JsonValidation(is_valid=False, error='No JSON file specified')
5656
if not os.path.isfile(json_file_path):

0 commit comments

Comments
 (0)