Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions securesystemslib/gpg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@
import functools
import logging
import os
import shlex
import subprocess # nosec

from securesystemslib import process
from typing import List

log = logging.getLogger(__name__)

GPG_TIMEOUT = 10


@functools.lru_cache(maxsize=3)
def is_available_gnupg(gnupg: str) -> bool:
def is_available_gnupg(gnupg: str, timeout=GPG_TIMEOUT) -> bool:
"""Returns whether gnupg points to a gpg binary."""
gpg_version_cmd = gnupg + " --version"
gpg_version_cmd = shlex.split(f"{gnupg} --version")
try:
process.run(gpg_version_cmd, stdout=process.PIPE, stderr=process.PIPE)
subprocess.run( # nosec
gpg_version_cmd,
capture_output=True,
timeout=timeout,
check=True,
)
return True
except (OSError, subprocess.TimeoutExpired):
return False
Expand Down Expand Up @@ -61,9 +68,9 @@ def have_gpg() -> bool:
return bool(gpg_command())


def gpg_version_command() -> str:
def gpg_version_command() -> List[str]:
"""Returns the command to get the current GPG version."""
return f"{gpg_command()} --version"
return shlex.split(f"{gpg_command()} --version")


FULLY_SUPPORTED_MIN_VERSION = "2.1.0"
Expand All @@ -73,16 +80,16 @@ def gpg_version_command() -> str:
)


