Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: Special-case suffix requests in obstore backend to support Azure#2994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Conversation

@kylebarron
Copy link
Contributor

Fix for issue raised here:pydata/xarray#10228

The obstore backend performs suffix requests to read sharded Zarr files. Azure does not support suffix requests and the underlyingobject_store Rust codeimmediately errors if a suffix request is performed against Azure.

The workaround probably shouldn't go inobject_store orobstore, because we don't want to silently perform two requests when the user asks for one. It's better for the user of those libraries to know that they have to opt-in to two requests on Azure. So I think it makes sense to have this workaround go here.

Note that this extra head request could be avoided if Zarr metadata already stored the length of each file, but I don't think that's true.

TODO:

  • Add unit tests and/or doctests in docstrings
  • Add docstrings and API docs for any new/modified user-facing classes and functions
  • New/modified features documented indocs/user-guide/*.rst
  • Changes documented as a new file inchanges/
  • GitHub Actions have all passed
  • Test coverage is 100% (Codecov passes)

maxrjones, TomNicholas, and joshmoore reacted with heart emoji
@kylebarronkylebarron changed the titlefix: Special-case suffix requests in obstore backendfix: Special-case suffix requests in obstore backend to support AzureApr 17, 2025
@github-actionsgithub-actionsbot added the needs release notesAutomatically applied to PRs which haven't added release notes labelApr 17, 2025
@d-v-b
Copy link
Contributor

I'm fine merging this without tests, because I think writing tests for this would be kind of cumbersome in our test suite as it is today (let me know if I'm wrong here). Longer term we should figure out how to make it easy to test this kind of thing.

kylebarron reacted with thumbs up emoji

@kylebarron
Copy link
ContributorAuthor

Indeed I'm not entirely sure how to test it. We'd need to at least mock a backend that doesn't support suffix requests. Maybe that wouldn't be too hard to do?

@TomAugspurger
Copy link
Contributor

There's adocker container for the Azurite storage emulator that supports most non-auth things.

However, I think since the test suite doesn't already use docker it's probably not worth setting up just for this.

d-v-b reacted with thumbs up emoji

Copy link
Contributor

@d-v-bd-v-b left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I can't test this myself, but I trust that this fix resolves the linked issue. If that turns out to be wrong, then it's all the more reason to beef up our remote store testing infrastructure in a separate PR.

kylebarron reacted with thumbs up emoji
@kylebarron
Copy link
ContributorAuthor

FWIW I didn't test this either. Maybe@lsim-aegeri can test from this branch?

@lsim-aegeri
Copy link

FWIW I didn't test this either. Maybe@lsim-aegeri can test from this branch?

I will test later today!

@lsim-aegeri
Copy link

I'm afraid I'm still getting the error. I double checked the file you modified in my .venv and confirmed I am using a version with the changes you made.

Current traceback:

---------------------------------------------------------------------------NotSupportedErrorTraceback (mostrecentcalllast)CellIn[46],line5148ds_n=xr.open_zarr(objstore_xr,consolidated=False)50# However, I get the error when loading the chunks into memory--->51ds_n.compute()File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/dataset.py:714,inDataset.compute(self,**kwargs)690"""Manually trigger loading and/or computation of this dataset's data    691 from disk or a remote source into memory and return a new dataset.    692 Unlike load, the original dataset is left unaltered.   (...)    711 dask.compute    712 """713new=self.copy(deep=False)-->714returnnew.load(**kwargs)File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/dataset.py:541,inDataset.load(self,**kwargs)538chunkmanager=get_chunked_array_type(*lazy_data.values())540# evaluate all the chunked arrays simultaneously-->541evaluated_data:tuple[np.ndarray[Any,Any], ...]=chunkmanager.compute(542*lazy_data.values(),**kwargs543 )545fork,datainzip(lazy_data,evaluated_data,strict=False):546self.variables[k].data=dataFile~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py:85,inDaskManager.compute(self,*data,**kwargs)80defcompute(81self,*data:Any,**kwargs:Any82 )->tuple[np.ndarray[Any,_DType_co], ...]:83fromdask.arrayimportcompute--->85returncompute(*data,**kwargs)File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/dask/base.py:656,incompute(traverse,optimize_graph,scheduler,get,*args,**kwargs)653postcomputes.append(x.__dask_postcompute__())655withshorten_traceback():-->656results=schedule(dsk,keys,**kwargs)658returnrepack([f(r,*a)forr, (f,a)inzip(results,postcomputes)])File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/indexing.py:574,inImplicitToExplicitIndexingAdapter.__array__(self,dtype,copy)570def__array__(571self,dtype:np.typing.DTypeLike=None,/,*,copy:bool|None=None572 )->np.ndarray:573ifVersion(np.__version__)>=Version("2.0.0"):-->574returnnp.asarray(self.get_duck_array(),dtype=dtype,copy=copy)575else:576returnnp.asarray(self.get_duck_array(),dtype=dtype)File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/indexing.py:579,inImplicitToExplicitIndexingAdapter.get_duck_array(self)578defget_duck_array(self):-->579returnself.array.get_duck_array()File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/indexing.py:790,inCopyOnWriteArray.get_duck_array(self)789defget_duck_array(self):-->790returnself.array.get_duck_array()File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/indexing.py:653,inLazilyIndexedArray.get_duck_array(self)649array=apply_indexer(self.array,self.key)650else:651# If the array is not an ExplicitlyIndexedNDArrayMixin,652# it may wrap a BackendArray so use its __getitem__-->653array=self.array[self.key]655# self.array[self.key] is now a numpy array when656# self.array is a BackendArray subclass657# and self.key is BasicIndexer((slice(None, None, None),))658# so we need the explicit check for ExplicitlyIndexed659ifisinstance(array,ExplicitlyIndexed):File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/backends/zarr.py:223,inZarrArrayWrapper.__getitem__(self,key)221elifisinstance(key,indexing.OuterIndexer):222method=self._oindex-->223returnindexing.explicit_indexing_adapter(224key,array.shape,indexing.IndexingSupport.VECTORIZED,method225 )File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/core/indexing.py:1014,inexplicit_indexing_adapter(key,shape,indexing_support,raw_indexing_method)992"""Support explicit indexing by delegating to a raw indexing method.    993    994 Outer and/or vectorized indexers are supported by indexing a second time   (...)   1011 Indexing result, in the form of a duck numpy-array.   1012 """1013raw_key,numpy_indices=decompose_indexer(key,shape,indexing_support)->1014result=raw_indexing_method(raw_key.tuple)1015ifnumpy_indices.tuple:1016# index the loaded duck array1017indexable=as_indexable(result)File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/xarray/backends/zarr.py:213,inZarrArrayWrapper._getitem(self,key)212def_getitem(self,key):-->213returnself._array[key]File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/array.py:2430,inArray.__getitem__(self,selection)2428returnself.vindex[cast(CoordinateSelection|MaskSelection,selection)]2429elifis_pure_orthogonal_indexing(pure_selection,self.ndim):->2430returnself.get_orthogonal_selection(pure_selection,fields=fields)2431else:2432returnself.get_basic_selection(cast(BasicSelection,pure_selection),fields=fields)File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/_compat.py:43,in_deprecate_positional_args.<locals>._inner_deprecate_positional_args.<locals>.inner_f(*args,**kwargs)41extra_args=len(args)-len(all_args)42ifextra_args<=0:--->43returnf(*args,**kwargs)45# extra_args > 046args_msg= [47f"{name}={arg}"48forname,arginzip(kwonly_args[:extra_args],args[-extra_args:],strict=False)49 ]File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/array.py:2872,inArray.get_orthogonal_selection(self,selection,out,fields,prototype)2870prototype=default_buffer_prototype()2871indexer=OrthogonalIndexer(selection,self.shape,self.metadata.chunk_grid)->2872returnsync(2873self._async_array._get_selection(2874indexer=indexer,out=out,fields=fields,prototype=prototype2875     )2876 )File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/sync.py:163,insync(coro,loop,timeout)160return_result=next(iter(finished)).result()162ifisinstance(return_result,BaseException):-->163raisereturn_result164else:165returnreturn_resultFile~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/sync.py:119,in_runner(coro)114"""    115 Await a coroutine and return the result of running it. If awaiting the coroutine raises an    116 exception, the exception will be returned.    117 """118try:-->119returnawaitcoro120exceptExceptionasex:121returnexFile~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/array.py:1289,inAsyncArray._get_selection(self,indexer,prototype,out,fields)1286_config=replace(_config,order=self.metadata.order)1288# reading chunks and decoding them->1289awaitself.codec_pipeline.read(1290         [1291             (1292self.store_path/self.metadata.encode_chunk_key(chunk_coords),1293self.metadata.get_chunk_spec(chunk_coords,_config,prototype=prototype),1294chunk_selection,1295out_selection,1296is_complete_chunk,1297             )1298forchunk_coords,chunk_selection,out_selection,is_complete_chunkinindexer1299         ],1300out_buffer,1301drop_axes=indexer.drop_axes,1302     )1303ifisinstance(indexer,BasicIndexer)andindexer.shape== ():1304returnout_buffer.as_scalar()File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/codec_pipeline.py:464,inBatchedCodecPipeline.read(self,batch_info,out,drop_axes)458asyncdefread(459self,460batch_info:Iterable[tuple[ByteGetter,ArraySpec,SelectorTuple,SelectorTuple,bool]],461out:NDBuffer,462drop_axes:tuple[int, ...]= (),463 )->None:-->464awaitconcurrent_map(465         [466             (single_batch_info,out,drop_axes)467forsingle_batch_infoinbatched(batch_info,self.batch_size)468         ],469self.read_batch,470config.get("async.concurrency"),471     )File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/common.py:68,inconcurrent_map(items,func,limit)65asyncwithsem:66returnawaitfunc(*item)--->68returnawaitasyncio.gather(*[asyncio.ensure_future(run(item))foriteminitems])File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/common.py:66,inconcurrent_map.<locals>.run(item)64asyncdefrun(item:tuple[Any])->V:65asyncwithsem:--->66returnawaitfunc(*item)File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/codec_pipeline.py:251,inBatchedCodecPipeline.read_batch(self,batch_info,out,drop_axes)244asyncdefread_batch(245self,246batch_info:Iterable[tuple[ByteGetter,ArraySpec,SelectorTuple,SelectorTuple,bool]],247out:NDBuffer,248drop_axes:tuple[int, ...]= (),249 )->None:250ifself.supports_partial_decode:-->251chunk_array_batch=awaitself.decode_partial_batch(252             [253                 (byte_getter,chunk_selection,chunk_spec)254forbyte_getter,chunk_spec,chunk_selection,*_inbatch_info255             ]256         )257forchunk_array, (_,chunk_spec,_,out_selection,_)inzip(258chunk_array_batch,batch_info,strict=False259         ):260ifchunk_arrayisnotNone:File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/codec_pipeline.py:207,inBatchedCodecPipeline.decode_partial_batch(self,batch_info)205assertself.supports_partial_decode206assertisinstance(self.array_bytes_codec,ArrayBytesCodecPartialDecodeMixin)-->207returnawaitself.array_bytes_codec.decode_partial(batch_info)File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/abc/codec.py:198,inArrayBytesCodecPartialDecodeMixin.decode_partial(self,batch_info)178asyncdefdecode_partial(179self,180batch_info:Iterable[tuple[ByteGetter,SelectorTuple,ArraySpec]],181 )->Iterable[NDBuffer|None]:182"""Partially decodes a batch of chunks.    183     This method determines parts of a chunk from the slice selection,    184     fetches these parts from the store (via ByteGetter) and decodes them.   (...)    196     Iterable[NDBuffer | None]    197     """-->198returnawaitconcurrent_map(199list(batch_info),200self._decode_partial_single,201config.get("async.concurrency"),202     )File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/common.py:68,inconcurrent_map(items,func,limit)65asyncwithsem:66returnawaitfunc(*item)--->68returnawaitasyncio.gather(*[asyncio.ensure_future(run(item))foriteminitems])File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/core/common.py:66,inconcurrent_map.<locals>.run(item)64asyncdefrun(item:tuple[Any])->V:65asyncwithsem:--->66returnawaitfunc(*item)File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/codecs/sharding.py:506,inShardingCodec._decode_partial_single(self,byte_getter,selection,shard_spec)503shard_dict=shard_dict_maybe504else:505# read some chunks within the shard-->506shard_index=awaitself._load_shard_index_maybe(byte_getter,chunks_per_shard)507ifshard_indexisNone:508returnNoneFile~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/codecs/sharding.py:718,inShardingCodec._load_shard_index_maybe(self,byte_getter,chunks_per_shard)713index_bytes=awaitbyte_getter.get(714prototype=numpy_buffer_prototype(),715byte_range=RangeByteRequest(0,shard_index_size),716     )717else:-->718index_bytes=awaitbyte_getter.get(719prototype=numpy_buffer_prototype(),byte_range=SuffixByteRequest(shard_index_size)720     )721ifindex_bytesisnotNone:722returnawaitself._decode_shard_index(index_bytes,chunks_per_shard)File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/storage/_common.py:124,inStorePath.get(self,prototype,byte_range)122ifprototypeisNone:123prototype=default_buffer_prototype()-->124returnawaitself.store.get(self.path,prototype=prototype,byte_range=byte_range)File~/code/lsimpfendoerfer/xr-test/.venv/lib/python3.11/site-packages/zarr/storage/_obstore.py:109,inObjectStore.get(self,key,prototype,byte_range)107returnprototype.buffer.from_bytes(awaitresp.bytes_async())# type: ignore[arg-type]108elifisinstance(byte_range,SuffixByteRequest):-->109resp=awaitobs.get_async(110self.store,key,options={"range": {"suffix":byte_range.suffix}}111     )112returnprototype.buffer.from_bytes(awaitresp.bytes_async())# type: ignore[arg-type]113else:NotSupportedError:Operationnotsupported:AzuredoesnotsupportsuffixrangerequestsDebugsource:NotSupported {source:"Azure does not support suffix range requests",}

Code I'm running:

importxarrayasxrimportnumpyasnpimportzarrfromzarr.storageimportObjectStorefromobstore.storeimportAzureStoreobjstore=ObjectStore(store=AzureStore(container_name=CONTAINER,prefix="xr-test/test_shards.zarr-v3",account_name=ACCOUNT,sas_key=SAS,    ))# Reading sharded array with zarr-python works as expectedroot=zarr.create_group(store=objstore,zarr_format=3,overwrite=True)z1=root.create_array(name='foo',shape=(10000,10000),shards=(2000,2000),chunks=(1000,1000),dtype='int32')z1[:]=np.random.randint(0,100,size=(10000,10000))root_read=zarr.open_group(store=objstore,zarr_format=3,mode='r')root_read['foo'][:]# Writing to xarray with shards also worksds=xr.Dataset(    {"foo":xr.DataArray(root_read['foo'][:],dims=['x','y'])},)objstore_xr=ObjectStore(store=AzureStore(container_name=CONTAINER,prefix="xr-test/test_shards_xr.zarr-v3",account_name=ACCOUNT,sas_key=SAS,    ))ds.to_zarr(objstore_xr,mode='w',consolidated=False,zarr_format=3,encoding={'foo': {'chunks': (1000,1000),'shards': (2000,2000)}})# Opening the dataset also works as expectedds_n=xr.open_zarr(objstore_xr,consolidated=False)# However, I get the error when loading the chunks into memoryds_n.compute()

@kylebarron
Copy link
ContributorAuthor

Oh, indeed, this PR so far only fixes themulti-fetch API inget_partial_values and didn't fix the main API inget. Give me a sec

@kylebarron
Copy link
ContributorAuthor

@lsim-aegeri can you try once more?

@lsim-aegeri
Copy link

It worked this time, thank you so much for fixing this! Do you know if this will be included in the next pypi release?

Also, I'm pretty sure this is also an issue when reading a sharded zarr with fsspec. Would you feel comfortable fixing that as well? I'll be fine usingobstore but it might save someone else some trouble.

kylebarron reacted with heart emoji

@kylebarron
Copy link
ContributorAuthor

It worked this time, thank you so much for fixing this! Do you know if this will be included in the next pypi release?

I'm not a primary Zarr maintainer, so I can't say for sure, but I think it's likely. TheObjectStore class isn't in a pypi zarr release yet, is it? I thought it was going to be released in zarr 3.1?

Also, I'm pretty sure this is also an issue when reading a sharded zarr with fsspec. Would you feel comfortable fixing that as well? I'll be fine usingobstore but it might save someone else some trouble.

Can you create a new issue for that? I'm not familiar with the fsspec backend myself.

@TomAugspurgerTomAugspurger merged commita0761ac intozarr-developers:mainApr 21, 2025
28 of 30 checks passed
@TomAugspurger
Copy link
Contributor

Thanks@kylebarron, and thanks for testing@lsim-aegeri.

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@TomAugspurgerTomAugspurgerTomAugspurger approved these changes

@d-v-bd-v-bd-v-b approved these changes

Assignees

No one assigned

Labels

needs release notesAutomatically applied to PRs which haven't added release notes

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

4 participants

@kylebarron@d-v-b@TomAugspurger@lsim-aegeri

[8]ページ先頭

©2009-2025 Movatter.jp