Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Fix blocking worker queue when scheduling in parallel#3013

Open
riga wants to merge5 commits intospotify:master
base:master
Choose a base branch
Loading
fromriga:fix/blocking_parallel_scheduling

Conversation

riga
Copy link
Contributor

@rigariga commentedOct 28, 2020
edited
Loading

Hi,

this PR is meant to fix* and improve the parallel scheduling of tasks. To demonstrate what is currently failing (* therefore the fix), I added the WIP label and only added one commit with a simple test that currently, against my expectation, does not pass but blocks the entire process.

Description

The blocking occurs when parallel scheduling is enabled but a task is not picklable. The scheduling is mostly implemented inWorker.add,

luigi/luigi/worker.py

Lines 730 to 774 incf2abbd

defadd(self,task,multiprocess=False,processes=0):
"""
Add a Task for the worker to check and possibly schedule and run.
Returns True if task and its dependencies were successfully scheduled or completed before.
"""
ifself._first_taskisNoneandhasattr(task,'task_id'):
self._first_task=task.task_id
self.add_succeeded=True
ifmultiprocess:
queue=multiprocessing.Manager().Queue()
pool=multiprocessing.Pool(processes=processesifprocesses>0elseNone)
else:
queue=DequeQueue()
pool=SingleProcessPool()
self._validate_task(task)
pool.apply_async(check_complete, [task,queue])
# we track queue size ourselves because len(queue) won't work for multiprocessing
queue_size=1
try:
seen= {task.task_id}
whilequeue_size:
current=queue.get()
queue_size-=1
item,is_complete=current
fornextinself._add(item,is_complete):
ifnext.task_idnotinseen:
self._validate_task(next)
seen.add(next.task_id)
pool.apply_async(check_complete, [next,queue])
queue_size+=1
except (KeyboardInterrupt,TaskException):
raise
exceptExceptionasex:
self.add_succeeded=False
formatted_traceback=traceback.format_exc()
self._log_unexpected_error(task)
task.trigger_event(Event.BROKEN_TASK,task,ex)
self._email_unexpected_error(task,formatted_traceback)
raise
finally:
pool.close()
pool.join()
returnself.add_succeeded

where processes of a pool put the results of task completeness checks back into a queue. However, when the task is not picklable, the queue is never filled so thatqueue.get() will block forever without being able to access exceptions raised in the process. Instead, the exception is hold by theAsyncResult object returned byapply_async which is not used inWorker.add, so there is currently no handle to catch these cases.

Possible solution

IMHO, the root cause for this issue is the difference between the actualmultiprocessing.Pool.apply_async (returning an async result object) and the customSingleProcessPool.apply_async (instantly returning the result of the function call). The latter should produce something like aSyncResult with the same API asAsyncResult. Following this, one should rely on its built-in functionality to wrap exceptions (re-raised when callingget()) and a lot of the custom exception handling in worker.py would be obsolete (such asAsyncCompletionException andTracebackWrapper) which simplifies things quite a lot.

I already implemented this (all tests pass) and will push it if you agree :)

I know that you guys are (understandably) very cautious when it comes to changes in core code, but I think the parallel scheduling could really benefit from this change / fix.

Motivation and Context

Our dependency trees sometimes take O(h) to build as our completeness checks require a lot of remote resources. Therefore, we make heavy use of the parallel scheduling feature. Besides this issue/PR, we plan to submit two additional PRs in the future:

  • We noticed that a list of tasks started withinterface.build(many_tasks_in_a_list, ...) is not using the parallel scheduling at all (because ofthis loop).
  • We would like to start running tasks alreadywhile the dependency tree is being built. Most of the required functionality is actually already there and would need only minor additions.

Have you tested this? If so, how?

Yes, the test is included and will pass once the parallel scheduling is fixed.

i = luigi.IntParameter()

tasks = [UnpicklableTask(i=i) for i in range(5)]
self.assertFalse(self.schedule_parallel(tasks))
Copy link
ContributorAuthor

@rigarigaOct 28, 2020
edited
Loading

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This line never terminates as can be seen in thefirst build log.

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Fixed in043b4c6.

