importarrayimportloggingimportposixpathimportwarningsfromcollections.abcimportMutableMappingfromfunctoolsimportcached_propertyfromfsspec.coreimporturl_to_fslogger=logging.getLogger("fsspec.mapping")[docs]classFSMap(MutableMapping):"""Wrap a FileSystem instance as a mutable wrapping. The keys of the mapping become files under the given root, and the values (which must be bytes) the contents of those files. Parameters ---------- root: string prefix for all the files fs: FileSystem instance check: bool (=True) performs a touch at the location, to check for write access. Examples -------- >>> fs = FileSystem(**parameters) # doctest: +SKIP >>> d = FSMap('my-data/path/', fs) # doctest: +SKIP or, more likely >>> d = fs.get_mapper('my-data/path/') >>> d['loc1'] = b'Hello World' # doctest: +SKIP >>> list(d.keys()) # doctest: +SKIP ['loc1'] >>> d['loc1'] # doctest: +SKIP b'Hello World' """def__init__(self,root,fs,check=False,create=False,missing_exceptions=None):self.fs=fsself.root=fs._strip_protocol(root)self._root_key_to_str=fs._strip_protocol(posixpath.join(root,"x"))[:-1]ifmissing_exceptionsisNone:missing_exceptions=(FileNotFoundError,IsADirectoryError,NotADirectoryError,)self.missing_exceptions=missing_exceptionsself.check=checkself.create=createifcreate:ifnotself.fs.exists(root):self.fs.mkdir(root)ifcheck:ifnotself.fs.exists(root):raiseValueError(f"Path{root} does not exist. Create "f" with the ``create=True`` keyword")self.fs.touch(root+"/a")self.fs.rm(root+"/a")@cached_propertydefdirfs(self):"""dirfs instance that can be used with the same keys as the mapper"""from.implementations.dirfsimportDirFileSystemreturnDirFileSystem(path=self._root_key_to_str,fs=self.fs)[docs]defclear(self):"""Remove all keys below root - empties out mapping"""logger.info("Clear mapping at%s",self.root)try:self.fs.rm(self.root,True)self.fs.mkdir(self.root)except:# noqa: E722pass [docs]defgetitems(self,keys,on_error="raise"):"""Fetch multiple items from the store If the backend is async-able, this might proceed concurrently Parameters ---------- keys: list(str) They keys to be fetched on_error : "raise", "omit", "return" If raise, an underlying exception will be raised (converted to KeyError if the type is in self.missing_exceptions); if omit, keys with exception will simply not be included in the output; if "return", all keys are included in the output, but the value will be bytes or an exception instance. Returns ------- dict(key, bytes|exception) """keys2=[self._key_to_str(k)forkinkeys]oe=on_errorifon_error=="raise"else"return"try:out=self.fs.cat(keys2,on_error=oe)ifisinstance(out,bytes):out={keys2[0]:out}exceptself.missing_exceptionsase:raiseKeyErrorfromeout={k:(KeyError()ifisinstance(v,self.missing_exceptions)elsev)fork,vinout.items()}return{key:out[k2]ifon_error=="raise"elseout.get(k2,KeyError(k2))forkey,k2inzip(keys,keys2)ifon_error=="return"ornotisinstance(out[k2],BaseException)} [docs]defsetitems(self,values_dict):"""Set the values of multiple items in the store Parameters ---------- values_dict: dict(str, bytes) """values={self._key_to_str(k):maybe_convert(v)fork,vinvalues_dict.items()}self.fs.pipe(values) [docs]defdelitems(self,keys):"""Remove multiple keys from the store"""self.fs.rm([self._key_to_str(k)forkinkeys]) def_key_to_str(self,key):"""Generate full path for the key"""ifnotisinstance(key,str):# raise TypeError("key must be of type `str`, got `{type(key).__name__}`"warnings.warn("from fsspec 2023.5 onward FSMap non-str keys will raise TypeError",DeprecationWarning,)ifisinstance(key,list):key=tuple(key)key=str(key)returnf"{self._root_key_to_str}{key}".rstrip("/")def_str_to_key(self,s):"""Strip path of to leave key name"""returns[len(self.root):].lstrip("/")def__getitem__(self,key,default=None):"""Retrieve data"""k=self._key_to_str(key)try:result=self.fs.cat(k)exceptself.missing_exceptionsasexc:ifdefaultisnotNone:returndefaultraiseKeyError(key)fromexcreturnresult[docs]defpop(self,key,default=None):"""Pop data"""result=self.__getitem__(key,default)try:delself[key]exceptKeyError:passreturnresult def__setitem__(self,key,value):"""Store value in key"""key=self._key_to_str(key)self.fs.mkdirs(self.fs._parent(key),exist_ok=True)self.fs.pipe_file(key,maybe_convert(value))def__iter__(self):return(self._str_to_key(x)forxinself.fs.find(self.root))def__len__(self):returnlen(self.fs.find(self.root))def__delitem__(self,key):"""Remove key"""try:self.fs.rm(self._key_to_str(key))exceptExceptionasexc:raiseKeyErrorfromexcdef__contains__(self,key):"""Does key exist in mapping?"""path=self._key_to_str(key)returnself.fs.isfile(path)def__reduce__(self):returnFSMap,(self.root,self.fs,False,False,self.missing_exceptions) defmaybe_convert(value):ifisinstance(value,array.array)orhasattr(value,"__array__"):# bytes-like thingsifhasattr(value,"dtype")andvalue.dtype.kindin"Mm":# The buffer interface doesn't support datetime64/timdelta64 numpy# arraysvalue=value.view("int64")value=bytes(memoryview(value))returnvalue[docs]defget_mapper(url="",check=False,create=False,missing_exceptions=None,alternate_root=None,**kwargs,):"""Create key-value interface for given URL and options The URL will be of the form "protocol://location" and point to the root of the mapper required. All keys will be file-names below this location, and their values the contents of each key. Also accepts compound URLs like zip::s3://bucket/file.zip , see ``fsspec.open``. Parameters ---------- url: str Root URL of mapping check: bool Whether to attempt to read from the location before instantiation, to check that the mapping does exist create: bool Whether to make the directory corresponding to the root before instantiating missing_exceptions: None or tuple If given, these exception types will be regarded as missing keys and return KeyError when trying to read data. By default, you get (FileNotFoundError, IsADirectoryError, NotADirectoryError) alternate_root: None or str In cases of complex URLs, the parser may fail to pick the correct part for the mapper root, so this arg can override Returns ------- ``FSMap`` instance, the dict-like key-value store. """# Removing protocol here - could defer to each open() on the backendfs,urlpath=url_to_fs(url,**kwargs)root=alternate_rootifalternate_rootisnotNoneelseurlpathreturnFSMap(root,fs,check,create,missing_exceptions=missing_exceptions)