Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

c10d/Store: add queues#150969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Closed
d4l3k wants to merge1 commit intomainfromd4l3k/store_queue
Closed

c10d/Store: add queues#150969

d4l3k wants to merge1 commit intomainfromd4l3k/store_queue

Conversation

@d4l3k
Copy link
Member

@d4l3kd4l3k commentedApr 9, 2025
edited
Loading

This adds queue operations as described in#150943.

This works by adding two new operationsqueue_push andqueue_pop. The semantics are designed to be blocking with a timeout. Pushing will always succeed as the queue is infinite size. Popping will first callwait until the key is ready and then pop the value from the queue.

This implements queues for only: HashStore, TCPStore w/ libuv. FileStore and the legacy backends are not supported.

wait andcheck work for queue operations though queue_push will only wake up the first waiter rather than all of them.

This also has a few cleanups to error types/documentation in related code.

Example trace:

[I409 16:51:43.963833529 TCPStoreLibUvBackend.cpp:829] [c10d - trace] validate magic:1015412686 address:[localhost]:55816[I409 16:51:43.963845838 TCPStoreLibUvBackend.cpp:842] [c10d - trace] ping nonce:2840795 address:[localhost]:55816[I409 16:51:43.963902914 TCPStoreLibUvBackend.cpp:911] [c10d - trace] add key:init/ val:1 address:[localhost]:55816[I409 16:51:43.963939389 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:init/ address:[localhost]:55816[I409 16:51:43.963974842 TCPStoreLibUvBackend.cpp:893] [c10d - trace] get key:init/ address:[localhost]:55816[I409 16:51:43.964071909 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/test_queue_support address:[localhost]:55816[I409 16:51:43.964080221 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964108584 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964123207 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964128194 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964156347 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964187493 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964217709 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964324300 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964354495 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964416299 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964458733 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/non_existant address:[localhost]:55816[W409 16:51:43.974516585 socket.cpp:460] [c10d] waitForInput: poll for socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) returned 0, likely a timeout[W409 16:51:43.974559169 socket.cpp:485] [c10d] waitForInput: socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) timed out after 10ms[I409 16:51:43.974600451 TCPStoreLibUvBackend.cpp:1101] [c10d - trace] cancel_wait address:[localhost]:55816

Test plan:

$ pytest test/distributed/test_store.py -k queue -v -s                                                                                                                                                                                                                               test/distributed/test_store.py::FileStoreTest::test_queues SKIPPED [0.4351s] (Store does not support queues)test/distributed/test_store.py::HashStoreTest::test_queues PASSED [0.0009s]test/distributed/test_store.py::PrefixFileStoreTest::test_queues SKIPPED [0.0006s] (Store does not support queues)test/distributed/test_store.py::TCPStoreTest::test_queues SKIPPED [0.0012s] (Store does not support queues)test/distributed/test_store.py::LibUvTCPStoreTest::test_queues PASSED [0.0014s]test/distributed/test_store.py::PrefixTCPStoreTest::test_queues PASSED [0.0014s]

cc@H-Huang@awgu@wanchaol@fegin@fduwjj@wz337@wconstab

@d4l3kd4l3k requested review fromfduwjj andwconstabApril 9, 2025 23:52
@pytorch-botpytorch-botbot added oncall: distributedAdd this issue/PR to distributed oncall triage queue release notes: distributed (c10d)release notes category labelsApr 9, 2025
@pytorch-bot
Copy link

pytorch-botbot commentedApr 9, 2025
edited
Loading

🔗 Helpful Links

🧪 See artifacts and rendered test results athud.pytorch.org/pr/150969

Note: Links to docs will display an error until the docs builds have been completed.

✅ No Failures

As of commite2cde44 with merge basef136443 (image):
💚 Looks good so far! There are no failures yet. 💚

This comment was automatically generated by Dr. CI and updates every 15 minutes.

@d4l3kd4l3k requested a review fromXilunWuApril 9, 2025 23:53
Copy link
Contributor

@XilunWuXilunWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

overall LGTM! Left some questions.

Copy link
Contributor

@fduwjjfduwjj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Overall lgtm. I just have some questions for the libuv part changes


// Queues
std::unordered_map<std::string, std::deque<std::vector<uint8_t>>> queues_;
std::unordered_map<std::string, std::deque<c10::intrusive_ptr<UvHandle>>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

looks like this is not being used?

d4l3k and XilunWu reacted with thumbs up emoji
Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

good catch -- this was before I switched to using WAIT

}
}

