| # Copyright 2023 The Chromium Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Helper functions for dealing with .zip files.""" |
| |
| import os |
| import pathlib |
| import posixpath |
| import stat |
| import time |
| import zipfile |
| |
| _FIXED_ZIP_HEADER_LEN=30 |
| |
| |
| def _set_alignment(zip_obj, zip_info, alignment): |
| """Sets a ZipInfo's extra field such that the file will be aligned. |
| |
| Args: |
| zip_obj: The ZipFile object that is being written. |
| zip_info: The ZipInfo object about to be written. |
| alignment: The amount of alignment (e.g. 4, or 4*1024). |
| """ |
| header_size= _FIXED_ZIP_HEADER_LEN+ len(zip_info.filename) |
| pos= zip_obj.fp.tell()+ header_size |
| padding_needed=(alignment-(pos% alignment))% alignment |
| |
| # Python writes |extra| to both the local file header and the central |
| # directory's file header. Android's zipalign tool writes only to the |
| # local file header, so there is more overhead in using Python to align. |
| zip_info.extra= b'\0'* padding_needed |
| |
| |
| def _hermetic_date_time(timestamp=None): |
| ifnot timestamp: |
| return(2001,1,1,0,0,0) |
| utc_time= time.gmtime(timestamp) |
| return(utc_time.tm_year, utc_time.tm_mon, utc_time.tm_mday, utc_time.tm_hour, |
| utc_time.tm_min, utc_time.tm_sec) |
| |
| |
| def add_to_zip_hermetic(zip_file, |
| zip_path, |
| *, |
| src_path=None, |
| data=None, |
| compress=None, |
| alignment=None, |
| timestamp=None): |
| """Adds a file to the given ZipFile with a hard-coded modified time. |
| |
| Args: |
| zip_file: ZipFile instance to add the file to. |
| zip_path: Destination path within the zip file (or ZipInfo instance). |
| src_path: Path of the source file. Mutually exclusive with |data|. |
| data: File data as a string. |
| compress: Whether to enable compression. Default is taken from ZipFile |
| constructor. |
| alignment: If set, align the data of the entry to this many bytes. |
| timestamp: The last modification date and time for the archive member. |
| """ |
| assert(src_pathisNone)!=(dataisNone),( |
| '|src_path| and |data| are mutually exclusive.') |
| if isinstance(zip_path, zipfile.ZipInfo): |
| zipinfo= zip_path |
| zip_path= zipinfo.filename |
| else: |
| zipinfo= zipfile.ZipInfo(filename=zip_path) |
| zipinfo.external_attr=0o644<<16 |
| |
| zipinfo.date_time= _hermetic_date_time(timestamp) |
| |
| if alignment: |
| _set_alignment(zip_file, zipinfo, alignment) |
| |
| # Filenames can contain backslashes, but it is more likely that we've |
| # forgotten to use forward slashes as a directory separator. |
| assert'\\'notin zip_path,'zip_path should not contain \\: '+ zip_path |
| assertnot posixpath.isabs(zip_path),'Absolute zip path: '+ zip_path |
| assertnot zip_path.startswith('..'),'Should not start with ..: '+ zip_path |
| assert posixpath.normpath(zip_path)== zip_path,( |
| f'Non-canonical zip_path: {zip_path} vs: {posixpath.normpath(zip_path)}') |
| assert zip_pathnotin zip_file.namelist(),( |
| 'Tried to add a duplicate zip entry: '+ zip_path) |
| |
| if src_pathand os.path.islink(src_path): |
| zipinfo.external_attr|= stat.S_IFLNK<<16# mark as a symlink |
| zip_file.writestr(zipinfo, os.readlink(src_path)) |
| return |
| |
| # Maintain the executable bit. |
| if src_path: |
| st= os.stat(src_path) |
| for modein(stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH): |
| if st.st_mode& mode: |
| zipinfo.external_attr|= mode<<16 |
| |
| if src_path: |
| with open(src_path,'rb')as f: |
| data= f.read() |
| |
| # zipfile will deflate even when it makes the file bigger. To avoid |
| # growing files, disable compression at an arbitrary cut off point. |
| if len(data)<16: |
| compress=False |
| |
| # None converts to ZIP_STORED, when passed explicitly rather than the |
| # default passed to the ZipFile constructor. |
| compress_type= zip_file.compression |
| if compressisnotNone: |
| compress_type= zipfile.ZIP_DEFLATEDif compresselse zipfile.ZIP_STORED |
| zip_file.writestr(zipinfo, data, compress_type) |
| |
| |
| def add_files_to_zip(inputs, |
| output, |
| *, |
| base_dir=None, |
| path_transform=None, |
| compress=None, |
| zip_prefix_path=None, |
| timestamp=None): |
| """Creates a zip file from a list of files. |
| |
| Args: |
| inputs: A list of paths to zip, or a list of (zip_path, fs_path) tuples. |
| output: Path, fileobj, or ZipFile instance to add files to. |
| base_dir: Prefix to strip from inputs. |
| path_transform: Called for each entry path. Returns a new zip path, or None |
| to skip the file. |
| compress: Whether to compress |
| zip_prefix_path: Path prepended to file path in zip file. |
| timestamp: Unix timestamp to use for files in the archive. |
| """ |
| if base_dirisNone: |
| base_dir='.' |
| input_tuples=[] |
| for tupin inputs: |
| if isinstance(tup, str): |
| src_path= tup |
| zip_path= os.path.relpath(src_path, base_dir) |
| # Zip files always use / as path separator. |
| if os.path.sep!= posixpath.sep: |
| zip_path= str(pathlib.Path(zip_path).as_posix()) |
| tup=(zip_path, src_path) |
| input_tuples.append(tup) |
| |
| # Sort by zip path to ensure stable zip ordering. |
| input_tuples.sort(key=lambda tup: tup[0]) |
| |
| out_zip= output |
| ifnot isinstance(output, zipfile.ZipFile): |
| out_zip= zipfile.ZipFile(output,'w') |
| |
| try: |
| for zip_path, fs_pathin input_tuples: |
| if zip_prefix_path: |
| zip_path= posixpath.join(zip_prefix_path, zip_path) |
| if path_transform: |
| zip_path= path_transform(zip_path) |
| if zip_pathisNone: |
| continue |
| add_to_zip_hermetic(out_zip, |
| zip_path, |
| src_path=fs_path, |
| compress=compress, |
| timestamp=timestamp) |
| finally: |
| if outputisnot out_zip: |
| out_zip.close() |
| |
| |
| def zip_directory(output, base_dir,**kwargs): |
| """Zips all files in the given directory.""" |
| inputs=[] |
| for root, _, filesin os.walk(base_dir): |
| for fin files: |
| inputs.append(os.path.join(root, f)) |
| |
| add_files_to_zip(inputs, output, base_dir=base_dir,**kwargs) |
| |
| |
| def merge_zips(output, input_zips, path_transform=None, compress=None): |
| """Combines all files from |input_zips| into |output|. |
| |
| Args: |
| output: Path, fileobj, or ZipFile instance to add files to. |
| input_zips: Iterable of paths to zip files to merge. |
| path_transform: Called for each entry path. Returns a new zip path, or None |
| to skip the file. |
| compress: Overrides compression setting from origin zip entries. |
| """ |
| assertnot isinstance(input_zips, str)# Easy mistake to make. |
| if isinstance(output, zipfile.ZipFile): |
| out_zip= output |
| out_filename= output.filename |
| else: |
| assert isinstance(output, str),'Was: '+ repr(output) |
| out_zip= zipfile.ZipFile(output,'w') |
| out_filename= output |
| |
| # Include paths in the existing zip here to avoid adding duplicate files. |
| crc_by_name={i.filename:(out_filename, i.CRC)for iin out_zip.infolist()} |
| |
| try: |
| for in_filein input_zips: |
| with zipfile.ZipFile(in_file,'r')as in_zip: |
| for infoin in_zip.infolist(): |
| # Ignore directories. |
| if info.filename[-1]=='/': |
| continue |
| if path_transform: |
| dst_name= path_transform(info.filename) |
| if dst_nameisNone: |
| continue |
| else: |
| dst_name= info.filename |
| |
| data= in_zip.read(info) |
| |
| # If there's a duplicate file, ensure contents is the same and skip |
| # adding it multiple times. |
| if dst_namein crc_by_name: |
| orig_filename, orig_crc= crc_by_name[dst_name] |
| new_crc= zipfile.crc32(data) |
| if new_crc== orig_crc: |
| continue |
| msg= f"""File appeared in multiple inputs with differing contents. |
| File: {dst_name} |
| Input1: {orig_filename} |
| Input2: {in_file}""" |
| raiseException(msg) |
| |
| if compressisnotNone: |
| compress_entry= compress |
| else: |
| compress_entry= info.compress_type!= zipfile.ZIP_STORED |
| add_to_zip_hermetic(out_zip, |
| dst_name, |
| data=data, |
| compress=compress_entry) |
| crc_by_name[dst_name]=(in_file, out_zip.getinfo(dst_name).CRC) |
| finally: |
| if outputisnot out_zip: |
| out_zip.close() |