@@ -69,13 +69,47 @@ def collect_dists(packages_dir: Path) -> list[Path]:
6969return dist_paths
7070
7171
72- def attest_dist (dist_path :Path ,signer :Signer )-> None :
72+ def assert_attestations_do_not_pre_exist (
73+ dist_to_attestation_map :dict [Path ,Path ],
74+ )-> None :
75+ existing_attestations = {
76+ f'*{ dist !s} ->{ dist_attestation !s} '
77+ for dist ,dist_attestation in dist_to_attestation_map .items ()
78+ if dist_attestation .exists ()
79+ }
80+ if not existing_attestations :
81+ return
82+
83+ existing_attestations_list = '\n ' .join (map (str ,existing_attestations ))
84+ error_message = (
85+ 'The following distributions already have publish attestations:'
86+ f'{ existing_attestations_list } ' ,
87+ )
88+ die (error_message )
89+
90+
91+ def compose_attestation_mapping (dist_paths :list [Path ])-> dict [Path ,Path ]:
92+ dist_to_attestation_map = {
93+ dist_path :dist_path .with_suffix (
94+ f'{ dist_path .suffix } .publish.attestation' ,
95+ )
96+ for dist_path in dist_paths
97+ }
98+
7399# We are the publishing step, so there should be no pre-existing publish
74100# attestation. The presence of one indicates user confusion.
75- attestation_path = Path ( f' { dist_path } . publish.attestation' )
76- if attestation_path . exists ():
77- die ( f' { dist_path } already has a publish attestation: { attestation_path } ' )
101+ # Make sure there's no publish attestations on disk.
102+ # We do this up-front to prevent partial signing.
103+ assert_attestations_do_not_pre_exist ( dist_to_attestation_map )
78104
105+ return dist_to_attestation_map
106+
107+
108+ def attest_dist (
109+ dist_path :Path ,
110+ attestation_path :Path ,
111+ signer :Signer ,
112+ )-> None :
79113dist = Distribution .from_file (dist_path )
80114attestation = Attestation .sign (signer ,dist )
81115
@@ -92,7 +126,9 @@ def get_identity_token() -> IdentityToken:
92126
93127
94128def main ()-> None :
95- packages_dir = Path (sys .argv [1 ])
129+ dist_to_attestation_map = compose_attestation_mapping (
130+ collect_dists (Path (sys .argv [1 ])),
131+ )
96132
97133try :
98134identity = get_identity_token ()
@@ -103,12 +139,10 @@ def main() -> None:
103139# since permissions can't be to blame at this stage.
104140die (_TOKEN_RETRIEVAL_FAILED_MESSAGE .format (identity_error = identity_error ))
105141
106- dist_paths = collect_dists (packages_dir )
107-
108- with SigningContext .production ().signer (identity ,cache = True )as s :
109- debug (f'attesting to dists:{ dist_paths } ' )
110- for dist_path in dist_paths :
111- attest_dist (dist_path ,s )
142+ with SigningContext .production ().signer (identity ,cache = True )as signer :
143+ debug (f'attesting to dists:{ dist_to_attestation_map .keys ()} ' )
144+ for dist_path ,attestation_path in dist_to_attestation_map .items ():
145+ attest_dist (dist_path ,attestation_path ,signer )
112146
113147
114148if __name__ == '__main__' :