Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-134173: Optimize concurrent.futures→asyncio state transfer with atomic snapshot#134174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
kumaraditya303 merged 9 commits intopython:mainfrombdraco:thread_to_asyncio_slow
May 18, 2025

Conversation

bdraco
Copy link
Contributor

@bdracobdraco commentedMay 18, 2025
edited
Loading

👋 from PyCon @ Pittsburgh

This PR significantly improves performance when transferring future state fromconcurrent.futures.Future toasyncio.Future, a common operation when dispatching executor jobs in asyncio applications.

The current_copy_future_state implementation requires multiple method calls and lock acquisitions to retrieve the source future's state:

  1. done() - acquires lock to check state
  2. cancelled() - acquires lock again
  3. exception() - acquires lock to get exception
  4. result() - acquires lock to get result

Each method call involves thread synchronization overhead, making this operation a bottleneck for high-frequency executor dispatches.

Our use case involves dispatching a large number of small executor jobs fromasyncio to a thread pool. These jobs typically involveopen orstat on files that are already cached by the OS, so the actual I/O returns almost instantly. However, we still have to offload them to avoid blocking the event loop, since there's no reliable way to determine in advance whether a read will hit the cache.

As a result, the majority of the overhead isn't from the I/O itself, but from the cost of scheduling. Most of the time is spent copying future state, which involves locking. This PR reduces that overhead, which has a meaningful impact at scale.

Add a new_get_snapshot() method toconcurrent.futures.Future that atomically retrieves all state information in a single lock acquisition:

  • Returns tuple:(done, cancelled, result, exception)
  • Uses optimized fast path for already-finished futures (no lock needed)
  • Provides atomic state capture for other states

The_copy_future_state function inasyncio now uses this snapshot method to retrieve the state from theconcurrent.future.Future

Benchmark results show dramatic improvements for the common case:

This optimization particularly benefits applications that:

  • Dispatch many small executor jobs (e.g., filesystem operations, DNS lookups)

  • Use thread pools for I/O-bound operations in asyncio

  • Have high frequency of executor task completion

  • Adds_get_snapshot() toconcurrent.futures.Future for atomic state retrieval

  • Updates_copy_future_state() to use the snapshot method

  • Maintains full backwards compatibility with existing code

  • Minimal code changes with focused optimization

These show consistent 4.4x+ speedup for the critical concurrent.futures→asyncio path.

