# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromcollections.abcimportSequenceimportosfrompyarrow.pandas_compatimport_pandas_api# noqafrompyarrow.libimport(Codec,Table,# noqaconcat_tables,schema)importpyarrow.libasextfrompyarrowimport_featherfrompyarrow._featherimportFeatherError# noqa: F401classFeatherDataset:""" Encapsulates details of reading a list of Feather files. Parameters ---------- path_or_paths : List[str] A list of file names validate_schema : bool, default True Check that individual file schemas are all the same / compatible """def__init__(self,path_or_paths,validate_schema=True):self.paths=path_or_pathsself.validate_schema=validate_schemadefread_table(self,columns=None):""" Read multiple feather files as a single pyarrow.Table Parameters ---------- columns : List[str] Names of columns to read from the file Returns ------- pyarrow.Table Content of the file as a table (of columns) """_fil=read_table(self.paths[0],columns=columns)self._tables=[_fil]self.schema=_fil.schemaforpathinself.paths[1:]:table=read_table(path,columns=columns)ifself.validate_schema:self.validate_schemas(path,table)self._tables.append(table)returnconcat_tables(self._tables)defvalidate_schemas(self,piece,table):ifnotself.schema.equals(table.schema):raiseValueError(f'Schema in{piece} was different.\n'f'{self.schema}\n\nvs\n\n{table.schema}')defread_pandas(self,columns=None,use_threads=True):""" Read multiple Parquet files as a single pandas DataFrame Parameters ---------- columns : List[str] Names of columns to read from the file use_threads : bool, default True Use multiple threads when converting to pandas Returns ------- pandas.DataFrame Content of the file as a pandas DataFrame (of columns) """returnself.read_table(columns=columns).to_pandas(use_threads=use_threads)defcheck_chunked_overflow(name,col):ifcol.num_chunks==1:returnifcol.typein(ext.binary(),ext.string()):raiseValueError(f"Column '{name}' exceeds 2GB maximum capacity of ""a Feather binary column. This restriction may be ""lifted in the future")else:# TODO(wesm): Not sure when else this might be reachedraiseValueError(f"Column '{name}' of type{col.type} was chunked on conversion to Arrow ""and cannot be currently written to Feather format")_FEATHER_SUPPORTED_CODECS={'lz4','zstd','uncompressed'}[docs]defwrite_feather(df,dest,compression=None,compression_level=None,chunksize=None,version=2):""" Write a pandas.DataFrame to Feather format. Parameters ---------- df : pandas.DataFrame or pyarrow.Table Data to write out as Feather format. dest : str Local destination path. compression : string, default None Can be one of {"zstd", "lz4", "uncompressed"}. The default of None uses LZ4 for V2 files if it is available, otherwise uncompressed. compression_level : int, default None Use a compression level particular to the chosen compressor. If None use the default compression level chunksize : int, default None For V2 files, the internal maximum size of Arrow RecordBatch chunks when writing the Arrow IPC file format. None means use the default, which is currently 64K version : int, default 2 Feather file version. Version 2 is the current. Version 1 is the more limited legacy format """if_pandas_api.have_pandas:if(_pandas_api.has_sparseandisinstance(df,_pandas_api.pd.SparseDataFrame)):df=df.to_dense()if_pandas_api.is_data_frame(df):# Feather v1 creates a new column in the resultant Table to# store index information if index type is not RangeIndexifversion==1:preserve_index=Falseelifversion==2:preserve_index=Noneelse:raiseValueError("Version value should either be 1 or 2")table=Table.from_pandas(df,preserve_index=preserve_index)ifversion==1:# Version 1 does not chunkingfori,nameinenumerate(table.schema.names):col=table[i]check_chunked_overflow(name,col)else:table=dfifversion==1:iflen(table.column_names)>len(set(table.column_names)):raiseValueError("cannot serialize duplicate column names")ifcompressionisnotNone:raiseValueError("Feather V1 files do not support compression ""option")ifchunksizeisnotNone:raiseValueError("Feather V1 files do not support chunksize ""option")else:ifcompressionisNoneandCodec.is_available('lz4_frame'):compression='lz4'elif(compressionisnotNoneandcompressionnotin_FEATHER_SUPPORTED_CODECS):raiseValueError(f'compression="{compression}" not supported, must be 'f'one of{_FEATHER_SUPPORTED_CODECS}')try:_feather.write_feather(table,dest,compression=compression,compression_level=compression_level,chunksize=chunksize,version=version)exceptException:ifisinstance(dest,str):try:os.remove(dest)exceptos.error:passraise [docs]defread_feather(source,columns=None,use_threads=True,memory_map=False,**kwargs):""" Read a pandas.DataFrame from Feather format. To read as pyarrow.Table use feather.read_table. Parameters ---------- source : str file path, or file-like object You can use MemoryMappedFile as source, for explicitly use memory map. columns : sequence, optional Only read a specific set of columns. If not provided, all columns are read. use_threads : bool, default True Whether to parallelize reading using multiple threads. If false the restriction is used in the conversion to Pandas as well as in the reading from Feather format. memory_map : boolean, default False Use memory mapping when opening file on disk, when source is a str. **kwargs Additional keyword arguments passed on to `pyarrow.Table.to_pandas`. Returns ------- df : pandas.DataFrame The contents of the Feather file as a pandas.DataFrame """return(read_table(source,columns=columns,memory_map=memory_map,use_threads=use_threads).to_pandas(use_threads=use_threads,**kwargs)) [docs]defread_table(source,columns=None,memory_map=False,use_threads=True):""" Read a pyarrow.Table from Feather format Parameters ---------- source : str file path, or file-like object You can use MemoryMappedFile as source, for explicitly use memory map. columns : sequence, optional Only read a specific set of columns. If not provided, all columns are read. memory_map : boolean, default False Use memory mapping when opening file on disk, when source is a str use_threads : bool, default True Whether to parallelize reading using multiple threads. Returns ------- table : pyarrow.Table The contents of the Feather file as a pyarrow.Table """reader=_feather.FeatherReader(source,use_memory_map=memory_map,use_threads=use_threads)ifcolumnsisNone:returnreader.read()ifnotisinstance(columns,Sequence):raiseTypeError("Columns must be a sequence but, got{}".format(type(columns).__name__))column_types=[type(column)forcolumnincolumns]ifall(map(lambdat:t==int,column_types)):table=reader.read_indices(columns)elifall(map(lambdat:t==str,column_types)):table=reader.read_names(columns)else:column_type_names=[t.__name__fortincolumn_types]raiseTypeError("Columns must be indices or names. "f"Got columns{columns} of types{column_type_names}")# Feather v1 already respects the column selectionifreader.version<3:returntable# Feather v2 reads with sorted / deduplicated selectionelifsorted(set(columns))==columns:returntableelse:# follow exact order / selection of namesreturntable.select(columns)