Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

feat/global concurrency limit#3547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Open
d-v-b wants to merge9 commits intozarr-developers:main
base:main
Choose a base branch
Loading
fromd-v-b:feat/global-concurrency-limit
Open
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletionschanges/3547.misc.md
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
Moved concurrency limits to a global per-event loop setting instead of per-array call.
4 changes: 0 additions & 4 deletionssrc/zarr/abc/codec.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -9,7 +9,6 @@
from zarr.abc.metadata import Metadata
from zarr.core.buffer import Buffer, NDBuffer
from zarr.core.common import NamedConfig, concurrent_map
from zarr.core.config import config

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterable
Expand DownExpand Up@@ -228,7 +227,6 @@ async def decode_partial(
return await concurrent_map(
list(batch_info),
self._decode_partial_single,
config.get("async.concurrency"),
)


Expand DownExpand Up@@ -265,7 +263,6 @@ async def encode_partial(
await concurrent_map(
list(batch_info),
self._encode_partial_single,
config.get("async.concurrency"),
)


Expand DownExpand Up@@ -467,7 +464,6 @@ async def _batching_helper(
return await concurrent_map(
list(batch_info),
_noop_for_none(func),
config.get("async.concurrency"),
)


Expand Down
4 changes: 1 addition & 3 deletionssrc/zarr/abc/store.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -464,11 +464,9 @@ async def getsize_prefix(self, prefix: str) -> int:

# avoid circular import
from zarr.core.common import concurrent_map
from zarr.core.config import config

keys = [(x,) async for x in self.list_prefix(prefix)]
limit = config.get("async.concurrency")
sizes = await concurrent_map(keys, self.getsize, limit=limit)
sizes = await concurrent_map(keys, self.getsize)
return sum(sizes)


Expand Down
4 changes: 0 additions & 4 deletionssrc/zarr/core/array.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -22,7 +22,6 @@
import numpy as np
from typing_extensions import deprecated

import zarr
from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec, Codec
from zarr.abc.numcodec import Numcodec, _is_numcodec
from zarr.codecs._v2 import V2Codec
Expand DownExpand Up@@ -1853,7 +1852,6 @@ async def _delete_key(key: str) -> None:
for chunk_coords in old_chunk_coords.difference(new_chunk_coords)
],
_delete_key,
zarr_config.get("async.concurrency"),
)

# Write new metadata
Expand DownExpand Up@@ -4530,7 +4528,6 @@ async def _copy_array_region(
await concurrent_map(
[(region, data) for region in result._iter_shard_regions()],
_copy_array_region,
zarr.core.config.config.get("async.concurrency"),
)
else:

Expand All@@ -4541,7 +4538,6 @@ async def _copy_arraylike_region(chunk_coords: slice, _data: NDArrayLike) -> Non
await concurrent_map(
[(region, data) for region in result._iter_shard_regions()],
_copy_arraylike_region,
zarr.core.config.config.get("async.concurrency"),
)
return result

Expand Down
5 changes: 0 additions & 5 deletionssrc/zarr/core/codec_pipeline.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -270,7 +270,6 @@ async def read_batch(
chunk_bytes_batch = await concurrent_map(
[(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info],
lambda byte_getter, prototype: byte_getter.get(prototype),
config.get("async.concurrency"),
)
chunk_array_batch = await self.decode_batch(
[
Expand DownExpand Up@@ -377,7 +376,6 @@ async def _read_key(
for byte_setter, chunk_spec, chunk_selection, _, is_complete_chunk in batch_info
],
_read_key,
config.get("async.concurrency"),
)
chunk_array_decoded = await self.decode_batch(
[
Expand DownExpand Up@@ -443,7 +441,6 @@ async def _write_key(byte_setter: ByteSetter, chunk_bytes: Buffer | None) -> Non
)
],
_write_key,
config.get("async.concurrency"),
)

async def decode(
Expand DownExpand Up@@ -476,7 +473,6 @@ async def read(
for single_batch_info in batched(batch_info, self.batch_size)
],
self.read_batch,
config.get("async.concurrency"),
)

async def write(
Expand All@@ -491,7 +487,6 @@ async def write(
for single_batch_info in batched(batch_info, self.batch_size)
],
self.write_batch,
config.get("async.concurrency"),
)


Expand Down
115 changes: 114 additions & 1 deletionsrc/zarr/core/common.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -4,7 +4,9 @@
import functools
import math
import operator
import threading
import warnings
import weakref
from collections.abc import Iterable, Mapping, Sequence
from enum import Enum
from itertools import starmap
Expand DownExpand Up@@ -98,15 +100,126 @@ def ceildiv(a: float, b: float) -> int:
V = TypeVar("V")


# Global semaphore management for per-process concurrency limiting
# Use WeakKeyDictionary to automatically clean up semaphores when event loops are garbage collected
_global_semaphores: weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, asyncio.Semaphore] = (
weakref.WeakKeyDictionary()
)
# Use threading.Lock instead of asyncio.Lock to coordinate across event loops
_global_semaphore_lock = threading.Lock()


def get_global_semaphore() -> asyncio.Semaphore:
"""
Get the global semaphore for the current event loop.

This ensures that all concurrent operations across the process share the same
concurrency limit, preventing excessive concurrent task creation when multiple
arrays or operations are running simultaneously.

The semaphore is lazily created per event loop and uses the configured
`async.concurrency` value from zarr config. The semaphore is cached per event
loop, so subsequent calls return the same semaphore instance.

Note: Config changes after the first call will not affect the semaphore limit.
To apply new config values, use :func:`reset_global_semaphores` to clear the cache.

Returns
-------
asyncio.Semaphore
The global semaphore for this event loop.

Raises
------
RuntimeError
If called outside of an async context (no running event loop).

See Also
--------
reset_global_semaphores : Clear the global semaphore cache
"""
loop = asyncio.get_running_loop()

# Acquire lock FIRST to prevent TOCTOU race condition
with _global_semaphore_lock:
if loop not in _global_semaphores:
limit = zarr_config.get("async.concurrency")
_global_semaphores[loop] = asyncio.Semaphore(limit)
return _global_semaphores[loop]


def reset_global_semaphores() -> None:
"""
Clear all cached global semaphores.

This is useful when you want config changes to take effect, or for testing.
The next call to :func:`get_global_semaphore` will create a new semaphore
using the current configuration.

Warning: This should only be called when no async operations are in progress,
as it will invalidate all existing semaphore references.

Examples
--------
>>> import zarr
>>> zarr.config.set({"async.concurrency": 50})
>>> reset_global_semaphores() # Apply new config
"""
with _global_semaphore_lock:
_global_semaphores.clear()


async def concurrent_map(
items: Iterable[T],
func: Callable[..., Awaitable[V]],
limit: int | None = None,
*,
use_global_semaphore: bool = True,
) -> list[V]:
if limit is None:
"""
Execute an async function concurrently over multiple items with concurrency limiting.

Parameters
----------
items : Iterable[T]
Items to process, where each item is a tuple of arguments to pass to func.
func : Callable[..., Awaitable[V]]
Async function to execute for each item.
limit : int | None, optional
If provided and use_global_semaphore is False, creates a local semaphore
with this limit. If None, no concurrency limiting is applied.
use_global_semaphore : bool, default True
If True, uses the global per-process semaphore for concurrency limiting,
ensuring all concurrent operations share the same limit. If False, uses
the `limit` parameter for local limiting (legacy behavior).

Returns
-------
list[V]
Results from executing func on all items.
"""
if use_global_semaphore:
if limit is not None:
raise ValueError(
"Cannot specify both use_global_semaphore=True and a limit value. "
"Either use the global semaphore (use_global_semaphore=True, limit=None) "
"or specify a local limit (use_global_semaphore=False, limit=<int>)."
)
# Use the global semaphore for process-wide concurrency limiting
sem = get_global_semaphore()

async def run(item: tuple[Any]) -> V:
async with sem:
return await func(*item)

return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])

elif limit is None:
# No concurrency limiting
return await asyncio.gather(*list(starmap(func, items)))

else:
# Legacy mode: create local semaphore with specified limit
sem = asyncio.Semaphore(limit)

async def run(item: tuple[Any]) -> V:
Expand Down
10 changes: 5 additions & 5 deletionssrc/zarr/core/group.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -44,6 +44,7 @@
NodeType,
ShapeLike,
ZarrFormat,
get_global_semaphore,
parse_shapelike,
)
from zarr.core.config import config
Expand DownExpand Up@@ -1441,8 +1442,8 @@ async def _members(
)

