diff --git a/securesystemslib/gpg/constants.py b/securesystemslib/gpg/constants.py index df023428..ea659747 100644 --- a/securesystemslib/gpg/constants.py +++ b/securesystemslib/gpg/constants.py @@ -18,19 +18,26 @@ import functools import logging import os +import shlex import subprocess # nosec - -from securesystemslib import process +from typing import List log = logging.getLogger(__name__) +GPG_TIMEOUT = 10 + @functools.lru_cache(maxsize=3) -def is_available_gnupg(gnupg: str) -> bool: +def is_available_gnupg(gnupg: str, timeout=GPG_TIMEOUT) -> bool: """Returns whether gnupg points to a gpg binary.""" - gpg_version_cmd = gnupg + " --version" + gpg_version_cmd = shlex.split(f"{gnupg} --version") try: - process.run(gpg_version_cmd, stdout=process.PIPE, stderr=process.PIPE) + subprocess.run( # nosec + gpg_version_cmd, + capture_output=True, + timeout=timeout, + check=True, + ) return True except (OSError, subprocess.TimeoutExpired): return False @@ -61,9 +68,9 @@ def have_gpg() -> bool: return bool(gpg_command()) -def gpg_version_command() -> str: +def gpg_version_command() -> List[str]: """Returns the command to get the current GPG version.""" - return f"{gpg_command()} --version" + return shlex.split(f"{gpg_command()} --version") FULLY_SUPPORTED_MIN_VERSION = "2.1.0" @@ -73,16 +80,16 @@ def gpg_version_command() -> str: ) -def gpg_sign_command(keyarg: str, homearg: str) -> str: +def gpg_sign_command(keyarg: str, homearg: str) -> List[str]: """Returns the command to use GPG to sign STDIN.""" - return ( + return shlex.split( f"{gpg_command()} --detach-sign --digest-algo SHA256 {keyarg} {homearg}" ) -def gpg_export_pubkey_command(homearg: str, keyid: str): +def gpg_export_pubkey_command(homearg: str, keyid: str) -> List[str]: """Returns the GPG command to export a public key.""" - return f"{gpg_command()} {homearg} --export {keyid}" + return shlex.split(f"{gpg_command()} {homearg} --export {keyid}") # See RFC4880 section 4.3. Packet Tags for a list of all packet types The diff --git a/securesystemslib/gpg/functions.py b/securesystemslib/gpg/functions.py index c299bea1..1e6d32f8 100644 --- a/securesystemslib/gpg/functions.py +++ b/securesystemslib/gpg/functions.py @@ -16,15 +16,17 @@ verifying signatures. """ import logging +import subprocess # nosec import time -from securesystemslib import exceptions, formats, process +from securesystemslib import exceptions, formats from securesystemslib.gpg.common import ( get_pubkey_bundle, parse_signature_packet, ) from securesystemslib.gpg.constants import ( FULLY_SUPPORTED_MIN_VERSION, + GPG_TIMEOUT, NO_GPG_MSG, SHA256, gpg_export_pubkey_command, @@ -40,7 +42,7 @@ NO_CRYPTO_MSG = "GPG support requires the cryptography library" -def create_signature(content, keyid=None, homedir=None): +def create_signature(content, keyid=None, homedir=None, timeout=GPG_TIMEOUT): """ Calls the gpg command line utility to sign the passed content with the key @@ -66,6 +68,9 @@ def create_signature(content, keyid=None, homedir=None): homedir: (optional) Path to the gpg keyring. If not passed the default keyring is used. + timeout (optional): + gpg command timeout in seconds. Default is 10. + securesystemslib.exceptions.FormatError: If the keyid was passed and does not match @@ -121,12 +126,12 @@ def create_signature(content, keyid=None, homedir=None): command = gpg_sign_command(keyarg=keyarg, homearg=homearg) - gpg_process = process.run( + gpg_process = subprocess.run( # nosec command, input=content, check=False, - stdout=process.PIPE, - stderr=process.PIPE, + capture_output=True, + timeout=timeout, ) # TODO: It's suggested to take a look at `--status-fd` for proper error @@ -261,13 +266,14 @@ def verify_signature(signature_object, pubkey_info, content): ) -def export_pubkey(keyid, homedir=None): +def export_pubkey(keyid, homedir=None, timeout=GPG_TIMEOUT): """Exports a public key from a GnuPG keyring. Arguments: keyid: An OpenPGP keyid in KEYID_SCHEMA format. homedir (optional): A path to the GnuPG home directory. If not set the default GnuPG home directory is used. + timeout (optional): gpg command timeout in seconds. Default is 10. Raises: ValueError: Keyid is not a string. @@ -307,7 +313,12 @@ def export_pubkey(keyid, homedir=None): # TODO: Consider adopting command error handling from `create_signature` # above, e.g. in a common 'run gpg command' utility function command = gpg_export_pubkey_command(keyid=keyid, homearg=homearg) - gpg_process = process.run(command, stdout=process.PIPE, stderr=process.PIPE) + gpg_process = subprocess.run( # nosec + command, + capture_output=True, + timeout=timeout, + check=True, + ) key_packet = gpg_process.stdout key_bundle = get_pubkey_bundle(key_packet, keyid) @@ -315,13 +326,14 @@ def export_pubkey(keyid, homedir=None): return key_bundle -def export_pubkeys(keyids, homedir=None): +def export_pubkeys(keyids, homedir=None, timeout=GPG_TIMEOUT): """Exports multiple public keys from a GnuPG keyring. Arguments: keyids: A list of OpenPGP keyids in KEYID_SCHEMA format. homedir (optional): A path to the GnuPG home directory. If not set the default GnuPG home directory is used. + timeout (optional): gpg command timeout in seconds. Default is 10. Raises: TypeError: Keyids is not iterable. @@ -341,7 +353,7 @@ def export_pubkeys(keyids, homedir=None): """ public_key_dict = {} for gpg_keyid in keyids: - public_key = export_pubkey(gpg_keyid, homedir=homedir) + public_key = export_pubkey(gpg_keyid, homedir=homedir, timeout=timeout) keyid = public_key["keyid"] public_key_dict[keyid] = public_key diff --git a/tests/test_gpg.py b/tests/test_gpg.py index a65919d1..075f0a5a 100644 --- a/tests/test_gpg.py +++ b/tests/test_gpg.py @@ -21,6 +21,7 @@ import os import shutil +import subprocess # nosec import tempfile import unittest @@ -33,7 +34,6 @@ from cryptography.hazmat import backends from cryptography.hazmat.primitives import serialization -from securesystemslib import process from securesystemslib.formats import ANY_PUBKEY_DICT_SCHEMA, GPG_PUBKEY_SCHEMA from securesystemslib.gpg.common import ( _assign_certified_key_info, @@ -44,6 +44,7 @@ parse_signature_packet, ) from securesystemslib.gpg.constants import ( + GPG_TIMEOUT, PACKET_TYPE_PRIMARY_KEY, PACKET_TYPE_SUB_KEY, PACKET_TYPE_USER_ATTR, @@ -218,14 +219,24 @@ def setUpClass(self): # pylint: disable=bad-classmethod-argument # erroneous gpg data in tests below. keyid = "F557D0FF451DEF45372591429EA70BD13D883381" cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg) - proc = process.run(cmd, stdout=process.PIPE, stderr=process.PIPE) + proc = subprocess.run( + cmd, + capture_output=True, + timeout=GPG_TIMEOUT, + check=True, + ) self.raw_key_data = proc.stdout self.raw_key_bundle = parse_pubkey_bundle(self.raw_key_data) # Export pubkey bundle with expired key for key expiration tests keyid = "E8AC80C924116DABB51D4B987CB07D6D2C199C7C" cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg) - proc = process.run(cmd, stdout=process.PIPE, stderr=process.PIPE) + proc = subprocess.run( + cmd, + capture_output=True, + timeout=GPG_TIMEOUT, + check=True, + ) self.raw_expired_key_bundle = parse_pubkey_bundle(proc.stdout) def test_parse_pubkey_payload_errors(self):