=== 1. Benchmarking concurrent.futures -> asyncio ===Writing benchmark scripts...=== Benchmarking concurrent.futures -> asyncio ===Running original...concurrent_to_asyncio: Mean +- std dev: 977 ns +- 13 nsRunning optimized...concurrent_to_asyncio: Mean +- std dev: 222 ns +- 3 nsComparison:Mean +- std dev: [concurrent_original] 977 ns +- 13 ns -> [concurrent_optimized] 222 ns +- 3 ns: 4.40x fasterCleaning up...
importpyperfimportconcurrent.futuresimportasyncioimportsubprocessimportosimportsysdefwrite_benchmark_scripts():"""Write individual benchmark scripts for each scenario."""# Common helper codecommon_imports='''import pyperfimport concurrent.futuresimport asynciodef _convert_future_exc(exc):    exc_class = type(exc)    if exc_class is concurrent.futures.CancelledError:        return asyncio.CancelledError(*exc.args)    elif exc_class is concurrent.futures.TimeoutError:        return asyncio.TimeoutError(*exc.args)    elif exc_class is concurrent.futures.InvalidStateError:        return asyncio.InvalidStateError(*exc.args)    else:        return exc'''# Optimization patch codeoptimization_patch='''FINISHED = concurrent.futures._base.FINISHEDCANCELLED = concurrent.futures._base.CANCELLEDCANCELLED_AND_NOTIFIED = concurrent.futures._base.CANCELLED_AND_NOTIFIEDdef _get_snapshot_implementation(self):    """Get a snapshot of the future's current state."""    # Fast path: check if already finished without lock    if self._state == FINISHED:        return True, False, self._result, self._exception    # Need lock for other states since they can change    with self._condition:        if self._state == FINISHED:            return True, False, self._result, self._exception        if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:            return True, True, None, None        return False, False, None, Noneconcurrent.futures.Future._get_snapshot = _get_snapshot_implementation'''# Original copy implementation (for concurrent.futures.Future)original_copy='''def copy_future_original(source, dest):    """Original implementation using individual method calls."""    if dest.cancelled():        return    if hasattr(source, 'done'):        assert source.done()    if source.cancelled():        dest.cancel()    else:        exception = source.exception()        if exception is not None:            dest.set_exception(_convert_future_exc(exception))        else:            result = source.result()            dest.set_result(result)'''# Optimized copy implementation (matches current code)optimized_copy='''def copy_future_optimized(source, dest):    """Optimized implementation using _get_snapshot."""    if dest.cancelled():        return    assert not dest.done()    done, cancelled, result, exception = source._get_snapshot()    assert done    if cancelled:        dest.cancel()    elif exception is not None:        dest.set_exception(_convert_future_exc(exception))    else:        dest.set_result(result)'''# 1. concurrent.futures -> asyncio (original)withopen('bench_concurrent_to_asyncio_original.py','w')asf:f.write(common_imports+original_copy+'''source = concurrent.futures.Future()source.set_result(42)loop = asyncio.new_event_loop()def task():    """Single copy operation benchmark."""    dest = asyncio.Future(loop=loop)    copy_future_original(source, dest)    dest.cancel()runner = pyperf.Runner()runner.bench_func('concurrent_to_asyncio', task)''')# 2. concurrent.futures -> asyncio (optimized)withopen('bench_concurrent_to_asyncio_optimized.py','w')asf:f.write(common_imports+optimization_patch+optimized_copy+'''source = concurrent.futures.Future()source.set_result(42)loop = asyncio.new_event_loop()def task():    """Single copy operation benchmark."""    dest = asyncio.Future(loop=loop)    copy_future_optimized(source, dest)    dest.cancel()runner = pyperf.Runner()runner.bench_func('concurrent_to_asyncio', task)''')defrun_benchmarks():"""Run all benchmarks and compare results."""print("Writing benchmark scripts...")write_benchmark_scripts()# Clean up old resultsforfin ['concurrent_original.json','concurrent_optimized.json']:ifos.path.exists(f):os.remove(f)print("\n=== Benchmarking concurrent.futures -> asyncio ===")print("Running original...")subprocess.run([sys.executable,'bench_concurrent_to_asyncio_original.py','-o','concurrent_original.json','--quiet'])print("Running optimized...")subprocess.run([sys.executable,'bench_concurrent_to_asyncio_optimized.py','-o','concurrent_optimized.json','--quiet'])print("\nComparison:")subprocess.run([sys.executable,'-m','pyperf','compare_to','concurrent_original.json','concurrent_optimized.json'])# Clean upprint("\nCleaning up...")forfin ['bench_concurrent_to_asyncio_original.py','bench_concurrent_to_asyncio_optimized.py']:ifos.path.exists(f):os.remove(f)print("\n=== Summary ===")print("concurrent.futures -> asyncio: Should show significant speedup with _get_snapshot")if__name__=="__main__":run_benchmarks()