def gpg_sign_command(keyarg: str, homearg: str) -> str:
def gpg_sign_command(keyarg: str, homearg: str) -> List[str]:
"""Returns the command to use GPG to sign STDIN."""
return (
return shlex.split(
f"{gpg_command()} --detach-sign --digest-algo SHA256 {keyarg} {homearg}"
)


def gpg_export_pubkey_command(homearg: str, keyid: str):
def gpg_export_pubkey_command(homearg: str, keyid: str) -> List[str]:
"""Returns the GPG command to export a public key."""
return f"{gpg_command()} {homearg} --export {keyid}"
return shlex.split(f"{gpg_command()} {homearg} --export {keyid}")


# See RFC4880 section 4.3. Packet Tags for a list of all packet types The
Expand Down
30 changes: 21 additions & 9 deletions securesystemslib/gpg/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
verifying signatures.
"""
import logging
import subprocess # nosec
import time

from securesystemslib import exceptions, formats, process
from securesystemslib import exceptions, formats
from securesystemslib.gpg.common import (
get_pubkey_bundle,
parse_signature_packet,
)
from securesystemslib.gpg.constants import (
FULLY_SUPPORTED_MIN_VERSION,
GPG_TIMEOUT,
NO_GPG_MSG,
SHA256,
gpg_export_pubkey_command,
Expand All @@ -40,7 +42,7 @@
NO_CRYPTO_MSG = "GPG support requires the cryptography library"


def create_signature(content, keyid=None, homedir=None):
def create_signature(content, keyid=None, homedir=None, timeout=GPG_TIMEOUT):
"""
<Purpose>
Calls the gpg command line utility to sign the passed content with the key
Expand All @@ -66,6 +68,9 @@ def create_signature(content, keyid=None, homedir=None):
homedir: (optional)
Path to the gpg keyring. If not passed the default keyring is used.

timeout (optional):
gpg command timeout in seconds. Default is 10.

<Exceptions>
securesystemslib.exceptions.FormatError:
If the keyid was passed and does not match
Expand Down Expand Up @@ -121,12 +126,12 @@ def create_signature(content, keyid=None, homedir=None):

command = gpg_sign_command(keyarg=keyarg, homearg=homearg)

gpg_process = process.run(
gpg_process = subprocess.run( # nosec
command,
input=content,
check=False,
stdout=process.PIPE,
stderr=process.PIPE,
capture_output=True,
timeout=timeout,
)

# TODO: It's suggested to take a look at `--status-fd` for proper error
Expand Down Expand Up @@ -261,13 +266,14 @@ def verify_signature(signature_object, pubkey_info, content):
)


def export_pubkey(keyid, homedir=None):
def export_pubkey(keyid, homedir=None, timeout=GPG_TIMEOUT):
"""Exports a public key from a GnuPG keyring.

Arguments:
keyid: An OpenPGP keyid in KEYID_SCHEMA format.
homedir (optional): A path to the GnuPG home directory. If not set the
default GnuPG home directory is used.
timeout (optional): gpg command timeout in seconds. Default is 10.

Raises:
ValueError: Keyid is not a string.
Expand Down Expand Up @@ -307,21 +313,27 @@ def export_pubkey(keyid, homedir=None):
# TODO: Consider adopting command error handling from `create_signature`
# above, e.g. in a common 'run gpg command' utility function
command = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)
gpg_process = process.run(command, stdout=process.PIPE, stderr=process.PIPE)
gpg_process = subprocess.run( # nosec
command,
capture_output=True,
timeout=timeout,
check=True,
)

key_packet = gpg_process.stdout
key_bundle = get_pubkey_bundle(key_packet, keyid)

return key_bundle


def export_pubkeys(keyids, homedir=None):
def export_pubkeys(keyids, homedir=None, timeout=GPG_TIMEOUT):
"""Exports multiple public keys from a GnuPG keyring.

Arguments:
keyids: A list of OpenPGP keyids in KEYID_SCHEMA format.
homedir (optional): A path to the GnuPG home directory. If not set the
default GnuPG home directory is used.
timeout (optional): gpg command timeout in seconds. Default is 10.

Raises:
TypeError: Keyids is not iterable.
Expand All @@ -341,7 +353,7 @@ def export_pubkeys(keyids, homedir=None):
"""
public_key_dict = {}
for gpg_keyid in keyids:
public_key = export_pubkey(gpg_keyid, homedir=homedir)
public_key = export_pubkey(gpg_keyid, homedir=homedir, timeout=timeout)
keyid = public_key["keyid"]
public_key_dict[keyid] = public_key

Expand Down
17 changes: 14 additions & 3 deletions tests/test_gpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import os
import shutil
import subprocess # nosec
import tempfile
import unittest

Expand All @@ -33,7 +34,6 @@
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import serialization

from securesystemslib import process
from securesystemslib.formats import ANY_PUBKEY_DICT_SCHEMA, GPG_PUBKEY_SCHEMA
from securesystemslib.gpg.common import (
_assign_certified_key_info,
Expand All @@ -44,6 +44,7 @@
parse_signature_packet,
)
from securesystemslib.gpg.constants import (
GPG_TIMEOUT,
PACKET_TYPE_PRIMARY_KEY,
PACKET_TYPE_SUB_KEY,
PACKET_TYPE_USER_ATTR,
Expand Down Expand Up @@ -218,14 +219,24 @@ def setUpClass(self): # pylint: disable=bad-classmethod-argument
# erroneous gpg data in tests below.
keyid = "F557D0FF451DEF45372591429EA70BD13D883381"
cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)
proc = process.run(cmd, stdout=process.PIPE, stderr=process.PIPE)
proc = subprocess.run(
cmd,
capture_output=True,
timeout=GPG_TIMEOUT,
check=True,
)
self.raw_key_data = proc.stdout
self.raw_key_bundle = parse_pubkey_bundle(self.raw_key_data)

# Export pubkey bundle with expired key for key expiration tests
keyid = "E8AC80C924116DABB51D4B987CB07D6D2C199C7C"
cmd = gpg_export_pubkey_command(keyid=keyid, homearg=homearg)
proc = process.run(cmd, stdout=process.PIPE, stderr=process.PIPE)
proc = subprocess.run(
cmd,
capture_output=True,
timeout=GPG_TIMEOUT,
check=True,
)
self.raw_expired_key_bundle = parse_pubkey_bundle(proc.stdout)

def test_parse_pubkey_payload_errors(self):
Expand Down