# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""FileSystem abstraction to interact with various local and remote filesystems."""frompyarrow.utilimport_is_path_like,_stringify_pathfrompyarrow._fsimport(# noqaFileSelector,FileType,FileInfo,FileSystem,LocalFileSystem,SubTreeFileSystem,_MockFileSystem,FileSystemHandler,PyFileSystem,_copy_files,_copy_files_selector,)# For backward compatibility.FileStats=FileInfo_not_imported=[]try:frompyarrow._azurefsimportAzureFileSystem# noqaexceptImportError:_not_imported.append("AzureFileSystem")try:frompyarrow._hdfsimportHadoopFileSystem# noqaexceptImportError:_not_imported.append("HadoopFileSystem")try:frompyarrow._gcsfsimportGcsFileSystem# noqaexceptImportError:_not_imported.append("GcsFileSystem")try:frompyarrow._s3fsimport(# noqaAwsDefaultS3RetryStrategy,AwsStandardS3RetryStrategy,S3FileSystem,S3LogLevel,S3RetryStrategy,ensure_s3_initialized,finalize_s3,ensure_s3_finalized,initialize_s3,resolve_s3_region)exceptImportError:_not_imported.append("S3FileSystem")else:# GH-38364: we don't initialize S3 eagerly as that could lead# to crashes at shutdown even when S3 isn't used.# Instead, S3 is initialized lazily using `ensure_s3_initialized`# in assorted places.importatexitatexit.register(ensure_s3_finalized)def__getattr__(name):ifnamein_not_imported:raiseImportError("The pyarrow installation is not built with support for "f"'{name}'")raiseAttributeError(f"module 'pyarrow.fs' has no attribute '{name}'")def_ensure_filesystem(filesystem,*,use_mmap=False):ifisinstance(filesystem,FileSystem):returnfilesystemelifisinstance(filesystem,str):# create a filesystem from a URI string, note that the `path` part of the URI# is treated as a prefix if specified, so the filesystem is wrapped in a# SubTreeFileSystemifuse_mmap:raiseValueError("Specifying to use memory mapping not supported for ""filesystem specified as an URI string")fs,path=FileSystem.from_uri(filesystem)prefix=fs.normalize_path(path)ifprefix:# validate that the prefix is pointing to a directoryprefix_info=fs.get_file_info([prefix])[0]ifprefix_info.type!=FileType.Directory:raiseValueError("The path component of the filesystem URI must point to a "f"directory but it has a type: `{prefix_info.type.name}`. The path "f"component is `{prefix_info.path}` and the given filesystem URI "f"is `{filesystem}`")fs=SubTreeFileSystem(prefix,fs)returnfselse:# handle fsspec-compatible filesystemstry:importfsspecexceptImportError:passelse:ifisinstance(filesystem,fsspec.AbstractFileSystem):iftype(filesystem).__name__=='LocalFileSystem':# In case its a simple LocalFileSystem, use native arrow onereturnLocalFileSystem(use_mmap=use_mmap)returnPyFileSystem(FSSpecHandler(filesystem))raiseTypeError(f"Unrecognized filesystem:{type(filesystem)}. `filesystem` argument must ""be a FileSystem instance or a valid file system URI")def_resolve_filesystem_and_path(path,filesystem=None,*,memory_map=False):""" Return filesystem/path from path which could be an URI or a plain filesystem path or a combination of fsspec protocol and URI. """ifnot_is_path_like(path):iffilesystemisnotNone:raiseValueError("'filesystem' passed but the specified path is file-like, so"" there is nothing to open with 'filesystem'.")returnfilesystem,pathiffilesystemisnotNone:filesystem=_ensure_filesystem(filesystem,use_mmap=memory_map)ifisinstance(filesystem,LocalFileSystem):path=_stringify_path(path)elifnotisinstance(path,str):raiseTypeError("Expected string path; path-like objects are only allowed ""with a local filesystem")path=filesystem.normalize_path(path)returnfilesystem,pathpath=_stringify_path(path)# if filesystem is not given, try to automatically determine one# first check if the file exists as a local (relative) file path# if not then try to parse the path as an URIfilesystem=LocalFileSystem(use_mmap=memory_map)try:file_info=filesystem.get_file_info(path)exceptValueError:# ValueError means path is likely an URIfile_info=Noneexists_locally=Falseelse:exists_locally=(file_info.type!=FileType.NotFound)# if the file or directory doesn't exists locally, then assume that# the path is an URI describing the file system as wellifnotexists_locally:try:filesystem,path=FileSystem.from_uri(path)exceptValueErrorase:msg=str(e)if"empty scheme"inmsgor"Cannot parse URI"inmsg:# neither an URI nor a locally existing path, so assume that# local path was given and propagate a nicer file not found# error instead of a more confusing scheme parsing errorpasselse:raiseeelse:path=filesystem.normalize_path(path)returnfilesystem,path[docs]defcopy_files(source,destination,source_filesystem=None,destination_filesystem=None,*,chunk_size=1024*1024,use_threads=True):""" Copy files between FileSystems. This functions allows you to recursively copy directories of files from one file system to another, such as from S3 to your local machine. Parameters ---------- source : string Source file path or URI to a single file or directory. If a directory, files will be copied recursively from this path. destination : string Destination file path or URI. If `source` is a file, `destination` is also interpreted as the destination file (not directory). Directories will be created as necessary. source_filesystem : FileSystem, optional Source filesystem, needs to be specified if `source` is not a URI, otherwise inferred. destination_filesystem : FileSystem, optional Destination filesystem, needs to be specified if `destination` is not a URI, otherwise inferred. chunk_size : int, default 1MB The maximum size of block to read before flushing to the destination file. A larger chunk_size will use more memory while copying but may help accommodate high latency FileSystems. use_threads : bool, default True Whether to use multiple threads to accelerate copying. Examples -------- Inspect an S3 bucket's files: >>> s3, path = fs.FileSystem.from_uri( ... "s3://registry.opendata.aws/roda/ndjson/") >>> selector = fs.FileSelector(path) >>> s3.get_file_info(selector) [<FileInfo for 'registry.opendata.aws/roda/ndjson/index.ndjson':...] Copy one file from S3 bucket to a local directory: >>> fs.copy_files("s3://registry.opendata.aws/roda/ndjson/index.ndjson", ... f"file:///{local_path}/index_copy.ndjson") >>> fs.LocalFileSystem().get_file_info(str(local_path)+ ... '/index_copy.ndjson') <FileInfo for '.../index_copy.ndjson': type=FileType.File, size=...> Copy file using a FileSystem object: >>> fs.copy_files("registry.opendata.aws/roda/ndjson/index.ndjson", ... f"file:///{local_path}/index_copy.ndjson", ... source_filesystem=fs.S3FileSystem()) """source_fs,source_path=_resolve_filesystem_and_path(source,source_filesystem)destination_fs,destination_path=_resolve_filesystem_and_path(destination,destination_filesystem)file_info=source_fs.get_file_info(source_path)iffile_info.type==FileType.Directory:source_sel=FileSelector(source_path,recursive=True)_copy_files_selector(source_fs,source_sel,destination_fs,destination_path,chunk_size,use_threads)else:_copy_files(source_fs,source_path,destination_fs,destination_path,chunk_size,use_threads) [docs]classFSSpecHandler(FileSystemHandler):""" Handler for fsspec-based Python filesystems. https://filesystem-spec.readthedocs.io/en/latest/index.html Parameters ---------- fs : FSSpec-compliant filesystem instance Examples -------- >>> PyFileSystem(FSSpecHandler(fsspec_fs)) # doctest: +SKIP """[docs]def__init__(self,fs):self.fs=fs def__eq__(self,other):ifisinstance(other,FSSpecHandler):returnself.fs==other.fsreturnNotImplementeddef__ne__(self,other):ifisinstance(other,FSSpecHandler):returnself.fs!=other.fsreturnNotImplemented[docs]defget_type_name(self):protocol=self.fs.protocolifisinstance(protocol,list):protocol=protocol[0]returnf"fsspec+{protocol}" [docs]defnormalize_path(self,path):returnpath @staticmethoddef_create_file_info(path,info):size=info["size"]ifinfo["type"]=="file":ftype=FileType.Fileelifinfo["type"]=="directory":ftype=FileType.Directory# some fsspec filesystems include a file size for directoriessize=Noneelse:ftype=FileType.UnknownreturnFileInfo(path,ftype,size=size,mtime=info.get("mtime",None))[docs]defget_file_info(self,paths):infos=[]forpathinpaths:try:info=self.fs.info(path)exceptFileNotFoundError:infos.append(FileInfo(path,FileType.NotFound))else:infos.append(self._create_file_info(path,info))returninfos [docs]defget_file_info_selector(self,selector):ifnotself.fs.isdir(selector.base_dir):ifself.fs.exists(selector.base_dir):raiseNotADirectoryError(selector.base_dir)else:ifselector.allow_not_found:return[]else:raiseFileNotFoundError(selector.base_dir)ifselector.recursive:maxdepth=Noneelse:maxdepth=1infos=[]selected_files=self.fs.find(selector.base_dir,maxdepth=maxdepth,withdirs=True,detail=True)forpath,infoinselected_files.items():_path=path.strip("/")base_dir=selector.base_dir.strip("/")# Need to exclude base directory from selected files if present# (fsspec filesystems, see GH-37555)if_path!=base_dir:infos.append(self._create_file_info(path,info))returninfos [docs]defcreate_dir(self,path,recursive):# mkdir also raises FileNotFoundError when base directory is not foundtry:self.fs.mkdir(path,create_parents=recursive)exceptFileExistsError:pass [docs]defdelete_dir(self,path):self.fs.rm(path,recursive=True) def_delete_dir_contents(self,path,missing_dir_ok):try:subpaths=self.fs.listdir(path,detail=False)exceptFileNotFoundError:ifmissing_dir_ok:returnraiseforsubpathinsubpaths:ifself.fs.isdir(subpath):self.fs.rm(subpath,recursive=True)elifself.fs.isfile(subpath):self.fs.rm(subpath)[docs]defdelete_dir_contents(self,path,missing_dir_ok):ifpath.strip("/")=="":raiseValueError("delete_dir_contents called on path '",path,"'")self._delete_dir_contents(path,missing_dir_ok) [docs]defdelete_root_dir_contents(self):self._delete_dir_contents("/",False) [docs]defdelete_file(self,path):# fs.rm correctly raises IsADirectoryError when `path` is a directory# instead of a file and `recursive` is not set to Trueifnotself.fs.exists(path):raiseFileNotFoundError(path)self.fs.rm(path) [docs]defmove(self,src,dest):self.fs.mv(src,dest,recursive=True) [docs]defcopy_file(self,src,dest):# fs.copy correctly raises IsADirectoryError when `src` is a directory# instead of a fileself.fs.copy(src,dest) # TODO can we read/pass metadata (e.g. Content-Type) in the methods below?[docs]defopen_input_stream(self,path):frompyarrowimportPythonFileifnotself.fs.isfile(path):raiseFileNotFoundError(path)returnPythonFile(self.fs.open(path,mode="rb"),mode="r") [docs]defopen_input_file(self,path):frompyarrowimportPythonFileifnotself.fs.isfile(path):raiseFileNotFoundError(path)returnPythonFile(self.fs.open(path,mode="rb"),mode="r") [docs]defopen_output_stream(self,path,metadata):frompyarrowimportPythonFilereturnPythonFile(self.fs.open(path,mode="wb"),mode="w") [docs]defopen_append_stream(self,path,metadata):frompyarrowimportPythonFilereturnPythonFile(self.fs.open(path,mode="ab"),mode="w")