[c10d][tcp_store] Fix connection reset caused by wrong socket close (#150987)

While fixing the memory leak in https://github.com/pytorch/pytorch/pull/145757, we accidentally close the socket for the case when nread == 0 and thought it is the case when connection is closed. This is not true. According to libuv doc: https://docs.libuv.org/en/v1.x/stream.html#c.uv_read_cb.

> nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under read(2).

We found this bug when debugging a broken pipe issue when users first call a set and then wait for all keys right afterwards on 128 ranks. This might also cause other broken pipe issues we have seen in the prod jobs recently.

Added a unit test to test this case.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/150987
Approved by: https://github.com/d4l3k, https://github.com/XilunWu
This commit is contained in:
fduwjj
2025-04-09 22:36:51 -07:00
committed by PyTorch MergeBot
parent e7ed50f27b
commit f663aa4e81
2 changed files with 55 additions and 13 deletions

View File

@@ -246,6 +246,52 @@ TEST(TCPStoreTest, testLibUVPartialRead) {
clientThread.join();
}
TEST(TCPStoreTest, testLibUVSetAndWait) {
int numWorkers = 128; // thread 0 creates both server and client
std::vector<std::thread> threads;
// server part
c10d::TCPStoreOptions server_opts{
0,
true, // is master
numWorkers,
false, // don't wait otherwise client thread won't spawn
std::chrono::seconds(defaultTimeout)};
server_opts.useLibUV = true;
auto serverTCPStore =
std::make_unique<c10d::TCPStore>("127.0.0.1", server_opts);
// client part
c10d::TCPStoreOptions client_opts{
serverTCPStore->getPort(),
false, // is master
numWorkers,
false, // wait workers
std::chrono::seconds(defaultTimeout)};
client_opts.useLibUV = true;
for (const auto i : c10::irange(numWorkers)) {
threads.emplace_back([=, &client_opts] {
auto clientTCPStore =
c10::make_intrusive<c10d::TCPStore>("127.0.0.1", client_opts);
std::string key("k_" + std::to_string(i));
std::string value("v_" + std::to_string(i));
std::vector<uint8_t> valueBuf(value.begin(), value.end());
clientTCPStore->set(key, valueBuf);
std::vector<std::string> all_keys;
for (const auto j : c10::irange(numWorkers)) {
all_keys.push_back("k_" + std::to_string(j));
}
clientTCPStore->wait(all_keys);
});
}
for (auto& thread : threads) {
thread.join();
}
}
void testMultiTenantStores(bool libUV) {
c10d::TCPStoreOptions opts{};
opts.isServer = true;

View File

@@ -120,25 +120,21 @@ class UvTcpSocket : public UvHandle {
if (nread > 0) {
try {
uv_socket->processBuf(buf, nread);
return; // We do free inside processBuf.
} catch (std::exception& ex) {
C10D_WARNING("Error processing client message: {}", ex.what());
uv_socket->close();
}
} else {
// Handle error and EOF cases
if (nread < 0) {
C10D_DEBUG(
"Read callback failed. code:{} name:{} desc:{}",
nread,
uv_err_name(nread),
uv_strerror(nread));
} else {
C10D_DEBUG("Remote peer closed the connection.");
}
} else if (nread < 0) { // Handle error and EOF cases
C10D_DEBUG(
"Read callback failed. code:{} name:{} desc:{}",
nread,
uv_err_name(nread),
uv_strerror(nread));
uv_socket->close();
// NOLINTNEXTLINE(cppcoreguidelines-no-malloc)
free(buf->base);
}
// NOLINTNEXTLINE(cppcoreguidelines-no-malloc)
free(buf->base);
}
public: