mirror of
https://github.com/zebrajr/pytorch.git
synced 2026-01-15 12:15:51 +00:00
Make c10d tests -Werror clean (#69703)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/69703 Test Plan: Imported from OSS Reviewed By: seemethere Differential Revision: D32997001 Pulled By: malfet fbshipit-source-id: 38b5f195c04f2b3b920e6883a96fe9a36345b9d2
This commit is contained in:
committed by
Facebook GitHub Bot
parent
be757addfa
commit
3bb20ae49f
@@ -12,7 +12,6 @@ function(c10d_add_test test_src)
|
||||
target_link_libraries(${test_name} ${ARGN})
|
||||
if(NOT WIN32)
|
||||
target_link_libraries(${test_name} pthread)
|
||||
target_compile_options(${test_name} PRIVATE -Wno-error)
|
||||
endif()
|
||||
add_test(NAME ${test_name} COMMAND $<TARGET_FILE:${test_name}>)
|
||||
endfunction()
|
||||
|
||||
@@ -81,14 +81,14 @@ void stressTestStore(std::string path, std::string prefix = "") {
|
||||
std::vector<std::thread> threads;
|
||||
c10d::test::Semaphore sem1, sem2;
|
||||
|
||||
for (const auto i : c10::irange(numThreads)) {
|
||||
threads.push_back(std::thread([&] {
|
||||
for (C10_UNUSED const auto i : c10::irange(numThreads)) {
|
||||
threads.emplace_back(std::thread([&] {
|
||||
auto fileStore =
|
||||
c10::make_intrusive<c10d::FileStore>(path, numThreads + 1);
|
||||
c10d::PrefixStore store(prefix, fileStore);
|
||||
sem1.post();
|
||||
sem2.wait();
|
||||
for (const auto j : c10::irange(numIterations)) {
|
||||
for (C10_UNUSED const auto j : c10::irange(numIterations)) {
|
||||
store.add("counter", 1);
|
||||
}
|
||||
}));
|
||||
|
||||
@@ -62,11 +62,11 @@ void stressTestStore(std::string prefix = "") {
|
||||
auto hashStore = c10::make_intrusive<c10d::HashStore>();
|
||||
c10d::PrefixStore store(prefix, hashStore);
|
||||
|
||||
for (const auto i : c10::irange(numThreads)) {
|
||||
threads.push_back(std::thread([&] {
|
||||
for (C10_UNUSED const auto i : c10::irange(numThreads)) {
|
||||
threads.emplace_back(std::thread([&] {
|
||||
sem1.post();
|
||||
sem2.wait();
|
||||
for (const auto j : c10::irange(numIterations)) {
|
||||
for (C10_UNUSED const auto j : c10::irange(numIterations)) {
|
||||
store.add("counter", 1);
|
||||
}
|
||||
}));
|
||||
|
||||
@@ -16,12 +16,12 @@ using c10d::ProcessGroup;
|
||||
template <typename T, typename... Args>
|
||||
std::vector<T> initialize(const std::string& path, int N, Args&&... args) {
|
||||
std::vector<T> tests;
|
||||
for (const auto i : c10::irange(N)) {
|
||||
for (C10_UNUSED const auto i : c10::irange(N)) {
|
||||
tests.push_back(std::move(T(path, std::forward<Args>(args)...)));
|
||||
}
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
for (const auto i : c10::irange(N)) {
|
||||
for (C10_UNUSED const auto i : c10::irange(N)) {
|
||||
threads.push_back(std::thread([i, N, &tests] { tests[i].start(i, N); }));
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ std::vector<T> initialize(const std::string& path, int N, Args&&... args) {
|
||||
|
||||
class AsyncTest {
|
||||
public:
|
||||
AsyncTest(const std::string& path) : path_(path) {}
|
||||
AsyncTest(std::string path) : path_(std::move(path)) {}
|
||||
|
||||
AsyncTest(AsyncTest&& other) {
|
||||
path_ = std::move(other.path_);
|
||||
|
||||
@@ -126,13 +126,13 @@ class CollectiveTest {
|
||||
int num,
|
||||
bool delayed = false) {
|
||||
std::vector<CollectiveTest> tests;
|
||||
for (const auto i : c10::irange(num)) {
|
||||
tests.push_back(CollectiveTest(path));
|
||||
for (C10_UNUSED const auto i : c10::irange(num)) {
|
||||
tests.emplace_back(CollectiveTest(path));
|
||||
}
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
for (const auto i : c10::irange(num)) {
|
||||
threads.push_back(std::thread(
|
||||
threads.emplace_back(std::thread(
|
||||
[i, &tests, delayed] { tests[i].start(i, tests.size(), delayed); }));
|
||||
}
|
||||
for (auto& thread : threads) {
|
||||
@@ -142,7 +142,7 @@ class CollectiveTest {
|
||||
return tests;
|
||||
}
|
||||
|
||||
CollectiveTest(const std::string& path) : path_(path) {}
|
||||
CollectiveTest(std::string path) : path_(std::move(path)) {}
|
||||
|
||||
CollectiveTest(CollectiveTest&& other) {
|
||||
path_ = std::move(other.path_);
|
||||
|
||||
@@ -31,8 +31,8 @@ c10::intrusive_ptr<c10d::TCPStore> _createServer(
|
||||
|
||||
// Different ports for different tests.
|
||||
void testHelper(const std::string& prefix = "") {
|
||||
const auto numThreads = 16;
|
||||
const auto numWorkers = numThreads + 1;
|
||||
constexpr auto numThreads = 16;
|
||||
constexpr auto numWorkers = numThreads + 1;
|
||||
|
||||
auto serverTCPStore = _createServer(numWorkers);
|
||||
|
||||
@@ -79,7 +79,7 @@ void testHelper(const std::string& prefix = "") {
|
||||
|
||||
// Hammer on TCPStore
|
||||
std::vector<std::thread> threads;
|
||||
const auto numIterations = 1000;
|
||||
constexpr auto numIterations = 1000;
|
||||
c10d::test::Semaphore sem1, sem2;
|
||||
|
||||
c10d::TCPStoreOptions opts{};
|
||||
@@ -100,14 +100,12 @@ void testHelper(const std::string& prefix = "") {
|
||||
std::to_string(numThreads * numIterations + 1);
|
||||
|
||||
for (const auto i : c10::irange(numThreads)) {
|
||||
threads.emplace_back(std::thread([&sem1,
|
||||
threads.emplace_back(std::thread([=,
|
||||
&sem1,
|
||||
&sem2,
|
||||
&clientStores,
|
||||
i,
|
||||
&expectedCounterRes,
|
||||
&numIterations,
|
||||
&numThreads] {
|
||||
for (const auto j : c10::irange(numIterations)) {
|
||||
&expectedCounterRes] {
|
||||
for (C10_UNUSED const auto j : c10::irange(numIterations)) {
|
||||
clientStores[i]->add("counter", 1);
|
||||
}
|
||||
// Let each thread set and get key on its client store
|
||||
@@ -163,13 +161,12 @@ void testWatchKeyCallback(const std::string& prefix = "") {
|
||||
// were run
|
||||
std::promise<int> numCallbacksExecutedPromise;
|
||||
std::atomic<int> numCallbacksExecuted{0};
|
||||
const int numThreads = 16;
|
||||
const int keyChangeOperation = 3;
|
||||
constexpr int numThreads = 16;
|
||||
constexpr int keyChangeOperation = 3;
|
||||
c10d::WatchKeyCallback callback =
|
||||
[&numCallbacksExecuted,
|
||||
&numCallbacksExecutedPromise,
|
||||
&numThreads,
|
||||
&keyChangeOperation](
|
||||
[=,
|
||||
&numCallbacksExecuted,
|
||||
&numCallbacksExecutedPromise](
|
||||
c10::optional<std::string> /* unused */,
|
||||
c10::optional<std::string> /* unused */) {
|
||||
numCallbacksExecuted++;
|
||||
@@ -210,12 +207,11 @@ void testWatchKeyCallback(const std::string& prefix = "") {
|
||||
std::vector<std::thread> threads;
|
||||
std::atomic<int> keyChangeOperationCount{0};
|
||||
for (const auto i : c10::irange(numThreads)) {
|
||||
threads.emplace_back(std::thread([&clientStores,
|
||||
threads.emplace_back(std::thread([=,
|
||||
&clientStores,
|
||||
&internalKey,
|
||||
&internalKeyCount,
|
||||
&keyChangeOperationCount,
|
||||
&keyChangeOperation,
|
||||
i] {
|
||||
&keyChangeOperationCount] {
|
||||
// Let each thread set and get key on its client store
|
||||
std::string key = internalKey + std::to_string(i);
|
||||
std::string keyCounter = internalKeyCount + std::to_string(i);
|
||||
@@ -275,8 +271,7 @@ void testKeyChangeHelper(
|
||||
c10d::WatchKeyCallback callback = [expectedOldValue,
|
||||
expectedNewValue,
|
||||
&callbackPromise,
|
||||
&eptr,
|
||||
&key](
|
||||
&eptr](
|
||||
c10::optional<std::string> oldValue,
|
||||
c10::optional<std::string> newValue) {
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user