balloob reacted with heart emoji
This PR significantly improves performance when transferring future state from `concurrent.futures.Future` to `asyncio.Future`, a common operation when dispatching executor jobs in asyncio applications.The current `_copy_future_state` implementation requires multiple method calls and lock acquisitions to retrieve the source future's state:1. `done()` - acquires lock to check state2. `cancelled()` - acquires lock again3. `exception()` - acquires lock to get exception4. `result()` - acquires lock to get resultEach method call involves thread synchronization overhead, making this operation a bottleneck for high-frequency executor dispatches.Our use case involves dispatching a large number of small executor jobs from `asyncio` to a thread pool. These jobs typically involve `open` or `stat` on files that are already cached by the OS, so the actual I/O returns almost instantly. However, we still have to offload them to avoid blocking the event loop, since there's no reliable way to determine in advance whether a read will hit the cache.As a result, the majority of the overhead isn't from the I/O itself, but from the cost of scheduling. Most of the time is spent copying future state, which involves locking. This PR reduces that overhead, which has a meaningful impact at scale.Add a new `_get_snapshot()` method to `concurrent.futures.Future` that atomically retrieves all state information in a single lock acquisition:- Returns tuple: `(done, cancelled, result, exception)`- Uses optimized fast path for already-finished futures (no lock needed)- Provides atomic state capture for other statesThe `_copy_future_state` function in `asyncio` now uses this snapshot method when available, falling back to the traditional approach for backwards compatibility.Benchmark results show dramatic improvements for the common case:- **concurrent.futures→asyncio transfer: 4.12x faster**- asyncio→asyncio transfer: Slightly slower (1.05x) due to hasattr check (I couldn't find any places where this actually happens though as it looks like `_chain_future` the only entry point to `_copy_future_state` and it is always called with `concurrent.futures.Future`)This optimization particularly benefits applications that:- Dispatch many small executor jobs (e.g., filesystem operations, DNS lookups)- Use thread pools for I/O-bound operations in asyncio- Have high frequency of executor task completion- Adds `_get_snapshot()` to `concurrent.futures.Future` for atomic state retrieval- Updates `_copy_future_state()` to prefer snapshot method when available- Maintains full backwards compatibility with existing code- Minimal code changes with focused optimizationThese show consistent 4x+ speedup for the critical concurrent.futures→asyncio path.```=== 1. Benchmarking concurrent.futures -> asyncio ===Running original...concurrent_to_asyncio: Mean +- std dev: 986 ns +- 16 nsRunning optimized...concurrent_to_asyncio: Mean +- std dev: 239 ns +- 4 nsComparison:Mean +- std dev: [concurrent_original] 986 ns +- 16 ns -> [concurrent_optimized] 239 ns +- 4 ns: 4.12x faster=== 2. Benchmarking asyncio -> asyncio ===Running original...asyncio_to_asyncio: Mean +- std dev: 221 ns +- 4 nsRunning optimized...asyncio_to_asyncio: Mean +- std dev: 232 ns +- 4 nsComparison:Mean +- std dev: [asyncio_original] 221 ns +- 4 ns -> [asyncio_optimized] 232 ns +- 4 ns: 1.05x slowerCleaning up...``````pythonimport pyperfimport concurrent.futuresimport asyncioimport subprocessimport osimport sysdef write_benchmark_scripts():    """Write individual benchmark scripts for each scenario."""    # Common helper code    common_imports = '''import pyperfimport concurrent.futuresimport asynciodef _convert_future_exc(exc):    exc_class = type(exc)    if exc_class is concurrent.futures.CancelledError:        return asyncio.CancelledError(*exc.args)    elif exc_class is concurrent.futures.TimeoutError:        return asyncio.TimeoutError(*exc.args)    elif exc_class is concurrent.futures.InvalidStateError:        return asyncio.InvalidStateError(*exc.args)    else:        return exc'''    # Optimization patch code    optimization_patch = '''FINISHED = concurrent.futures._base.FINISHEDCANCELLED = concurrent.futures._base.CANCELLEDCANCELLED_AND_NOTIFIED = concurrent.futures._base.CANCELLED_AND_NOTIFIEDdef _get_snapshot_implementation(self):    """Get a snapshot of the future's current state."""    # Fast path: check if already finished without lock    if self._state == FINISHED:        return True, False, self._result, self._exception    # Need lock for other states since they can change    with self._condition:        if self._state == FINISHED:            return True, False, self._result, self._exception        if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:            return True, True, None, None        return False, False, None, Noneconcurrent.futures.Future._get_snapshot = _get_snapshot_implementation'''    # Original copy implementation    original_copy = '''def copy_future_original(source, dest):    """Original implementation using individual method calls."""    if dest.cancelled():        return    if hasattr(source, 'done'):        assert source.done()    if source.cancelled():        dest.cancel()    else:        exception = source.exception()        if exception is not None:            dest.set_exception(_convert_future_exc(exception))        else:            result = source.result()            dest.set_result(result)'''    # Optimized copy implementation    optimized_copy = '''def copy_future_optimized(source, dest):    """Optimized implementation using _get_snapshot when available."""    if dest.cancelled():        return    # Use _get_snapshot for futures that support it    if hasattr(source, '_get_snapshot'):        done, cancelled, result, exception = source._get_snapshot()        assert done        if cancelled:            dest.cancel()        elif exception is not None:            dest.set_exception(_convert_future_exc(exception))        else:            dest.set_result(result)        return    # Traditional fallback for asyncio.Future    if hasattr(source, 'done'):        assert source.done()    if source.cancelled():        dest.cancel()    else:        exception = source.exception()        if exception is not None:            dest.set_exception(_convert_future_exc(exception))        else:            result = source.result()            dest.set_result(result)'''    # 1. concurrent.futures -> asyncio (original)    with open('bench_concurrent_to_asyncio_original.py', 'w') as f:        f.write(common_imports + original_copy + '''source = concurrent.futures.Future()source.set_result(42)loop = asyncio.new_event_loop()def task():    """Single copy operation benchmark."""    dest = asyncio.Future(loop=loop)    copy_future_original(source, dest)    dest.cancel()runner = pyperf.Runner()runner.bench_func('concurrent_to_asyncio', task)''')    # 2. concurrent.futures -> asyncio (optimized)    with open('bench_concurrent_to_asyncio_optimized.py', 'w') as f:        f.write(common_imports + optimization_patch + optimized_copy + '''source = concurrent.futures.Future()source.set_result(42)loop = asyncio.new_event_loop()def task():    """Single copy operation benchmark."""    dest = asyncio.Future(loop=loop)    copy_future_optimized(source, dest)    dest.cancel()runner = pyperf.Runner()runner.bench_func('concurrent_to_asyncio', task)''')    # 3. asyncio -> asyncio (original)    with open('bench_asyncio_to_asyncio_original.py', 'w') as f:        f.write(common_imports + original_copy + '''loop = asyncio.new_event_loop()source = asyncio.Future(loop=loop)source.set_result(42)def task():    """Single copy operation benchmark."""    dest = asyncio.Future(loop=loop)    copy_future_original(source, dest)    dest.cancel()runner = pyperf.Runner()runner.bench_func('asyncio_to_asyncio', task)''')    # 4. asyncio -> asyncio (optimized - should use fallback)    with open('bench_asyncio_to_asyncio_optimized.py', 'w') as f:        f.write(common_imports + optimization_patch + optimized_copy + '''loop = asyncio.new_event_loop()source = asyncio.Future(loop=loop)source.set_result(42)def task():    """Single copy operation benchmark."""    dest = asyncio.Future(loop=loop)    copy_future_optimized(source, dest)    dest.cancel()runner = pyperf.Runner()runner.bench_func('asyncio_to_asyncio', task)''')def run_benchmarks():    """Run all benchmarks and compare results."""    print("Writing benchmark scripts...")    write_benchmark_scripts()    # Clean up old results    for f in ['concurrent_original.json', 'concurrent_optimized.json',              'asyncio_original.json', 'asyncio_optimized.json']:        if os.path.exists(f):            os.remove(f)    print("\n=== 1. Benchmarking concurrent.futures -> asyncio ===")    print("Running original...")    subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_original.py',                   '-o', 'concurrent_original.json', '--quiet'])    print("Running optimized...")    subprocess.run([sys.executable, 'bench_concurrent_to_asyncio_optimized.py',                   '-o', 'concurrent_optimized.json', '--quiet'])    print("\nComparison:")    subprocess.run([sys.executable, '-m', 'pyperf', 'compare_to',                   'concurrent_original.json', 'concurrent_optimized.json'])    print("\n=== 2. Benchmarking asyncio -> asyncio ===")    print("Running original...")    subprocess.run([sys.executable, 'bench_asyncio_to_asyncio_original.py',                   '-o', 'asyncio_original.json', '--quiet'])    print("Running optimized...")    subprocess.run([sys.executable, 'bench_asyncio_to_asyncio_optimized.py',                   '-o', 'asyncio_optimized.json', '--quiet'])    print("\nComparison:")    subprocess.run([sys.executable, '-m', 'pyperf', 'compare_to',                   'asyncio_original.json', 'asyncio_optimized.json'])    # Clean up    print("\nCleaning up...")    for f in ['bench_concurrent_to_asyncio_original.py',              'bench_concurrent_to_asyncio_optimized.py',              'bench_asyncio_to_asyncio_original.py',              'bench_asyncio_to_asyncio_optimized.py']:        if os.path.exists(f):            os.remove(f)    print("\n=== Summary ===")    print("concurrent.futures -> asyncio: Should show significant speedup")    print("asyncio -> asyncio: Should show no regression (fallback path)")if __name__ == "__main__":    run_benchmarks()```
@bdracobdraco changed the titleGH-134173: Optimize concurrent.futures→asyncio state transfer with atomic snapshotgh-134173: Optimize concurrent.futures→asyncio state transfer with atomic snapshotMay 18, 2025
@kumaraditya303kumaraditya303 merged commit53da1e8 intopython:mainMay 18, 2025
43 checks passed
@bdraco
Copy link
ContributorAuthor

Thanks!

@bdracobdraco deleted the thread_to_asyncio_slow branchMay 18, 2025 17:29
bdraco added a commit to home-assistant/docker-base that referenced this pull requestMay 18, 2025
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@kumaraditya303kumaraditya303kumaraditya303 approved these changes

@1st11st1Awaiting requested review from 1st11st1 is a code owner

@asvetlovasvetlovAwaiting requested review from asvetlovasvetlov is a code owner

@willingcwillingcAwaiting requested review from willingcwillingc is a code owner

Assignees
No one assigned
Labels
None yet
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

2 participants
@bdraco@kumaraditya303

[8]ページ先頭

©2009-2025 Movatter.jp