Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-134173: Optimize concurrent.futures→asyncio state transfer with atomic snapshot#134174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
kumaraditya303 merged 9 commits intopython:mainfrombdraco:thread_to_asyncio_slow
May 18, 2025
Merged
Show file tree
Hide file tree
Changes from1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
NextNext commit
Optimize concurrent.futures→asyncio state transfer with atomic snapshot
This PR significantly improves performance when transferring future state from `concurrent.futures.Future` to `asyncio.Future`, a common operation when dispatching executor jobs in asyncio applications.The current `_copy_future_state` implementation requires multiple method calls and lock acquisitions to retrieve the source future's state:1. `done()` - acquires lock to check state2. `cancelled()` - acquires lock again3. `exception()` - acquires lock to get exception4. `result()` - acquires lock to get resultEach method call involves thread synchronization overhead, making this operation a bottleneck for high-frequency executor dispatches.Our use case involves dispatching a large number of small executor jobs from `asyncio` to a thread pool. These jobs typically involve `open` or `stat` on files that are already cached by the OS, so the actual I/O returns almost instantly. However, we still have to offload them to avoid blocking the event loop, since there's no reliable way to determine in advance whether a read will hit the cache.As a result, the majority of the overhead isn't from the I/O itself, but from the cost of scheduling. Most of the time is spent copying future state, which involves locking. This PR reduces that overhead, which has a meaningful impact at scale.Add a new `_get_snapshot()` method to `concurrent.futures.Future` that atomically retrieves all state information in a single lock acquisition:- Returns tuple: `(done, cancelled, result, exception)`- Uses optimized fast path for already-finished futures (no lock needed)- Provides atomic state capture for other statesThe `_copy_future_state` function in `asyncio` now uses this snapshot method when available, falling back to the traditional approach for backwards compatibility.Benchmark results show dramatic improvements for the common case:- **concurrent.futures→asyncio transfer: 4.12x faster**- asyncio→asyncio transfer: Slightly slower (1.05x) due to hasattr check (I couldn't find any places where this actually happens though as it looks like `_chain_future` the only entry point to `_copy_future_state` and it is always called with `concurrent.futures.Future`)This optimization particularly benefits applications that:- Dispatch many small executor jobs (e.g., filesystem operations, DNS lookups)- Use thread pools for I/O-bound operations in asyncio- Have high frequency of executor task completion- Adds `_get_snapshot()` to `concurrent.futures.Future` for atomic state retrieval- Updates `_copy_future_state()` to prefer snapshot method when available- Maintains full backwards compatibility with existing code- Minimal code changes with focused optimizationThese show consistent 4x+ speedup for the critical concurrent.futures→asyncio path.```=== 1. Benchmarking concurrent.futures -> asyncio ===Running original...concurrent_to_asyncio: Mean +- std dev: 986 ns +- 16 nsRunning optimized...concurrent_to_asyncio: Mean +- std dev: 239 ns +- 4 nsComparison:Mean +- std dev: [concurrent_original] 986 ns +- 16 ns -> [concurrent_optimized] 239 ns +- 4 ns: 4.12x faster=== 2. Benchmarking asyncio -> asyncio ===Running original...asyncio_to_asyncio: Mean +- std dev: 221 ns +- 4 nsRunning optimized...asyncio_to_asyncio: Mean +- std dev: 232 ns +- 4 nsComparison:Mean +- std dev: [asyncio_original] 221 ns +- 4 ns -> [asyncio_optimized] 232 ns +- 4 ns: 1.05x slowerCleaning up...``````pythonimport pyperfimport concurrent.futuresimport asyncioimport subprocessimport osimport sysdef write_benchmark_scripts():    """Write individual benchmark scripts for each scenario."""    # Common helper code    common_imports = '''import pyperfimport concurrent.futuresimport asynciodef _convert_future_exc(exc):    exc_class = type(exc)    if exc_class is concurrent.futures.CancelledError:        return asyncio.CancelledError(*exc.args)    elif exc_class is concurrent.futures.TimeoutError:        return asyncio.TimeoutError(*exc.args)    elif exc_class is concurrent.futures.InvalidStateError:        return asyncio.InvalidStateError(*exc.args)    else:        return exc'''    # Optimization patch code    optimization_patch = '''FINISHED = concurrent.futures._base.FINISHEDCANCELLED = concurrent.futures._base.CANCELLEDCANCELLED_AND_NOTIFIED = concurrent.futures._base.CANCELLED_AND_NOTIFIEDdef _get_snapshot_implementation(self):    """Get a snapshot of the future's current state."""    # Fast path: check if already finished without lock    if self._state == FINISHED:        return True, False, self._result, self._exception    # Need lock for other states since they can change    with self._condition:        if self._state == FINISHED:            return True, False, self._result, self._exception        if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:            return True, True, None, None        return False, False, None, Noneconcurrent.futures.Future._get_snapshot = _get_snapshot_implementation'''    # Original copy implementation    original_copy = '''def copy_future_original(source, dest):    """Original implementation using individual method calls."""    if dest.cancelled():        return    if hasattr(source, 'done'):        assert source.done()    if source.cancelled():        dest.cancel()    else:        exception = source.exception()        if exception is not None:            dest.set_exception(_convert_future_exc(exception))        else:            result = source.result()            dest.set_result(result)'''    # Optimized copy implementation    optimized_copy = '''def copy_future_optimized(source, dest):    """Optimized implementation using _get_snapshot when available."""    if dest.cancelled():        return    # Use _get_snapshot for futures that support it    if hasattr(source, '_get_snapshot'):        done, cancelled, result, exception = source._get_snapshot()        assert done        if cancelled:            dest.cancel()        elif exception is not None:            dest.set_exception(_convert_future_exc(exception))        else:            dest.set_result(result)        return    # Traditional fallback for asyncio.Future    if hasattr(source, 'done'):        assert source.done()    if source.cancelled():        dest.cancel()    else:        exception = source.exception()        if exception is not None:            dest.set_exception(_convert_future_exc(exception))        else:            result = source.result()            dest.set_result(result)'''    # 1. concurrent.futures -> asyncio (original)    with open('bench_concurrent_to_asyncio_original.py', 'w') as f:        f.write(common_imports + original_copy + '''source = concurrent.futures.Future()source.set_result(42)loop = asyncio.new_event_loop()def task():    """Single copy operation benchmark."""    dest = asyncio.Future(loop=loop)    copy_future_original(source, dest)    dest.cancel()runner = pyperf.Runner()runner.bench_func('concurrent_to_asyncio', task)''')    # 2. concurrent.futures -> asyncio (optimized)    with open('bench_concurrent_to_asyncio_optimized.py', 'w') as f:        f.write(common_imports + optimization_patch + optimized_copy + '''source = concurrent.futures.Future()source.set_result(42)loop = asyncio.new_event_loop()def task():    """Single copy operation benchmark."""    dest = asyncio.Future(loop=loop)    copy_future_optimized(source, dest)    dest.cancel()runner = pyperf.Runner()runner.bench_func('concurrent_to_asyncio', task)''')    # 3. asyncio -> asyncio (original)    with open('bench_asyncio_to_asyncio_original.py', 'w') as f:        f.write(common_imports + original_copy + '''loop = asyncio.new_event_loop()source = asyncio.Future(loop=loop)source.set_result(42)def task():    """Single copy operation benchmark."""    dest = asyncio.Future(loop=loop)    copy_future_original(source, dest)    dest.cancel()runner = pyperf.Runner()runner.bench_func('asyncio_to_asyncio', task)''')    # 4. asyncio -> asyncio (optimized - should use fallback)    with open('bench_asyncio_to_asyncio_optimized.py', 'w') as f:        f.write(common_imports + optimization_patch + optimized_copy + '''loop = asyncio.new_event_loop()source = asyncio.Future(loop=loop)source.set_result(42)def task():    """Single copy operation benchmark."""    dest = asyncio.Future(loop=loop)    copy_future_optimized(source, dest)    dest.cancel()runner = pyperf.Runner()runner.bench_func('asyncio_to_asyncio', task)''')def run_benchmarks():    """Run all benchmarks and compare results."""    print("Writing benchmark scripts...")    write_benchmark_scripts()    # Clean up old results    for f in ['concurrent_original.json', 'concurrent_optimized.json',              'asyncio_original.json', 'asyncio_optimized.json']:        if os.path.exists(f):            os.remove(f)    print("\n=== 1. Benchmarking concurrent.futures -> asyncio ===")    print("Running original...")    subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_original.py',                   '-o', 'concurrent_original.json', '--quiet'])    print("Running optimized...")    subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_optimized.py',                   '-o', 'concurrent_optimized.json', '--quiet'])    print("\nComparison:")    subprocess.run([sys.executable, '-m', 'pyperf', 'compare_to',                   'concurrent_original.json', 'concurrent_optimized.json'])    print("\n=== 2. Benchmarking asyncio -> asyncio ===")    print("Running original...")    subprocess.run([sys.executable, 'bench_asyncio_to_asyncio_original.py',                   '-o', 'asyncio_original.json', '--quiet'])    print("Running optimized...")    subprocess.run([sys.executable, 'bench_asyncio_to_asyncio_optimized.py',                   '-o', 'asyncio_optimized.json', '--quiet'])    print("\nComparison:")    subprocess.run([sys.executable, '-m', 'pyperf', 'compare_to',                   'asyncio_original.json', 'asyncio_optimized.json'])    # Clean up    print("\nCleaning up...")    for f in ['bench_concurrent_to_asyncio_original.py',              'bench_concurrent_to_asyncio_optimized.py',              'bench_asyncio_to_asyncio_original.py',              'bench_asyncio_to_asyncio_optimized.py']:        if os.path.exists(f):            os.remove(f)    print("\n=== Summary ===")    print("concurrent.futures -> asyncio: Should show significant speedup")    print("asyncio -> asyncio: Should show no regression (fallback path)")if __name__ == "__main__":    run_benchmarks()```
  • Loading branch information
