[BE] split seq_id to collective_seq_id and p2p_seq_id (#125727)

Summary:
Split out `seq_id` into `collective_seq_id` and `p2p_seq_id`. The main idea here is that collectives that go to all machines should have identical `collective_seq_id` and therefore it makes it easier to spot if one of machines isn't handling a collective operation.
Next, we can attempt to match up p2p operations to ensure that the sender(s)/receivers(s) are in sync.

Resolves issue: https://github.com/pytorch/pytorch/issues/125173

Test Plan:
Unit tests.

Reviewers:

Subscribers:

Tasks:

Tags:

Pull Request resolved: https://github.com/pytorch/pytorch/pull/125727
Approved by: https://github.com/zdevito
This commit is contained in:
Chirag Pandya
2024-05-20 17:10:51 -07:00
committed by PyTorch MergeBot
parent 5f64086d08
commit a83e745356
5 changed files with 94 additions and 53 deletions

View File

@@ -69,7 +69,7 @@ class ProcessGroupNCCLSimulateErrors : public c10d::ProcessGroupNCCL {
const std::vector<at::Tensor>& outputs = {},
bool record = false) override {
return c10::make_intrusive<WorkNCCLSimulateErrors>(
device, simulateError_, rank, opType, seq_);
device, simulateError_, rank, opType, seqCollective_);
}
size_t getNCCLCommCacheSize() {
@@ -131,7 +131,7 @@ class ProcessGroupNCCLTimedOutErrors : public ProcessGroupNCCLSimulateErrors {
const std::vector<at::Tensor>& outputs = {},
bool record = false) override {
return c10::make_intrusive<WorkNCCLTimedoutErrors>(
device, setTimedoutError_, rank, opType, seq_);
device, setTimedoutError_, rank, opType, seqCollective_);
}
void setTimedoutError() {

View File

@@ -3524,7 +3524,7 @@ class NCCLTraceTest(NCCLTraceTestBase):
t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())
ver = t["version"]
self.assertEqual(ver, "1.5")
self.assertEqual(ver, "2.0")
pg_config = t["pg_config"]
self.assertEqual(len(pg_config), 1)
default_pg_info = pg_config["0"]
@@ -3548,7 +3548,7 @@ class NCCLTraceTest(NCCLTraceTestBase):
self.assertIn("test_c10d_nccl.py", str(last["frames"]))
self.assertEqual(last["input_sizes"], ((3, 4),))
self.assertEqual(last["output_sizes"], ((3, 4),))
self.assertEqual(last["seq_id"], 2)
self.assertEqual(last["collective_seq_id"], 2)
now = datetime.now()
event_created_time = datetime.fromtimestamp(
last["time_created_ns"] / 1000000000
@@ -3629,7 +3629,7 @@ class NCCLTraceTest(NCCLTraceTestBase):
self.assertIn("test_c10d_nccl.py", str(last["frames"]))
self.assertEqual(last["input_sizes"], ((3, 4),))
self.assertEqual(last["output_sizes"], ((3, 4),))
self.assertEqual(last["seq_id"] - first["seq_id"], 9)
self.assertEqual(last["collective_seq_id"] - first["collective_seq_id"], 9)
@requires_nccl()
@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")
@@ -3659,10 +3659,10 @@ class NCCLTraceTest(NCCLTraceTestBase):
t = t["entries"]
self.assertEqual(t[-1]["profiling_name"], "nccl:all_reduce")
if self.rank == 0:
self.assertEqual(t[-1]["seq_id"], 1)
self.assertEqual(t[-1]["collective_seq_id"], 1)
self.assertEqual(t[-1]["state"], "completed")
else:
self.assertEqual(t[-1]["seq_id"], 2)
self.assertEqual(t[-1]["collective_seq_id"], 2)
self.assertEqual(
t[-1]["state"], self.started_or_scheduled(timing_enabled)
)
@@ -3704,10 +3704,10 @@ class NCCLTraceTest(NCCLTraceTestBase):
t = t["entries"]
self.assertEqual(t[-1]["profiling_name"], "nccl:all_reduce")
if self.rank == 0:
self.assertEqual(t[-1]["seq_id"], 1)
self.assertEqual(t[-1]["collective_seq_id"], 1)
self.assertEqual(t[-1]["state"], "completed")
else:
self.assertEqual(t[-1]["seq_id"], 2)
self.assertEqual(t[-1]["collective_seq_id"], 2)
self.assertEqual(
t[-1]["state"], self.started_or_scheduled(timing_enabled)
)
@@ -3799,7 +3799,9 @@ class NCCLTraceTest(NCCLTraceTestBase):
self.assertEqual(
t["entries"][p2p_op_idx]["profiling_name"], profiling_name
)
self.assertEqual(t["entries"][p2p_op_idx]["seq_id"], expected_seq)
self.assertEqual(
t["entries"][p2p_op_idx]["collective_seq_id"], expected_seq
)
self.assertEqual(t["entries"][p2p_op_idx]["op_id"], expected_op_id)
expected_op_id += 1
self.assertEqual(t["entries"][p2p_op_idx]["input_sizes"], [input_sizes])
@@ -3819,7 +3821,9 @@ class NCCLTraceTest(NCCLTraceTestBase):
self.assertEqual(
t["entries"][coalesced_op]["profiling_name"], "nccl:coalesced"
)
self.assertEqual(t["entries"][coalesced_op]["seq_id"], expected_seq)
self.assertEqual(
t["entries"][coalesced_op]["collective_seq_id"], expected_seq
)
expected_seq += 1
self.assertEqual(t["entries"][coalesced_op]["state"], "completed")
self.assertEqual(t["entries"][coalesced_op]["input_sizes"], [])
@@ -3875,7 +3879,7 @@ class NCCLTraceTest(NCCLTraceTestBase):
input_sizes = op_sizes[seq % ops_per_repeat]
profiling_name = "nccl:recv 0<-1" if self.rank == 0 else "nccl:send 1->0"
self.assertEqual(t["entries"][seq]["profiling_name"], profiling_name)
self.assertEqual(t["entries"][seq]["seq_id"], expected_seq)
self.assertEqual(t["entries"][seq]["p2p_seq_id"], expected_seq)
expected_seq += 1
self.assertEqual(t["entries"][seq]["op_id"], expected_op_id)
expected_op_id += 1
@@ -3935,7 +3939,7 @@ class NCCLTraceTest(NCCLTraceTestBase):
self.assertEqual(
t["entries"][0]["profiling_name"], "nccl:reduce_scatter_tensor_coalesced"
)
self.assertEqual(t["entries"][0]["seq_id"], 1)
self.assertEqual(t["entries"][0]["collective_seq_id"], 1)
self.assertEqual(t["entries"][0]["input_sizes"], [[2, 2], [2, 2]])
self.assertEqual(
t["entries"][0]["output_sizes"],
@@ -4003,9 +4007,9 @@ class NCCLTraceTestDumpOnTimeout(NCCLTraceTestDumpOnTimeoutBase):
t = pickle.load(f)
t = t["entries"]
self.assertEqual(len(t), 2)
self.assertEqual(t[0]["seq_id"], 1)
self.assertEqual(t[0]["collective_seq_id"], 1)
self.assertEqual(t[0]["state"], "completed")
self.assertEqual(t[1]["seq_id"], 2)
self.assertEqual(t[1]["collective_seq_id"], 2)
self.assertEqual(
t[1]["state"], self.started_or_scheduled(timing_enabled)
)
@@ -4066,7 +4070,7 @@ class NCCLTraceTestTimeoutDumpOnStuckRanks(NCCLTraceTestDumpOnTimeoutBase):
t = pickle.load(f)
t = t["entries"]
self.assertEqual(len(t), 1)
self.assertEqual(t[0]["seq_id"], 1)
self.assertEqual(t[0]["collective_seq_id"], 1)
self.assertEqual(t[0]["state"], "completed")
return

View File

@@ -931,7 +931,7 @@ void ProcessGroupNCCL::setSequenceNumberForGroup() {
} // NCCL just starts sequence numbers at 0.
uint64_t ProcessGroupNCCL::getSequenceNumberForGroup() {
return seq_;
return seqCollective_;
}
void ProcessGroupNCCL::registerOnCompletionHook(
@@ -2246,7 +2246,7 @@ c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> ProcessGroupNCCL::initWork(
device,
rank,
opType,
seq_,
seqCollective_,
profilingTitle,
profilingTitle != nullptr ? std::optional<std::vector<at::Tensor>>(inputs)
: c10::nullopt,
@@ -2254,6 +2254,7 @@ c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> ProcessGroupNCCL::initWork(
enableTiming_.load(),
dist_debug_level_);
if (record) {
bool isP2P = isP2POp(opType);
// Ideally record every work that we enqueue, rather than every work we
// create.
// - at the time of this PR we do not currently enqueue every created work
@@ -2270,13 +2271,15 @@ c10::intrusive_ptr<ProcessGroupNCCL::WorkNCCL> ProcessGroupNCCL::initWork(
r->trace_id_ = NCCLTraceBuffer::get()->record(
uid_,
std::make_tuple(pg_name_, pg_desc_),
seq_,
seqCollective_,
seqP2P_,
op_id_,
profilingTitle ? profilingTitle : "",
inputs,
outputs,
r->ncclStartEvent_.get(),
r->ncclEndEvent_.get());
r->ncclEndEvent_.get(),
isP2P);
}
return r;
}
@@ -2328,10 +2331,6 @@ ProcessGroupNCCL::Options::Options(bool is_high_priority_stream)
static constexpr int CoalActive = 0x01, CoalColl = 0x02, CoalP2P = 0x04;
void ProcessGroupNCCL::startCoalescing() {
coalescedDevice_.set_index(-1);
coalescedComm_ = nullptr;
coalescing_state_ |= CoalActive;
groupStart();
// Other collective ops bump seq_ before creating a work. Thus, if coalesced
// ops bump seq_ only after initing a work they will collide with (reuse) the
// seq_ of the last non-coalesced collective. Previously, seq_ was bumped
@@ -2340,10 +2339,19 @@ void ProcessGroupNCCL::startCoalescing() {
// same seq_ for those ops and its 'endCoalescing' op. Hence we bump during
// start, which has one minor downside- we burn a seq_ if someone ever does a
// 'start' and 'end' coalescing region without doing an operation inbetween.
seq_++;
// Don't bump op_id_ here, becuase startCoalescing isn't a logical operation.
// Don't bump op_id_ here, because startCoalescing isn't a logical operation.
// Bump it for each logical op inside the coalescing group.
if (coalescing_state_ & CoalP2P) {
seqP2P_++;
} else {
seqCollective_++;
}
coalescedDevice_.set_index(-1);
coalescedComm_ = nullptr;
coalescing_state_ |= CoalActive;
groupStart();
}
// `optype` is for specifying a composite optype, such as ALLGATHER and
@@ -2441,7 +2449,7 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::collective(
errorIfCapturingNonCapturableNCCL(capture_status);
// Bump collective counter
seq_++;
seqCollective_++;
op_id_++;
auto device = getDevice(input);
@@ -2596,9 +2604,10 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::collectiveCoalesced(
errorIfCapturingNonCapturableNCCL(capture_status);
// Bump collective counter
seq_++;
seqCollective_++;
// For coalescingManager collectives, there is no individual c++ call per
// collective so there is no flight record and we increment seq_ and op_id_
// collective so there is no flight record and we increment seq*_ and op_id_
// together. Compare this to startCoalesing/endCoalescing flow where we
// increment seq_ once per group and increment op_id_ once per indvidual
// operation within the group
@@ -2826,9 +2835,9 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::pointToPoint(
p2pTargetRank = isSendRecvSelf ? 0 : 1 - p2pRank;
if (!coalescing_state_) {
// Bump sequence number. Don't do so if it's a batch P2P, it will be
// bumped in `endCoalescing`.
seq_++;
// Bump P2P sequence number. Don't do so if it's a batch P2P, it will be
// bumped in `startCoalescing`.
seqP2P_++;
}
}
@@ -2869,13 +2878,15 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::pointToPoint(
auto trace_id = NCCLTraceBuffer::get()->record(
uid_,
std::make_tuple(pg_name_, pg_desc_),
seq_,
seqCollective_,
seqP2P_,
op_id_,
profilingTitle,
{tensor},
{tensor},
nullptr,
nullptr);
nullptr,
/*isP2P=*/true);
// TODO(whc) if we want to make the per-p2p-op flightrecorder entries get
// their timings/states updated by proxy when the Work obj representing the
// coalesce group gets its update, we could accumulate these trace_ids
@@ -2894,19 +2905,21 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::pointToPoint(
// output, not sure what
work->outputs_ = std::make_shared<std::vector<at::Tensor>>();
work->outputs_->push_back(tensor);
// TODO(whc) becuase we don't pass output {tensor} to initWork, we tell
// TODO(whc) because we don't pass output {tensor} to initWork, we tell
// initWork to not record, and then we manually call record passing all the
// information it wants.
work->trace_id_ = NCCLTraceBuffer::get()->record(
uid_,
std::make_tuple(pg_name_, pg_desc_),
seq_,
seqCollective_,
seqP2P_,
op_id_,
profilingTitle,
{tensor},
{tensor},
work->ncclStartEvent_.get(),
work->ncclEndEvent_.get());
work->ncclEndEvent_.get(),
/*isP2P=*/true);
}
// is gpuGuard needed for the if block below, or can i swap them

View File

@@ -1055,13 +1055,16 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// Counting for the sequential number of NCCL collective call.
// (specifically, how many actual kernels we launched, which differs from
// op_id_ when coalescing is enabled)
uint64_t seq_{0};
uint64_t seqCollective_{0};
// Counting for the sequential number of NCCL P2P calls.
uint64_t seqP2P_{0};
// Incrementing counter for logical operations (collective or p2p) issued on
// the ProcessGroup
uint64_t op_id_{0};
// the sequential number of the last colletive enqueued into workMetaList_
// the sequential number of the last collective enqueued into workMetaList_
// This is useful for indentifying a rank that has not join a collective
// initialized to be -1 to indicate no collective has been enqueued
int64_t lastEnqueuedSeq_{-1};
@@ -1069,10 +1072,10 @@ class TORCH_API ProcessGroupNCCL : public Backend {
// the name of the last collective enqueued into workMetaList_
std::string lastEnqueuedWorkName_;
// the sequential number of the last colletive started as the kernal
// the sequential number of the last collective started as the kernel
int64_t lastStartedSeq_{-1};
// the name of the last collective started as the kernal
// the name of the last collective started as the kernel
std::string lastStartedWorkName_;
// the sequential number of the last colletive completed marked by

View File

@@ -2,17 +2,23 @@
#include <c10/util/ApproximateClock.h>
#include <c10/util/irange.h>
#include <torch/csrc/distributed/c10d/NCCLUtils.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <torch/csrc/distributed/c10d/Types.hpp>
#include <torch/csrc/distributed/c10d/Utils.hpp>
#include <ATen/cuda/CUDAEvent.h>
#include <torch/csrc/jit/serialization/pickler.h>
#include <torch/csrc/profiler/combined_traceback.h>
#include <sys/types.h>
#include <cstdlib>
#include <fstream>
#include <string>
#include <system_error>
#include <vector>
namespace c10d {
static c10::IValue entries_key = "entries";
@@ -20,12 +26,14 @@ static c10::IValue nccl_comm_key = "nccl_comm_state";
static c10::IValue version_key = "version";
// Update whenever changing contents or formatting of the dump
// (minor when adding fields, major when changing existing fields)
static c10::IValue version_val = "1.5";
static c10::IValue version_val = "2.0";
static c10::IValue pg_config_key = "pg_config";
static c10::IValue record_id_key = "record_id";
static c10::IValue pg_id_key = "pg_id";
static c10::IValue pg_name_key = "process_group";
static c10::IValue seq_id_key = "seq_id";
static c10::IValue collective_seq_id_key = "collective_seq_id";
static c10::IValue p2p_seq_id_key = "p2p_seq_id";
static c10::IValue is_p2p_key = "is_p2p";
static c10::IValue op_id_key = "op_id";
static c10::IValue profiling_name_key = "profiling_name";
static c10::IValue input_sizes_key = "input_sizes";
@@ -428,11 +436,14 @@ struct NCCLTraceBuffer {
size_t pg_id_;
std::tuple<std::string, std::string> pg_name_; // <group_name, group_desc>
// Both seq_id_ and op_id_ are per_pg incrementing counters
// seq_id refers to actual kernel launches (e.g. 1 per coalesced group)
// op_id refers to logical operations (e.g. one per op inside coalesced
// group)
size_t seq_id_;
// collective_seq_id and p2p_seq_id refer to actual kernel launches (e.g. 1
// per coalesced group).
// collective_seq_id only increments for true collective operations (over
// all ranks in the group). p2p_seq_id only increments over non-collective
// operations in the group. op_id refers to logical operations (e.g. one per
// op inside coalesced group)
size_t collective_seq_id_;
size_t p2p_seq_id_;
size_t op_id_;
std::string profiling_name_;
@@ -445,6 +456,10 @@ struct NCCLTraceBuffer {
// timestamp when the entry was created, likely close to the time the work
// was 'enqueued'- not necessarily started
c10::time_t time_created_;
// Is this a P2P event?
bool isP2P_;
std::optional<float> duration_;
// timestamp when our CPU threads discovered that the kernel started.
@@ -479,13 +494,15 @@ struct NCCLTraceBuffer {
std::optional<size_t> record(
size_t pg_id,
const std::tuple<std::string, std::string>& pg_name,
size_t seq_id,
size_t collective_seq_id,
size_t p2p_seq_id,
size_t op_id,
std::string profiling_name,
const std::vector<at::Tensor>& inputs,
const std::vector<at::Tensor>& outputs,
Event* start,
Event* end) {
Event* end,
bool isP2P) {
if (!enabled_) {
return c10::nullopt;
}
@@ -497,13 +514,15 @@ struct NCCLTraceBuffer {
id_,
pg_id,
pg_name,
seq_id,
collective_seq_id,
p2p_seq_id,
op_id,
std::move(profiling_name),
std::move(traceback),
std::move(start),
std::move(end),
c10::getTime()};
c10::getTime(),
isP2P};
for (const auto& input : inputs) {
c10::IntArrayRef sizes = input.sizes();
@@ -656,7 +675,8 @@ struct NCCLTraceBuffer {
dict.insert(record_id_key, int64_t(e.id_));
dict.insert(pg_id_key, int64_t(e.pg_id_));
dict.insert(pg_name_key, e.pg_name_);
dict.insert(seq_id_key, int64_t(e.seq_id_));
dict.insert(collective_seq_id_key, int64_t(e.collective_seq_id_));
dict.insert(p2p_seq_id_key, int64_t(e.p2p_seq_id_));
dict.insert(op_id_key, int64_t(e.op_id_));
dict.insert(profiling_name_key, e.profiling_name_);
dict.insert(time_created_key, int64_t(e.time_created_));
@@ -699,6 +719,7 @@ struct NCCLTraceBuffer {
? int64_t(*e.time_discovered_completed_)
: c10::IValue());
dict.insert(retired_key, e.retired_);
dict.insert(is_p2p_key, e.isP2P_);
auto frames = new_list();
for (int64_t frame : tb) {