voidLibUVStoreDaemon::wakeupOneWaitingClient(const std::string& key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Just n00b question, why we don't have this for other operations within libuv backend?

Copy link
MemberAuthor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

Normally with set/wait all of the waiting clients get woken up. With queues we only want a single worker to wake up and try and get the item so that's why we need a special "wakeupOne"

@d4l3kd4l3kforce-pushed thed4l3k/store_queue branch 3 times, most recently from4079813 to6ce1313CompareApril 10, 2025 16:18
@d4l3k
Copy link
MemberAuthor

@pytorchbot merge

pytorch-bot[bot] reacted with thumbs up emoji

@pytorch-botpytorch-botbot added the ciflow/trunkTrigger trunk jobs on your pull request labelApr 10, 2025
@pytorchmergebot
Copy link
Collaborator

Merge started

Your change will be merged once all checks pass (ETA 0-4 Hours).

Learn more about merging in thewiki.

Questions? Feedback? Please reach out to thePyTorch DevX Team

Advanced Debugging
Check the merge workflow status
here

@d4l3k
Copy link
MemberAuthor

@pytorchmergebot cancel

@pytorch-bot
Copy link

❌ 🤖 pytorchbot command failed:

@pytorchbot: error: argument command: invalid choice: 'cancel' (choose from 'merge', 'revert', 'rebase', 'label', 'drci', 'cherry-pick', 'close')usage: @pytorchbot [-h] {merge,revert,rebase,label,drci,cherry-pick,close} ...

Try@pytorchbot --help for more info.

@pytorchmergebot
Copy link
Collaborator

The mergejob was canceled or timed out. This most often happen if two merge requests were issued for the same PR, or if merge job was waiting for more than 6 hours for tests to finish. In later case, please do not hesitate to reissue the merge command
For more information seepytorch-bot wiki.

@d4l3k
Copy link
MemberAuthor

@pytorchbot merge

pytorch-bot[bot] reacted with thumbs up emoji

@pytorchmergebot
Copy link
Collaborator

Merge started

Your change will be merged once all checks pass (ETA 0-4 Hours).

Learn more about merging in thewiki.

Questions? Feedback? Please reach out to thePyTorch DevX Team

Advanced Debugging
Check the merge workflow status
here

@d4l3kd4l3k deleted the d4l3k/store_queue branchApril 11, 2025 19:44
timocafe pushed a commit to timocafe/pytorch that referenced this pull requestApr 16, 2025
This adds queue operations as described inpytorch#150943.This works by adding two new operations `queue_push` and `queue_pop`. The semantics are designed to be blocking with a timeout. Pushing will always succeed as the queue is infinite size. Popping will first call `wait` until the key is ready and then pop the value from the queue.This implements queues for only: HashStore, TCPStore w/ libuv. FileStore and the legacy backends are not supported.`wait` and `check` work for queue operations though queue_push will only wake up the first waiter rather than all of them.This also has a few cleanups to error types/documentation in related code.Example trace:```[I409 16:51:43.963833529 TCPStoreLibUvBackend.cpp:829] [c10d - trace] validate magic:1015412686 address:[localhost]:55816[I409 16:51:43.963845838 TCPStoreLibUvBackend.cpp:842] [c10d - trace] ping nonce:2840795 address:[localhost]:55816[I409 16:51:43.963902914 TCPStoreLibUvBackend.cpp:911] [c10d - trace] add key:init/ val:1 address:[localhost]:55816[I409 16:51:43.963939389 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:init/ address:[localhost]:55816[I409 16:51:43.963974842 TCPStoreLibUvBackend.cpp:893] [c10d - trace] get key:init/ address:[localhost]:55816[I409 16:51:43.964071909 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/test_queue_support address:[localhost]:55816[I409 16:51:43.964080221 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964108584 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964123207 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964128194 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964156347 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964187493 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964217709 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964324300 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964354495 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964416299 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964458733 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/non_existant address:[localhost]:55816[W409 16:51:43.974516585 socket.cpp:460] [c10d] waitForInput: poll for socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) returned 0, likely a timeout[W409 16:51:43.974559169 socket.cpp:485] [c10d] waitForInput: socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) timed out after 10ms[I409 16:51:43.974600451 TCPStoreLibUvBackend.cpp:1101] [c10d - trace] cancel_wait address:[localhost]:55816```Test plan:```$ pytest test/distributed/test_store.py -k queue -v -stest/distributed/test_store.py::FileStoreTest::test_queues SKIPPED [0.4351s] (Store does not support queues)test/distributed/test_store.py::HashStoreTest::test_queues PASSED [0.0009s]test/distributed/test_store.py::PrefixFileStoreTest::test_queues SKIPPED [0.0006s] (Store does not support queues)test/distributed/test_store.py::TCPStoreTest::test_queues SKIPPED [0.0012s] (Store does not support queues)test/distributed/test_store.py::LibUvTCPStoreTest::test_queues PASSED [0.0014s]test/distributed/test_store.py::PrefixTCPStoreTest::test_queues PASSED [0.0014s]```Pull Requestresolved:pytorch#150969Approved by:https://github.com/XilunWu,https://github.com/fduwjj
amathewc pushed a commit to amathewc/pytorch that referenced this pull requestApr 17, 2025
This adds queue operations as described inpytorch#150943.This works by adding two new operations `queue_push` and `queue_pop`. The semantics are designed to be blocking with a timeout. Pushing will always succeed as the queue is infinite size. Popping will first call `wait` until the key is ready and then pop the value from the queue.This implements queues for only: HashStore, TCPStore w/ libuv. FileStore and the legacy backends are not supported.`wait` and `check` work for queue operations though queue_push will only wake up the first waiter rather than all of them.This also has a few cleanups to error types/documentation in related code.Example trace:```[I409 16:51:43.963833529 TCPStoreLibUvBackend.cpp:829] [c10d - trace] validate magic:1015412686 address:[localhost]:55816[I409 16:51:43.963845838 TCPStoreLibUvBackend.cpp:842] [c10d - trace] ping nonce:2840795 address:[localhost]:55816[I409 16:51:43.963902914 TCPStoreLibUvBackend.cpp:911] [c10d - trace] add key:init/ val:1 address:[localhost]:55816[I409 16:51:43.963939389 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:init/ address:[localhost]:55816[I409 16:51:43.963974842 TCPStoreLibUvBackend.cpp:893] [c10d - trace] get key:init/ address:[localhost]:55816[I409 16:51:43.964071909 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/test_queue_support address:[localhost]:55816[I409 16:51:43.964080221 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964108584 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964123207 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964128194 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964156347 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964187493 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964217709 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964324300 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964354495 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964416299 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964458733 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/non_existant address:[localhost]:55816[W409 16:51:43.974516585 socket.cpp:460] [c10d] waitForInput: poll for socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) returned 0, likely a timeout[W409 16:51:43.974559169 socket.cpp:485] [c10d] waitForInput: socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) timed out after 10ms[I409 16:51:43.974600451 TCPStoreLibUvBackend.cpp:1101] [c10d - trace] cancel_wait address:[localhost]:55816```Test plan:```$ pytest test/distributed/test_store.py -k queue -v -stest/distributed/test_store.py::FileStoreTest::test_queues SKIPPED [0.4351s] (Store does not support queues)test/distributed/test_store.py::HashStoreTest::test_queues PASSED [0.0009s]test/distributed/test_store.py::PrefixFileStoreTest::test_queues SKIPPED [0.0006s] (Store does not support queues)test/distributed/test_store.py::TCPStoreTest::test_queues SKIPPED [0.0012s] (Store does not support queues)test/distributed/test_store.py::LibUvTCPStoreTest::test_queues PASSED [0.0014s]test/distributed/test_store.py::PrefixTCPStoreTest::test_queues PASSED [0.0014s]```Pull Requestresolved:pytorch#150969Approved by:https://github.com/XilunWu,https://github.com/fduwjj
Sign up for freeto join this conversation on GitHub. Already have an account?Sign in to comment

Reviewers

@fduwjjfduwjjfduwjj approved these changes

@XilunWuXilunWuXilunWu approved these changes

@wconstabwconstabAwaiting requested review from wconstab

Assignees

No one assigned

Labels

ciflow/trunkTrigger trunk jobs on your pull requestMergedoncall: distributedAdd this issue/PR to distributed oncall triage queuerelease notes: distributed (c10d)release notes category

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

5 participants

@d4l3k@pytorchmergebot@fduwjj@XilunWu

[8]ページ先頭

©2009-2025 Movatter.jp