- Notifications
You must be signed in to change notification settings - Fork26.3k
c10d/Store: add queues#150969
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
c10d/Store: add queues#150969
Uh oh!
There was an error while loading.Please reload this page.
Conversation
pytorch-botbot commentedApr 9, 2025 • edited
Loading Uh oh!
There was an error while loading.Please reload this page.
edited
Uh oh!
There was an error while loading.Please reload this page.
🔗 Helpful Links🧪 See artifacts and rendered test results athud.pytorch.org/pr/150969
Note: Links to docs will display an error until the docs builds have been completed. ✅ No FailuresAs of commite2cde44 with merge basef136443 ( This comment was automatically generated by Dr. CI and updates every 15 minutes. |
XilunWu left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
overall LGTM! Left some questions.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
fduwjj left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Overall lgtm. I just have some questions for the libuv part changes
| // Queues | ||
| std::unordered_map<std::string, std::deque<std::vector<uint8_t>>> queues_; | ||
| std::unordered_map<std::string, std::deque<c10::intrusive_ptr<UvHandle>>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
looks like this is not being used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
good catch -- this was before I switched to using WAIT
| } | ||
| } | ||
| voidLibUVStoreDaemon::wakeupOneWaitingClient(const std::string& key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Just n00b question, why we don't have this for other operations within libuv backend?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
Normally with set/wait all of the waiting clients get woken up. With queues we only want a single worker to wake up and try and get the item so that's why we need a special "wakeupOne"
4079813 to6ce1313Compared4l3k commentedApr 10, 2025
@pytorchbot merge |
pytorchmergebot commentedApr 10, 2025
Merge startedYour change will be merged once all checks pass (ETA 0-4 Hours). Learn more about merging in thewiki. Questions? Feedback? Please reach out to thePyTorch DevX Team |
d4l3k commentedApr 10, 2025
@pytorchmergebot cancel |
❌ 🤖 pytorchbot command failed: Try |
pytorchmergebot commentedApr 10, 2025
The mergejob was canceled or timed out. This most often happen if two merge requests were issued for the same PR, or if merge job was waiting for more than 6 hours for tests to finish. In later case, please do not hesitate to reissue the merge command |
d4l3k commentedApr 11, 2025
@pytorchbot merge |
pytorchmergebot commentedApr 11, 2025
Merge startedYour change will be merged once all checks pass (ETA 0-4 Hours). Learn more about merging in thewiki. Questions? Feedback? Please reach out to thePyTorch DevX Team |
This adds queue operations as described inpytorch#150943.This works by adding two new operations `queue_push` and `queue_pop`. The semantics are designed to be blocking with a timeout. Pushing will always succeed as the queue is infinite size. Popping will first call `wait` until the key is ready and then pop the value from the queue.This implements queues for only: HashStore, TCPStore w/ libuv. FileStore and the legacy backends are not supported.`wait` and `check` work for queue operations though queue_push will only wake up the first waiter rather than all of them.This also has a few cleanups to error types/documentation in related code.Example trace:```[I409 16:51:43.963833529 TCPStoreLibUvBackend.cpp:829] [c10d - trace] validate magic:1015412686 address:[localhost]:55816[I409 16:51:43.963845838 TCPStoreLibUvBackend.cpp:842] [c10d - trace] ping nonce:2840795 address:[localhost]:55816[I409 16:51:43.963902914 TCPStoreLibUvBackend.cpp:911] [c10d - trace] add key:init/ val:1 address:[localhost]:55816[I409 16:51:43.963939389 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:init/ address:[localhost]:55816[I409 16:51:43.963974842 TCPStoreLibUvBackend.cpp:893] [c10d - trace] get key:init/ address:[localhost]:55816[I409 16:51:43.964071909 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/test_queue_support address:[localhost]:55816[I409 16:51:43.964080221 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964108584 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964123207 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964128194 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964156347 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964187493 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964217709 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964324300 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964354495 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964416299 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964458733 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/non_existant address:[localhost]:55816[W409 16:51:43.974516585 socket.cpp:460] [c10d] waitForInput: poll for socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) returned 0, likely a timeout[W409 16:51:43.974559169 socket.cpp:485] [c10d] waitForInput: socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) timed out after 10ms[I409 16:51:43.974600451 TCPStoreLibUvBackend.cpp:1101] [c10d - trace] cancel_wait address:[localhost]:55816```Test plan:```$ pytest test/distributed/test_store.py -k queue -v -stest/distributed/test_store.py::FileStoreTest::test_queues SKIPPED [0.4351s] (Store does not support queues)test/distributed/test_store.py::HashStoreTest::test_queues PASSED [0.0009s]test/distributed/test_store.py::PrefixFileStoreTest::test_queues SKIPPED [0.0006s] (Store does not support queues)test/distributed/test_store.py::TCPStoreTest::test_queues SKIPPED [0.0012s] (Store does not support queues)test/distributed/test_store.py::LibUvTCPStoreTest::test_queues PASSED [0.0014s]test/distributed/test_store.py::PrefixTCPStoreTest::test_queues PASSED [0.0014s]```Pull Requestresolved:pytorch#150969Approved by:https://github.com/XilunWu,https://github.com/fduwjj
This adds queue operations as described inpytorch#150943.This works by adding two new operations `queue_push` and `queue_pop`. The semantics are designed to be blocking with a timeout. Pushing will always succeed as the queue is infinite size. Popping will first call `wait` until the key is ready and then pop the value from the queue.This implements queues for only: HashStore, TCPStore w/ libuv. FileStore and the legacy backends are not supported.`wait` and `check` work for queue operations though queue_push will only wake up the first waiter rather than all of them.This also has a few cleanups to error types/documentation in related code.Example trace:```[I409 16:51:43.963833529 TCPStoreLibUvBackend.cpp:829] [c10d - trace] validate magic:1015412686 address:[localhost]:55816[I409 16:51:43.963845838 TCPStoreLibUvBackend.cpp:842] [c10d - trace] ping nonce:2840795 address:[localhost]:55816[I409 16:51:43.963902914 TCPStoreLibUvBackend.cpp:911] [c10d - trace] add key:init/ val:1 address:[localhost]:55816[I409 16:51:43.963939389 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:init/ address:[localhost]:55816[I409 16:51:43.963974842 TCPStoreLibUvBackend.cpp:893] [c10d - trace] get key:init/ address:[localhost]:55816[I409 16:51:43.964071909 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/test_queue_support address:[localhost]:55816[I409 16:51:43.964080221 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964108584 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964123207 TCPStoreLibUvBackend.cpp:1121] [c10d - trace] queue_push key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964128194 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964156347 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964187493 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964217709 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964324300 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964354495 TCPStoreLibUvBackend.cpp:1133] [c10d - trace] queue_pop key:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964416299 TCPStoreLibUvBackend.cpp:940] [c10d - trace] check key_count:1 keys[0]:/test_prefix/foo address:[localhost]:55816[I409 16:51:43.964458733 TCPStoreLibUvBackend.cpp:977] [c10d - trace] wait key_count:1 keys[0]:/test_prefix/non_existant address:[localhost]:55816[W409 16:51:43.974516585 socket.cpp:460] [c10d] waitForInput: poll for socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) returned 0, likely a timeout[W409 16:51:43.974559169 socket.cpp:485] [c10d] waitForInput: socket SocketImpl(fd=75, addr=[localhost]:55816, remote=[localhost]:46641) timed out after 10ms[I409 16:51:43.974600451 TCPStoreLibUvBackend.cpp:1101] [c10d - trace] cancel_wait address:[localhost]:55816```Test plan:```$ pytest test/distributed/test_store.py -k queue -v -stest/distributed/test_store.py::FileStoreTest::test_queues SKIPPED [0.4351s] (Store does not support queues)test/distributed/test_store.py::HashStoreTest::test_queues PASSED [0.0009s]test/distributed/test_store.py::PrefixFileStoreTest::test_queues SKIPPED [0.0006s] (Store does not support queues)test/distributed/test_store.py::TCPStoreTest::test_queues SKIPPED [0.0012s] (Store does not support queues)test/distributed/test_store.py::LibUvTCPStoreTest::test_queues PASSED [0.0014s]test/distributed/test_store.py::PrefixTCPStoreTest::test_queues PASSED [0.0014s]```Pull Requestresolved:pytorch#150969Approved by:https://github.com/XilunWu,https://github.com/fduwjj
Uh oh!
There was an error while loading.Please reload this page.
This adds queue operations as described in#150943.
This works by adding two new operations
queue_pushandqueue_pop. The semantics are designed to be blocking with a timeout. Pushing will always succeed as the queue is infinite size. Popping will first callwaituntil the key is ready and then pop the value from the queue.This implements queues for only: HashStore, TCPStore w/ libuv. FileStore and the legacy backends are not supported.
waitandcheckwork for queue operations though queue_push will only wake up the first waiter rather than all of them.This also has a few cleanups to error types/documentation in related code.
Example trace:
Test plan:
cc@H-Huang@awgu@wanchaol@fegin@fduwjj@wz337@wconstab