@bdraco
bdraco committedMay 18, 2025
commit6b76a377fbe4098cd1afaf0c8bfa86ab60bd4f29
16 changes: 15 additions & 1 deletionLib/asyncio/futures.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -353,10 +353,24 @@ def _copy_future_state(source, dest):

The other Future may be a concurrent.futures.Future.
"""
assert source.done()
if dest.cancelled():
return
assert not dest.done()

# Use _get_snapshot for futures that support it
if hasattr(source, '_get_snapshot'):
done, cancelled, result, exception = source._get_snapshot()
assert done
if cancelled:
dest.cancel()
elif exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
dest.set_result(result)
return

# Traditional fallback needs done check
assert source.done()
if source.cancelled():
dest.cancel()
else:
Expand Down
27 changes: 27 additions & 0 deletionsLib/concurrent/futures/_base.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -558,6 +558,33 @@ def set_exception(self, exception):
self._condition.notify_all()
self._invoke_callbacks()

def _get_snapshot(self):
"""Get a snapshot of the future's current state.

This method atomically retrieves the state in one lock acquisition,
which is significantly faster than multiple method calls.

Returns:
Tuple of (done, cancelled, result, exception)
- done: True if the future is done (cancelled or finished)
- cancelled: True if the future was cancelled
- result: The result if available and not cancelled
- exception: The exception if available and not cancelled
"""
# Fast path: check if already finished without lock
if self._state == FINISHED:
return True, False, self._result, self._exception

# Need lock for other states since they can change
with self._condition:
# We have to check the state again after acquiring the lock
# because it may have changed in the meantime.
if self._state == FINISHED:
return True, False, self._result, self._exception
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
return True, True, None, None
return False, False, None, None

__class_getitem__ = classmethod(types.GenericAlias)

class Executor(object):
Expand Down
50 changes: 50 additions & 0 deletionsLib/test/test_asyncio/test_futures.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -454,6 +454,56 @@ def test_copy_state(self):
newf_tb = ''.join(traceback.format_tb(newf_exc.__traceback__))
self.assertEqual(newf_tb.count('raise concurrent.futures.InvalidStateError'), 1)

def test_copy_state_from_concurrent_futures(self):
"""Test _copy_future_state from concurrent.futures.Future.