@rigariga changed the title[WIP] Fix blocking worker queue when scheduling in parallel[WIP, test fails intentionally] Fix blocking worker queue when scheduling in parallelOct 30, 2020
@rigariga changed the title[WIP, test fails intentionally] Fix blocking worker queue when scheduling in parallel[WIP, test fails intentionally, waiting for feedback] Fix blocking worker queue when scheduling in parallelNov 5, 2020
@sognetic
Copy link
Contributor

sognetic commentedJun 18, 2021
edited
Loading

This seems like a really interesting improvement, is there any chance of this making it into luigi? In addition, I would be also very interested in the two improvements mentioned above, could these be included in luigi without the PR suggested here?

@lallea
Copy link
Contributor

I am not a maintainer, but I think it would be great to get this in. We also use parallel scheduling to cut down latency until first task execution.

@riga
Copy link
ContributorAuthor

I essentially have all changes I mentioned above ready in personal branches (and then work kicked in and couldn't follow up on this anymore).

@dlstadther If you agree with the ideas above, I would open PRs.

@dlstadther
Copy link
Collaborator

I'm good with the submission of a fix to resolve this issue. Note that I don't utilize the Luigi project for work anymore, so i can only review as an outsider now. I'll do my best to be responsive, but times might be slow.

Also, Parallel scheduling is optional behaviour and is not enabled by default.

@rigariga requested a review froma team as acode ownerJuly 5, 2021 15:12
@rigariga changed the title[WIP, test fails intentionally, waiting for feedback] Fix blocking worker queue when scheduling in parallelFix blocking worker queue when scheduling in parallelJul 5, 2021
@riga
Copy link
ContributorAuthor

riga commentedJul 5, 2021

I committed the fix of the loop doing the completeness checks during initial scheduling.

The main change is that queue objects are no longer required for storing the results ofcomplete() calls across processes, but the current implementation rather fully relies onAsync /SyncResult's for themultiprocess.Pool /SingleProcessPool cases. Actually,AsyncResult's have been used before as well, but somewhat mixed with said queues.

IMHO, this simplifies the code quite a lot and allows to add the 2 additional features I mentioned in the PR description (in future PRs), but I understand that changes to the core require a lot of scrutiny.@dlstadther Any idea who we can include for additional review?

The tests pass, but for some reason the coverage drops by ~8% and I fail to see where / how exactly ...

@dlstadther
Copy link
Collaborator

@riga ; sorry, i've been a bit busy lately and haven't gotten to review this yet. I'll try to get to it in the next week.

As for others to review, that's a bit challenging. Unsure if@honnix or other Spotify folks can review.

@lallea
Copy link
Contributor

With the usual disclaimer that I am not a maintainer, I am familiar with the code and would like this PR to get in, so I'll contribute a review. It overall looks good. The polling itches a bit, since it might add up to measurable CPU usage for long tasks, but I don't have a suggestion for a way to avoid it that wouldn't add complexity. Luigi is built on a concept of polling, so I guess we should not be too picky. :-)

I'll add a few nitpicks inline.

break
else:
time.sleep(wait_interval)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

The for/else + continue makes the flow unclear. How about packing up the rest of the loop into a new method and calling it beforebreak?

Copy link
Collaborator

@dlstadtherdlstadther left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I have an optional comment and a question. Assuming no impact, i'm good to approve. (Sorry for taking a month to return to this review when I said I'd be a week)!

"""

def __init__(self, func, args=None, kwargs=None):
super(SyncResult, self).__init__()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

luigi isn't officially supporting py 2.7 anymore so this could just besuper().__init__(), but totally optional

Comment on lines -400 to +449
try:
is_complete = task.complete()
except Exception:
is_complete = TracebackWrapper(traceback.format_exc())
out_queue.put((task, is_complete))
return task.complete()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

What's the potential for hitting an exception here? Was the previous catch never caught and thus pointless? Or are we introducing the opportunity here for an unhandled exception?

@stale
Copy link

stalebot commentedJan 9, 2022

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions.

@stalestalebot added the wontfix labelJan 9, 2022
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@lallealallealallea left review comments

@dlstadtherdlstadtherdlstadther left review comments

@honnixhonnixAwaiting requested review from honnix

@TarraschTarraschAwaiting requested review from Tarrasch

At least 1 approving review is required to merge this pull request.

Assignees
No one assigned
Labels
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

4 participants
@riga@sognetic@lallea@dlstadther

[8]ページ先頭

©2009-2025 Movatter.jp