- Notifications
You must be signed in to change notification settings - Fork6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
[Dashboard] RevisitedJobManager
log fetching infra to avoid blocking the event-loop#45117
base:master
Are you sure you want to change the base?
[Dashboard] RevisitedJobManager
log fetching infra to avoid blocking the event-loop#45117
Conversation
… files (ray-project#45116)"This reverts commit4ac54f1.Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…instead moving sleeping upstream)Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…cking the event-loop)Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Tidying up;Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Swapped default to make `get_last_n_log_lines` fetch unbounded number of logs by default;Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…atforms (MacOS);Fixed INT32_MAX to be `2^32 - 1` as it shouldSigned-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
78c4297
toe38ce12
CompareSigned-off-by: Alexey Kudinkin <ak@anyscale.com>
e38ce12
to575f333
Compare@@ -30,7 +30,7 @@ | |||
# The maximum field value for int32 id's -- which is also the maximum | |||
# number of simultaneous in-flight requests. | |||
INT32_MAX = (2**31) - 1 | |||
INT32_MAX = (2**32) - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
is this safe? This is assuming it's an unsigned int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Python ints are bigints so there's no overflow.
Also can leave this change out if folks are nervous about it (perfectionist in me couldn't pass this one and not try to correct it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I don't know how this is used, but if all of it's usages are safe for this change, I'm fine with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
One usage seems related to pass the value into a grpc client, so it might not stay in pythonland
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
this is definitely a little dangerous given it seems to be used when interacting with our cpp IDs & gRPC. let's not couple it together with this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
why do we need to define our own value, instead ofsys.maxsize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Ack, will get rid of this one
@@ -30,7 +30,7 @@ | |||
# The maximum field value for int32 id's -- which is also the maximum | |||
# number of simultaneous in-flight requests. | |||
INT32_MAX = (2**31) - 1 | |||
INT32_MAX = (2**32) - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
I don't know how this is used, but if all of it's usages are safe for this change, I'm fine with it.
@@ -30,7 +30,7 @@ | |||
# The maximum field value for int32 id's -- which is also the maximum | |||
# number of simultaneous in-flight requests. | |||
INT32_MAX = (2**31) - 1 | |||
INT32_MAX = (2**32) - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
One usage seems related to pass the value into a grpc client, so it might not stay in pythonland
# NOTE: Sending chunk over the web-socket is an async operation, | ||
# allowing sync tailing iteration to yield the event-loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
What does this mean?tail_job_logs
is an async iterator. so don't know what you mean by "sync tailing iteration"
NUM_LOG_LINES_ON_ERROR = 10 | ||
#Maximum number of characters to print out of the logs to avoid | ||
MAX_LOG_LINES_ON_ERROR = 10 | ||
#Max number of characters to print out of the logs to avoid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
let's please avoid pure renaming/refactoring in addition to logic changes. it makes our lives harder as reviewers to cut through the noise and increases the chance of bugs slipping through
# log_tail_iter can return batches of lines at a time. | ||
for line in lines: | ||
log_tail_deque.append(line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
# log_tail_iter can return batches of lines at a time. | |
forlineinlines: | |
log_tail_deque.append(line) | |
# log_tail_iter can return batches of lines at a time. | |
log_tail_deque.extend(lines) |
@@ -84,8 +88,8 @@ def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]: | |||
# - We accumulated at least MAX_CHUNK_CHAR_LENGTH total chars |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
update comment above with hardcoded10
def tail_logs( | ||
self, | ||
job_id: str, | ||
*, | ||
max_lines_per_chunk: int = MAX_LINES_PER_CHUNK, | ||
) -> Iterator[List[str]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
it might make more sense to convert this iterator toasync
directly because this is called in other places where we similarly need to avoid blocking the loop
num_log_lines: The number of lines to return. | ||
max_log_lines: The number of lines to return. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
again please avoid unnecessary pure refactoring
if read_lines_count % max_lines_per_chunk == 1: | ||
await asyncio.sleep(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
couldn't this avoid yielding if batch sizes not matchingmax_lines_per_chunk
are returned repeatedly?
given thatself.tail_logs
already takes themax_lines_per_chunk
, why not just always yield the loop each iteration?
@@ -30,7 +30,7 @@ | |||
# The maximum field value for int32 id's -- which is also the maximum | |||
# number of simultaneous in-flight requests. | |||
INT32_MAX = (2**31) - 1 | |||
INT32_MAX = (2**32) - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
this is definitely a little dangerous given it seems to be used when interacting with our cpp IDs & gRPC. let's not couple it together with this change
*, | ||
max_lines_per_chunk: int = MAX_LINES_PER_CHUNK, | ||
max_chunk_char_size: int = MAX_CHUNK_CHAR_SIZE, | ||
) -> Iterator[Optional[List[str]]]: | ||
"""Yield lines from a file as it's written. | ||
Returns lines in batches of up to 10 lines or 20000 characters, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
this docstring can be updated?
@@ -105,9 +109,6 @@ def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]: | |||
# Add line to current chunk | |||
lines.append(curr_line) | |||
chunk_char_count += len(curr_line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
here thetime.sleep(1)
is removed making it a busy loop if all logs are read. What about:
change the signature to returnAsyncIterator[Optional[List[str]]]
or evenAsyncIterator[List[str]]
, and if it reached EOF, doasyncio.sleep(1)
.
And consequently changeJobLogStorageClient.tail_logs
to returnAsyncIterator
.
In fact this works nicely soJobManager.tail_job_logs
no longer needs its ownasyncio.sleep
.
@@ -30,7 +30,7 @@ | |||
# The maximum field value for int32 id's -- which is also the maximum | |||
# number of simultaneous in-flight requests. | |||
INT32_MAX = (2**31) - 1 | |||
INT32_MAX = (2**32) - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
why do we need to define our own value, instead ofsys.maxsize
max_lines_per_chunk = min(self.MAX_LINES_PER_CHUNK, max_log_lines) | ||
read_lines_count = 0 | ||
for lines in self.tail_logs(job_id, max_lines_per_chunk=max_lines_per_chunk): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
As I commented if we change the iterator to AsyncIterator this can beasync for
and we no longer need aawait asyncio.sleep(0)
.
Changes
Currently, some operations involving fetching logs for Ray Jobs is implemented in a way that could be blocking the event-loop.
This PR revisits some of these operations to make sure that we appropriately release the event-loop frequently to avoid stalls.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.