diff --git a/CHANGELOG.md b/CHANGELOG.md index 47faf79..d658ba4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- The `Distribution` type and APIs have been added, allowing a user to supply + a pre-computed digest instead of performing I/O + ([#34](https://github.com/trailofbits/pypi-attestations/pull/34)) + +### Changed + +- `sign` and `verify` no longer perform I/O + ([#34](https://github.com/trailofbits/pypi-attestations/pull/34)) + + +### Fixed + +- `verify`: catch another leaky error case + ([#32](https://github.com/trailofbits/pypi-attestations/pull/32)) + + ## [0.0.8] ### Fixed diff --git a/src/pypi_attestations/_cli.py b/src/pypi_attestations/_cli.py index ddff7e0..4fdc791 100644 --- a/src/pypi_attestations/_cli.py +++ b/src/pypi_attestations/_cli.py @@ -14,6 +14,7 @@ from sigstore.verify import Verifier, policy from pypi_attestations import Attestation, AttestationError, VerificationError, __version__ +from pypi_attestations._impl import Distribution if typing.TYPE_CHECKING: from collections.abc import Iterable @@ -183,15 +184,19 @@ def _sign(args: argparse.Namespace) -> None: for file_path in args.files: _logger.debug(f"Signing {file_path}") - signature_path = Path(f"{file_path}.publish.attestation") try: - attestation = Attestation.sign(signer, file_path) + dist = Distribution.from_file(file_path) + except ValidationError as e: + _die(f"Invalid Python package distribution: {e}") + + try: + attestation = Attestation.sign(signer, dist) except AttestationError as e: _die(f"Failed to sign: {e}") - _logger.debug("Attestation saved for %s saved in %s", file_path, signature_path) - + signature_path = Path(f"{file_path}.publish.attestation") signature_path.write_text(attestation.model_dump_json()) + _logger.debug("Attestation for %s saved in %s", file_path, signature_path) def _inspect(args: argparse.Namespace) -> None: @@ -267,7 +272,12 @@ def _verify(args: argparse.Namespace) -> None: _die(f"Invalid attestation ({file_path}): {validation_error}") try: - attestation.verify(verifier, pol, file_path) + dist = Distribution.from_file(file_path) + except ValidationError as e: + _die(f"Invalid Python package distribution: {e}") + + try: + attestation.verify(verifier, pol, dist) except VerificationError as verification_error: _logger.error("Verification failed for %s: %s", file_path, verification_error) continue diff --git a/src/pypi_attestations/_impl.py b/src/pypi_attestations/_impl.py index 2bf4882..d253898 100644 --- a/src/pypi_attestations/_impl.py +++ b/src/pypi_attestations/_impl.py @@ -13,13 +13,8 @@ from annotated_types import MinLen # noqa: TCH002 from cryptography import x509 from cryptography.hazmat.primitives import serialization -from packaging.utils import ( - InvalidSdistFilename, - InvalidWheelFilename, - parse_sdist_filename, - parse_wheel_filename, -) -from pydantic import Base64Bytes, BaseModel +from packaging.utils import parse_sdist_filename, parse_wheel_filename +from pydantic import Base64Bytes, BaseModel, field_validator from pydantic_core import ValidationError from sigstore._utils import _sha256_streaming from sigstore.dsse import Envelope as DsseEnvelope @@ -38,6 +33,34 @@ from sigstore.verify.policy import VerificationPolicy # pragma: no cover +class Distribution(BaseModel): + """Represents a Python package distribution. + + A distribution is identified by its (sdist or wheel) filename, which + provides the package name and version (at a minimum) plus a SHA-256 + digest, which uniquely identifies its contents. + """ + + name: str + digest: str + + @field_validator("name") + @classmethod + def _validate_name(cls, v: str) -> str: + return _ultranormalize_dist_filename(v) + + @classmethod + def from_file(cls, dist: Path) -> Distribution: + """Construct a `Distribution` from the given path.""" + name = dist.name + with dist.open(mode="rb", buffering=0) as io: + # Replace this with `hashlib.file_digest()` once + # our minimum supported Python is >=3.11 + digest = _sha256_streaming(io).hex() + + return cls(name=name, digest=digest) + + class AttestationType(str, Enum): """Attestation types known to PyPI.""" @@ -98,32 +121,19 @@ class Attestation(BaseModel): """ @classmethod - def sign(cls, signer: Signer, dist: Path) -> Attestation: - """Create an envelope, with signature, from a distribution file. + def sign(cls, signer: Signer, dist: Distribution) -> Attestation: + """Create an envelope, with signature, from the given Python distribution. On failure, raises `AttestationError`. """ - try: - with dist.open(mode="rb", buffering=0) as io: - # Replace this with `hashlib.file_digest()` once - # our minimum supported Python is >=3.11 - digest = _sha256_streaming(io).hex() - except OSError as e: - raise AttestationError(str(e)) - - try: - name = _ultranormalize_dist_filename(dist.name) - except (ValueError, InvalidWheelFilename, InvalidSdistFilename) as e: - raise AttestationError(str(e)) - try: stmt = ( _StatementBuilder() .subjects( [ _Subject( - name=name, - digest=_DigestSet(root={"sha256": digest}), + name=dist.name, + digest=_DigestSet(root={"sha256": dist.digest}), ) ] ) @@ -144,19 +154,15 @@ def sign(cls, signer: Signer, dist: Path) -> Attestation: raise AttestationError(str(e)) def verify( - self, verifier: Verifier, policy: VerificationPolicy, dist: Path + self, + verifier: Verifier, + policy: VerificationPolicy, + dist: Distribution, ) -> tuple[str, dict[str, Any] | None]: - """Verify against an existing Python artifact. - - Returns a tuple of the in-toto predicate type and optional deserialized JSON predicate. + """Verify against an existing Python distribution. On failure, raises an appropriate subclass of `AttestationError`. """ - with dist.open(mode="rb", buffering=0) as io: - # Replace this with `hashlib.file_digest()` once - # our minimum supported Python is >=3.11 - expected_digest = _sha256_streaming(io).hex() - bundle = self.to_bundle() try: type_, payload = verifier.verify_dsse(bundle, policy) @@ -184,18 +190,13 @@ def verify( except ValueError as e: raise VerificationError(f"invalid subject: {str(e)}") - try: - normalized = _ultranormalize_dist_filename(dist.name) - except ValueError as e: - raise VerificationError(f"invalid distribution name: {str(e)}") - - if subject_name != normalized: + if subject_name != dist.name: raise VerificationError( - f"subject does not match distribution name: {subject_name} != {normalized}" + f"subject does not match distribution name: {subject_name} != {dist.name}" ) digest = subject.digest.root.get("sha256") - if digest is None or digest != expected_digest: + if digest is None or digest != dist.digest: raise VerificationError("subject does not match distribution digest") try: diff --git a/test/test_impl.py b/test/test_impl.py index e91e6bd..e6c2180 100644 --- a/test/test_impl.py +++ b/test/test_impl.py @@ -1,12 +1,14 @@ """Internal implementation tests.""" import os +from hashlib import sha256 from pathlib import Path import pretend import pypi_attestations._impl as impl import pytest import sigstore +from pydantic import ValidationError from sigstore.dsse import _DigestSet, _StatementBuilder, _Subject from sigstore.models import Bundle from sigstore.oidc import IdentityToken @@ -20,13 +22,34 @@ _HERE = Path(__file__).parent _ASSETS = _HERE / "assets" -artifact_path = _ASSETS / "rfc8785-0.1.2-py3-none-any.whl" -bundle_path = _ASSETS / "rfc8785-0.1.2-py3-none-any.whl.sigstore" -attestation_path = _ASSETS / "rfc8785-0.1.2-py3-none-any.whl.attestation" +dist_path = _ASSETS / "rfc8785-0.1.2-py3-none-any.whl" +dist = impl.Distribution.from_file(dist_path) +dist_bundle_path = _ASSETS / "rfc8785-0.1.2-py3-none-any.whl.sigstore" +dist_attestation_path = _ASSETS / "rfc8785-0.1.2-py3-none-any.whl.attestation" # produced by actions/attest@v1 -gh_signed_artifact_path = _ASSETS / "pypi_attestation_models-0.0.4a2.tar.gz" -gh_signed_bundle_path = _ASSETS / "pypi_attestation_models-0.0.4a2.tar.gz.sigstore" +gh_signed_dist_path = _ASSETS / "pypi_attestation_models-0.0.4a2.tar.gz" +gh_signed_dist = impl.Distribution.from_file(gh_signed_dist_path) +gh_signed_dist_bundle_path = _ASSETS / "pypi_attestation_models-0.0.4a2.tar.gz.sigstore" + + +class TestDistribution: + def test_from_file_nonexistent(self, tmp_path: Path) -> None: + nonexistent = tmp_path / "foo-1.2.3.tar.gz" + with pytest.raises(OSError): + impl.Distribution.from_file(nonexistent) + + def test_invalid_sdist_name(self) -> None: + with pytest.raises(ValidationError, match="Invalid sdist filename"): + impl.Distribution(name="invalid-name.tar.gz", digest=sha256(b"lol").hexdigest()) + + def test_invalid_wheel_name(self) -> None: + with pytest.raises(ValidationError, match="Invalid wheel filename"): + impl.Distribution(name="invalid-name.whl", digest=sha256(b"lol").hexdigest()) + + def test_invalid_unknown_dist(self) -> None: + with pytest.raises(ValidationError, match="unknown distribution format"): + impl.Distribution(name="complete.nonsense", digest=sha256(b"lol").hexdigest()) class TestAttestation: @@ -36,9 +59,9 @@ def test_roundtrip(self, id_token: IdentityToken) -> None: verifier = Verifier.staging() with sign_ctx.signer(id_token) as signer: - attestation = impl.Attestation.sign(signer, artifact_path) + attestation = impl.Attestation.sign(signer, dist) - attestation.verify(verifier, policy.UnsafeNoOp(), artifact_path) + attestation.verify(verifier, policy.UnsafeNoOp(), dist) # converting to a bundle and verifying as a bundle also works bundle = attestation.to_bundle() @@ -46,34 +69,7 @@ def test_roundtrip(self, id_token: IdentityToken) -> None: # converting back also works roundtripped_attestation = impl.Attestation.from_bundle(bundle) - roundtripped_attestation.verify(verifier, policy.UnsafeNoOp(), artifact_path) - - def test_sign_invalid_dist_filename(self, tmp_path: Path) -> None: - bad_dist = tmp_path / "invalid-name.tar.gz" - bad_dist.write_bytes(b"junk") - - with pytest.raises( - impl.AttestationError, - match=r"Invalid sdist filename \(invalid version\): invalid-name\.tar\.gz", - ): - impl.Attestation.sign(pretend.stub(), bad_dist) - - def test_sign_raises_attestation_exception(self, tmp_path: Path) -> None: - non_existing_file = tmp_path / "invalid-name.tar.gz" - with pytest.raises(impl.AttestationError, match="No such file"): - impl.Attestation.sign(pretend.stub(), non_existing_file) - - bad_wheel_filename = tmp_path / "invalid-name.whl" - bad_wheel_filename.write_bytes(b"junk") - - with pytest.raises(impl.AttestationError, match="Invalid wheel filename"): - impl.Attestation.sign(pretend.stub(), bad_wheel_filename) - - bad_sdist_filename = tmp_path / "invalid_name.tar.gz" - bad_sdist_filename.write_bytes(b"junk") - - with pytest.raises(impl.AttestationError, match="Invalid sdist filename"): - impl.Attestation.sign(pretend.stub(), bad_sdist_filename) + roundtripped_attestation.verify(verifier, policy.UnsafeNoOp(), dist) def test_wrong_predicate_raises_exception(self, monkeypatch: pytest.MonkeyPatch) -> None: def dummy_predicate(self_: _StatementBuilder, _: str) -> _StatementBuilder: @@ -83,7 +79,7 @@ def dummy_predicate(self_: _StatementBuilder, _: str) -> _StatementBuilder: monkeypatch.setattr(sigstore.dsse._StatementBuilder, "predicate_type", dummy_predicate) with pytest.raises(impl.AttestationError, match="invalid statement"): - impl.Attestation.sign(pretend.stub(), artifact_path) + impl.Attestation.sign(pretend.stub(), dist) @online def test_expired_certificate( @@ -97,7 +93,7 @@ def in_validity_period(_: IdentityToken) -> bool: sign_ctx = SigningContext.staging() with sign_ctx.signer(id_token, cache=False) as signer: with pytest.raises(impl.AttestationError): - impl.Attestation.sign(signer, artifact_path) + impl.Attestation.sign(signer, dist) @online def test_multiple_signatures( @@ -105,7 +101,7 @@ def test_multiple_signatures( ) -> None: def get_bundle(*_) -> Bundle: # noqa: ANN002 # Duplicate the signature to trigger a Conversion error - bundle = Bundle.from_json(gh_signed_bundle_path.read_bytes()) + bundle = Bundle.from_json(gh_signed_dist_bundle_path.read_bytes()) bundle._inner.dsse_envelope.signatures.append(bundle._inner.dsse_envelope.signatures[0]) return bundle @@ -115,7 +111,7 @@ def get_bundle(*_) -> Bundle: # noqa: ANN002 with pytest.raises(impl.AttestationError): with sign_ctx.signer(id_token) as signer: - impl.Attestation.sign(signer, artifact_path) + impl.Attestation.sign(signer, dist) def test_verify_github_attested(self) -> None: verifier = Verifier.production() @@ -128,10 +124,10 @@ def test_verify_github_attested(self) -> None: ] ) - bundle = Bundle.from_json(gh_signed_bundle_path.read_bytes()) + bundle = Bundle.from_json(gh_signed_dist_bundle_path.read_bytes()) attestation = impl.Attestation.from_bundle(bundle) - predicate_type, predicate = attestation.verify(verifier, pol, gh_signed_artifact_path) + predicate_type, predicate = attestation.verify(verifier, pol, gh_signed_dist) assert predicate_type == "https://docs.pypi.org/attestations/publish/v1" assert predicate == {} @@ -142,8 +138,8 @@ def test_verify(self) -> None: identity="william@yossarian.net", issuer="https://github.com/login/oauth" ) - attestation = impl.Attestation.model_validate_json(attestation_path.read_text()) - predicate_type, predicate = attestation.verify(verifier, pol, artifact_path) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) + predicate_type, predicate = attestation.verify(verifier, pol, dist) assert predicate_type == "https://docs.pypi.org/attestations/publish/v1" assert predicate is None @@ -159,16 +155,18 @@ def test_verify_digest_mismatch(self, tmp_path: Path) -> None: identity="william@yossarian.net", issuer="https://github.com/login/oauth" ) - attestation = impl.Attestation.model_validate_json(attestation_path.read_text()) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) - modified_artifact_path = tmp_path / artifact_path.name - modified_artifact_path.write_bytes(b"nothing") + modified_dist_path = tmp_path / dist_path.name + modified_dist_path.write_bytes(b"nothing") + + modified_dist = impl.Distribution.from_file(modified_dist_path) # attestation has the correct filename, but a mismatching digest. with pytest.raises( impl.VerificationError, match="subject does not match distribution digest" ): - attestation.verify(verifier, pol, modified_artifact_path) + attestation.verify(verifier, pol, modified_dist) def test_verify_filename_mismatch(self, tmp_path: Path) -> None: verifier = Verifier.staging() @@ -177,26 +175,28 @@ def test_verify_filename_mismatch(self, tmp_path: Path) -> None: identity="william@yossarian.net", issuer="https://github.com/login/oauth" ) - attestation = impl.Attestation.model_validate_json(attestation_path.read_text()) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) + + modified_dist_path = tmp_path / "wrong_name-0.1.2-py3-none-any.whl" + modified_dist_path.write_bytes(dist_path.read_bytes()) - modified_artifact_path = tmp_path / "wrong_name-0.1.2-py3-none-any.whl" - modified_artifact_path.write_bytes(artifact_path.read_bytes()) + different_name_dist = impl.Distribution.from_file(modified_dist_path) # attestation has the correct digest, but a mismatching filename. with pytest.raises( impl.VerificationError, match="subject does not match distribution name" ): - attestation.verify(verifier, pol, modified_artifact_path) + attestation.verify(verifier, pol, different_name_dist) def test_verify_policy_mismatch(self) -> None: verifier = Verifier.staging() # Wrong identity. pol = policy.Identity(identity="fake@example.com", issuer="https://github.com/login/oauth") - attestation = impl.Attestation.model_validate_json(attestation_path.read_text()) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match=r"Certificate's SANs do not match"): - attestation.verify(verifier, pol, artifact_path) + attestation.verify(verifier, pol, dist) def test_verify_wrong_envelope(self) -> None: verifier = pretend.stub( @@ -204,10 +204,10 @@ def test_verify_wrong_envelope(self) -> None: ) pol = pretend.stub() - attestation = impl.Attestation.model_validate_json(attestation_path.read_text()) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="expected JSON envelope, got fake-type"): - attestation.verify(verifier, pol, artifact_path) + attestation.verify(verifier, pol, dist) def test_verify_bad_payload(self) -> None: verifier = pretend.stub( @@ -217,10 +217,10 @@ def test_verify_bad_payload(self) -> None: ) pol = pretend.stub() - attestation = impl.Attestation.model_validate_json(attestation_path.read_text()) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="invalid statement"): - attestation.verify(verifier, pol, artifact_path) + attestation.verify(verifier, pol, dist) def test_verify_too_many_subjects(self) -> None: statement = ( @@ -246,10 +246,10 @@ def test_verify_too_many_subjects(self) -> None: ) pol = pretend.stub() - attestation = impl.Attestation.model_validate_json(attestation_path.read_text()) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="too many subjects in statement"): - attestation.verify(verifier, pol, artifact_path) + attestation.verify(verifier, pol, dist) def test_verify_subject_missing_name(self) -> None: statement = ( @@ -274,10 +274,10 @@ def test_verify_subject_missing_name(self) -> None: ) pol = pretend.stub() - attestation = impl.Attestation.model_validate_json(attestation_path.read_text()) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="invalid subject: missing name"): - attestation.verify(verifier, pol, artifact_path) + attestation.verify(verifier, pol, dist) def test_verify_subject_invalid_name(self) -> None: statement = ( @@ -305,45 +305,10 @@ def test_verify_subject_invalid_name(self) -> None: ) pol = pretend.stub() - attestation = impl.Attestation.model_validate_json(attestation_path.read_text()) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="invalid subject: Invalid wheel filename"): - attestation.verify(verifier, pol, artifact_path) - - def test_verify_distribution_invalid_name(self, tmp_path: Path) -> None: - statement = ( - _StatementBuilder() # noqa: SLF001 - .subjects( - [ - _Subject( - name=artifact_path.name, - digest=_DigestSet(root={"sha256": "abcd"}), - ), - ] - ) - .predicate_type("foo") - .build() - ._inner.model_dump_json() - ) - - verifier = pretend.stub( - verify_dsse=pretend.call_recorder( - lambda bundle, policy: ( - "application/vnd.in-toto+json", - statement.encode(), - ) - ) - ) - pol = pretend.stub() - - attestation = impl.Attestation.model_validate_json(attestation_path.read_text()) - bad_artifact = tmp_path / "bad.whl" - bad_artifact.write_bytes(artifact_path.read_bytes()) - - with pytest.raises( - impl.VerificationError, match="invalid distribution name: Invalid wheel filename" - ): - attestation.verify(verifier, pol, bad_artifact) + attestation.verify(verifier, pol, dist) def test_verify_unknown_attestation_type(self) -> None: statement = ( @@ -377,14 +342,14 @@ def test_verify_unknown_attestation_type(self) -> None: ) pol = pretend.stub() - attestation = impl.Attestation.model_validate_json(attestation_path.read_text()) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_text()) with pytest.raises(impl.VerificationError, match="unknown attestation type: foo"): - attestation.verify(verifier, pol, artifact_path) + attestation.verify(verifier, pol, dist) def test_from_bundle_missing_signatures() -> None: - bundle = Bundle.from_json(bundle_path.read_bytes()) + bundle = Bundle.from_json(dist_bundle_path.read_bytes()) bundle._inner.dsse_envelope.signatures = [] # noqa: SLF001 with pytest.raises(impl.ConversionError, match="expected exactly one signature, got 0"): @@ -392,7 +357,7 @@ def test_from_bundle_missing_signatures() -> None: def test_to_bundle_invalid_cert() -> None: - attestation = impl.Attestation.model_validate_json(attestation_path.read_bytes()) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_bytes()) attestation.verification_material.certificate = b"foo" with pytest.raises(impl.ConversionError, match="invalid X.509 certificate"): @@ -400,7 +365,7 @@ def test_to_bundle_invalid_cert() -> None: def test_to_bundle_invalid_tlog_entry() -> None: - attestation = impl.Attestation.model_validate_json(attestation_path.read_bytes()) + attestation = impl.Attestation.model_validate_json(dist_attestation_path.read_bytes()) attestation.verification_material.transparency_entries[0].clear() with pytest.raises(impl.ConversionError, match="invalid transparency log entry"):