Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- The `Distribution` type and APIs have been added, allowing a user to supply
a pre-computed digest instead of performing I/O
([#34](https://github.com/trailofbits/pypi-attestations/pull/34))

### Changed

- `sign` and `verify` no longer perform I/O
([#34](https://github.com/trailofbits/pypi-attestations/pull/34))


### Fixed

- `verify`: catch another leaky error case
([#32](https://github.com/trailofbits/pypi-attestations/pull/32))


## [0.0.8]

### Fixed
Expand Down
20 changes: 15 additions & 5 deletions src/pypi_attestations/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sigstore.verify import Verifier, policy

from pypi_attestations import Attestation, AttestationError, VerificationError, __version__
from pypi_attestations._impl import Distribution

if typing.TYPE_CHECKING:
from collections.abc import Iterable
Expand Down Expand Up @@ -183,15 +184,19 @@ def _sign(args: argparse.Namespace) -> None:
for file_path in args.files:
_logger.debug(f"Signing {file_path}")

signature_path = Path(f"{file_path}.publish.attestation")
try:
attestation = Attestation.sign(signer, file_path)
dist = Distribution.from_file(file_path)
except ValidationError as e:
_die(f"Invalid Python package distribution: {e}")

try:
attestation = Attestation.sign(signer, dist)
except AttestationError as e:
_die(f"Failed to sign: {e}")

_logger.debug("Attestation saved for %s saved in %s", file_path, signature_path)

signature_path = Path(f"{file_path}.publish.attestation")
signature_path.write_text(attestation.model_dump_json())
_logger.debug("Attestation for %s saved in %s", file_path, signature_path)


def _inspect(args: argparse.Namespace) -> None:
Expand Down Expand Up @@ -267,7 +272,12 @@ def _verify(args: argparse.Namespace) -> None:
_die(f"Invalid attestation ({file_path}): {validation_error}")

try:
attestation.verify(verifier, pol, file_path)
dist = Distribution.from_file(file_path)
except ValidationError as e:
_die(f"Invalid Python package distribution: {e}")

try:
attestation.verify(verifier, pol, dist)
except VerificationError as verification_error:
_logger.error("Verification failed for %s: %s", file_path, verification_error)
continue
Expand Down
83 changes: 42 additions & 41 deletions src/pypi_attestations/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,8 @@
from annotated_types import MinLen # noqa: TCH002
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from packaging.utils import (
InvalidSdistFilename,
InvalidWheelFilename,
parse_sdist_filename,
parse_wheel_filename,
)
from pydantic import Base64Bytes, BaseModel
from packaging.utils import parse_sdist_filename, parse_wheel_filename
from pydantic import Base64Bytes, BaseModel, field_validator
from pydantic_core import ValidationError
from sigstore._utils import _sha256_streaming
from sigstore.dsse import Envelope as DsseEnvelope
Expand All @@ -38,6 +33,34 @@
from sigstore.verify.policy import VerificationPolicy # pragma: no cover


class Distribution(BaseModel):
"""Represents a Python package distribution.

A distribution is identified by its (sdist or wheel) filename, which
provides the package name and version (at a minimum) plus a SHA-256
digest, which uniquely identifies its contents.
"""

name: str
digest: str

@field_validator("name")
@classmethod
def _validate_name(cls, v: str) -> str:
return _ultranormalize_dist_filename(v)

@classmethod
def from_file(cls, dist: Path) -> Distribution:
"""Construct a `Distribution` from the given path."""
name = dist.name
with dist.open(mode="rb", buffering=0) as io:
# Replace this with `hashlib.file_digest()` once
# our minimum supported Python is >=3.11
digest = _sha256_streaming(io).hex()

return cls(name=name, digest=digest)


class AttestationType(str, Enum):
"""Attestation types known to PyPI."""

Expand Down Expand Up @@ -98,32 +121,19 @@ class Attestation(BaseModel):
"""

@classmethod
def sign(cls, signer: Signer, dist: Path) -> Attestation:
"""Create an envelope, with signature, from a distribution file.
def sign(cls, signer: Signer, dist: Distribution) -> Attestation:
"""Create an envelope, with signature, from the given Python distribution.

On failure, raises `AttestationError`.
"""
try:
with dist.open(mode="rb", buffering=0) as io:
# Replace this with `hashlib.file_digest()` once
# our minimum supported Python is >=3.11
digest = _sha256_streaming(io).hex()
except OSError as e:
raise AttestationError(str(e))

try:
name = _ultranormalize_dist_filename(dist.name)
except (ValueError, InvalidWheelFilename, InvalidSdistFilename) as e:
raise AttestationError(str(e))

try:
stmt = (
_StatementBuilder()
.subjects(
[
_Subject(
name=name,
digest=_DigestSet(root={"sha256": digest}),
name=dist.name,
digest=_DigestSet(root={"sha256": dist.digest}),
)
]
)
Expand All @@ -144,19 +154,15 @@ def sign(cls, signer: Signer, dist: Path) -> Attestation:
raise AttestationError(str(e))

def verify(
self, verifier: Verifier, policy: VerificationPolicy, dist: Path
self,
verifier: Verifier,
policy: VerificationPolicy,
dist: Distribution,
) -> tuple[str, dict[str, Any] | None]:
"""Verify against an existing Python artifact.

Returns a tuple of the in-toto predicate type and optional deserialized JSON predicate.
"""Verify against an existing Python distribution.

On failure, raises an appropriate subclass of `AttestationError`.
"""
with dist.open(mode="rb", buffering=0) as io:
# Replace this with `hashlib.file_digest()` once
# our minimum supported Python is >=3.11
expected_digest = _sha256_streaming(io).hex()

bundle = self.to_bundle()
try:
type_, payload = verifier.verify_dsse(bundle, policy)
Expand Down Expand Up @@ -184,18 +190,13 @@ def verify(
except ValueError as e:
raise VerificationError(f"invalid subject: {str(e)}")

try:
normalized = _ultranormalize_dist_filename(dist.name)
except ValueError as e:
raise VerificationError(f"invalid distribution name: {str(e)}")

if subject_name != normalized:
if subject_name != dist.name:
raise VerificationError(
f"subject does not match distribution name: {subject_name} != {normalized}"
f"subject does not match distribution name: {subject_name} != {dist.name}"
)

digest = subject.digest.root.get("sha256")
if digest is None or digest != expected_digest:
if digest is None or digest != dist.digest:
raise VerificationError("subject does not match distribution digest")

try:
Expand Down
Loading