Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-134173: Optimize concurrent.futures→asyncio state transfer with atomic snapshot#134174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
kumaraditya303 merged 9 commits intopython:mainfrombdraco:thread_to_asyncio_slow
May 18, 2025
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletionsLib/asyncio/futures.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -351,22 +351,19 @@ def _set_concurrent_future_state(concurrent, source):
def _copy_future_state(source, dest):
"""Internal helper to copy state from another Future.

The other Futuremay be a concurrent.futures.Future.
The other Futuremust be a concurrent.futures.Future.
"""
assert source.done()
if dest.cancelled():
return
assert not dest.done()
if source.cancelled():
done, cancelled, result, exception = source._get_snapshot()
assert done
if cancelled:
dest.cancel()
elif exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
exception = source.exception()
if exception is not None:
dest.set_exception(_convert_future_exc(exception))
else:
result = source.result()
dest.set_result(result)

dest.set_result(result)

def _chain_future(source, destination):
"""Chain two futures so that when one completes, so does the other.
Expand Down
27 changes: 27 additions & 0 deletionsLib/concurrent/futures/_base.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -558,6 +558,33 @@ def set_exception(self, exception):
self._condition.notify_all()
self._invoke_callbacks()

def _get_snapshot(self):
"""Get a snapshot of the future's current state.

This method atomically retrieves the state in one lock acquisition,
which is significantly faster than multiple method calls.

Returns:
Tuple of (done, cancelled, result, exception)
- done: True if the future is done (cancelled or finished)
- cancelled: True if the future was cancelled
- result: The result if available and not cancelled
- exception: The exception if available and not cancelled
"""
# Fast path: check if already finished without lock
if self._state == FINISHED:
return True, False, self._result, self._exception

# Need lock for other states since they can change
with self._condition:
# We have to check the state again after acquiring the lock
# because it may have changed in the meantime.
if self._state == FINISHED:
return True, False, self._result, self._exception
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
return True, True, None, None
return False, False, None, None

__class_getitem__ = classmethod(types.GenericAlias)

class Executor(object):
Expand Down
58 changes: 54 additions & 4 deletionsLib/test/test_asyncio/test_futures.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -413,23 +413,23 @@ def func_repr(func):
def test_copy_state(self):
from asyncio.futures import _copy_future_state

f =self._new_future(loop=self.loop)
f =concurrent.futures.Future()
f.set_result(10)

newf = self._new_future(loop=self.loop)
_copy_future_state(f, newf)
self.assertTrue(newf.done())
self.assertEqual(newf.result(), 10)

f_exception =self._new_future(loop=self.loop)
f_exception =concurrent.futures.Future()
f_exception.set_exception(RuntimeError())

newf_exception = self._new_future(loop=self.loop)
_copy_future_state(f_exception, newf_exception)
self.assertTrue(newf_exception.done())
self.assertRaises(RuntimeError, newf_exception.result)

f_cancelled =self._new_future(loop=self.loop)
f_cancelled =concurrent.futures.Future()
f_cancelled.cancel()

newf_cancelled = self._new_future(loop=self.loop)
Expand All@@ -441,7 +441,7 @@ def test_copy_state(self):
except BaseException as e:
f_exc = e

f_conexc =self._new_future(loop=self.loop)
f_conexc =concurrent.futures.Future()
f_conexc.set_exception(f_exc)

newf_conexc = self._new_future(loop=self.loop)
Expand All@@ -454,6 +454,56 @@ def test_copy_state(self):
newf_tb = ''.join(traceback.format_tb(newf_exc.__traceback__))
self.assertEqual(newf_tb.count('raise concurrent.futures.InvalidStateError'), 1)

def test_copy_state_from_concurrent_futures(self):
"""Test _copy_future_state from concurrent.futures.Future.

