Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

[Dashboard] RevisitedJobManager log fetching infra to avoid blocking the event-loop#45117

Open
alexeykudinkin wants to merge17 commits intoray-project:master
base:master
Choose a base branch
Loading
fromalexeykudinkin:ak/dshb-asnc-tl-logs-fix

Conversation

alexeykudinkin
Copy link
Contributor

@alexeykudinkinalexeykudinkin commentedMay 2, 2024
edited
Loading

Changes

Currently, some operations involving fetching logs for Ray Jobs is implemented in a way that could be blocking the event-loop.

This PR revisits some of these operations to make sure that we appropriately release the event-loop frequently to avoid stalls.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e.,git commit -s) in this PR.
  • I've runscripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed forhttps://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it indoc/source/tune/api/ under the
      corresponding.rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures athttps://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

… files (ray-project#45116)"This reverts commit4ac54f1.Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…instead moving sleeping upstream)Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…cking the event-loop)Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Tidying up;Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Swapped default to make `get_last_n_log_lines` fetch unbounded number of logs by default;Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…atforms (MacOS);Fixed INT32_MAX to be `2^32 - 1` as it shouldSigned-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkinalexeykudinkinforce-pushed theak/dshb-asnc-tl-logs-fix branch from78c4297 toe38ce12CompareMay 2, 2024 22:23
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkinalexeykudinkinforce-pushed theak/dshb-asnc-tl-logs-fix branch frome38ce12 to575f333CompareMay 2, 2024 22:28
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@@ -30,7 +30,7 @@

# The maximum field value for int32 id's -- which is also the maximum
# number of simultaneous in-flight requests.
INT32_MAX = (2**31) - 1
INT32_MAX = (2**32) - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

is this safe? This is assuming it's an unsigned int?

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Python ints are bigints so there's no overflow.

Also can leave this change out if folks are nervous about it (perfectionist in me couldn't pass this one and not try to correct it)

Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I don't know how this is used, but if all of it's usages are safe for this change, I'm fine with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

One usage seems related to pass the value into a grpc client, so it might not stay in pythonland

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

this is definitely a little dangerous given it seems to be used when interacting with our cpp IDs & gRPC. let's not couple it together with this change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

why do we need to define our own value, instead ofsys.maxsize

alexeykudinkin reacted with thumbs up emoji
Copy link
ContributorAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Ack, will get rid of this one

@@ -30,7 +30,7 @@

# The maximum field value for int32 id's -- which is also the maximum
# number of simultaneous in-flight requests.
INT32_MAX = (2**31) - 1
INT32_MAX = (2**32) - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

I don't know how this is used, but if all of it's usages are safe for this change, I'm fine with it.

@@ -30,7 +30,7 @@

# The maximum field value for int32 id's -- which is also the maximum
# number of simultaneous in-flight requests.
INT32_MAX = (2**31) - 1
INT32_MAX = (2**32) - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

One usage seems related to pass the value into a grpc client, so it might not stay in pythonland

Comment on lines +193 to +194
# NOTE: Sending chunk over the web-socket is an async operation,
# allowing sync tailing iteration to yield the event-loop
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

What does this mean?tail_job_logs is an async iterator. so don't know what you mean by "sync tailing iteration"

Comment on lines -16 to +21
NUM_LOG_LINES_ON_ERROR = 10
#Maximum number of characters to print out of the logs to avoid
MAX_LOG_LINES_ON_ERROR = 10
#Max number of characters to print out of the logs to avoid
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

let's please avoid pure renaming/refactoring in addition to logic changes. it makes our lives harder as reviewers to cut through the noise and increases the chance of bugs slipping through

Comment on lines +64 to +66
# log_tail_iter can return batches of lines at a time.
for line in lines:
log_tail_deque.append(line)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Suggested change
# log_tail_iter can return batches of lines at a time.
forlineinlines:
log_tail_deque.append(line)
# log_tail_iter can return batches of lines at a time.
log_tail_deque.extend(lines)

@@ -84,8 +88,8 @@ def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]:
# - We accumulated at least MAX_CHUNK_CHAR_LENGTH total chars
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

update comment above with hardcoded10

Comment on lines +29 to +34
def tail_logs(
self,
job_id: str,
*,
max_lines_per_chunk: int = MAX_LINES_PER_CHUNK,
) -> Iterator[List[str]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

it might make more sense to convert this iterator toasync directly because this is called in other places where we similarly need to avoid blocking the loop

Comment on lines -40 to +53
num_log_lines: The number of lines to return.
max_log_lines: The number of lines to return.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

again please avoid unnecessary pure refactoring

Comment on lines +72 to +73
if read_lines_count % max_lines_per_chunk == 1:
await asyncio.sleep(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

couldn't this avoid yielding if batch sizes not matchingmax_lines_per_chunk are returned repeatedly?

given thatself.tail_logs already takes themax_lines_per_chunk, why not just always yield the loop each iteration?

@@ -30,7 +30,7 @@

# The maximum field value for int32 id's -- which is also the maximum
# number of simultaneous in-flight requests.
INT32_MAX = (2**31) - 1
INT32_MAX = (2**32) - 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

this is definitely a little dangerous given it seems to be used when interacting with our cpp IDs & gRPC. let's not couple it together with this change

*,
max_lines_per_chunk: int = MAX_LINES_PER_CHUNK,
max_chunk_char_size: int = MAX_CHUNK_CHAR_SIZE,
) -> Iterator[Optional[List[str]]]:
"""Yield lines from a file as it's written.

Returns lines in batches of up to 10 lines or 20000 characters,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

this docstring can be updated?

@@ -105,9 +109,6 @@ def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]:
# Add line to current chunk
lines.append(curr_line)
chunk_char_count += len(curr_line)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

here thetime.sleep(1) is removed making it a busy loop if all logs are read. What about:

change the signature to returnAsyncIterator[Optional[List[str]]] or evenAsyncIterator[List[str]], and if it reached EOF, doasyncio.sleep(1).

And consequently changeJobLogStorageClient.tail_logs to returnAsyncIterator.

In fact this works nicely soJobManager.tail_job_logs no longer needs its ownasyncio.sleep.

edoakes reacted with thumbs up emoji
@@ -30,7 +30,7 @@

# The maximum field value for int32 id's -- which is also the maximum
# number of simultaneous in-flight requests.
INT32_MAX = (2**31) - 1
INT32_MAX = (2**32) - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

why do we need to define our own value, instead ofsys.maxsize

alexeykudinkin reacted with thumbs up emoji
max_lines_per_chunk = min(self.MAX_LINES_PER_CHUNK, max_log_lines)
read_lines_count = 0

for lines in self.tail_logs(job_id, max_lines_per_chunk=max_lines_per_chunk):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

As I commented if we change the iterator to AsyncIterator this can beasync for and we no longer need aawait asyncio.sleep(0).

Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment
Reviewers

@edoakesedoakesedoakes left review comments

@rynewangrynewangrynewang left review comments

@alanwguoalanwguoalanwguo approved these changes

Assignees
No one assigned
Labels
None yet
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

4 participants
@alexeykudinkin@alanwguo@edoakes@rynewang

[8]ページ先頭

©2009-2025 Movatter.jp