Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork33.7k
Description
Bug report
Bug description:
I ran into a data corruption bug that seems to be triggered by interleaving reads/seeks from different files inside of an uncompressed zip file. As far as I can tell from the docs, this is allowed byzipfile. It works correctly in Python 3.7 and 3.9, but fails in 3.12.
I'm attaching a somewhat convoluted testcase (still working on a simpler one). It parses a dBase IV database by reading records from a .dbf file, and for each record, reading a corresponding record from a .dbt file.
When run using Python 3.9, you will see a bunch of data printed out. When run using Python 3.12, you will get an exceptionValueError: Invalid dBase IV block: b'PK\x03\x04\n\x00\x00\x00'. That block does not appear in the input file at all. (Though, when tested with a larger input, I got a block of bytes that appeared in thewrong file.)
For some context,here is a workaround I used in my project: I changed it to read the .dbf file first, then the .dbt.
Testcase:
#!/usr/bin/env python3importdatetimeimportpathlibimportstructimportzipfilefromdataclassesimportdataclassfromtypingimportAny,BinaryIO,List,TupleZIP_PATH=pathlib.Path(__file__).parent/'notams.zip'@dataclassclassDbfHeader:SIZE=32VERSION=3info:intlast_update:datetime.datenum_records:intheader_bytes:intrecord_bytes:int@classmethoddeffrom_bytes(cls,data:bytes):info,year,month,day,num_records,header_bytes,record_bytes=struct.unpack('<4BIHH20x',data)version=info&0x3ifversion!=cls.VERSION:raiseValueError(f"Unsupported DBF version:{version}")returncls(info,datetime.date(year+1900,month,day),num_records,header_bytes,record_bytes)@dataclassclassDbfField:SIZE=32name:strtype:strlength:int@classmethoddeffrom_bytes(cls,data:bytes):name,typ,length=struct.unpack('<11sc4xB15x',data)returncls(name.rstrip(b'\x00').decode(),typ.decode(),length)classDbfFile:@classmethoddefread_header(cls,fd:BinaryIO)->Tuple[DbfHeader,List[DbfField]]:header=DbfHeader.from_bytes(fd.read(DbfHeader.SIZE))num_fields= (header.header_bytes-33)//32fields= [DbfField.from_bytes(fd.read(DbfField.SIZE))for_inrange(num_fields)]iffd.read(1)!=b'\x0D':raiseValueError("Missing array terminator")returnheader,fields@classmethoddefread_record(cls,fd:BinaryIO,fields:List[DbfField])->List[Any]:fd.read(1)values= []forfieldinfields:data=fd.read(field.length).decode('latin-1').strip(' ')iffield.type=='C':value=dataeliffield.type=='D':s=data.strip(' ')ifs:value=datetime.datetime.strptime(data,'%Y%m%d').date()else:value=Noneeliffield.type=='L':iflen(data)!=1:raiseValueError(f"Incorrect length:{data!r}")ifdatain'YyTt':value=Trueelifdatain'NnFf':value=Falseelifdata=='?':value=Noneelse:raiseValueError(f"Incorrect boolean:{data!r}")eliffield.typein ('M','N'):value=int(data)ifdataelseNoneelse:raiseValueError(f"Unsupported field:{field.type}")values.append(value)returnvalues@dataclassclassDbtHeader:SIZE=512next_free_block:intdbf_filename:strreserved:intblock_length:int@classmethoddeffrom_bytes(cls,data:bytes):next_free_block,dbf_filename,reserved,block_length=struct.unpack('<I4x8sIH490x',data)returncls(next_free_block,dbf_filename.decode('latin-1'),reserved,block_length)classDbtFile:DBT3_BLOCK_SIZE=512DBT4_BLOCK_START=b'\xFF\xFF\x08\x00'@classmethoddefread_header(cls,fd:BinaryIO)->DbtHeader:fd.seek(0)block=fd.read(DbtHeader.SIZE)returnDbtHeader.from_bytes(block)@classmethoddefread_record(cls,fd:BinaryIO,header:DbtHeader,idx:int)->str:fd.seek(header.block_length*idx)block_start=fd.read(8)ifblock_start[0:4]!=cls.DBT4_BLOCK_START:raiseValueError(f"Invalid dBase IV block:{block_start}")length=int.from_bytes(block_start[4:8],'little')data=fd.read(length-len(block_start))returndata.decode('latin-1')defmain():withzipfile.ZipFile(ZIP_PATH)asz:withz.open('notams.dbf')asdbf_in,z.open('notams.dbt')asdbt_in:dbf_header,dbf_fields=DbfFile.read_header(dbf_in)dbt_header=DbtFile.read_header(dbt_in)for_inrange(dbf_header.num_records):record=DbfFile.read_record(dbf_in,dbf_fields)print(record)memo=DbtFile.read_record(dbt_in,dbt_header,record[3])print(memo)if__name__=='__main__':main()
Input file:
notams.zip
CPython versions tested on:
3.9, 3.12
Operating systems tested on:
Linux
Linked PRs
Metadata
Metadata
Assignees
Labels
Projects
Status