This tests the optimized path using _get_snapshot when available.
"""
from asyncio.futures import _copy_future_state

# Test with a result
f_concurrent = concurrent.futures.Future()
f_concurrent.set_result(42)
f_asyncio = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent, f_asyncio)
self.assertTrue(f_asyncio.done())
self.assertEqual(f_asyncio.result(), 42)

# Test with an exception
f_concurrent_exc = concurrent.futures.Future()
f_concurrent_exc.set_exception(ValueError("test exception"))
f_asyncio_exc = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent_exc, f_asyncio_exc)
self.assertTrue(f_asyncio_exc.done())
with self.assertRaises(ValueError) as cm:
f_asyncio_exc.result()
self.assertEqual(str(cm.exception), "test exception")

# Test with cancelled state
f_concurrent_cancelled = concurrent.futures.Future()
f_concurrent_cancelled.cancel()
f_asyncio_cancelled = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent_cancelled, f_asyncio_cancelled)
self.assertTrue(f_asyncio_cancelled.cancelled())

# Test that destination already cancelled prevents copy
f_concurrent_result = concurrent.futures.Future()
f_concurrent_result.set_result(10)
f_asyncio_precancelled = self._new_future(loop=self.loop)
f_asyncio_precancelled.cancel()
_copy_future_state(f_concurrent_result, f_asyncio_precancelled)
self.assertTrue(f_asyncio_precancelled.cancelled())

# Test exception type conversion
f_concurrent_invalid = concurrent.futures.Future()
f_concurrent_invalid.set_exception(concurrent.futures.InvalidStateError("invalid"))
f_asyncio_invalid = self._new_future(loop=self.loop)
_copy_future_state(f_concurrent_invalid, f_asyncio_invalid)
self.assertTrue(f_asyncio_invalid.done())
with self.assertRaises(asyncio.exceptions.InvalidStateError) as cm:
f_asyncio_invalid.result()
self.assertEqual(str(cm.exception), "invalid")

def test_iter(self):
fut = self._new_future(loop=self.loop)

Expand Down
57 changes: 57 additions & 0 deletionsLib/test/test_concurrent_futures/test_future.py
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -6,6 +6,7 @@
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)

from test import support
from test.support import threading_helper

from .util import (
PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE,
Expand DownExpand Up@@ -282,6 +283,62 @@ def test_multiple_set_exception(self):

self.assertEqual(f.exception(), e)

def test_get_snapshot(self):
"""Test the _get_snapshot method for atomic state retrieval."""
# Test with a pending future
f = Future()
done, cancelled, result, exception = f._get_snapshot()
self.assertFalse(done)
self.assertFalse(cancelled)
self.assertIsNone(result)
self.assertIsNone(exception)

# Test with a finished future (successful result)
f = Future()
f.set_result(42)
done, cancelled, result, exception = f._get_snapshot()
self.assertTrue(done)
self.assertFalse(cancelled)
self.assertEqual(result, 42)
self.assertIsNone(exception)

# Test with a finished future (exception)
f = Future()
exc = ValueError("test error")
f.set_exception(exc)
done, cancelled, result, exception = f._get_snapshot()
self.assertTrue(done)
self.assertFalse(cancelled)
self.assertIsNone(result)
self.assertIs(exception, exc)

# Test with a cancelled future
f = Future()
f.cancel()
done, cancelled, result, exception = f._get_snapshot()
self.assertTrue(done)
self.assertTrue(cancelled)
self.assertIsNone(result)
self.assertIsNone(exception)

# Test concurrent access (basic thread safety check)
f = Future()
f.set_result(100)
results = []

def get_snapshot():
for _ in range(1000):
snapshot = f._get_snapshot()
results.append(snapshot)

threads = [threading.Thread(target=get_snapshot) for _ in range(4)]
with threading_helper.start_threads(threads):
pass
# All snapshots should be identical for a finished future
expected = (True, False, 100, None)
for result in results:
self.assertEqual(result, expected)


def setUpModule():
setup_module()
Expand Down
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
Speed up :mod:`asyncio` performance of transferring state from thread
pool :class:`concurrent.futures.Future` by up to 4.4x. Patch by J. Nick
Koston.
Loading

[8]ページ先頭

©2009-2025 Movatter.jp