Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

gh-74028:concurrent.futures.Executor.map: introducebuffersize param for lazier behavior#125663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
picnixz merged 45 commits intopython:mainfromebonnal:fix-issue-29842
Mar 13, 2025

Conversation

@ebonnal
Copy link
Contributor

@ebonnalebonnal commentedOct 17, 2024
edited
Loading

Context recap (#74028)

Let's consider that we have an inputiterable andN = len(iterable).

Currentconcurrent.futures.Executor.map is$O(N)$ in space (unecessarily expensive on large iterables, completely impossible to use on infinite iterables):
The callresults: Iterator = executor.map(func, iterable) iterates over all the elements of theiterable, submitting$N$ tasks to theexecutor (futures collected into a list of size$N$). Following calls tonext(results) take the oldest future from the list (FIFO), then wait for its result and return it.

Proposal: add an optionalbuffersize param

With this proposal, the callresults: Iterator = executor.map(func, iterable, buffersize=b) will iterate only over the first$b$ elements ofiterable, submitting$b$ tasks to theexecutor (futures stored in the bufferdeque) and then will return the results iterator.

Calls tonext(results) will get the next input element fromiterable and submit a task to theexecutor for it (enqueuing another future), then wait for the oldest future in the buffer queue to complete (FIFO), then return the result.

Benefits:

  • The space complexity becomes$O(b)$
  • When using abuffersize the client code takes back the control over the speed of iteration over the inputiterable: after an initial spike of$b$ calls tofunc to fill the buffer, the iteration over inputiterable will follow the rate of the iteration over theresults (controlled by the client), which is critical whenfunc involves talking to services that you don't want to overload.

Why a new PR

It turns out it is very similar to the initial work of@MojoVampire in#707 back in 2017 (followed up by@graingert in#18566 and@Jason-Y-Z in#114975): use a queue of fixed size to hold the not-yet-yielded future results.

In addition this PR:

  • uses the intuitive term"buffer"
  • decouple the buffer size from the number of workers
  • unaltered default behavior: keeps the exact same current list-based behavior whenbuffersize=None (default)
  • integrates concisely into existing logic

📚 Documentation preview 📚:https://cpython-previews--125663.org.readthedocs.build/

vinismarques, laurylopes, hdemonta, steven1u, digiz3d, ebonnal, and wengh reacted with thumbs up emoji
@ghost
Copy link

ghost commentedOct 17, 2024
edited by ghost
Loading

All commit authors signed the Contributor License Agreement.
CLA signed

@bedevere-app
Copy link

Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply theskip news label instead.

@bedevere-app
Copy link

Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply theskip news label instead.

@bedevere-app
Copy link

Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply theskip news label instead.

1 similar comment
@bedevere-app
Copy link

Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply theskip news label instead.

@bedevere-app
Copy link

Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply theskip news label instead.

@bedevere-app
Copy link

Most changes to Pythonrequire a NEWS entry. Add one using theblurb_it web app or theblurb command-line tool.

If this change has little impact on Python users, wait for a maintainer to apply theskip news label instead.

@Zheaoli
Copy link
Contributor

Thanks for the PR.

First, I think this is a big behavior change for Executor. I think we need to discuss it in thehttps://discuss.python.org/ first.

In my personal opinion, I think this is not a good choice to add thebuffersize argument to the api. For now, the API design is based on the originalmap API. I think this argument will bring more inconsistent into the codebase. And BTW, even if we need the buffersize argument, I think it's not reasonable toforbids the usage of both timeout and buffersize at the same time

The returned iterator raises aTimeoutError ifnext() is called and the result isn’t available after timeout seconds from the original call toExecutor.map().

@ebonnalebonnalforce-pushed thefix-issue-29842 branch 2 times, most recently from9eef605 toe5c867aCompareOctober 18, 2024 13:49
@ebonnal
Copy link
ContributorAuthor

ebonnal commentedOct 18, 2024
edited
Loading

Hi@Zheaoli, thank you for your comment!

First, I think this is a big behavior change for Executor.

You mean bigalternative behavior, right? (the default behavior when ommittingbuffersize remaining unchanged)

I think we need to discuss it in thehttps://discuss.python.org/ first.

Fair, I will start a thread there and ping you.

For now, the API design is based on the original map API. I think this argument will bring more inconsistent into the codebase.

I'm not sure to get it, could you detail that point? 🙏🏻

I think it's not reasonable to forbids the usage of both timeout and buffersize at the same time

You are completely right, makes more sense! I have fixed that (commit)

@Zheaoli
Copy link
Contributor

I'm not sure to get it, could you detail that point? 🙏🏻

For me, the basicmap API's behavior is when we put an infinite iterator, the result would be infinite and only stop when the iterator has been stoped. I think we need to keep the same behavior betweenmap andexecutor.map

@ebonnal
Copy link
ContributorAuthor

ebonnal commentedOct 20, 2024
edited
Loading

Hi@Zheaoli

For me, the basic map API's behavior is when we put an infinite iterator, the result would be infinite and only stop when the iterator has been stoped. I think we need to keep the same behavior between map and executor.map

There may be a misunderstanding here, the goal of this PR is precisely to makeExecutor.map closer to the builtinmap behavior, i.e. make it lazier. (map and currentexecutor.mapdo not have the same behavior)

I will recap the behaviors so that everybody is on the same page:

built-inmap

infinite_iterator=itertools.count(0)# a `map` instance is created and the func and iterable are just stored as attributesmapped_iterator=map(str,infinite_iterator)# retrieves the first element of its input iterator, applies# the transformation and returns the resultassertnext(mapped_iterator)=="0"# the next element in the input iterator is the 2ndassertnext(infinite_iterator)==1# one can next infinitelyassertnext(mapped_iterator)=="2"assertnext(mapped_iterator)=="3"assertnext(mapped_iterator)=="4"assertnext(mapped_iterator)=="5" ...

Executor.map withoutbuffersize (= currentExecutor.map)

infinite_iterator=itertools.count(0)# this line runs FOREVER, trying to iterate over input iterator until exhaustionmapped_iterator=executor.map(str,infinite_iterator)

⏫ this line will run forever because it collects the entire input iterable eagerly, in order to build the entire future results listfs = [self.submit(fn, *args) for args in zip(*iterables)] which requires infinite time and memory.

Executor.map withbuffersize

infinite_iterator=itertools.count(0)# retrieves the first 2 elements (=buffersize) and submits 2 tasks for themmapped_iterator=executor.map(str,infinite_iterator,buffersize=2)# retrieves the 3rd element of input iterator and submits a task for it,# then wait for the oldest future in the buffer to complete and returns the resultassertnext(mapped_iterator)=="0"# the next element of the input iterator is the 4thassertnext(infinite_iterator)==3# one can next infinitely while only a buffer of finite not-yet-yielded future results is kept in memoryassertnext(mapped_iterator)=="1"assertnext(mapped_iterator)=="2"assertnext(mapped_iterator)=="4"assertnext(mapped_iterator)=="5" ...

note

I used the example of an infinite input iterator because this is an example where currentExecutor.map is just unusable at all. But even forfinite input iterables, if a developer writesmapped_iterator = executor.map(fn, iterable), they often don’t want the iterable to be eagerly exhausted right away, but rather to be iterated at the same rate asmapped_iterator. This PR's proposal is to allow them to do so by setting abuffersize.

@ebonnal
Copy link
ContributorAuthor

hey@rruuaanng, fyi I have applied your requested changes regarding the integration of unit tests into existing class 🙏🏻

args_iter=iter(zip(*iterables))
ifbuffersize:
fs=collections.deque(
self.submit(fn,*args)forargsinislice(args_iter,buffersize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Isn't buffersize empty? Can you introduce it? (Forgive me for not understanding it).

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

absolutely np, thank you for taking the time to review my proposal. To be sure to understand the question well, what do you mean by"Isn't buffersize empty?"

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Hey@rruuaanng , I have reworked the PR's description, I hope it makes things clearer!

@ebonnal
Copy link
ContributorAuthor

Hey@NewUserHa@AA-Turner@serhiy-storchaka, this may interest you given your recent activity on#14221 🙏🏻

if (
buffersize
and (executor:=executor_weakref())
and (args:=next(args_iter,None))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

args may be empty, so you need to check forargs is not None

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Are you refering to the case where one callexecutor.map(func) without any input iterable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Yes. You can't always assume thatfunc needs an input (or do you?)

Copy link
ContributorAuthor

@ebonnalebonnalDec 3, 2024
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

you are right! But in such a case we don't enter thewhile fs: (fs being empty in that case), right?

Copy link
ContributorAuthor

@ebonnalebonnalDec 5, 2024
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@picnixz I have added unit tests checking the behavior with multiple input iterables and without any input iterables.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

But in such a case we don't enter the while fs.

Not necessarily. What I meant is that you callexecutor.map with an input iterable that yieldsargs = () everytime.

Note that it also doesn't hurt to checkis not None because it's probably slightly faster since otherwise you need to call__bool__ on the args being yielded.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

So for example a call likeexecutor.map(func, [()])? In such a call we getiterables = ([()],) andargs_iter = iter(zip(*([()],))) andnext(args_iter,) will be((),) (not()). You may have missed theziping in your reasoning?

In term of pure readability of the code I struggle to have an opinion, do you feel that(args := next(args_iter, None)) is not None is more natural?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

You may have missed the ziping in your reasoning?

I did :) Sorry, my bad!

do you feel that (args := next(args_iter, None)) is not None is more natural?

I feel it would at least help avoiding questions like mine! (and it would still be probably slightly better performance wise but this claim is just my gut feeling).

Copy link
ContributorAuthor

@ebonnalebonnalDec 5, 2024
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@picnixz oh yes I see... I have renamedargs_iter into a more self-explanatoryzipped_iterables, do you think it would be enough to avoid the confusion?

(Because I am scared that the addition ofis not None may misslead some of our fellow pythonistas wondering "wait, why is this not None check necessary here, what am I missing here 🤔?")

Copy link
Member

@picnixzpicnixzDec 13, 2024
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Personally, I like having theis not None just so that I don't have to wonder what'sargs_iter is precisely yielding. I can assume that it's yielding a tuple-like object, but I don't necessarily know the shape of that tuple. Sois not None discriminates probable items and the sentinel value. So I'd say it's still pythonic.

Performance-wise it should be roughly the same (one checks that the tuple's size != 0 and the other just compares if it's the None singleton but both are essentially a single comparison).

Now up to you. If others didn't observe (like me) thatargs_iter never yields an empty tuple, then it's probably better to keep theis not None check for clarity.

Copy link
ContributorAuthor

@ebonnalebonnal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Many thanks for your review@picnixz 🙏🏻 !

if (
buffersize
and (executor:=executor_weakref())
and (args:=next(args_iter,None))
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Are you refering to the case where one callexecutor.map(func) without any input iterable?

Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
@picnixz
Copy link
Member

I'll rerun the CI tomorrow if it still fails, don't worry

ebonnal reacted with heart emoji

Copy link
Member

@serhiy-storchakaserhiy-storchaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

LGTM. 👍

ebonnal reacted with rocket emoji
Copy link
Member

@picnixzpicnixz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I think the new line I wanted you to add has been gobbled by another commit :(

Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
@picnixzpicnixz self-assigned thisMar 12, 2025
@picnixz
Copy link
Member

I'll merge this tomorrow or on Friday (today is review's day!)

ebonnal reacted with hooray emoji

@picnixzpicnixz merged commita005835 intopython:mainMar 13, 2025
39 checks passed
@picnixz
Copy link
Member

Thank you for the contribution!

ebonnal reacted with heart emoji

@ebonnal
Copy link
ContributorAuthor

ebonnal commentedMar 13, 2025
edited
Loading

Thank you@picnixz@gpshead@serhiy-storchaka for making this change go through! 🙏🏻

@ebonnalebonnal deleted the fix-issue-29842 branchMarch 13, 2025 17:26
plashchynski pushed a commit to plashchynski/cpython that referenced this pull requestMar 17, 2025
…cutor.map` for lazier behavior (python#125663)`concurrent.futures.Executor.map` now supports limiting the number of submittedtasks whose results have not yet been yielded via the new `buffersize` parameter.---------Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
plashchynski pushed a commit to plashchynski/cpython that referenced this pull requestMar 17, 2025
seehwan pushed a commit to seehwan/cpython that referenced this pull requestApr 16, 2025
…cutor.map` for lazier behavior (python#125663)`concurrent.futures.Executor.map` now supports limiting the number of submittedtasks whose results have not yet been yielded via the new `buffersize` parameter.---------Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
seehwan pushed a commit to seehwan/cpython that referenced this pull requestApr 16, 2025
# reverse to keep finishing order
fs.reverse()
whilefs:
if (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

@ebonnal I believe you got this part slightly wrong, "off-by-one". IIUC, the number of pending futures cannot be larger thanbuffsize. However, after the initial submission ofbuffsize tasks before, here in this branch you are appending an EXTRA task to the queue, and now you havebuffsize + 1 tasks that have potentially not yielded yet.

Fortunately, looks like the fix is trivial: you simply have to yield first, next append to the queue:

diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.pyindex d98b1ebdd58..3b9ccf4d651 100644--- a/Lib/concurrent/futures/_base.py+++ b/Lib/concurrent/futures/_base.py@@ -628,17 +628,17 @@ def result_iterator():                 # reverse to keep finishing order                 fs.reverse()                 while fs:+                    # Careful not to keep a reference to the popped future+                    if timeout is None:+                        yield _result_or_cancel(fs.pop())+                    else:+                        yield _result_or_cancel(fs.pop(), end_time - time.monotonic())                     if (                         buffersize                         and (executor := executor_weakref())                         and (args := next(zipped_iterables, None))                     ):                         fs.appendleft(executor.submit(fn, *args))-                    # Careful not to keep a reference to the popped future-                    if timeout is None:-                        yield _result_or_cancel(fs.pop())-                    else:-                        yield _result_or_cancel(fs.pop(), end_time - time.monotonic())             finally:                 for future in fs:                     future.cancel()

Copy link
ContributorAuthor

@ebonnalebonnalApr 28, 2025
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Hi@dalcinl,

TL;DR: fyi we have#131467 that is open and tackling this "off-by-one" situation. Would be great to get your review there! 🙏🏻
I have not proposed this variation at first because I think it makes sense as an optional follow up PR given that it integrates slightly less smoothly into existing logic.

You will notice that it is not "just" moving the yield before the enqueue, let's see why on a simple scenario, explaining the 3 behaviors:

Scenario:

it=executor.map(fn,iterable,buffersize=buffersize)# point Anext(it)# point Bnext(it)# point C

"enqueue -> wait -> yield" (current, introduced by this PR) behavior

  • buffersbuffersize tasks
  • at# point A there isbuffersize buffered tasks
  • 1st call tonext:
    • enqueue a task, jumping tobuffersize+1 tasks in buffer
    • wait for the next result to be available
    • yield the next result, finally going down tobuffersize tasks in buffer
  • at# point B there is stillbuffersize buffered tasks
  • 2nd call tonext: same

pro:buffersize tasks in buffer between two calls tonext
con: while waiting for the next result we havebuffersize+1 tasks in buffer

"wait -> yield -> enqueue" (your proposal) behavior

  • buffersbuffersize tasks
  • at# point A there isbuffersize buffered tasks
  • call tonext:
    • wait for the next result to be available
    • yield this result, going down tobuffersize - 1 tasks in buffer
  • at# point B there isbuffersize - 1 buffered tasks
  • call tonext:
    • enqueue a task, jumping back tobuffersize tasks in buffer
    • wait for the next result to be available
    • yield next result, going down tobuffersize - 1 tasks in buffer
  • at# point C there is stillbuffersize-1 buffered tasks

pro: never exceedbuffersize
con: between two calls tonext we have onlybuffersize - 1 tasks in buffer

"wait -> enqueue -> yield" (#131467) behavior

  • buffersbuffersize tasks
  • at# point A there isbuffersize buffered tasks
  • 1st call tonext:
    • wait for next result to be available
    • enqueue a task, jumping tobuffersize + 1 tasks in buffer
    • yield already available result without needing to wait, going back tobuffersize tasks in buffer
  • at# point B there is stillbuffersize buffered tasks
  • 2nd call tonext: same

pros:

  • buffersize tasks in buffer between two calls tonext
  • jumps tobuffersize + 1 during a call tonext but is back instantly tobuffersize because the next result is already available when we enqueued the next task (the closest we can get to a yield-and-enqueue-at-the-same-time).

Let me know if it makes sense 🙏🏻

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@serhiy-storchakaserhiy-storchakaserhiy-storchaka approved these changes

@picnixzpicnixzpicnixz approved these changes

@gpsheadgpsheadAwaiting requested review from gpshead

+2 more reviewers

@dalcinldalcinldalcinl left review comments

@rruuaanngrruuaanngrruuaanng left review comments

Reviewers whose approvals may not affect merge requirements

Assignees

@gpsheadgpshead

@picnixzpicnixz

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

9 participants

@ebonnal@Zheaoli@picnixz@hugovk@gpshead@dalcinl@serhiy-storchaka@rruuaanng@encukou

[8]ページ先頭

©2009-2025 Movatter.jp