This tests the optimized path using _get_snapshot when available.
"""
from asyncio.futures import _copy_future_state

# Test with a result
f_concurrent = concurrent.futures.Future()
f_concurrent.set_result(42)
f_asyncio = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent, f_asyncio)
self.assertTrue(f_asyncio.done())
self.assertEqual(f_asyncio.result(), 42)

# Test with an exception
f_concurrent_exc = concurrent.futures.Future()
f_concurrent_exc.set_exception(ValueError("test exception"))
f_asyncio_exc = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent_exc, f_asyncio_exc)
self.assertTrue(f_asyncio_exc.done())
with self.assertRaises(ValueError) as cm:
f_asyncio_exc.result()
self.assertEqual(str(cm.exception), "test exception")

# Test with cancelled state
f_concurrent_cancelled = concurrent.futures.Future()
f_concurrent_cancelled.cancel()
f_asyncio_cancelled = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent_cancelled, f_asyncio_cancelled)
self.assertTrue(f_asyncio_cancelled.cancelled())

# Test that destination already cancelled prevents copy
f_concurrent_result = concurrent.futures.Future()
f_concurrent_result.set_result(10)
f_asyncio_precancelled = self._new_future(loop=self.loop)
f_asyncio_precancelled.cancel()
_copy_future_state(f_concurrent_result, f_asyncio_precancelled)
self.assertTrue(f_asyncio_precancelled.cancelled())

# Test exception type conversion
f_concurrent_invalid = concurrent.futures.Future()
f_concurrent_invalid.set_exception(concurrent.futures.InvalidStateError("invalid"))
f_asyncio_invalid = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent_invalid, f_asyncio_invalid)
self.assertTrue(f_asyncio_invalid.done())
with self.assertRaises(asyncio.exceptions.InvalidStateError) as cm:
f_asyncio_invalid.result()
self.assertEqual(str(cm.exception), "invalid")

def test_iter(self):
fut = self._new_future(loop=self.loop)

Expand Down
62 changes: 62 additions & 0 deletionsLib/test/test_concurrent_futures/test_future.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -282,6 +282,68 @@ def test_multiple_set_exception(self):

self.assertEqual(f.exception(), e)

def test_get_snapshot(self):
"""Test the _get_snapshot method for atomic state retrieval."""
# Test with a pending future
f = Future()
done, cancelled, result, exception = f._get_snapshot()
self.assertFalse(done)
self.assertFalse(cancelled)
self.assertIsNone(result)
self.assertIsNone(exception)

# Test with a finished future (successful result)
f = Future()
f.set_result(42)
done, cancelled, result, exception = f._get_snapshot()
self.assertTrue(done)
self.assertFalse(cancelled)
self.assertEqual(result, 42)
self.assertIsNone(exception)

# Test with a finished future (exception)
f = Future()
exc = ValueError("test error")
f.set_exception(exc)
done, cancelled, result, exception = f._get_snapshot()
self.assertTrue(done)
self.assertFalse(cancelled)
self.assertIsNone(result)
self.assertEqual(exception, exc)

# Test with a cancelled future
f = Future()
f.cancel()
done, cancelled, result, exception = f._get_snapshot()
self.assertTrue(done)
self.assertTrue(cancelled)
self.assertIsNone(result)
self.assertIsNone(exception)

# Test concurrent access (basic thread safety check)
f = Future()
f.set_result(100)
results = []

def get_snapshot():
for _ in range(1000):
snapshot = f._get_snapshot()
results.append(snapshot)

threads = []
for _ in range(4):
t = threading.Thread(target=get_snapshot)
threads.append(t)
t.start()

for t in threads:
t.join()

# All snapshots should be identical for a finished future
expected = (True, False, 100, None)
for result in results:
self.assertEqual(result, expected)


def setUpModule():
setup_module()
Expand Down
Loading

[8]ページ先頭

©2009-2025 Movatter.jp