raise ValueError(msg)
#enforce a concurrency limit by passing asemaphoreto all the recursive functions
semaphore =asyncio.Semaphore(config.get("async.concurrency"))
#Use globalsemaphorefor process-wide concurrency limiting
semaphore =get_global_semaphore()
async for member in _iter_members_deep(
self,
max_depth=max_depth,
Expand DownExpand Up@@ -3338,9 +3339,8 @@ async def create_nodes(
The created nodes in the order they are created.
"""

# Note: the only way to alter this value is via the config. If that's undesirable for some reason,
# then we should consider adding a keyword argument this this function
semaphore = asyncio.Semaphore(config.get("async.concurrency"))
# Use global semaphore for process-wide concurrency limiting
semaphore = get_global_semaphore()
create_tasks: list[Coroutine[None, None, str]] = []

for key, value in nodes.items():
Expand Down
3 changes: 2 additions & 1 deletionsrc/zarr/storage/_local.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -217,7 +217,8 @@ async def get_partial_values(
assert isinstance(key, str)
path = self.root / key
args.append((_get, path, prototype, byte_range))
return await concurrent_map(args, asyncio.to_thread, limit=None) # TODO: fix limit
# Use global semaphore to limit concurrent thread spawning
return await concurrent_map(args, asyncio.to_thread)

async def set(self, key: str, value: Buffer) -> None:
# docstring inherited
Expand Down
3 changes: 2 additions & 1 deletionsrc/zarr/storage/_memory.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -104,7 +104,8 @@ async def get_partial_values(
async def _get(key: str, byte_range: ByteRequest | None) -> Buffer | None:
return await self.get(key, prototype=prototype, byte_range=byte_range)

return await concurrent_map(key_ranges, _get, limit=None)
# In-memory operations are fast and don't benefit from concurrency limiting
return await concurrent_map(key_ranges, _get, use_global_semaphore=False)

async def exists(self, key: str) -> bool:
# docstring inherited
Expand Down
8 changes: 4 additions & 4 deletionssrc/zarr/storage/_obstore.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -13,8 +13,7 @@
Store,
SuffixByteRequest,
)
from zarr.core.common import concurrent_map
from zarr.core.config import config
from zarr.core.common import concurrent_map, get_global_semaphore

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Coroutine, Iterable, Sequence
Expand DownExpand Up@@ -209,7 +208,7 @@ async def delete_dir(self, prefix: str) -> None:

metas = await obs.list(self.store, prefix).collect_async()
keys = [(m["path"],) for m in metas]
await concurrent_map(keys, self.delete, limit=config.get("async.concurrency"))
await concurrent_map(keys, self.delete)

@property
def supports_listing(self) -> bool:
Expand DownExpand Up@@ -485,7 +484,8 @@ async def _get_partial_values(
else:
raise ValueError(f"Unsupported range input: {byte_range}")

semaphore = asyncio.Semaphore(config.get("async.concurrency"))
# Use global semaphore for process-wide concurrency limiting
semaphore = get_global_semaphore()

futs: list[Coroutine[Any, Any, list[_Response]]] = []
for path, bounded_ranges in per_file_bounded_requests.items():
Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp