[Distributed] [16/N] Fix clang-tidy warnings in torch/csrc/distributed/c10d (#137404)

Follows #137072

Pull Request resolved: https://github.com/pytorch/pytorch/pull/137404
Approved by: https://github.com/Skylion007
This commit is contained in:
cyy
2024-10-10 18:05:32 +00:00
committed by PyTorch MergeBot
parent 20815c7cb9
commit 94e12f97dc
22 changed files with 256 additions and 289 deletions

View File

@@ -1,9 +1,6 @@
#include <c10/util/irange.h>
#include "StoreTestCommon.hpp"
#include <iostream>
#include <thread>
#include <torch/csrc/distributed/c10d/Backoff.hpp>
TEST(BackoffTest, exponentialBackoffDefaults) {

View File

@@ -40,7 +40,7 @@ std::string tmppath() {
}
#endif
void testGetSet(std::string path, std::string prefix = "") {
void testGetSet(const std::string& path, const std::string& prefix = "") {
// Basic Set/Get on File Store
{
auto fileStore = c10::make_intrusive<c10d::FileStore>(path, 2);
@@ -100,7 +100,7 @@ void stressTestStore(std::string path, std::string prefix = "") {
c10d::test::Semaphore sem1, sem2;
for (C10_UNUSED const auto i : c10::irange(numThreads)) {
threads.emplace_back(std::thread([&] {
threads.emplace_back([&] {
auto fileStore =
c10::make_intrusive<c10d::FileStore>(path, numThreads + 1);
c10d::PrefixStore store(prefix, fileStore);
@@ -109,7 +109,7 @@ void stressTestStore(std::string path, std::string prefix = "") {
for (C10_UNUSED const auto j : c10::irange(numIterations)) {
store.add("counter", 1);
}
}));
});
}
sem1.wait(numThreads);

View File

@@ -3,15 +3,15 @@
#include <unistd.h>
#include <iostream>
#include <thread>
#include <torch/csrc/distributed/c10d/HashStore.hpp>
#include <torch/csrc/distributed/c10d/PrefixStore.hpp>
#include <utility>
constexpr int64_t kShortStoreTimeoutMillis = 100;
void testGetSet(std::string prefix = "") {
void testGetSet(const std::string& prefix = "") {
// Basic set/get
{
auto hashStore = c10::make_intrusive<c10d::HashStore>();
@@ -60,16 +60,16 @@ void stressTestStore(std::string prefix = "") {
std::vector<std::thread> threads;
c10d::test::Semaphore sem1, sem2;
auto hashStore = c10::make_intrusive<c10d::HashStore>();
c10d::PrefixStore store(prefix, hashStore);
c10d::PrefixStore store(std::move(prefix), hashStore);
for (C10_UNUSED const auto i : c10::irange(numThreads)) {
threads.emplace_back(std::thread([&] {
threads.emplace_back([&] {
sem1.post();
sem2.wait();
for (C10_UNUSED const auto j : c10::irange(numIterations)) {
store.add("counter", 1);
}
}));
});
}
sem1.wait(numThreads);

View File

@@ -6,14 +6,13 @@
#include <torch/csrc/distributed/c10d/FileStore.hpp>
#include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp>
#include "CUDATest.hpp"
#include "TestUtils.hpp"
using namespace c10d::test;
using at::cuda::CUDAStream;
template <typename T, typename... Args>
std::vector<T> initialize(const std::string& path, int N, Args&&... args) {
std::vector<T> initialize(const std::string& path, size_t N, Args&&... args) {
std::vector<T> tests;
for (C10_UNUSED const auto i : c10::irange(N)) {
tests.push_back(std::move(T(path, std::forward<Args>(args)...)));
@@ -35,10 +34,7 @@ class AsyncTest {
public:
AsyncTest(std::string path) : path_(std::move(path)) {}
AsyncTest(AsyncTest&& other) {
path_ = std::move(other.path_);
pg_ = std::move(other.pg_);
}
AsyncTest(AsyncTest&& other) noexcept = default;
::c10d::ProcessGroupGloo& getProcessGroup() {
return *pg_;
@@ -53,8 +49,8 @@ class AsyncTest {
options->devices.push_back(
::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>(
new ::c10d::ProcessGroupGloo(store, rank, size, options));
pg_ =
std::make_unique<::c10d::ProcessGroupGloo>(store, rank, size, options);
}
protected:
@@ -88,7 +84,7 @@ class AsyncInputIsOutputTest : public AsyncTest {
at::cuda::OptionalCUDAGuard deviceGuard;
streams_.reserve(numDevices_);
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
deviceGuard.set_index(static_cast<c10::DeviceIndex>(i));
streams_.push_back(at::cuda::getStreamFromPool());
}
}
@@ -118,7 +114,9 @@ class AsyncInputIsOutputTest : public AsyncTest {
}
protected:
// NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
const int numTensors_;
// NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
const int numDevices_;
std::vector<at::Tensor> inputs_;
std::vector<CUDAStream> streams_;
@@ -136,13 +134,13 @@ class AsyncAllreduceTest : public AsyncInputIsOutputTest {
// Launch sleep on every stream
at::cuda::OptionalCUDAGuard deviceGuard;
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
cudaSleep(streams_[i], 10 * 1000 * 1000);
deviceGuard.set_index(static_cast<c10::DeviceIndex>(i));
cudaSleep(streams_[i], 10ull * 1000 * 1000);
}
// Launch value initialization for every tensor
for (const auto i : c10::irange(numTensors_)) {
deviceGuard.set_index(i % numDevices_);
deviceGuard.set_index(static_cast<c10::DeviceIndex>(i % numDevices_));
inputs_[i].fill_(pg_->getRank() * numTensors_ + i);
}
@@ -155,26 +153,26 @@ class AsyncBroadcastTest : public AsyncInputIsOutputTest {
AsyncBroadcastTest(const std::string& path, int numTensors)
: AsyncInputIsOutputTest(path, numTensors) {}
c10::intrusive_ptr<c10d::Work> run(int rootRank, int rootTensor) {
c10::intrusive_ptr<c10d::Work> run(size_t rootRank, size_t rootTensor) {
// For the duration of this function, make THC use our streams
c10::cuda::CUDAMultiStreamGuard guard(streams_);
// Launch sleep on every stream
at::cuda::OptionalCUDAGuard deviceGuard;
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(i);
cudaSleep(streams_[i], 10 * 1000 * 1000);
deviceGuard.set_index(static_cast<c10::DeviceIndex>(i));
cudaSleep(streams_[i], 10ull * 1000 * 1000);
}
// Launch value initialization for every tensor
for (const auto i : c10::irange(numTensors_)) {
deviceGuard.set_index(i % numDevices_);
deviceGuard.set_index(static_cast<c10::DeviceIndex>(i % numDevices_));
inputs_[i].fill_(pg_->getRank() * numTensors_ + i);
}
::c10d::BroadcastOptions options;
options.rootRank = rootRank;
options.rootTensor = rootTensor;
options.rootRank = static_cast<int64_t>(rootRank);
options.rootTensor = static_cast<int64_t>(rootTensor);
return pg_->broadcast(inputs_, options);
}
};

View File

@@ -1,15 +1,12 @@
#ifndef _WIN32
#include <signal.h>
#include <sys/wait.h>
#include <unistd.h>
#include <csignal>
#endif
#include <sys/types.h>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <sstream>
#include <memory>
#include <thread>
#include <gtest/gtest.h>
@@ -30,7 +27,7 @@ constexpr auto kWaitTimeout = std::chrono::milliseconds(1);
#ifndef _WIN32
class SignalTest {
public:
SignalTest(const std::string& path) : path_(path) {}
SignalTest(std::string path) : path_(std::move(path)) {}
~SignalTest() {
if (arm_.joinable()) {
@@ -41,7 +38,7 @@ class SignalTest {
// Arms test to send signal to PID when the semaphore unlocks. This
// happens as soon as the first collective completes successfully.
void arm(int pid, int signal) {
arm_ = std::thread([=] {
arm_ = std::thread([this, pid, signal] {
sem_.wait();
kill(pid, signal);
});
@@ -108,7 +105,7 @@ class ProcessGroupGlooDelayed : public ::c10d::ProcessGroupGloo {
int rank,
int size,
c10::intrusive_ptr<Options> options)
: ProcessGroupGloo(store, rank, size, options) {}
: ProcessGroupGloo(store, rank, size, std::move(options)) {}
c10::intrusive_ptr<::c10d::Work> send(
std::vector<at::Tensor>& tensors,
@@ -127,13 +124,13 @@ class CollectiveTest {
bool delayed = false) {
std::vector<CollectiveTest> tests;
for (C10_UNUSED const auto i : c10::irange(num)) {
tests.emplace_back(CollectiveTest(path));
tests.emplace_back(path);
}
std::vector<std::thread> threads;
for (const auto i : c10::irange(num)) {
threads.emplace_back(std::thread(
[i, &tests, delayed] { tests[i].start(i, tests.size(), delayed); }));
threads.emplace_back(
[i, &tests, delayed] { tests[i].start(i, tests.size(), delayed); });
}
for (auto& thread : threads) {
thread.join();
@@ -144,16 +141,13 @@ class CollectiveTest {
CollectiveTest(std::string path) : path_(std::move(path)) {}
CollectiveTest(CollectiveTest&& other) {
path_ = std::move(other.path_);
pg_ = std::move(other.pg_);
}
CollectiveTest(CollectiveTest&& other) noexcept = default;
::c10d::ProcessGroupGloo& getProcessGroup() {
return *pg_;
}
void start(int rank, int size, bool delayed) {
void start(int rank, size_t size, bool delayed) {
auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
// Set a timeout that is small enough to make this test run fast, but also
@@ -164,11 +158,11 @@ class CollectiveTest {
::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
if (!delayed) {
pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>(
new ::c10d::ProcessGroupGloo(store, rank, size, options));
pg_ = std::make_unique<::c10d::ProcessGroupGloo>(
store, rank, size, options);
} else {
pg_ = std::unique_ptr<ProcessGroupGlooDelayed>(
new ProcessGroupGlooDelayed(store, rank, size, options));
pg_ =
std::make_unique<ProcessGroupGlooDelayed>(store, rank, size, options);
}
}
@@ -192,13 +186,13 @@ std::vector<std::vector<at::Tensor>> copyTensors(
}
std::vector<std::vector<at::Tensor>> waitWork(
std::vector<c10::intrusive_ptr<c10d::Work>> works) {
const std::vector<c10::intrusive_ptr<c10d::Work>>& works) {
std::vector<std::vector<at::Tensor>> outputTensors;
for (auto& work : works) {
try {
work->wait();
} catch (const std::exception& ex) {
LOG(ERROR) << "Exception received: " << ex.what() << std::endl;
LOG(ERROR) << "Exception received: " << ex.what() << '\n';
}
outputTensors.emplace_back(work->result());
}
@@ -206,14 +200,14 @@ std::vector<std::vector<at::Tensor>> waitWork(
}
std::vector<std::vector<at::Tensor>> waitFuture(
std::vector<c10::intrusive_ptr<c10d::Work>> works) {
const std::vector<c10::intrusive_ptr<c10d::Work>>& works) {
std::vector<std::vector<at::Tensor>> outputTensors;
for (auto& work : works) {
auto fut = work->getFuture();
try {
fut->wait();
} catch (const std::exception& ex) {
LOG(ERROR) << "Exception received: " << ex.what() << std::endl;
LOG(ERROR) << "Exception received: " << ex.what() << '\n';
}
auto result = fut->value();
if (result.isNone()) {
@@ -288,8 +282,7 @@ void testAllreduce(
auto outputs = waitFuture(work);
auto event_lists = disableProfilerLegacy();
checkProfiledEvents(
std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes);
checkProfiledEvents(event_lists, GLOO_ALLREDUCE_STR, size, allShapes);
// Verify outputs
const auto expected = (size * (size - 1)) / 2;
@@ -334,8 +327,7 @@ void testAllreduceUsingWorkAPI(
auto outputs = waitWork(work);
auto event_lists = disableProfilerLegacy();
checkProfiledEvents(
std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes);
checkProfiledEvents(event_lists, GLOO_ALLREDUCE_STR, size, allShapes);
// Verify outputs
const auto expected = (size * (size - 1)) / 2;
@@ -371,7 +363,8 @@ void testBroadcast(
at::OptionalDeviceGuard deviceGuard;
for (const auto l : c10::irange(stride)) {
if (b == at::DeviceType::CUDA) {
deviceGuard.reset_device(at::Device(at::kCUDA, l));
deviceGuard.reset_device(
at::Device(at::kCUDA, static_cast<c10::DeviceIndex>(l)));
}
inputs[k][l] =
at::ones(shapes, at::dtype(dtype).device(b)) * (k * stride + l);
@@ -396,8 +389,7 @@ void testBroadcast(
auto outputs = waitFuture(work);
auto event_lists = disableProfilerLegacy();
checkProfiledEvents(
std::move(event_lists), GLOO_BROADCAST_STR, size, allShapes);
checkProfiledEvents(event_lists, GLOO_BROADCAST_STR, size, allShapes);
// Verify outputs
const auto expected = (i * stride + j);
@@ -427,8 +419,9 @@ void testAlltoall(const std::string& path, const at::DeviceType b) {
{30, 31, 32, 33, 34, 35, 36},
};
for (const auto rank : c10::irange(size)) {
const std::vector<int32_t>& blob = blobs[rank];
inputs[rank] = at::from_blob((int32_t*)(blob.data()), blob.size()).to(b);
std::vector<int32_t>& blob = blobs[rank];
inputs[rank] =
at::from_blob(blob.data(), static_cast<int64_t>(blob.size())).to(b);
}
// Allocate outputs
@@ -478,7 +471,7 @@ void testAlltoall(const std::string& path, const at::DeviceType b) {
}
auto event_lists = disableProfilerLegacy();
checkProfiledEvents(std::move(event_lists), GLOO_A2A_STR, size, allShapes);
checkProfiledEvents(event_lists, GLOO_A2A_STR, size, allShapes);
// Verify outputs
std::vector<std::vector<int32_t>> expected = {
{0, 1, 10, 11, 12, 20, 21, 30, 31},
@@ -516,7 +509,7 @@ void testBarrier(const std::string& path) {
std::vector<std::vector<int64_t>> allShapes;
// Barrier does not use tensors, so skip shape checking.
checkProfiledEvents(
std::move(event_lists),
event_lists,
GLOO_STR,
size,
allShapes,
@@ -533,7 +526,7 @@ void testMonitoredBarrier(const std::string& path) {
std::vector<std::thread> threads;
threads.reserve(size);
for (const auto r : c10::irange(size)) {
threads.emplace_back(std::thread([=]() { runMonitoredBarrier(r); }));
threads.emplace_back([=]() { runMonitoredBarrier(r); });
}
for (auto& t : threads) {
t.join();
@@ -555,8 +548,7 @@ void testMonitoredBarrier(const std::string& path) {
};
threads.clear();
for (const auto r : c10::irange(size)) {
threads.emplace_back(
std::thread([=]() { runMonitoredBarrierWithException(r); }));
threads.emplace_back([=]() { runMonitoredBarrierWithException(r); });
}
for (auto& t : threads) {
t.join();
@@ -613,14 +605,14 @@ void testSend(const std::string& path) {
enableProfilerLegacy(ProfilerConfig(
ProfilerState::CPU, /* report_input_shapes */ true, false));
auto sendWork = pg.send(tensors, dstRank, tag);
bool sendCompleted;
bool sendCompleted = false;
std::thread waitSendThreadAbort([&]() { sendCompleted = sendWork->wait(); });
sendWork->abort();
// Block until the sendWork gets successfully aborted
waitSendThreadAbort.join();
EXPECT_FALSE(sendCompleted);
auto event_lists = disableProfilerLegacy();
checkProfiledEvents(std::move(event_lists), GLOO_SEND_STR, 1, allShapes);
checkProfiledEvents(event_lists, GLOO_SEND_STR, 1, allShapes);
// Now create a separate sender thread to ensure that future waitsends can
// complete successfully.
@@ -663,14 +655,14 @@ void testRecv(const std::string& path) {
enableProfilerLegacy(ProfilerConfig(
ProfilerState::CPU, /* report_input_shapes */ true, false));
auto recvWork = pg.recv(tensors, srcRank, tag);
bool recvCompleted;
bool recvCompleted = false;
std::thread waitRecvThreadAbort([&]() { recvCompleted = recvWork->wait(); });
recvWork->abort();
// Block until the first recv gets successfully aborted
waitRecvThreadAbort.join();
EXPECT_FALSE(recvCompleted);
auto event_lists = disableProfilerLegacy();
checkProfiledEvents(std::move(event_lists), GLOO_RECV_STR, 1, allShapes);
checkProfiledEvents(event_lists, GLOO_RECV_STR, 1, allShapes);
// Now create a separate receiver thread to ensure that future waits can
// complete successfully.

View File

@@ -5,23 +5,21 @@
#include <cstdlib>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#define STR_HELPER(x) #x
#define STR(x) STR_HELPER(x)
// Wait for work to complete
std::vector<std::vector<at::Tensor>> waitWork(
c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,
std::vector<c10::intrusive_ptr<c10d::Work>> works) {
const c10::intrusive_ptr<::c10d::ProcessGroupMPI>& pg,
const std::vector<c10::intrusive_ptr<c10d::Work>>& works) {
std::vector<std::vector<at::Tensor>> outputTensors;
for (auto& work : works) {
try {
work->wait();
} catch (const std::exception& ex) {
std::cerr << "Exception received: " << ex.what() << std::endl;
std::cerr << "Exception received: " << ex.what() << '\n';
pg->abort();
}
outputTensors.emplace_back(work->result());
@@ -31,15 +29,15 @@ std::vector<std::vector<at::Tensor>> waitWork(
// Wait using Futures
std::vector<std::vector<at::Tensor>> waitFuture(
c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,
std::vector<c10::intrusive_ptr<c10d::Work>> works) {
const c10::intrusive_ptr<::c10d::ProcessGroupMPI>& pg,
const std::vector<c10::intrusive_ptr<c10d::Work>>& works) {
std::vector<std::vector<at::Tensor>> outputTensors;
for (auto& work : works) {
auto fut = work->getFuture();
try {
fut->wait();
} catch (const std::exception& ex) {
std::cerr << "Exception received: " << ex.what() << std::endl;
std::cerr << "Exception received: " << ex.what() << '\n';
pg->abort();
}
auto result = fut->value();
@@ -78,7 +76,7 @@ void testAllreduce(int iter = 1000) {
const auto expected = worldSize * i;
auto data = outputTensors[i][0].data_ptr<float>();
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
if (data[j] != static_cast<float>(expected)) {
TORCH_CHECK(false, "BOOM!");
}
}
@@ -110,7 +108,7 @@ void testBroadcast(int iter = 10000) {
const auto expected = i;
auto data = outputTensors[i][0].data_ptr<float>();
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
if (data[j] != static_cast<float>(expected)) {
TORCH_CHECK(false, "BOOM!");
}
}
@@ -140,7 +138,7 @@ void testReduce(int iter = 10000) {
const auto expected = worldSize * i;
auto data = outputTensors[i][0].data_ptr<float>();
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
if (data[j] != static_cast<float>(expected)) {
TORCH_CHECK(false, "BOOM!");
}
}
@@ -179,7 +177,7 @@ void testAllgather(int iter = 10000) {
const auto expected = i * j;
auto data = outputTensors[i][j].data_ptr<float>();
for (auto k = 0; k < outputTensors[i][j].numel(); ++k) {
if (data[k] != expected) {
if (data[k] != static_cast<float>(expected)) {
TORCH_CHECK(false, "BOOM!");
}
}
@@ -222,7 +220,7 @@ void testGather(int iter = 10000) {
const auto expected = i * j;
auto data = outputTensors[i][j].data_ptr<float>();
for (auto k = 0; k < outputTensors[i][j].numel(); ++k) {
if (data[k] != expected) {
if (data[k] != static_cast<float>(expected)) {
TORCH_CHECK(false, "BOOM!");
}
}
@@ -230,7 +228,7 @@ void testGather(int iter = 10000) {
}
} else {
for (const auto i : c10::irange(iter)) {
if (outputTensors[i].size() != 0) {
if (!outputTensors[i].empty()) {
TORCH_CHECK(false, "BOOM!");
}
}
@@ -271,7 +269,7 @@ void testScatter(int iter = 1) {
const auto expected = i * j;
auto data = outputTensors[i][0].data_ptr<float>();
for (auto k = 0; k < outputTensors[i][0].numel(); ++k) {
if (data[k] != expected) {
if (data[k] != static_cast<float>(expected)) {
TORCH_CHECK(false, "BOOM!");
}
}
@@ -331,7 +329,7 @@ void testSendRecv(bool recvAnysource, int iter = 10000) {
const auto expected = i;
auto data = outputTensors[i][0].data_ptr<float>();
for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
if (data[j] != expected) {
if (data[j] != static_cast<float>(expected)) {
TORCH_CHECK(false, "BOOM!");
}
}
@@ -349,7 +347,7 @@ int main(int argc, char** argv) {
#ifdef MPIEXEC
// If we are within an openmpi mpirun, then skip the exec
if (!std::getenv("OMPI_COMM_WORLD_SIZE")) {
std::cout << "Execute mpiexec from: " << STR(MPIEXEC) << std::endl;
std::cout << "Execute mpiexec from: " << STR(MPIEXEC) << '\n';
execl(STR(MPIEXEC), "-np 2", argv[0], (char*)nullptr);
}
@@ -363,7 +361,7 @@ int main(int argc, char** argv) {
testSendRecv(true);
testBackendName();
std::cout << "Test successful" << std::endl;
std::cout << "Test successful" << '\n';
#else
std::cout << "MPI executable not found, skipping test" << std::endl;
#endif

View File

@@ -8,6 +8,7 @@
#include <torch/csrc/distributed/c10d/FileStore.hpp>
#include <torch/csrc/distributed/c10d/NCCLUtils.hpp>
#include <torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp>
#include <utility>
#include "CUDATest.hpp"
#include "TestUtils.hpp"
@@ -47,7 +48,7 @@ class ProcessGroupNCCLSimulateErrors : public c10d::ProcessGroupNCCL {
int rank,
int size,
c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts)
: ProcessGroupNCCL(store, rank, size, opts), simulateError_(false) {}
: ProcessGroupNCCL(store, rank, size, std::move(opts)) {}
std::exception_ptr checkForNCCLErrors(
std::shared_ptr<c10d::NCCLComm>& ncclComm) override {
@@ -93,7 +94,7 @@ class ProcessGroupNCCLSimulateErrors : public c10d::ProcessGroupNCCL {
}
private:
bool simulateError_;
bool simulateError_{false};
};
class WorkNCCLTimedoutErrors : public c10d::ProcessGroupNCCL::WorkNCCL {
@@ -127,9 +128,7 @@ class ProcessGroupNCCLTimedOutErrors : public ProcessGroupNCCLSimulateErrors {
int rank,
int size,
c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts)
: ProcessGroupNCCLSimulateErrors(store, rank, size, opts),
watchDogDebugInfoFinished_(false),
setTimedoutError_(false) {}
: ProcessGroupNCCLSimulateErrors(store, rank, size, std::move(opts)) {}
c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> initWork(
at::Device& device,
@@ -177,10 +176,10 @@ class ProcessGroupNCCLTimedOutErrors : public ProcessGroupNCCLSimulateErrors {
watchDogDebugInfoFinished_ = true;
return "";
}
bool watchDogDebugInfoFinished_;
bool watchDogDebugInfoFinished_{false};
private:
bool setTimedoutError_;
bool setTimedoutError_{false};
};
class ProcessGroupNCCLNoHeartbeatCaught
@@ -191,8 +190,7 @@ class ProcessGroupNCCLNoHeartbeatCaught
int rank,
int size,
c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts)
: ProcessGroupNCCLTimedOutErrors(store, rank, size, opts),
hasMonitorThreadCaughtError_(false) {}
: ProcessGroupNCCLTimedOutErrors(store, rank, size, std::move(opts)) {}
std::mutex& getWatchdogMutex() {
return workMetaListMutex_;
@@ -223,11 +221,11 @@ class ProcessGroupNCCLNoHeartbeatCaught
// It's really hard to unit test std::abort. So we override it instead.
// Commented this override, we do see process aborted with core dump without
// this override.
void terminateProcess(std::string errMsg) override {
void terminateProcess(const std::string& errMsg) override {
throw std::runtime_error(errMsg);
}
bool hasMonitorThreadCaughtError_;
bool hasMonitorThreadCaughtError_{false};
};
class ProcessGroupNCCLDebugInfoStuck
@@ -238,7 +236,7 @@ class ProcessGroupNCCLDebugInfoStuck
int rank,
int size,
c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts)
: ProcessGroupNCCLNoHeartbeatCaught(store, rank, size, opts) {}
: ProcessGroupNCCLNoHeartbeatCaught(store, rank, size, std::move(opts)) {}
protected:
// Override the heartbeat monitor function to set a long timeout to mimic the
@@ -357,7 +355,7 @@ std::string readTraceFromFile(const std::string& filename, size_t size) {
// Read the strings from the file
if (file) { // While the file stream is in good state
std::string str(size, '\0');
file.read(&str[0], size);
file.read(&str[0], static_cast<std::streamsize>(size));
if (file) {
return str;
}
@@ -368,7 +366,7 @@ std::string readTraceFromFile(const std::string& filename, size_t size) {
// Extend the nested class outside the parent class
class TestDebugInfoWriter : public c10d::DebugInfoWriter {
public:
TestDebugInfoWriter(std::string namePrefix)
TestDebugInfoWriter(const std::string& namePrefix)
: DebugInfoWriter(namePrefix, 0) {}
void write(const std::string& ncclTrace) override {
@@ -433,7 +431,7 @@ TEST_F(ProcessGroupNCCLErrorsTest, testNCCLErrorsNoHeartbeat) {
EXPECT_TRUE(pg.getErrorCaughtFlag());
}
work->wait();
EXPECT_TRUE(traces.size() > 0);
EXPECT_TRUE(!traces.empty());
auto filename = c10::str(tempFilename, 0);
auto traceFromStorage = readTraceFromFile(filename, traces.size());
// Check the traces read from storage match with the original nccl trace.

View File

@@ -21,15 +21,12 @@ using at::cuda::CUDAStream;
class NCCLTestBase {
public:
NCCLTestBase(
const std::string& path,
std::string path,
const std::chrono::milliseconds pgTimeout =
c10d::kProcessGroupNCCLDefaultTimeout)
: path_(path), pgTimeout_(pgTimeout) {}
: path_(std::move(path)), pgTimeout_(pgTimeout) {}
NCCLTestBase(NCCLTestBase&& other) {
path_ = std::move(other.path_);
pg_ = std::move(other.pg_);
}
NCCLTestBase(NCCLTestBase&& other) noexcept = default;
std::shared_ptr<::c10d::ProcessGroupNCCL> getProcessGroup() {
return pg_;
@@ -41,7 +38,7 @@ class NCCLTestBase {
void initialize(
int rank,
int size,
size_t size,
std::optional<::std::shared_ptr<::c10d::ProcessGroupNCCL>> split_from =
std::nullopt) {
store_ = c10::make_intrusive<::c10d::FileStore>(path_, size);
@@ -55,8 +52,8 @@ class NCCLTestBase {
opts->split_color = ++color_;
}
#endif
pg_ = std::unique_ptr<::c10d::ProcessGroupNCCL>(
new ::c10d::ProcessGroupNCCL(store_, rank, size, std::move(opts)));
pg_ = std::make_unique<::c10d::ProcessGroupNCCL>(
store_, rank, size, std::move(opts));
}
protected:
@@ -76,10 +73,7 @@ class NCCLTest : public NCCLTestBase {
std::chrono::milliseconds pgTimeout =
c10d::kProcessGroupNCCLDefaultTimeout,
int inputDim = 3)
: NCCLTestBase(path, pgTimeout),
numDevices_(1), // one device per rank (thread)
rank_(rank),
worldSize_(worldSize) {
: NCCLTestBase(path, pgTimeout), rank_(rank), worldSize_(worldSize) {
// Each device has a single tensor to perf the NCCL op
::at::globalContext().lazyInitDevice(c10::DeviceType::CUDA);
tensors_.resize(numDevices_);
@@ -88,10 +82,10 @@ class NCCLTest : public NCCLTestBase {
at::cuda::OptionalCUDAGuard deviceGuard;
assert(numDevices_ == 1);
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(rank_);
deviceGuard.set_index(static_cast<c10::DeviceIndex>(rank_));
tensors_[i] = at::empty({inputDim, inputDim}, at::kCUDA);
inputs_[i].resize(worldSize_ * numDevices_);
outputs_[i].resize(worldSize_ * numDevices_);
inputs_[i].resize(static_cast<size_t>(worldSize_) * numDevices_);
outputs_[i].resize(static_cast<size_t>(worldSize_) * numDevices_);
for (auto j = 0; j < worldSize_ * numDevices_; ++j) {
inputs_[i][j] = at::empty({inputDim, inputDim}, at::kCUDA);
outputs_[i][j] = at::empty({inputDim, inputDim}, at::kCUDA);
@@ -106,7 +100,7 @@ class NCCLTest : public NCCLTestBase {
// getters to retrieve the current stream).
//
// 1 device only, hence 1 stream only
deviceGuard.set_index(rank_);
deviceGuard.set_index(static_cast<c10::DeviceIndex>(rank_));
streams_.push_back(at::cuda::getStreamFromPool());
}
@@ -148,7 +142,8 @@ class NCCLTest : public NCCLTestBase {
std::vector<std::vector<at::Tensor>>& tensor_lists) {
std::vector<std::vector<at::Tensor>> outputs(numDevices_);
for (auto& output : outputs) {
output = std::vector<at::Tensor>(worldSize_ * numDevices_);
output = std::vector<at::Tensor>(
static_cast<size_t>(worldSize_ * numDevices_));
}
// For the duration of this function, make THC use our streams
@@ -169,8 +164,8 @@ class NCCLTest : public NCCLTestBase {
void launchDeviceSleep() {
at::cuda::OptionalCUDAGuard deviceGuard;
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(rank_);
cudaSleep(streams_[i], 2000 * 1000 * 1000);
deviceGuard.set_index(static_cast<c10::DeviceIndex>(rank_));
cudaSleep(streams_[i], 2000ull * 1000 * 1000);
}
}
@@ -178,7 +173,7 @@ class NCCLTest : public NCCLTestBase {
void valueInitialization() {
at::cuda::OptionalCUDAGuard deviceGuard;
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(rank_);
deviceGuard.set_index(static_cast<c10::DeviceIndex>(rank_));
tensors_[i].fill_(pg_->getRank() * numDevices_ + i);
}
}
@@ -199,14 +194,15 @@ class NCCLTest : public NCCLTestBase {
void valueInitializationForSparse() {
at::cuda::OptionalCUDAGuard deviceGuard;
for (const auto i : c10::irange(numDevices_)) {
deviceGuard.set_index(rank_);
deviceGuard.set_index(static_cast<c10::DeviceIndex>(rank_));
tensors_[i].fill_(pg_->getRank() * numDevices_ + i + 1);
// Convert the dense tensor to a sparse tensor in COO row format
tensors_[i] = to_sparse_row_indices_format(tensors_[i]);
}
}
const int numDevices_;
// NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
const int numDevices_{1}; // one device per rank (thread)
int rank_;
int worldSize_;
std::vector<at::Tensor> tensors_;
@@ -374,7 +370,7 @@ class ReduceScatterBaseNCCLTest : public NCCLTest {
ReduceScatterBaseNCCLTest(const std::string& path, int rank, int worldSize)
: NCCLTest(path, rank, worldSize) {
at::cuda::OptionalCUDAGuard deviceGuard;
deviceGuard.set_index(rank_);
deviceGuard.set_index(static_cast<c10::DeviceIndex>(rank_));
output_tensor_ = at::empty({1}, at::kCUDA);
input_tensor_ = at::empty({worldSize}, at::kCUDA);
for (const auto i : c10::irange(worldSize)) {
@@ -755,7 +751,7 @@ class ProcessGroupNCCLTest : public ::testing::Test {
std::vector<std::thread> threads;
threads.reserve(size_);
for (const auto rank : c10::irange(size_)) {
threads.emplace_back(std::thread(testFunc, file.path, rank, size_));
threads.emplace_back(testFunc, file.path, rank, size_);
}
for (const auto rank : c10::irange(size_)) {
threads[rank].join();
@@ -827,7 +823,7 @@ TEST_F(ProcessGroupNCCLTest, testBackendName) {
}
TemporaryFile file;
auto test = NCCLTestBase(file.path);
test.initialize(/*rank=*/0, /*world_size=*/1);
test.initialize(/*rank=*/0, /*size=*/1);
EXPECT_EQ(
test.getProcessGroup()->getBackendName(),
std::string(c10d::NCCL_BACKEND_NAME));

View File

@@ -1,11 +1,9 @@
#ifdef USE_C10D_UCC
#include <gtest/gtest.h>
#include <torch/csrc/distributed/c10d/UCCUtils.hpp>
#include <utility>
#include <vector>
using namespace c10d;
TEST(ProcessGroupUCCTest, testTrim) {
std::vector<std::pair<std::string, std::string>> tests = {
{" allreduce ", "allreduce"},
@@ -13,7 +11,7 @@ TEST(ProcessGroupUCCTest, testTrim) {
{"send\n", "send"},
};
for (auto entry : tests) {
ASSERT_EQ(trim(entry.first), entry.second);
ASSERT_EQ(c10d::trim(entry.first), entry.second);
}
}
@@ -24,12 +22,13 @@ TEST(ProcessGroupUCCTest, testToLower) {
{"send", "send"},
};
for (auto entry : tests) {
ASSERT_EQ(tolower(entry.first), entry.second);
ASSERT_EQ(c10d::tolower(entry.first), entry.second);
}
}
TEST(ProcessGroupUCCTest, testParseList) {
std::string input = "\tAllReduce, ALLGATHER, send\n";
std::vector<std::string> expect{"allreduce", "allgather", "send"};
ASSERT_EQ(parse_list(input), expect);
ASSERT_EQ(c10d::parse_list(input), expect);
}
#endif

View File

@@ -2,10 +2,7 @@
#include "StoreTestCommon.hpp"
#include <cstdlib>
#include <future>
#include <iostream>
#include <string>
#include <system_error>
#include <thread>
#include <gtest/gtest.h>
@@ -104,33 +101,32 @@ void testHelper(bool useLibUV, const std::string& prefix = "") {
std::to_string(numThreads * numIterations + 1);
for (const auto i : c10::irange(numThreads)) {
threads.emplace_back(
std::thread([=, &sem1, &sem2, &clientStores, &expectedCounterRes] {
for (C10_UNUSED const auto j : c10::irange(numIterations)) {
clientStores[i]->add("counter", 1);
}
// Let each thread set and get key on its client store
std::string key = "thread_" + std::to_string(i);
for (const auto j : c10::irange(numIterations)) {
std::string val = "thread_val_" + std::to_string(j);
c10d::test::set(*clientStores[i], key, val);
c10d::test::check(*clientStores[i], key, val);
}
threads.emplace_back([=, &sem1, &sem2, &clientStores, &expectedCounterRes] {
for (C10_UNUSED const auto j : c10::irange(numIterations)) {
clientStores[i]->add("counter", 1);
}
// Let each thread set and get key on its client store
std::string key = "thread_" + std::to_string(i);
for (const auto j : c10::irange(numIterations)) {
std::string val = "thread_val_" + std::to_string(j);
c10d::test::set(*clientStores[i], key, val);
c10d::test::check(*clientStores[i], key, val);
}
sem1.post();
sem2.wait();
// Check the counter results
c10d::test::check(*clientStores[i], "counter", expectedCounterRes);
// Now check other threads' written data
for (const auto j : c10::irange(numThreads)) {
if (j == i) {
continue;
}
std::string key = "thread_" + std::to_string(i);
std::string val = "thread_val_" + std::to_string(numIterations - 1);
c10d::test::check(*clientStores[i], key, val);
}
}));
sem1.post();
sem2.wait();
// Check the counter results
c10d::test::check(*clientStores[i], "counter", expectedCounterRes);
// Now check other threads' written data
for (const auto j : c10::irange(numThreads)) {
if (j == i) {
continue;
}
std::string key = "thread_" + std::to_string(i);
std::string val = "thread_val_" + std::to_string(numIterations - 1);
c10d::test::check(*clientStores[i], key, val);
}
});
}
sem1.wait(numThreads);

View File

@@ -1,13 +1,12 @@
#include <torch/csrc/distributed/c10d/Backoff.hpp>
#include <exception>
#include <stdexcept>
namespace c10d {
namespace {
constexpr std::chrono::milliseconds kZeroInterval{0};
int32_t randSeed() {
std::random_device::result_type randSeed() {
std::random_device rd;
return rd();
}

View File

@@ -418,7 +418,7 @@ class AllToAllSingle : public torch::autograd::Function<AllToAllSingle> {
static torch::autograd::variable_list backward(
torch::autograd::AutogradContext* ctx,
torch::autograd::variable_list grad_out_list) {
const torch::autograd::variable_list& grad_out_list) {
const std::vector<int64_t>& output_split_sizes =
ctx->saved_data["output_split_sizes"].toIntVector();
const std::vector<int64_t>& input_split_sizes =
@@ -476,12 +476,12 @@ class ReduceScatterTensor
static torch::autograd::variable_list backward(
torch::autograd::AutogradContext* ctx,
torch::autograd::variable_list grad_out_list) {
const torch::autograd::variable_list& grad_out_list) {
const int64_t group_size = ctx->saved_data["group_size"].toInt();
const std::string& group_name = ctx->saved_data["group_name"].toStringRef();
DCHECK(grad_out_list.size() == 1);
auto grad_out = grad_out_list[0];
const auto& grad_out = grad_out_list[0];
auto out =
c10::Dispatcher::singleton()
@@ -532,12 +532,12 @@ class AllGatherIntoTensor
static torch::autograd::variable_list backward(
torch::autograd::AutogradContext* ctx,
torch::autograd::variable_list grad_out_list) {
const torch::autograd::variable_list& grad_out_list) {
const int64_t group_size = ctx->saved_data["group_size"].toInt();
const std::string& group_name = ctx->saved_data["group_name"].toStringRef();
DCHECK(grad_out_list.size() == 1);
auto grad_out = grad_out_list[0];
const auto& grad_out = grad_out_list[0];
auto out =
c10::Dispatcher::singleton()

View File

@@ -2,9 +2,8 @@
#include <torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp>
#include <torch/csrc/distributed/c10d/control_plane/Handlers.hpp>
#include <c10/util/CallOnce.h>
#include <c10/util/env.h>
#include <algorithm>
#include <fstream>
#ifdef USE_C10D_NCCL
#include <vector>
@@ -47,7 +46,7 @@ void NCCLComm::waitUntilInitialized(int timeoutSecs) {
auto startTimepoint = std::chrono::steady_clock::now();
while (!initialized_) {
if (ncclComm_) {
ncclResult_t result;
ncclResult_t result{};
ncclCommGetAsyncError(ncclComm_, &result);
if (result == ncclSuccess) {
LOG(INFO) << "Rank " << rank_ << ": NCCL communicator is initialized.";
@@ -98,7 +97,7 @@ std::string getNcclVersion() {
static std::string versionString;
c10::call_once(ncclGetVersionFlag, []() {
int version;
int version = 0;
ncclResult_t status = ncclGetVersion(&version);
// can't compute the version if call did not return successfully or version
// code < 100 (corresponding to 0.1.0)
@@ -116,7 +115,7 @@ std::string getNcclVersion() {
std::to_string(ncclMinor) + "." + std::to_string(ncclPatch);
#ifdef NCCL_SUFFIX
const auto ncclSuffix = std::string(NCCL_SUFFIX);
if (ncclSuffix.length()) {
if (!ncclSuffix.empty()) {
versionString += "." + ncclSuffix;
}
#endif
@@ -134,16 +133,14 @@ size_t hashTensors(const std::vector<at::Tensor>& tensors) {
size_t data_size = tensor.storage().nbytes();
if (data_size > 0 && tensor.storage().data_ptr()) {
auto src = static_cast<const char*>(tensor.storage().data_ptr().get());
char* dst = (char*)std::calloc(data_size, sizeof(char));
std::vector<char> dst(data_size);
// This is needed so that we trigger a device synchronization so we can
// get the collective finished if launched on GPU and hash its output.
cudaMemcpy(dst, src, data_size, cudaMemcpyDeviceToHost);
cudaMemcpy(dst.data(), src, data_size, cudaMemcpyDeviceToHost);
for (size_t i = 0; i < data_size; ++i) {
// Update the hash for each byte in the tensor
hash = c10::hash_combine(
hash, c10::get_hash(((char*)dst)[i], data_size));
hash = c10::hash_combine(hash, c10::get_hash(dst[i], data_size));
}
free(dst);
}
}
}
@@ -199,7 +196,7 @@ std::string getNcclErrorDetailStr(
std::string interpret;
std::string err;
#ifdef ENABLE_NCCL_GET_LAST_ERROR
auto ret = ncclGetLastError(NULL);
auto ret = ncclGetLastError(nullptr);
if (ret) {
err = "\nLast error:\n" + std::string(ret);
} else {
@@ -244,7 +241,7 @@ std::string getNcclErrorDetailStr(
control_plane::RegisterHandler dumpHandler{
"dump_nccl_trace_pickle",
[](const control_plane::Request& req, control_plane::Response& res) {
const auto params = req.params();
const auto& params = req.params();
size_t validParamCount = 0;
// valid params
@@ -292,7 +289,7 @@ control_plane::RegisterHandler dumpHandler{
control_plane::RegisterHandler jsonDumpHandler{
"dump_nccl_trace_json",
[](const control_plane::Request& req, control_plane::Response& res) {
const auto params = req.params();
const auto& params = req.params();
size_t validParamCount = 0;
// valid params
@@ -347,6 +344,11 @@ void DebugInfoWriter::write(const std::string& ncclTrace) {
}
file.write(ncclTrace.data(), ncclTrace.size());
if (!file) {
LOG(ERROR) << "Error opening file for writing NCCLPG debug info: "
<< filename_;
return;
}
LOG(INFO) << "Finished writing NCCLPG debug info to " << filename_;
}
@@ -391,7 +393,7 @@ std::optional<size_t> NCCLTraceBuffer::record(
}
if (all_pg_status_.find(pg_id) == all_pg_status_.end()) {
// Current pg_status is not in FR.
all_pg_status_[pg_id] = pg_status;
all_pg_status_[pg_id] = std::move(pg_status);
}
auto traceback =
torch::CapturedTraceback::gather(true, true, capture_cpp_stack_);
@@ -406,8 +408,8 @@ std::optional<size_t> NCCLTraceBuffer::record(
op_id,
std::move(profiling_name),
std::move(traceback),
std::move(start),
std::move(end),
start,
end,
c10::getTime(),
timeout_ms.count(),
isP2P,
@@ -424,14 +426,14 @@ std::optional<size_t> NCCLTraceBuffer::record(
for (const auto& input : inputs) {
c10::IntArrayRef sizes = input.sizes();
te.input_dtypes_.push_back(input.dtype().toScalarType());
te.input_dims_.push_back(sizes.size());
te.input_dims_.push_back(static_cast<int64_t>(sizes.size()));
te.sizes_.insert(te.sizes_.end(), sizes.begin(), sizes.end());
}
for (const auto& output : outputs) {
c10::IntArrayRef sizes = output.sizes();
te.output_dtypes_.push_back(output.dtype().toScalarType());
te.output_dims_.push_back(sizes.size());
te.output_dims_.push_back(static_cast<int64_t>(sizes.size()));
te.sizes_.insert(te.sizes_.end(), sizes.begin(), sizes.end());
}
@@ -453,7 +455,7 @@ void NCCLTraceBuffer::record_pg_ranks(
return;
}
std::lock_guard<std::mutex> guard(mutex_);
pg_name_to_ranks_[pg_name] = ranks;
pg_name_to_ranks_[pg_name] = std::move(ranks);
}
void NCCLTraceBuffer::update_state(Entry& r) {
@@ -475,8 +477,14 @@ std::vector<NCCLTraceBuffer::Entry> NCCLTraceBuffer::dump_entries() {
std::lock_guard<std::mutex> guard(mutex_);
std::vector<Entry> result;
result.reserve(entries_.size());
result.insert(result.end(), entries_.begin() + next_, entries_.end());
result.insert(result.end(), entries_.begin(), entries_.begin() + next_);
result.insert(
result.end(),
entries_.begin() + static_cast<std::ptrdiff_t>(next_),
entries_.end());
result.insert(
result.end(),
entries_.begin(),
entries_.begin() + static_cast<std::ptrdiff_t>(next_));
// query any remaining events
for (auto& r : result) {
update_state(r);
@@ -566,7 +574,7 @@ const c10::List<c10::IValue> NCCLTraceBuffer::getCollectiveTrace(
if (includeStacktraces) {
auto& tb = stracebacks.tracebacks.at(i);
auto frames = new_list();
for (int64_t frame : tb) {
for (auto frame : tb) {
frames.push_back(all_frames.at(frame));
}
dict.insert(frames_key, frames);
@@ -585,7 +593,7 @@ const c10::List<c10::IValue> NCCLTraceBuffer::getCollectiveTrace(
}
auto it = e.sizes_.begin();
auto read_sizes = [&](const c10::SmallVector<int, 4>& dims) {
auto read_sizes = [&](const c10::SmallVector<int64_t, 4>& dims) {
auto sizes = new_list();
for (auto dim : dims) {
auto arg_sizes = new_list();
@@ -601,14 +609,14 @@ const c10::List<c10::IValue> NCCLTraceBuffer::getCollectiveTrace(
std::vector<std::string> input_dtypes_strs;
input_dtypes_strs.reserve(e.input_dtypes_.size());
for (const auto& input_dtype : e.input_dtypes_) {
input_dtypes_strs.push_back(c10::toString(input_dtype));
input_dtypes_strs.emplace_back(c10::toString(input_dtype));
}
dict.insert(input_dtypes_key, input_dtypes_strs);
dict.insert(output_sizes_key, read_sizes(e.output_dims_));
std::vector<std::string> output_dtypes_strs;
output_dtypes_strs.reserve(e.output_dtypes_.size());
for (const auto& output_dtype : e.output_dtypes_) {
output_dtypes_strs.push_back(c10::toString(output_dtype));
output_dtypes_strs.emplace_back(c10::toString(output_dtype));
}
dict.insert(output_dtypes_key, output_dtypes_strs);
if (e.time_discovered_completed_.has_value()) {
@@ -723,10 +731,10 @@ std::string NCCLTraceBuffer::dump_json(
j[duration_key_str] = *e.duration_;
}
auto it = e.sizes_.begin();
auto read_sizes = [&](const c10::SmallVector<int, 4>& dims) {
auto sizes = std::list<std::list<int>>();
auto read_sizes = [&](const c10::SmallVector<int64_t, 4>& dims) {
auto sizes = std::list<std::list<int64_t>>();
for (auto dim : dims) {
auto arg_sizes = std::list<int>();
auto arg_sizes = std::list<int64_t>();
for (auto i : c10::irange(dim)) {
(void)i;
arg_sizes.push_back(*it++);
@@ -739,14 +747,14 @@ std::string NCCLTraceBuffer::dump_json(
std::vector<std::string> input_dtypes_strs;
input_dtypes_strs.reserve(e.input_dtypes_.size());
for (const auto& input_dtype : e.input_dtypes_) {
input_dtypes_strs.push_back(c10::toString(input_dtype));
input_dtypes_strs.emplace_back(c10::toString(input_dtype));
}
j[input_dtypes_key_str] = input_dtypes_strs;
j[output_sizes_key_str] = read_sizes(e.output_dims_);
std::vector<std::string> output_dtypes_strs;
output_dtypes_strs.reserve(e.output_dtypes_.size());
for (const auto& output_dtype : e.output_dtypes_) {
output_dtypes_strs.push_back(c10::toString(output_dtype));
output_dtypes_strs.emplace_back(c10::toString(output_dtype));
}
j[output_dtypes_key_str] = output_dtypes_strs;
if (e.time_discovered_completed_.has_value()) {
@@ -770,7 +778,7 @@ std::string NCCLTraceBuffer::dump_json(
entries.emplace_back(j);
}
if (entries.size() > 0) {
if (!entries.empty()) {
result[entries_key_str] = entries;
}
}
@@ -811,7 +819,7 @@ std::string NCCLTraceBuffer::dump(
per_comm_dict.insert(ncclId, inner_dict);
}
}
if (per_comm_dict.size() > 0) {
if (!per_comm_dict.empty()) {
result.insert(nccl_comm_key, per_comm_dict);
}
return pickle_str(result);

View File

@@ -636,9 +636,9 @@ struct NCCLTraceBuffer {
std::optional<c10::time_t> time_discovered_completed_;
// size information for input/output tensors
c10::SmallVector<int, 4> input_dims_;
c10::SmallVector<int64_t, 4> input_dims_;
std::vector<c10::ScalarType> input_dtypes_;
c10::SmallVector<int, 4> output_dims_;
c10::SmallVector<int64_t, 4> output_dims_;
std::vector<c10::ScalarType> output_dtypes_;
c10::SmallVector<int64_t, 8> sizes_; // flattened from inputs, outputs
bool retired_ = false; // is this work entry no longer in the workMetaList_?

View File

@@ -2,13 +2,11 @@
#include <dlfcn.h>
#include <exception>
#include <fstream>
#include <map>
#include <mutex>
#include <sstream>
#include <stdexcept>
#include <tuple>
#include <unordered_set>
#include <utility>
#include <ATen/cuda/CUDAContext.h>
@@ -87,7 +85,7 @@ ncclDataType_t getNcclDataType(at::ScalarType type) {
return it->second;
}
bool complexViewAsRealAllowed(const ReduceOp reduceOp) {
bool complexViewAsRealAllowed(const ReduceOp& reduceOp) {
switch (reduceOp) {
case ReduceOp::SUM:
return true;
@@ -110,7 +108,7 @@ ncclRedOpRAII unpackPreMulSum(
const ncclComm_t& comm) {
const auto* preMulSupplement =
reinterpret_cast<NCCLPreMulSumSupplement*>(reduceOp.supplement_.get());
ncclRedOp_t preMulSum;
ncclRedOp_t preMulSum{};
bool has_tensor = preMulSupplement->tensor_factor.defined();
auto residence = has_tensor ? ncclScalarDevice : ncclScalarHostImmediate;
const T* ptr_factor = has_tensor
@@ -161,8 +159,7 @@ ncclRedOpRAII getNcclReduceOp(
default:
C10_THROW_ERROR(
TypeError, "PreMulSum Data type must be half, float, or double");
ncclRedOp_t unused;
return unused;
return ncclRedOp_t{};
}
#else
C10_THROW_ERROR(ValueError, "PreMulSum requires NCCL>=2.11.1");
@@ -260,7 +257,7 @@ std::string buildNcclUniqueIdStr(const ncclUniqueId& ncclID) {
return oss.str();
}
std::string getNcclAbortedCommStoreKey(const std::string ncclIdStr) {
std::string getNcclAbortedCommStoreKey(const std::string& ncclIdStr) {
return std::string(kNCCLAbortedCommStoreKey) + ":" + ncclIdStr;
}
@@ -513,8 +510,8 @@ std::ostream& operator<<(
}
ProcessGroupNCCL::WorkNCCL::WorkNCCL(
const std::string& pgUID,
const std::string& pgDesc,
std::string pgUID,
std::string pgDesc,
at::Device& device,
int rank,
OpType opType,
@@ -527,8 +524,8 @@ ProcessGroupNCCL::WorkNCCL::WorkNCCL(
bool cudaEventCacheEnabled,
DebugLevel distDebugLevel)
: Work(rank, opType, profilingTitle, inputs),
pgUID_(pgUID),
pgDesc_(pgDesc),
pgUID_(std::move(pgUID)),
pgDesc_(std::move(pgDesc)),
device_(device),
workStartTime_(std::chrono::steady_clock::now()),
seq_(seq),
@@ -622,7 +619,7 @@ const std::string& ProcessGroupNCCL::WorkNCCL::logPrefix() const {
void ProcessGroupNCCL::WorkNCCL::setException(
std::exception_ptr exception_ptr) {
std::unique_lock<std::mutex> lock(mutex_);
exception_ = exception_ptr;
exception_ = std::move(exception_ptr);
}
// Helper that checks if the NCCL kernels are completed on the GPUs
@@ -805,7 +802,7 @@ void ProcessGroupNCCL::WorkNCCL::abort() {
ncclCommDevIdxMapMutex.unlock();
}
ProcessGroupNCCL::CUDAEventCache::CUDAEventCache() {}
ProcessGroupNCCL::CUDAEventCache::CUDAEventCache() = default;
// CUDA event is used to record the start/end of one Work.
// Instead of let the CUDA event gets destroyed, we now reuse it after the Work
@@ -854,8 +851,8 @@ ProcessGroupNCCL::ProcessGroupNCCL(
c10::intrusive_ptr<Options> options)
: Backend(rank, size),
store_(store),
options_(options),
ncclCommCounter_(0),
options_(std::move(options)),
traceKeyStart_(getTraceStartKey("NCCL", rank)),
traceKeyEnd_(getTraceEndKey("NCCL", rank)),
terminateProcessGroup_(false),
@@ -1154,7 +1151,7 @@ void ProcessGroupNCCL::waitForFutureOrTimeout(
::c10d::C10dLoggingData data;
if (log) {
data.integers["pg_id"] = local_id_;
data.integers["pg_id"] = static_cast<int64_t>(local_id_);
data.integers["rank"] = rank_;
data.integers["global_rank"] = globalRank();
data.strings["flight_recorder_version"] = c10d::version_val_str;
@@ -1221,7 +1218,7 @@ void ProcessGroupNCCL::waitForFutureOrTimeout(
void ProcessGroupNCCL::abortCommsFromMap(
std::unordered_map<std::string, std::shared_ptr<NCCLComm>>& ncclCommsMap,
std::optional<std::string> abortReason) {
const std::optional<std::string>& abortReason) {
// The process may control multiple devices, loop through the communicators on
// each device
for (auto& it : ncclCommsMap) {
@@ -1255,7 +1252,7 @@ void ProcessGroupNCCL::abortCommsFromMap(
}
// Abort all communicators on this rank
bool ProcessGroupNCCL::abort(std::optional<std::string> abortReason) {
bool ProcessGroupNCCL::abort(const std::optional<std::string>& abortReason) {
// This will log counter for how long the abort actually takes.
STATIC_SCOPED_WAIT_COUNTER(pytorch.ProcessGroupNCCL__abort);
// Remove record from global ncclCommDevIdxMapMutex before aboarting,
@@ -1276,7 +1273,7 @@ bool ProcessGroupNCCL::abort(std::optional<std::string> abortReason) {
return true;
}
void ProcessGroupNCCL::shutdown(std::optional<std::string> reason) {
void ProcessGroupNCCL::shutdown(const std::optional<std::string>& reason) {
// Don't join threads here since the purpose of this method is to abort all
// communicators and signal the threads to exit. Joining on the threads could
// potentially block and hence avoid it in this method.
@@ -1357,13 +1354,13 @@ bool ProcessGroupNCCL::dumpDebuggingInfo() {
return false;
}
void ProcessGroupNCCL::terminateProcess(std::string errMsg) {
void ProcessGroupNCCL::terminateProcess(const std::string& errMsg) {
// Logging with `FATAL`, after errMsg printed, it calls `std::abort()`
// to terminate the program execution.
LOG(FATAL) << logPrefix() << errMsg;
}
int computeDeltaMS(
long computeDeltaMS(
std::chrono::time_point<std::chrono::steady_clock> start,
std::chrono::time_point<std::chrono::steady_clock> end) {
return std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
@@ -1752,7 +1749,7 @@ void ProcessGroupNCCL::addEphemeralTimeout(
}
bool ProcessGroupNCCL::verifyWorkTimeoutForTest(
const c10::intrusive_ptr<Work> work,
const c10::intrusive_ptr<Work>& work,
const std::chrono::milliseconds& timeout) {
// Since collective returns a c10d::Work, we need to cast it to WorkNCCL.
if (auto workNCCL = c10::dynamic_intrusive_pointer_cast<WorkNCCL>(work)) {
@@ -1936,7 +1933,7 @@ void ProcessGroupNCCL::watchdogHandler() {
// multiple times after the start
if (pgStatus_->lastStartedSeq < static_cast<int64_t>(work.seq_) &&
work.isStarted()) {
pgStatus_->lastStartedSeq = work.seq_;
pgStatus_->lastStartedSeq = static_cast<int64_t>(work.seq_);
pgStatus_->lastStartedWorkName = opTypeToString(work.opType_);
}
@@ -1950,7 +1947,7 @@ void ProcessGroupNCCL::watchdogHandler() {
ephemeralTimeoutInflight_ -= work.ownedEphermeralTimeout_;
}
}
pgStatus_->lastCompletedSeq = work.seq_;
pgStatus_->lastCompletedSeq = static_cast<int64_t>(work.seq_);
pgStatus_->lastCompletedWorkName = opTypeToString(work.opType_);
pgStatus_->lastCompletedNumelIn = work.numelIn_;
pgStatus_->lastCompletedNumelOut = work.numelOut_;
@@ -2259,7 +2256,7 @@ std::shared_ptr<NCCLComm> ProcessGroupNCCL::getNCCLComm(
}
// GPU world size and GPU rank
int numRanks, rank;
int numRanks = -1, rank = -1;
if (!singleP2POp) {
// Collective, all-to-all, or batch P2P
@@ -2372,7 +2369,7 @@ std::shared_ptr<NCCLComm> ProcessGroupNCCL::getNCCLComm(
C10D_NCCL_CHECK(ncclGroupStart(), std::nullopt);
}
ncclStreams_.emplace(deviceKey, std::move(streamVal));
ncclStreams_.emplace(deviceKey, streamVal);
// Note: these events are created with the (default) cudaEventDisableTiming
// flag This flag provides the best performance when used with
@@ -2461,7 +2458,7 @@ void check_gpu_single_tensor(
// condition may be a challenge because the test would need to pass tensors on
// different devices in the same process.
int64_t check_gpu_tensors_same_device(const std::vector<at::Tensor>& tensors) {
if (tensors.size() == 0) {
if (tensors.empty()) {
C10_THROW_ERROR(ValueError, "Tensor list must be nonempty");
}
@@ -2600,7 +2597,7 @@ void ProcessGroupNCCL::assignTimeoutToWork(
}
void ProcessGroupNCCL::workEnqueue(
c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> work) {
const c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL>& work) {
if (!terminateProcessGroup_.load()) {
std::lock_guard<std::mutex> lock(workMetaListMutex_);
// Avoid view tensors to be processed in cleanup thread.
@@ -4265,7 +4262,8 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::barrier(const BarrierOptions& opts) {
ValueError,
barDevIdx >= 0,
"Failed to infer a GPU device id to perform barrier. ");
auto barDevice = at::Device(at::DeviceType::CUDA, barDevIdx);
auto barDevice = at::Device(
at::DeviceType::CUDA, static_cast<c10::DeviceIndex>(barDevIdx));
// Create a dummy tensor on the device
// Note: we use zeros() instead of empty() to prevent barrier from triggering
@@ -4291,7 +4289,7 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::alltoall_base(
const AllToAllOptions& /* unused */) {
check_gpu_single_tensor(outputTensor, true);
check_gpu_single_tensor(inputTensor, true);
if (outputSplitSizes.size() == 0 && inputSplitSizes.size() == 0) {
if (outputSplitSizes.empty() && inputSplitSizes.empty()) {
RECORD_PARAM_COMMS_DATA(
std::make_tuple(
static_cast<int64_t>(seqCollective_) + 1,
@@ -4553,7 +4551,8 @@ void ProcessGroupNCCL::groupEnd() {
--ncclActiveGroupCounter_;
}
void ProcessGroupNCCL::groupEndNonblocking(std::shared_ptr<NCCLComm> comm) {
void ProcessGroupNCCL::groupEndNonblocking(
const std::shared_ptr<NCCLComm>& comm) {
#ifndef NCCL_HAS_COMM_NONBLOCKING
C10D_NCCL_CHECK(ncclGroupEnd(), std::nullopt);
#else
@@ -4602,7 +4601,7 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::gather(
outputs = outputTensors[0];
} else {
// if not in the root rank, initialize outputs as empty list
if (outputTensors.size() != 0) {
if (!outputTensors.empty()) {
invalidArgument("requires empty output on non-root");
}
outputs = {};
@@ -4643,13 +4642,14 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::gather(
const auto root = opts.rootRank;
if (getRank() == root) {
if (!avoidRecordStreams_) {
for (auto output : outputs) {
for (auto const& output : outputs) {
c10::cuda::CUDACachingAllocator::recordStream(
output.storage().data_ptr(), stream);
}
}
}
torch::cuda::nccl::gather(inputTensor, outputs, comm, stream, root);
torch::cuda::nccl::gather(
inputTensor, outputs, comm, stream, static_cast<int32_t>(root));
return ncclSuccess;
},
[](at::cuda::CUDAStream&,
@@ -4696,7 +4696,7 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::scatter(
} else {
// if not in the root rank, initialize inputTensors as empty place holder
// with an empty list
if (inputTensors.size() != 0) {
if (!inputTensors.empty()) {
invalidArgument("requires empty input on non-root");
}
inputs = {};
@@ -4740,13 +4740,14 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::scatter(
at::cuda::CUDAStream& stream) {
if (getRank() == root) {
if (!avoidRecordStreams) {
for (auto input : inputs) {
for (auto const& input : inputs) {
c10::cuda::CUDACachingAllocator::recordStream(
input.storage().data_ptr(), stream);
}
}
}
torch::cuda::nccl::scatter(inputs, outputTensor, comm, stream, root);
torch::cuda::nccl::scatter(
inputs, outputTensor, comm, stream, static_cast<int32_t>(root));
return ncclSuccess;
},
[](at::cuda::CUDAStream&,

View File

@@ -272,8 +272,8 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// Constructor takes a list of CUDA devices
WorkNCCL(
const std::string& pgUID,
const std::string& pgDesc,
std::string pgUID,
std::string pgDesc,
at::Device& device,
int rank,
OpType opType,
@@ -373,7 +373,7 @@ class TORCH_API ProcessGroupNCCL : public Backend {
bool avoidRecordStreams_{false};
// Clone of opTimeout_ from ProcessGroupNCCL.
std::chrono::milliseconds opTimeout_;
std::chrono::milliseconds opTimeout_{};
// Ephemeral timeouts are owned by exactly one work,
// and reset after that work completes.
@@ -457,7 +457,7 @@ class TORCH_API ProcessGroupNCCL : public Backend {
private:
std::mutex cacheMutex_;
// NOTE: We intentionaly store raw pointers so that
// NOTE: We intentionally store raw pointers so that
// we do not attempt to destroy the event objects on process exit,
// because cuda may be gone.
std::vector<at::cuda::CUDAEvent*>
@@ -520,7 +520,7 @@ class TORCH_API ProcessGroupNCCL : public Backend {
int size,
const std::string& groupName,
c10::intrusive_ptr<Options> options = Options::create())
: ProcessGroupNCCL(store, rank, size, options) {}
: ProcessGroupNCCL(store, rank, size, std::move(options)) {}
~ProcessGroupNCCL() override;
@@ -643,7 +643,7 @@ class TORCH_API ProcessGroupNCCL : public Backend {
void groupEnd();
void groupEndNonblocking(std::shared_ptr<NCCLComm> comm);
void groupEndNonblocking(const std::shared_ptr<NCCLComm>& comm);
c10::intrusive_ptr<Work> gather(
std::vector<std::vector<at::Tensor>>& outputTensors,
@@ -682,16 +682,16 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// Helper function for iteratively aborting communicators in the provided map
void abortCommsFromMap(
std::unordered_map<std::string, std::shared_ptr<NCCLComm>>& ncclCommsMap,
std::optional<std::string> abortReason);
const std::optional<std::string>& abortReason);
c10::intrusive_ptr<intra_node_comm::IntraNodeComm> initIntraNodeComm();
// Provides an API to abort the ProcessGroup (similar to ncclCommAbort)
// instead of relying on ProcessGroupNCCL destructor.
// return true if abort is successful, otherwise false
bool abort(std::optional<std::string> abortReason = std::nullopt);
bool abort(const std::optional<std::string>& abortReason = std::nullopt);
void shutdown(std::optional<std::string> reason = std::nullopt);
void shutdown(const std::optional<std::string>& reason = std::nullopt);
void eagerConnectSingleDevice(at::Device device) override;
@@ -712,7 +712,7 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// `opTimeout_` of the provided WorkNCCL instance is the same as the specified
// timeout.
bool verifyWorkTimeoutForTest(
const c10::intrusive_ptr<Work> work,
const c10::intrusive_ptr<Work>& work,
const std::chrono::milliseconds& timeout);
protected:
@@ -903,7 +903,7 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// Function that directly trigger std::abort so that the whole process
// gets terminated.
virtual void terminateProcess(std::string errMsg);
virtual void terminateProcess(const std::string& errMsg);
// A helper function to wait for a future to complete or timeout.
void waitForFutureOrTimeout(
@@ -1089,7 +1089,7 @@ class TORCH_API ProcessGroupNCCL : public Backend {
std::list<ProcessGroupNCCL::WorkNCCL> completedWorkList_;
// Add Work Pointer to workVector
void workEnqueue(c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL>);
void workEnqueue(const c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL>&);
// The CUDA streams used by NCCL kernels
std::unordered_map<std::string, at::cuda::CUDAStream> ncclStreams_;

View File

@@ -8,13 +8,10 @@
#include <torch/csrc/distributed/c10d/Utils.hpp>
#include <torch/csrc/jit/serialization/pickler.h>
#include <torch/csrc/profiler/combined_traceback.h>
#include <chrono>
#include <sys/types.h>
#include <cstdlib>
#include <fstream>
#include <string>
#include <system_error>
#include <vector>
namespace c10d {
@@ -27,7 +24,7 @@ struct ProcessGroupStatus {
int64_t lastEnqueuedSeq{-1};
// the sequential number of the last collective started as the kernel
int64_t lastStartedSeq{-1};
// the sequential number of the last colletive completed marked by
// the sequential number of the last collective completed marked by
// the watchdog thread
// initialized to be -1 to indicate no collective has been completed
int64_t lastCompletedSeq{-1};
@@ -129,7 +126,7 @@ inline std::string analyzeLaggingRanks(const TraceMap& traceMap) {
std::string report =
"\n\t - To our best knowledge, the lagging/dead/mismatched ranks "
"that caused the desync are:";
if (startRanks.size()) {
if (!startRanks.empty()) {
report += c10::str(
"\n\t - [",
ranksToString(startRanks),
@@ -137,7 +134,7 @@ inline std::string analyzeLaggingRanks(const TraceMap& traceMap) {
lagSeq,
" (count from 1)");
}
if (endRanks.size()) {
if (!endRanks.empty()) {
report += c10::str(
"\n\t [",
ranksToString(endRanks),
@@ -169,7 +166,7 @@ inline std::string dumpSnapshot(TraceMap& traceMap) {
}
}
if (collectivesStart.size()) {
if (!collectivesStart.empty()) {
report += c10::str("\n\t #", seq, " started ranks:");
for (auto& mapPair : collectivesStart) {
report += c10::str(
@@ -179,7 +176,7 @@ inline std::string dumpSnapshot(TraceMap& traceMap) {
mapPair.first);
}
}
if (collectivesEnd.size()) {
if (!collectivesEnd.empty()) {
report += c10::str("\n\t #", seq, " finished ranks:");
for (auto& mapPair : collectivesEnd) {
report += c10::str(
@@ -218,7 +215,7 @@ inline std::string retrieveDesyncReport(
int worldSize) {
std::string report;
uint64_t thisSeq;
uint64_t thisSeq = 0;
std::string thisCol;
std::vector<int> missingRanks;
@@ -226,7 +223,7 @@ inline std::string retrieveDesyncReport(
for (const auto rank : c10::irange(worldSize)) {
// Build traceMapStart.
uint64_t seqStart;
uint64_t seqStart = 0;
{
std::string traceKeyStart = getTraceStartKey(pgName, rank);
if (!store->check({traceKeyStart})) {
@@ -250,7 +247,7 @@ inline std::string retrieveDesyncReport(
if (!store->check({traceKeyEnd})) {
continue;
}
uint64_t seq;
uint64_t seq = 0;
std::string col;
if (!parseTraceValue(store, traceKeyEnd, seq, col)) {
return report;
@@ -323,7 +320,7 @@ inline std::string get_python_cpp_trace() {
auto frame_id = s_tb[idx];
const auto& frame = s_tbs.all_frames.at(frame_id);
oss << "#" << idx << " " << frame.funcname << " from " << frame.filename
<< ":" << frame.lineno << std::endl;
<< ":" << frame.lineno << '\n';
}
return oss.str();
}

View File

@@ -2,12 +2,8 @@
#include <torch/csrc/python_headers.h>
namespace torch {
namespace distributed {
namespace c10d {
namespace torch::distributed::c10d {
PyMethodDef* python_functions();
} // namespace c10d
} // namespace distributed
} // namespace torch
} // namespace torch::distributed::c10d

View File

@@ -45,12 +45,10 @@ struct formatter<std::error_code> {
} // namespace fmt
namespace c10d {
namespace detail {
namespace c10d::detail {
inline std::error_code lastError() noexcept {
return std::error_code{errno, std::generic_category()};
}
} // namespace detail
} // namespace c10d
} // namespace c10d::detail

View File

@@ -12,8 +12,7 @@
#include <c10/util/Logging.h>
#include <fmt/format.h>
namespace c10d {
namespace detail {
namespace c10d::detail {
enum class LogLevel { Trace, Debug, Info, Warning, Error };
@@ -24,8 +23,7 @@ std::string formatLogMessage(fmt::string_view fmt, T&&... args) {
return fmt::vformat(fmt, fmt::make_format_args(args...));
}
} // namespace detail
} // namespace c10d
} // namespace c10d::detail
#define C10D_ERROR(...) \
if (c10d::detail::isLogLevelEnabled(c10d::detail::LogLevel::Error)) \

View File

@@ -16,8 +16,7 @@
#include <torch/csrc/distributed/c10d/Backoff.hpp>
#include <torch/csrc/distributed/c10d/exception.h>
namespace c10d {
namespace detail {
namespace c10d::detail {
class SocketOptions {
public:
@@ -103,5 +102,4 @@ class Socket {
std::unique_ptr<SocketImpl> impl_;
};
} // namespace detail
} // namespace c10d
} // namespace c10d::detail

View File

@@ -22,11 +22,9 @@ as it exposes the underlying platform specific socket headers.
#include <netinet/in.h>
#endif
namespace c10d {
namespace detail {
namespace c10d::detail {
// Returns a human-readable representation of the given socket address.
std::string formatSockAddr(const struct ::sockaddr* addr, socklen_t len);
} // namespace detail
} // namespace c10d
} // namespace c10d::detail