Skip to content

💅 Bundle attestation existence check together #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,47 @@ def collect_dists(packages_dir: Path) -> list[Path]:
return dist_paths


def attest_dist(dist_path: Path, signer: Signer) -> None:
def assert_attestations_do_not_pre_exist(
dist_to_attestation_map: dict[Path, Path],
) -> None:
existing_attestations = {
f'* {dist !s} -> {dist_attestation !s}'
for dist, dist_attestation in dist_to_attestation_map.items()
if dist_attestation.exists()
}
if not existing_attestations:
return

existing_attestations_list = '\n'.join(map(str, existing_attestations))
error_message = (
'The following distributions already have publish attestations:'
f'{existing_attestations_list}',
)
die(error_message)


def compose_attestation_mapping(dist_paths: list[Path]) -> dict[Path, Path]:
dist_to_attestation_map = {
dist_path: dist_path.with_suffix(
f'{dist_path.suffix}.publish.attestation',
)
for dist_path in dist_paths
}

# We are the publishing step, so there should be no pre-existing publish
# attestation. The presence of one indicates user confusion.
attestation_path = Path(f'{dist_path}.publish.attestation')
if attestation_path.exists():
die(f'{dist_path} already has a publish attestation: {attestation_path}')
# Make sure there's no publish attestations on disk.
# We do this up-front to prevent partial signing.
assert_attestations_do_not_pre_exist(dist_to_attestation_map)

return dist_to_attestation_map


def attest_dist(
dist_path: Path,
attestation_path: Path,
signer: Signer,
) -> None:
dist = Distribution.from_file(dist_path)
attestation = Attestation.sign(signer, dist)

Expand All @@ -92,7 +126,9 @@ def get_identity_token() -> IdentityToken:


def main() -> None:
packages_dir = Path(sys.argv[1])
dist_to_attestation_map = compose_attestation_mapping(
collect_dists(Path(sys.argv[1])),
)

try:
identity = get_identity_token()
Expand All @@ -103,12 +139,10 @@ def main() -> None:
# since permissions can't be to blame at this stage.
die(_TOKEN_RETRIEVAL_FAILED_MESSAGE.format(identity_error=identity_error))

dist_paths = collect_dists(packages_dir)

with SigningContext.production().signer(identity, cache=True) as s:
debug(f'attesting to dists: {dist_paths}')
for dist_path in dist_paths:
attest_dist(dist_path, s)
with SigningContext.production().signer(identity, cache=True) as signer:
debug(f'attesting to dists: {dist_to_attestation_map.keys()}')
for dist_path, attestation_path in dist_to_attestation_map.items():
attest_dist(dist_path, attestation_path, signer)


if __name__ == '__main__':
Expand Down
Loading