Uh oh!
There was an error while loading.Please reload this page.
- Notifications
You must be signed in to change notification settings - Fork33.4k
gh-74028:concurrent.futures.Executor.map: introducebuffersize param for lazier behavior#125663
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
ghost commentedOct 17, 2024 • edited by ghost
Loading Uh oh!
There was an error while loading.Please reload this page.
edited by ghost
Uh oh!
There was an error while loading.Please reload this page.
Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the |
Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the |
Uh oh!
There was an error while loading.Please reload this page.
Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the |
1 similar comment
Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the |
Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the |
Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool. If this change has little impact on Python users, wait for a maintainer to apply the |
Zheaoli commentedOct 18, 2024
Thanks for the PR. First, I think this is a big behavior change for Executor. I think we need to discuss it in thehttps://discuss.python.org/ first. In my personal opinion, I think this is not a good choice to add the
|
9eef605 toe5c867aCompareebonnal commentedOct 18, 2024 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Hi@Zheaoli, thank you for your comment!
You mean bigalternative behavior, right? (the default behavior when ommitting
Fair, I will start a thread there and ping you.
I'm not sure to get it, could you detail that point? 🙏🏻
You are completely right, makes more sense! I have fixed that (commit) |
Zheaoli commentedOct 19, 2024
For me, the basic |
ebonnal commentedOct 20, 2024 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Hi@Zheaoli
There may be a misunderstanding here, the goal of this PR is precisely to make I will recap the behaviors so that everybody is on the same page: built-in |
ebonnal commentedOct 25, 2024
hey@rruuaanng, fyi I have applied your requested changes regarding the integration of unit tests into existing class 🙏🏻 |
Uh oh!
There was an error while loading.Please reload this page.
Lib/concurrent/futures/_base.py Outdated
| args_iter=iter(zip(*iterables)) | ||
| ifbuffersize: | ||
| fs=collections.deque( | ||
| self.submit(fn,*args)forargsinislice(args_iter,buffersize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Isn't buffersize empty? Can you introduce it? (Forgive me for not understanding it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
absolutely np, thank you for taking the time to review my proposal. To be sure to understand the question well, what do you mean by"Isn't buffersize empty?"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Hey@rruuaanng , I have reworked the PR's description, I hope it makes things clearer!
ebonnal commentedOct 28, 2024
Hey@NewUserHa@AA-Turner@serhiy-storchaka, this may interest you given your recent activity on#14221 🙏🏻 |
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst OutdatedShow resolvedHide resolved
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Lib/concurrent/futures/_base.py Outdated
| if ( | ||
| buffersize | ||
| and (executor:=executor_weakref()) | ||
| and (args:=next(args_iter,None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
args may be empty, so you need to check forargs is not None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Are you refering to the case where one callexecutor.map(func) without any input iterable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Yes. You can't always assume thatfunc needs an input (or do you?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
you are right! But in such a case we don't enter thewhile fs: (fs being empty in that case), right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@picnixz I have added unit tests checking the behavior with multiple input iterables and without any input iterables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
But in such a case we don't enter the while fs.
Not necessarily. What I meant is that you callexecutor.map with an input iterable that yieldsargs = () everytime.
Note that it also doesn't hurt to checkis not None because it's probably slightly faster since otherwise you need to call__bool__ on the args being yielded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
So for example a call likeexecutor.map(func, [()])? In such a call we getiterables = ([()],) andargs_iter = iter(zip(*([()],))) andnext(args_iter,) will be((),) (not()). You may have missed theziping in your reasoning?
In term of pure readability of the code I struggle to have an opinion, do you feel that(args := next(args_iter, None)) is not None is more natural?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
You may have missed the ziping in your reasoning?
I did :) Sorry, my bad!
do you feel that (args := next(args_iter, None)) is not None is more natural?
I feel it would at least help avoiding questions like mine! (and it would still be probably slightly better performance wise but this claim is just my gut feeling).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@picnixz oh yes I see... I have renamedargs_iter into a more self-explanatoryzipped_iterables, do you think it would be enough to avoid the confusion?
(Because I am scared that the addition ofis not None may misslead some of our fellow pythonistas wondering "wait, why is this not None check necessary here, what am I missing here 🤔?")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Personally, I like having theis not None just so that I don't have to wonder what'sargs_iter is precisely yielding. I can assume that it's yielding a tuple-like object, but I don't necessarily know the shape of that tuple. Sois not None discriminates probable items and the sentinel value. So I'd say it's still pythonic.
Performance-wise it should be roughly the same (one checks that the tuple's size != 0 and the other just compares if it's the None singleton but both are essentially a single comparison).
Now up to you. If others didn't observe (like me) thatargs_iter never yields an empty tuple, then it's probably better to keep theis not None check for clarity.
ebonnal left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Many thanks for your review@picnixz 🙏🏻 !
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Lib/concurrent/futures/_base.py Outdated
| if ( | ||
| buffersize | ||
| and (executor:=executor_weakref()) | ||
| and (args:=next(args_iter,None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Are you refering to the case where one callexecutor.map(func) without any input iterable?
Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
picnixz commentedFeb 24, 2025
I'll rerun the CI tomorrow if it still fails, don't worry |
Uh oh!
There was an error while loading.Please reload this page.
serhiy-storchaka left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
LGTM. 👍
picnixz left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I think the new line I wanted you to add has been gobbled by another commit :(
Uh oh!
There was an error while loading.Please reload this page.
Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
picnixz commentedMar 12, 2025
I'll merge this tomorrow or on Friday (today is review's day!) |
a005835 intopython:mainUh oh!
There was an error while loading.Please reload this page.
picnixz commentedMar 13, 2025
Thank you for the contribution! |
ebonnal commentedMar 13, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
Thank you@picnixz@gpshead@serhiy-storchaka for making this change go through! 🙏🏻 |
…cutor.map` for lazier behavior (python#125663)`concurrent.futures.Executor.map` now supports limiting the number of submittedtasks whose results have not yet been yielded via the new `buffersize` parameter.---------Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
…cutor.map` for lazier behavior (python#125663)`concurrent.futures.Executor.map` now supports limiting the number of submittedtasks whose results have not yet been yielded via the new `buffersize` parameter.---------Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
| # reverse to keep finishing order | ||
| fs.reverse() | ||
| whilefs: | ||
| if ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
@ebonnal I believe you got this part slightly wrong, "off-by-one". IIUC, the number of pending futures cannot be larger thanbuffsize. However, after the initial submission ofbuffsize tasks before, here in this branch you are appending an EXTRA task to the queue, and now you havebuffsize + 1 tasks that have potentially not yielded yet.
Fortunately, looks like the fix is trivial: you simply have to yield first, next append to the queue:
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.pyindex d98b1ebdd58..3b9ccf4d651 100644--- a/Lib/concurrent/futures/_base.py+++ b/Lib/concurrent/futures/_base.py@@ -628,17 +628,17 @@ def result_iterator(): # reverse to keep finishing order fs.reverse() while fs:+ # Careful not to keep a reference to the popped future+ if timeout is None:+ yield _result_or_cancel(fs.pop())+ else:+ yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) if ( buffersize and (executor := executor_weakref()) and (args := next(zipped_iterables, None)) ): fs.appendleft(executor.submit(fn, *args))- # Careful not to keep a reference to the popped future- if timeout is None:- yield _result_or_cancel(fs.pop())- else:- yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) finally: for future in fs: future.cancel()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Hi@dalcinl,
TL;DR: fyi we have#131467 that is open and tackling this "off-by-one" situation. Would be great to get your review there! 🙏🏻
I have not proposed this variation at first because I think it makes sense as an optional follow up PR given that it integrates slightly less smoothly into existing logic.
You will notice that it is not "just" moving the yield before the enqueue, let's see why on a simple scenario, explaining the 3 behaviors:
Scenario:
it=executor.map(fn,iterable,buffersize=buffersize)# point Anext(it)# point Bnext(it)# point C
"enqueue -> wait -> yield" (current, introduced by this PR) behavior
- buffers
buffersizetasks - at
# point Athere isbuffersizebuffered tasks - 1st call to
next:- enqueue a task, jumping to
buffersize+1tasks in buffer - wait for the next result to be available
- yield the next result, finally going down to
buffersizetasks in buffer
- enqueue a task, jumping to
- at
# point Bthere is stillbuffersizebuffered tasks - 2nd call to
next: same
pro:buffersize tasks in buffer between two calls tonext
con: while waiting for the next result we havebuffersize+1 tasks in buffer
"wait -> yield -> enqueue" (your proposal) behavior
- buffers
buffersizetasks - at
# point Athere isbuffersizebuffered tasks - call to
next:- wait for the next result to be available
- yield this result, going down to
buffersize - 1tasks in buffer
- at
# point Bthere isbuffersize - 1buffered tasks - call to
next:- enqueue a task, jumping back to
buffersizetasks in buffer - wait for the next result to be available
- yield next result, going down to
buffersize - 1tasks in buffer
- enqueue a task, jumping back to
- at
# point Cthere is stillbuffersize-1buffered tasks
pro: never exceedbuffersize
con: between two calls tonext we have onlybuffersize - 1 tasks in buffer
"wait -> enqueue -> yield" (#131467) behavior
- buffers
buffersizetasks - at
# point Athere isbuffersizebuffered tasks - 1st call to
next:- wait for next result to be available
- enqueue a task, jumping to
buffersize + 1tasks in buffer - yield already available result without needing to wait, going back to
buffersizetasks in buffer
- at
# point Bthere is stillbuffersizebuffered tasks - 2nd call to
next: same
pros:
buffersizetasks in buffer between two calls tonext- jumps to
buffersize + 1during a call tonextbut is back instantly tobuffersizebecause the next result is already available when we enqueued the next task (the closest we can get to a yield-and-enqueue-at-the-same-time).
Let me know if it makes sense 🙏🏻
Uh oh!
There was an error while loading.Please reload this page.
Context recap (#74028)
Let's consider that we have an input
iterableandN = len(iterable).Current$O(N)$ in space (unecessarily expensive on large iterables, completely impossible to use on infinite iterables):$N$ tasks to the$N$ ). Following calls to
concurrent.futures.Executor.mapisThe call
results: Iterator = executor.map(func, iterable)iterates over all the elements of theiterable, submittingexecutor(futures collected into a list of sizenext(results)take the oldest future from the list (FIFO), then wait for its result and return it.Proposal: add an optional
buffersizeparamWith this proposal, the call$b$ elements of$b$ tasks to the
results: Iterator = executor.map(func, iterable, buffersize=b)will iterate only over the firstiterable, submittingexecutor(futures stored in the bufferdeque) and then will return the results iterator.Calls to
next(results)will get the next input element fromiterableand submit a task to theexecutorfor it (enqueuing another future), then wait for the oldest future in the buffer queue to complete (FIFO), then return the result.Benefits:
buffersizethe client code takes back the control over the speed of iteration over the inputiterable: after an initial spike offuncto fill the buffer, the iteration over inputiterablewill follow the rate of the iteration over theresults(controlled by the client), which is critical whenfuncinvolves talking to services that you don't want to overload.Why a new PR
It turns out it is very similar to the initial work of@MojoVampire in#707 back in 2017 (followed up by@graingert in#18566 and@Jason-Y-Z in#114975): use a queue of fixed size to hold the not-yet-yielded future results.
In addition this PR:
buffersize=None(default)📚 Documentation preview 📚:https://cpython-previews--125663.org.readthedocs.build/