Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork33.3k
gh-134173: Optimize concurrent.futures→asyncio state transfer with atomic snapshot#134174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Changes from2 commits
6b76a377083fc58bb0e0bb21092602577e1b0908edc4165de8839e71b78e000File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading.Please reload this page.
Jump to
Uh oh!
There was an error while loading.Please reload this page.
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -353,10 +353,24 @@ def _copy_future_state(source, dest): | ||
| The other Future may be a concurrent.futures.Future. | ||
| """ | ||
| if dest.cancelled(): | ||
| return | ||
| assert not dest.done() | ||
| # Use _get_snapshot for futures that support it | ||
| if hasattr(source, '_get_snapshot'): | ||
kumaraditya303 marked this conversation as resolved. OutdatedShow resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| done, cancelled, result, exception = source._get_snapshot() | ||
| assert done | ||
| if cancelled: | ||
| dest.cancel() | ||
| elif exception is not None: | ||
| dest.set_exception(_convert_future_exc(exception)) | ||
| else: | ||
| dest.set_result(result) | ||
| return | ||
| # Traditional fallback needs done check | ||
| assert source.done() | ||
| if source.cancelled(): | ||
| dest.cancel() | ||
| else: | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -282,6 +282,68 @@ def test_multiple_set_exception(self): | ||
| self.assertEqual(f.exception(), e) | ||
| def test_get_snapshot(self): | ||
| """Test the _get_snapshot method for atomic state retrieval.""" | ||
| # Test with a pending future | ||
| f = Future() | ||
| done, cancelled, result, exception = f._get_snapshot() | ||
| self.assertFalse(done) | ||
| self.assertFalse(cancelled) | ||
| self.assertIsNone(result) | ||
| self.assertIsNone(exception) | ||
| # Test with a finished future (successful result) | ||
| f = Future() | ||
| f.set_result(42) | ||
| done, cancelled, result, exception = f._get_snapshot() | ||
| self.assertTrue(done) | ||
| self.assertFalse(cancelled) | ||
| self.assertEqual(result, 42) | ||
| self.assertIsNone(exception) | ||
| # Test with a finished future (exception) | ||
| f = Future() | ||
| exc = ValueError("test error") | ||
| f.set_exception(exc) | ||
| done, cancelled, result, exception = f._get_snapshot() | ||
| self.assertTrue(done) | ||
| self.assertFalse(cancelled) | ||
| self.assertIsNone(result) | ||
| self.assertEqual(exception, exc) | ||
bdraco marked this conversation as resolved. OutdatedShow resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| # Test with a cancelled future | ||
| f = Future() | ||
| f.cancel() | ||
| done, cancelled, result, exception = f._get_snapshot() | ||
| self.assertTrue(done) | ||
| self.assertTrue(cancelled) | ||
| self.assertIsNone(result) | ||
| self.assertIsNone(exception) | ||
| # Test concurrent access (basic thread safety check) | ||
| f = Future() | ||
| f.set_result(100) | ||
| results = [] | ||
| def get_snapshot(): | ||
| for _ in range(1000): | ||
| snapshot = f._get_snapshot() | ||
| results.append(snapshot) | ||
| threads = [] | ||
| for _ in range(4): | ||
| t = threading.Thread(target=get_snapshot) | ||
| threads.append(t) | ||
| t.start() | ||
| for t in threads: | ||
kumaraditya303 marked this conversation as resolved. OutdatedShow resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| t.join() | ||
| # All snapshots should be identical for a finished future | ||
| expected = (True, False, 100, None) | ||
| for result in results: | ||
| self.assertEqual(result, expected) | ||
| def setUpModule(): | ||
| setup_module() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| Add ``_get_snapshot()`` method to:class:`concurrent.futures.Future` to | ||
| atomically retrieve all future state in a single lock acquisition. This speeds | ||
| up:mod:`asyncio`'s ``_copy_future_state()`` by up to 4x when transferring state | ||
kumaraditya303 marked this conversation as resolved. OutdatedShow resolvedHide resolvedUh oh!There was an error while loading.Please reload this page. | ||
| from:class:`concurrent.futures.Future` to:class:`asyncio.Future`. Patch by J. Nick Koston. | ||
Uh oh!
There was an error while loading.Please reload this page.