#include #include #ifdef USE_C10D_GLOO #include #include #include #include #ifdef _WIN32 #include #include #include #else #include #include #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef _WIN32 #define GENERATE_ALL_TYPES(type, func, ...) \ switch (type) { \ case ::at::ScalarType::Float: \ func(__VA_ARGS__); \ break; \ case ::at::ScalarType::Double: \ func(__VA_ARGS__); \ break; \ case ::at::ScalarType::Half: \ func(__VA_ARGS__); \ break; \ case ::at::ScalarType::BFloat16: \ func(__VA_ARGS__); \ break; \ case ::at::ScalarType::Char: \ func(__VA_ARGS__); \ break; \ case ::at::ScalarType::Byte: \ case ::at::ScalarType::Bool: \ func(__VA_ARGS__); \ break; \ case ::at::ScalarType::Int: \ func(__VA_ARGS__); \ break; \ case ::at::ScalarType::Long: \ func(__VA_ARGS__); \ break; \ default: \ TORCH_CHECK(false, "Invalid scalar type"); \ } #define HOST_NAME_MAX 256 #else #define GENERATE_ALL_TYPES(type, func, args...) \ switch (type) { \ case ::at::ScalarType::Float: \ func(args); \ break; \ case ::at::ScalarType::Double: \ func(args); \ break; \ case ::at::ScalarType::Half: \ func(args); \ break; \ case ::at::ScalarType::BFloat16: \ func(args); \ break; \ case ::at::ScalarType::Char: \ func(args); \ break; \ case ::at::ScalarType::Byte: \ case ::at::ScalarType::Bool: \ func(args); \ break; \ case ::at::ScalarType::Int: \ func(args); \ break; \ case ::at::ScalarType::Long: \ func(args); \ break; \ default: \ TORCH_CHECK(false, "Invalid scalar type"); \ } #endif namespace c10d { namespace { using steady_clock_time_point = std::chrono::time_point; std::chrono::milliseconds getRemainingTime( steady_clock_time_point startTime, const std::chrono::milliseconds& timeout, bool waitAllRanks) { if (waitAllRanks) { // See Note in monitoredBarrier return timeout; } auto elapsedTime = std::chrono::steady_clock::now() - startTime; auto remainingMillis = timeout - std::chrono::duration_cast(elapsedTime); // If no more remaining time, return -1 to indicate to caller. if (remainingMillis.count() <= 0) { return std::chrono::milliseconds(-1); } return remainingMillis; } // Emit a LOG(ERROR) and throws using TORCH_CHECK with the given messages. void logAndThrow( const std::string& logMessage, const std::string& errorMessage) { LOG(ERROR) << logMessage; TORCH_CHECK(false, errorMessage); } // For monitoredBarrier, checks remaining time left to finish processing ranks // and throws error if timeout. void checkRemainingTime( const std::chrono::milliseconds& monitoredBarrierTimeout, const std::chrono::milliseconds& remainingTime, const std::vector& processedRanks, int currentRank) { const std::string kNoRemainingTimeError = c10::str( "Rank ", currentRank, " timed out in monitoredBarrier after ", monitoredBarrierTimeout.count(), " ms."); if (remainingTime.count() < 0) { std::string rankInfo; if (!processedRanks.empty()) { rankInfo = c10::str( "Successfully processed ranks: ", c10::Join(", ", processedRanks)); } else { rankInfo = "No ranks successfully processed in monitoredBarrier."; } auto error = c10::str(kNoRemainingTimeError, "\n", rankInfo); logAndThrow(error, error); } } typedef void (*ReduceFunc)(void*, const void*, const void*, size_t); template , int> = 0> ReduceFunc toFunction(const ReduceOp& r) { switch (r) { case ReduceOp::SUM: return ReduceFunc(&::gloo::sum); case ReduceOp::PRODUCT: return ReduceFunc(&::gloo::product); case ReduceOp::MIN: return ReduceFunc(&::gloo::min); case ReduceOp::MAX: return ReduceFunc(&::gloo::max); case ReduceOp::BAND: TORCH_CHECK(false, "Cannot use ReduceOp.BAND with non-integral dtype"); break; case ReduceOp::BOR: TORCH_CHECK(false, "Cannot use ReduceOp.BOR with non-integral dtype"); break; case ReduceOp::BXOR: TORCH_CHECK(false, "Cannot use ReduceOp.BXOR with non-integral dtype"); break; case ReduceOp::AVG: TORCH_CHECK(false, "Cannot use ReduceOp.AVG with Gloo"); break; case ReduceOp::PREMUL_SUM: TORCH_CHECK(false, "Cannot use ReduceOp.PREMUL_SUM with Gloo"); break; case ReduceOp::UNUSED: default: break; } TORCH_CHECK(false, "Unhandled ReduceOp"); } // Bitwise AND with SFINAE guard for integral types. template , int> = 0> void band(void* c, const void* a, const void* b, size_t n) { auto tc = static_cast(c); auto ta = static_cast(a); auto tb = static_cast(b); for (const auto i : c10::irange(n)) { tc[i] = ta[i] & tb[i]; } } // Bitwise OR with SFINAE guard for integral types. template , int> = 0> void bor(void* c, const void* a, const void* b, size_t n) { auto tc = static_cast(c); auto ta = static_cast(a); auto tb = static_cast(b); for (const auto i : c10::irange(n)) { tc[i] = ta[i] | tb[i]; } } // Bitwise XOR with SFINAE guard for integral types. template , int> = 0> void bxor(void* c, const void* a, const void* b, size_t n) { auto tc = static_cast(c); auto ta = static_cast(a); auto tb = static_cast(b); for (const auto i : c10::irange(n)) { tc[i] = ta[i] ^ tb[i]; } } template , int> = 0> ReduceFunc toFunction(const ReduceOp& r) { switch (r) { case ReduceOp::SUM: return ReduceFunc(&::gloo::sum); case ReduceOp::PRODUCT: return ReduceFunc(&::gloo::product); case ReduceOp::MIN: return ReduceFunc(&::gloo::min); case ReduceOp::MAX: return ReduceFunc(&::gloo::max); case ReduceOp::BAND: return ReduceFunc(&band); case ReduceOp::BOR: return ReduceFunc(&bor); case ReduceOp::BXOR: return ReduceFunc(&bxor); case ReduceOp::AVG: TORCH_CHECK(false, "Cannot use ReduceOp.AVG with Gloo"); break; case ReduceOp::PREMUL_SUM: TORCH_CHECK(false, "Cannot use ReduceOp.PREMUL_SUM with Gloo"); break; case ReduceOp::UNUSED: default: break; } TORCH_CHECK(false, "Unhandled ReduceOp"); } template void setInputs(O& opts, std::vector& tensors) { opts.setInputs(getDataPointers(tensors), tensors[0].numel()); } template void setInput(O& opts, at::Tensor& tensor) { opts.setInput(getDataPointer(tensor), tensor.numel()); } template void setInput(O& opts, at::Tensor& tensor, std::vector& counts) { opts.setInput(getDataPointer(tensor), counts); } template void setInput(O& opts, at::Tensor& tensor, std::vector& counts) { opts.setInput(getDataPointer(tensor), counts); } template void setOutputs(O& opts, std::vector& tensors) { opts.setOutputs(getDataPointers(tensors), tensors[0].numel()); } template void setOutput(O& opts, at::Tensor& tensor) { opts.setOutput(getDataPointer(tensor), tensor.numel()); } template void setOutput(O& opts, at::Tensor& tensor, std::vector& counts) { opts.setOutput(getDataPointer(tensor), counts); } template void setOutput(O& opts, at::Tensor& tensor, std::vector& counts) { opts.setOutput(getDataPointer(tensor), counts); } at::Tensor pinnedLike(at::Tensor& tensor) { auto* allocator = at::detail::getCUDAHooks().getPinnedMemoryAllocator(); auto storage = c10::Storage( c10::Storage::use_byte_size_t(), static_cast(at::detail::computeStorageNbytes( tensor.sizes(), tensor.strides(), tensor.dtype().itemsize())), allocator, /*resizable=*/false); return at::empty({0}, tensor.options().device(at::kCPU)) .set_(storage, 0, tensor.sizes(), tensor.strides()); } // This function initializes a vector of CUDA streams, one for every // tensor in the input tensor vector, and ensures that these streams are // synchronized with the current default streams. This is needed so // that new work on the new streams is serialized w.r.t. all operations // on the tensors. void initializeStreamsEvents( const std::vector& tensors, std::vector& streams, std::vector& events) { streams.reserve(tensors.size()); events.reserve(tensors.size()); for (const auto i : c10::irange(tensors.size())) { c10::Device device = tensors[i].device(); c10::impl::VirtualGuardImpl impl(device.type()); // Record event on current stream events.emplace_back(device.type()); events[i].record(impl.getStream(device)); // Get a non-default stream to execute asynchronous CUDA operations // on for this device. This ensures that the default stream used // by the caller is not occupied by c10d related operations. streams.push_back( impl.getStreamFromGlobalPool(device, /*isHighPriority=*/true)); // Ensure the new stream is synchronized with the current stream. events[i].block(streams[i]); // `tensors` are created on a different stream. Hence, they must record // new streams in this Work to prevent being freed before the Work finishes. if (tensors[i].is_sparse()) { if (tensors[i].is_coalesced()) { impl.recordDataPtrOnStream( tensors[i].indices().storage().data_ptr(), streams[i]); impl.recordDataPtrOnStream( tensors[i].values().storage().data_ptr(), streams[i]); } else { // We will need to coalesce first, which means new tensors will // be allocated on the streams we just allocated, and there // is no need to record them separately. } } else { impl.recordDataPtrOnStream(tensors[i].storage().data_ptr(), streams[i]); } } } // This function initializes a vector of CUDA streams, one per device, // and ensures that these streams are synchronized with the current default // streams. It is assumed that the tensors in the nested tensor vectors are // on the same device. void initializeStreamsEvents( std::vector>& tensors, std::vector& streams, std::vector& events) { // Ensure that the tensors in the nested tensor vectors are on the same // device. for (const auto& tensorgroup : tensors) { const auto device_id = tensorgroup[0].device().index(); for (const auto& tensor : tensorgroup) { if (tensor.device().index() != device_id) { TORCH_CHECK( false, "tensors in the nested tensor vectors need to " "be on the same device"); } } } streams.reserve(tensors.size()); events.reserve(tensors.size()); for (const auto i : c10::irange(tensors.size())) { c10::Device device = tensors[i][0].device(); c10::impl::VirtualGuardImpl impl(device.type()); // Record event on current stream events.emplace_back(device.type()); events[i].record(impl.getStream(device)); // Get a non-default stream to execute asynchronous CUDA operations // on for this output. This ensures that the default stream used // by the caller is not occupied by c10d related operations. streams.push_back( impl.getStreamFromGlobalPool(device, /*isHighPriority=*/true)); // Ensure the new stream is synchronized with the current stream. events[i].block(streams[i]); for (at::Tensor& tensor : tensors[i]) { // `tensors` are created on a different stream. Hence, they must record // new streams in this Work to prevent being freed before the Work // finishes. impl.recordDataPtrOnStream(tensor.storage().data_ptr(), streams[i]); } } } const auto kLoopbackAddress = "127.0.0.1"; } // namespace // static void ProcessGroupGloo::AsyncWork::execute( const c10::intrusive_ptr& work) { if (work->recordFunctionBeforeCallback_) { work->recordFunctionBeforeCallback_(); } try { work->run(); } catch (...) { work->finishWorkGlooError(std::current_exception()); return; } // FIXME: We need to call it here since Future completion requires all // the work to be synchronized to CUDA. work->synchronize(); work->finishWorkGloo(); } std::vector ProcessGroupGloo::AsyncWork::result() { TORCH_CHECK( isCompleted(), "Work needs to be completed before calling result(). " "Should call wait() before result()."); TORCH_CHECK( outputTensors_.size() <= 1, "work result does not support list of lists, use .getFuture() and value()"); return outputTensors_.empty() ? std::vector() : outputTensors_.at(0); } c10::intrusive_ptr ProcessGroupGloo::AsyncWork:: getFuture() { return future_; } namespace { c10::intrusive_ptr createFutureAsOutput( const std::vector>& outputTensors) { if (outputTensors.size() > 1) { return c10::make_intrusive( c10::ListType::create(c10::ListType::create(c10::TensorType::get()))); } return c10::make_intrusive( c10::ListType::create(c10::TensorType::get())); } void returnFutureWithOutput( c10::intrusive_ptr& future, const std::vector>& outputTensors) { if (outputTensors.empty()) { future->markCompleted(c10::IValue(std::vector())); return; } if (outputTensors.size() > 1) { future->markCompleted(c10::IValue(outputTensors)); return; } future->markCompleted(c10::IValue(outputTensors[0])); } } // namespace inline void ProcessGroupGloo::AsyncWork::recordAsyncWorkProfilingInfo( const char* profilingTitle, const std::optional>& inputTensors) { auto recordingFunction = std::make_shared(at::RecordScope::USER_SCOPE); if (recordingFunction->isActive()) { std::function before_handler = [inputTensors, profilingTitle, recordingFunction]() { // The work will be started and completed by different threads. recordingFunction->_setAsync(); std::vector inputs; if (inputTensors) { inputs.reserve(inputTensors->size()); for (const auto& tensor : *inputTensors) { inputs.emplace_back(tensor); } } recordingFunction->before( profilingTitle, c10::ArrayRef(inputs.data(), inputs.size())); }; recordFunctionBeforeCallback_ = at::wrapPropagateTLSState(std::move(before_handler)); std::function end_handler = [recordingFunction]() { recordingFunction->end(); }; recordFunctionEndCallback_ = at::wrapPropagateTLSState(end_handler); } } ProcessGroupGloo::AsyncWork::AsyncWork( std::vector> outputTensors, OpType opType, uint64_t seq, const char* profilingTitle, const std::optional>& inputTensors) // Profiler: Pass nullptr as profilingTitle to parent constructor to // replace default profiler implementation with async version that reports // correct timestamps for work that is asynchronously executed. : Work(-1, opType, nullptr, inputTensors), outputTensors_(std::move(outputTensors)), future_(createFutureAsOutput(outputTensors_)), seq_(seq) { if (profilingTitle != nullptr) { recordAsyncWorkProfilingInfo(profilingTitle, inputTensors); } } uint64_t ProcessGroupGloo::AsyncWork::getSequencenumber() const { return seq_; } void ProcessGroupGloo::AsyncWork::finishWorkGlooError( const std::exception_ptr& eptr) { future_->setError(eptr); finish(eptr); } void ProcessGroupGloo::AsyncWork::finishWorkGloo() { returnFutureWithOutput(future_, outputTensors_); finish(); } ProcessGroupGloo::SendWork::SendWork( at::Tensor& tensor, std::unique_ptr<::gloo::transport::UnboundBuffer> buffer, uint64_t seq) : Work( -1, OpType::SEND, "gloo:send", std::optional>({tensor})), tensor_(tensor), buffer_(std::move(buffer)), seq_(seq) {} uint64_t ProcessGroupGloo::SendWork::getSequencenumber() const { return seq_; } bool ProcessGroupGloo::SendWork::wait(std::chrono::milliseconds timeout) { bool sendCompleted = false; std::exception_ptr exception{nullptr}; try { if (timeout == kNoTimeout) { sendCompleted = buffer_->waitSend(); } else { sendCompleted = buffer_->waitSend(timeout); } } catch (...) { exception = std::current_exception(); } // Completes the Work object and throws the exception. finishAndThrow(exception); return sendCompleted; } void ProcessGroupGloo::SendWork::abort() { buffer_->abortWaitSend(); } ProcessGroupGloo::RecvWork::RecvWork( at::Tensor& tensor, std::unique_ptr<::gloo::transport::UnboundBuffer> buffer, OpType opType, uint64_t seq, const char* profilingTitle) : Work( -1, opType, profilingTitle, std::optional>({tensor})), tensor_(tensor), buffer_(std::move(buffer)), srcRank_(-1), seq_(seq) {} uint64_t ProcessGroupGloo::RecvWork::getSequencenumber() const { return seq_; } int ProcessGroupGloo::RecvWork::sourceRank() const { std::lock_guard lock(mutex_); return srcRank_; } bool ProcessGroupGloo::RecvWork::wait(std::chrono::milliseconds timeout) { bool recvCompleted = false; std::exception_ptr exception{nullptr}; try { if (timeout == kNoTimeout) { recvCompleted = buffer_->waitRecv(&srcRank_); } else { recvCompleted = buffer_->waitRecv(&srcRank_, timeout); } } catch (...) { exception = std::current_exception(); } // Completes the Work object and throws the exception. finishAndThrow(exception); return recvCompleted; } void ProcessGroupGloo::RecvWork::abort() { buffer_->abortWaitRecv(); } ProcessGroupGloo::Options::Options(std::chrono::milliseconds timeout) : Backend::Options(GLOO_BACKEND_NAME, timeout), threads(2) {} namespace { void socketInitialize() { #ifdef _WIN32 ::gloo::init_winsock(); #endif } // Gloo assumes that this machine's hostname can always be resolved // to an address. If it doesn't it throws a runtime error saying // that it can't be resolved. Instead of catching it, we choose // to proactively check if an address can be resolved, so we can // gracefully fall back to an alternative if it doesn't. bool doesHostnameResolveToUsableAddress(const std::string& hostname) { socketInitialize(); struct addrinfo hints {}; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; struct addrinfo* result = nullptr; auto rv = getaddrinfo(hostname.c_str(), nullptr, &hints, &result); if (rv < 0) { return false; } struct addrinfo* rp = nullptr; for (rp = result; rp != nullptr; rp = rp->ai_next) { auto fd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (fd == -1) { continue; } rv = bind(fd, rp->ai_addr, rp->ai_addrlen); #ifdef _WIN32 closesocket(fd); #else close(fd); #endif if (rv == -1) { continue; } break; } freeaddrinfo(result); return rp != nullptr; } } // namespace std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: createDeviceForInterface(const std::string& interface_name) { return ::c10d::GlooDeviceFactory::makeDeviceForInterface(interface_name); } std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: createDeviceForHostname(const std::string& hostname) { TORCH_CHECK( doesHostnameResolveToUsableAddress(hostname), "Cannot resolve ", hostname, " to a (local) address"); return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname); } #if defined(__linux__) || defined(_WIN32) std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: createDefaultDevice() { // Use the hostname to resolve the network address to // use. Note: if the hostname does not resolve to an address (e.g. // because of misconfigured /etc/hosts file), this will not work. socketInitialize(); std::array hostname{}; auto rv = gethostname(hostname.data(), HOST_NAME_MAX); if (rv != 0) { C10_THROW_ERROR(DistBackendError, std::strerror(errno)); } // Use this machine's hostname if it resolves to an address. if (doesHostnameResolveToUsableAddress(hostname.data())) { return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname.data()); } // Otherwise, use the loopback address. TORCH_WARN_ONCE( "Unable to resolve hostname to a (local) address. ", "Using the loopback address as fallback. ", "Manually set the network interface to bind to with GLOO_SOCKET_IFNAME."); return createDeviceForHostname(kLoopbackAddress); } #endif #ifdef __APPLE__ std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo:: createDefaultDevice() { // Use the hostname to resolve the network address to // use. Note: if the hostname does not resolve to an address (e.g. // because of misconfigured /etc/hosts file), this will not work. const auto hostNameMax = sysconf(_SC_HOST_NAME_MAX); auto hostname = std::unique_ptr(new char[hostNameMax]); auto rv = gethostname(hostname.get(), hostNameMax); if (rv != 0) { C10_THROW_ERROR(DistBackendError, std::strerror(errno)); } // Use this machine's hostname if it resolves to an address. if (doesHostnameResolveToUsableAddress(hostname.get())) { return ::c10d::GlooDeviceFactory::makeDeviceForHostname(hostname.get()); } // Otherwise, use the loopback address. TORCH_WARN_ONCE( "Unable to resolve hostname to a (local) address. ", "Using the loopback address as fallback. ", "Manually set the network interface to bind to with GLOO_SOCKET_IFNAME."); return createDeviceForHostname(kLoopbackAddress); } #endif ProcessGroupGloo::ProcessGroupGloo( const c10::intrusive_ptr& store, int rank, int size, c10::intrusive_ptr options) : Backend(rank, size), store_(new GlooStore(store)), options_(std::move(options)), stop_(false), collectiveCounter_(0) { auto& devices = options_->devices; if (devices.empty()) { TORCH_CHECK(false, "No device(s) specified"); } // Create and connect a context for every device. // // Note that the same device can be specified multiple times, either // the same object, or the same logical device as different objects. // Either mode is fine and only has performance implications. // // Using the same object multiple times means all contexts share a // single I/O thread. If you use different objects for the same // logical device they will have independent I/O threads. The latter // option is needed if you have a fast NIC that cannot be saturated // by a single I/O thread. // contexts_.reserve(options_->devices.size()); for (const auto i : c10::irange(options_->devices.size())) { auto context = std::make_shared<::gloo::rendezvous::Context>(rank_, size_); auto store = ::gloo::rendezvous::PrefixStore(std::to_string(i), *store_); context->setTimeout(options_->timeout); try { context->connectFullMesh(store, options_->devices[i]); } catch (const std::runtime_error& e) { auto err = e.what(); // TORCH_CHECK to print the cpp stacktrace. auto msg = c10::str("Gloo connectFullMesh failed with ", err); logAndThrow(msg, msg); } contexts_.push_back(std::move(context)); } // Every worker thread stores the AsyncWork object it's currently // working on in the workInProgress_ vector. It must have size equal // to the number of workers such that they can simply index into it // using the worker index they are started with. workInProgress_.resize(options_->threads); threads_.resize(options_->threads); for (const auto i : c10::irange(threads_.size())) { threads_[i] = std::thread(&ProcessGroupGloo::runLoop, this, i); } init(); } ProcessGroupGloo::~ProcessGroupGloo() { std::unique_lock lock(workMutex_); workConsumeCV_.wait(lock, [&] { return workQueue_.empty(); }); // Queue is empty, signal stop stop_ = true; // Release lock to allow threads to terminate lock.unlock(); workProduceCV_.notify_all(); // Wait for worker threads to terminate for (auto& thread : threads_) { thread.join(); } } uint32_t ProcessGroupGloo::nextTag() { return collectiveCounter_++; } std::shared_ptr<::gloo::Context> ProcessGroupGloo::getContext(uint32_t tag) { return contexts_[tag % contexts_.size()]; } void ProcessGroupGloo::runLoop(int workerIndex) { std::unique_lock lock(workMutex_); while (!stop_) { if (workQueue_.empty()) { workProduceCV_.wait(lock); continue; } auto work = std::move(workQueue_.front()); workQueue_.pop_front(); workInProgress_[workerIndex] = work; lock.unlock(); // Notify after releasing the lock so that the waiter // does not immediately block. workConsumeCV_.notify_one(); AsyncWork::execute(work); lock.lock(); workInProgress_[workerIndex].reset(); } } void ProcessGroupGloo::enqueue(c10::intrusive_ptr work) { std::unique_lock lock(workMutex_); workQueue_.push_back(std::move(work)); lock.unlock(); // Notify after releasing the lock so that the waiter // does not immediately block. workProduceCV_.notify_one(); } namespace { class AsyncBroadcastWork : public ProcessGroupGloo::AsyncWork { public: AsyncBroadcastWork( const std::shared_ptr& context, std::vector& inputs, int rootRank, int rootTensor, uint32_t tag, uint64_t seq) : ProcessGroupGloo::AsyncWork( {inputs}, OpType::BROADCAST, seq, "gloo:broadcast", inputs), context(context), inputs(inputs), rootRank(rootRank), rootTensor(rootTensor), tag(tag) {} std::shared_ptr context; std::vector inputs{}; const int rootRank; const int rootTensor; const uint32_t tag; void broadcast(at::Tensor& tensor) { const auto& scalarType = tensor.scalar_type(); gloo::BroadcastOptions opts(context); opts.setRoot(rootRank); opts.setTag(tag); GENERATE_ALL_TYPES(scalarType, setOutput, opts, tensor); gloo::broadcast(opts); } void run() override { broadcast(inputs[rootTensor]); // Copy to non-root tensors for (const auto i : c10::irange(inputs.size())) { if (i == static_cast(rootTensor)) { continue; } inputs[i].copy_(inputs[rootTensor]); } } }; class AsyncBroadcastCUDAWork : public AsyncBroadcastWork { public: AsyncBroadcastCUDAWork( const std::shared_ptr& context, std::vector& inputs, int rootRank, int rootTensor, uint32_t tag, uint64_t seq) : AsyncBroadcastWork(context, inputs, rootRank, rootTensor, tag, seq) { initializeStreamsEvents(inputs, streams, events); // Create pinned host side tensors. tmp = pinnedLike(inputs[rootTensor]); c10::OptionalStreamGuard guard; if (context->rank == rootRank) { guard.reset_stream(streams[rootTensor]); tmp.copy_(inputs[rootTensor], /* non_blocking */ true); } } void run() override { // Synchronize with copy operation if applicable. if (context->rank == rootRank) { streams[rootTensor].synchronize(); } // Run broadcast on host side tensors. broadcast(tmp); // Kick off copy back to the CUDA tensors. c10::OptionalStreamGuard guard; for (const auto i : c10::irange(inputs.size())) { guard.reset_stream(streams[i]); inputs[i].copy_(tmp, /* non_blocking */ true); events[i].record(streams[i]); } } void synchronize() override { // Synchronize with the copy back to CUDA tensors. for (const auto i : c10::irange(inputs.size())) { c10::Device device = inputs[i].device(); events[i].block( c10::impl::VirtualGuardImpl(device.type()).getStream(device)); } } at::Tensor tmp; std::vector streams{}; std::vector events{}; }; } // namespace c10::intrusive_ptr ProcessGroupGloo::broadcast( std::vector& inputs, const BroadcastOptions& opts) { static auto invalidArgument = [](const std::string& msg) { TORCH_CHECK(false, "ProcessGroupGloo::broadcast: " + msg); }; assertRootRank(invalidArgument, opts.rootRank, size_); assertRootTensor( invalidArgument, opts.rootTensor, static_cast(inputs.size())); assertDense(invalidArgument, inputs); assertTypeAndSizesMatch(invalidArgument, inputs); const auto& device = inputs[0].device(); switch (device.type()) { case at::kCPU: break; case at::kCUDA: // If the user gave us a CUDA tensor then CUDA must be loaded. TORCH_INTERNAL_ASSERT(at::hasCUDA()); break; default: invalidArgument(c10::str("unsupported device type ", device.type())); } c10::intrusive_ptr work; auto tag = nextTag(); auto context = getContext(tag); ++seq_; if (device.type() == at::kCPU) { work = c10::make_intrusive( std::move(context), inputs, opts.rootRank, opts.rootTensor, tag, seq_); } else if (device.type() == at::kCUDA) { work = c10::make_intrusive( std::move(context), inputs, opts.rootRank, opts.rootTensor, tag, seq_); } else { TORCH_CHECK(false, "Invalid backend"); } enqueue(work); return work; } namespace { class AsyncAllreduceWork : public ProcessGroupGloo::AsyncWork { public: AsyncAllreduceWork( const std::shared_ptr& context, std::vector& inputs, ReduceOp reduceOp, uint32_t tag, uint64_t seq) : ProcessGroupGloo::AsyncWork( {inputs}, OpType::ALLREDUCE, seq, "gloo:all_reduce", inputs), context(context), inputs(inputs), reduceOp(std::move(reduceOp)), tag(tag) {} std::shared_ptr context; std::vector inputs{}; const ReduceOp reduceOp; const uint32_t tag; void allreduce(std::vector& tensors) { const auto& scalarType = tensors[0].scalar_type(); gloo::AllreduceOptions opts(context); opts.setReduceFunction(getFunction(scalarType, reduceOp)); opts.setTag(tag); GENERATE_ALL_TYPES(scalarType, setOutputs, opts, tensors); gloo::allreduce(opts); } void run() override { allreduce(inputs); } template void getFunction(gloo::AllreduceOptions::Func& fn, const ReduceOp op) { fn = toFunction(op); } gloo::AllreduceOptions::Func getFunction( const at::ScalarType& dtype, const ReduceOp& op) { gloo::AllreduceOptions::Func fn; GENERATE_ALL_TYPES(dtype, getFunction, fn, op); return fn; } }; class AsyncAllreduceCoalescedWork : public AsyncAllreduceWork { public: AsyncAllreduceCoalescedWork( const std::shared_ptr& context, std::vector& inputs, ReduceOp reduceOp, uint32_t tag, uint64_t seq) : AsyncAllreduceWork(context, inputs, std::move(reduceOp), tag, seq) {} void run() override { allreduceCoalesced(inputs); } private: void allreduceCoalesced(std::vector& tensors) { // reduce coalesced, flattened tensors. at::Tensor coalescedTensor = flattenDenseTensors(tensors); std::vector allreduceInput = {coalescedTensor}; allreduce(allreduceInput); // separate and reshape tensors. size_t offset = 0; for (at::Tensor& tensor : tensors) { const int64_t tensorNumel = tensor.numel(); const c10::IntArrayRef tensorShape = tensor.sizes(); tensor.copy_(coalescedTensor.slice(0, offset, offset + tensorNumel) .view(tensorShape)); offset += tensorNumel; } } }; class AsyncSparseAllreduceWork : public ProcessGroupGloo::AsyncWork { public: AsyncSparseAllreduceWork( const std::shared_ptr& context, std::vector& inputs, uint32_t tag, uint64_t seq) : ProcessGroupGloo::AsyncWork( {inputs}, OpType::_ALLREDUCE_SPARSE, seq, "gloo:sparse_all_reduce", inputs), context(context), inputs(inputs), tag(tag) {} std::shared_ptr context; std::vector inputs{}; const uint32_t tag; // We share dimensionality about the sparse tensors before collecting // their contents. We assume here that the maximum number of sparse // and dense dimensions is 4. This is stored in a contiguous piece of // memory so that we can easily run allgather on it. // // The layout of this memory is as follows: // // - [0:4]: sparse dims // - [4:8]: dense dims // - [8]: nnz // class SparseTensorMetadata { public: static constexpr auto dim = 9; // Construct from an existing metadata tensor to facilitate structured // access to metadata from peers, after gathering it. explicit SparseTensorMetadata(at::Tensor metadata) : metadata_(std::move(metadata)), data_(metadata_.mutable_data_ptr()) { AT_ASSERT(metadata_.scalar_type() == at::kLong); AT_ASSERT(metadata_.dim() == 1); AT_ASSERT(metadata_.size(0) == dim); } // Populate the metadata. void populate_from_sparse_tensor(const at::Tensor& tensor) { const auto sparse_dim = tensor.sparse_dim(); AT_ASSERT(sparse_dim <= 4); for (const auto i : c10::irange(4)) { if (i < sparse_dim) { data_[i] = tensor.size(i); } } const auto dense_dim = tensor.dense_dim(); AT_ASSERT(dense_dim <= 4); for (const auto i : c10::irange(4)) { if (i < dense_dim) { data_[i + 4] = tensor.size(sparse_dim + i); } } data_[8] = tensor._nnz(); } std::vector sizes() const { std::vector sizes; // Sparse sizes for (const auto i : c10::irange(4)) { if (data_[i] <= 0) { break; } sizes.push_back(data_[i]); } // Dense sizes for (const auto i : c10::irange(4, 8)) { if (data_[i] <= 0) { break; } sizes.push_back(data_[i]); } return sizes; } int64_t nnz() const { return data_[8]; } protected: at::Tensor metadata_; int64_t* data_; }; // Sparse allreduce is implemented with allgather on indices and values. // Every process then sums the resulting sparse tensors locally. // The nnz for sparse tensors may be different across processes, so first // we run allgather on the nnz, and then allgather with max(nnz). at::Tensor allreduce(std::vector& tensors) { // TODO: This is a massive hack! There is some confusion about // Variable/Tensor inside the body of this function. Turning off // grad smooths over the confusion for now. This fixes // test/test_c10d_gloo.py ProcessGroupGlooTest.test_sparse_allreduce_basics // // The correct fix is to stop allocating tensors that are not variables, // but to conveniently do this c10d must depend on torch not ATen at::AutoDispatchBelowAutograd guard; auto input = tensors[0]; // Perform local reduction if we have multiple inputs. for (const auto i : c10::irange(1, tensors.size())) { input += tensors[i]; } // Need to coalesce before we can access indices and values. input = input.coalesce(); // Gather metadata information from all ranks. auto metadata = allgather_metadata(input); // Sanity check dimensionality across ranks. { const auto expected = metadata[context->rank].sizes(); for (const auto i : c10::irange(context->size)) { if (i == context->rank) { continue; } const auto actual = metadata[i].sizes(); TORCH_CHECK(actual == expected, "Sparse dimensions do not match"); } } // Gather all indices and all values. auto indices = allgather_indices(input, metadata); auto values = allgather_values(input, metadata); // Perform global reduction. AT_ASSERT(static_cast(indices.size()) == context->size); AT_ASSERT(static_cast(values.size()) == context->size); auto output = at::sparse_coo_tensor( indices[0], values[0], input.sizes(), input.options()); for (const auto i : c10::irange(1, context->size)) { output += at::sparse_coo_tensor( indices[i], values[i], input.sizes(), input.options()); } // Coalesce for good measure. return output.coalesce(); } void run() override { auto output = allreduce(inputs); // This copy is needed when we run a multi-gpu version of reduce (multiple // inputs per rank). for (const auto i : c10::irange(inputs.size())) { inputs[i].copy_(output); } } private: std::vector allgather_metadata( const at::Tensor& tensor) { auto buffer = at::zeros({context->size, SparseTensorMetadata::dim}, at::kLong); // Prepare metadata vector (1 entry per rank) std::vector metadata; metadata.reserve(context->size); for (const auto i : c10::irange(context->size)) { metadata.emplace_back(buffer.select(0, i)); } // Populate data for this rank metadata[context->rank].populate_from_sparse_tensor(tensor); // Allgather metadata gloo::AllgatherOptions opts(context); opts.setOutput(buffer.mutable_data_ptr(), buffer.numel()); opts.setTag(tag); gloo::allgather(opts); return metadata; } std::vector allgather_indices( const at::Tensor& tensor, const std::vector& metadata) { const auto sparseDim = tensor.sparse_dim(); std::vector counts(context->size); size_t totalSize = 0; for (const auto i : c10::irange(metadata.size())) { counts[i] = metadata[i].nnz() * sparseDim; totalSize += counts[i]; } auto output = at::empty({static_cast(totalSize)}, at::kLong); // tensors copied from cuda may not be contiguous, get a contiguous // tensor before use its data_ptr auto input = tensor.indices().contiguous(); // Allgatherv indices. gloo::AllgathervOptions opts(context); opts.setInput( // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) const_cast(input.const_data_ptr()), input.numel()); opts.setOutput(output.mutable_data_ptr(), counts); opts.setTag(tag); gloo::allgatherv(opts); // Compile indices tensor per rank. std::vector indices; indices.reserve(metadata.size()); int64_t offset = 0; for (const auto& i : metadata) { const auto nnz = i.nnz(); const auto numel = sparseDim * nnz; indices.push_back( output.narrow(0, offset, numel).reshape({sparseDim, nnz})); offset += numel; } return indices; } std::vector allgather_values( const at::Tensor& tensor, const std::vector& metadata) { // There are nnz #dense_dim()-dimensional tensors per rank. const auto valueShape = tensor.sizes().slice(tensor.sparse_dim()); int64_t denseNumel = 1; for (auto dim : valueShape) { denseNumel *= dim; } std::vector counts(context->size); int64_t totalSize = 0; for (const auto i : c10::irange(metadata.size())) { counts[i] = metadata[i].nnz() * denseNumel; totalSize += static_cast(counts[i]); } auto output = at::empty({totalSize}, tensor.scalar_type()); // Allgatherv indices. gloo::AllgathervOptions opts(context); // tensors copied from cuda may not be contiguous, get a contiguous // tensor before use its data_ptr at::Tensor valueTensor = tensor.values().contiguous(); GENERATE_ALL_TYPES(valueTensor.scalar_type(), setInput, opts, valueTensor); GENERATE_ALL_TYPES( valueTensor.scalar_type(), setOutput, opts, output, counts); opts.setTag(tag); gloo::allgatherv(opts); // Compile values tensor per rank. std::vector values; values.reserve(metadata.size()); int64_t offset = 0; for (const auto& i : metadata) { const auto nnz = i.nnz(); const auto numel = denseNumel * nnz; auto tensorShape = std::vector({(int64_t)nnz}); std::copy( valueShape.begin(), valueShape.end(), std::back_inserter(tensorShape)); values.push_back(output.narrow(0, offset, numel).reshape(tensorShape)); offset += numel; } return values; } }; class AsyncAllreduceCUDAWork : public AsyncAllreduceWork { public: AsyncAllreduceCUDAWork( const std::shared_ptr& context, std::vector& inputs, ReduceOp reduceOp, uint32_t tag, uint64_t seq) : AsyncAllreduceWork(context, inputs, std::move(reduceOp), tag, seq) { initializeStreamsEvents(inputs, streams, events); // Kick off copy from CUDA tensors to pinned CPU tensors. tmp.reserve(inputs.size()); c10::OptionalStreamGuard guard; for (const auto i : c10::irange(inputs.size())) { guard.reset_stream(streams[i]); tmp.push_back(pinnedLike(inputs[i]).copy_(inputs[i], true)); } } void run() override { // Synchronize with copy operations. for (const auto i : c10::irange(inputs.size())) { streams[i].synchronize(); } // Run allreduce on host side tensors. allreduce(tmp); c10::OptionalStreamGuard guard; for (const auto i : c10::irange(inputs.size())) { guard.reset_stream(streams[i]); inputs[i].copy_(tmp[i], /* non_blocking */ true); events[i].record(streams[i]); } } void synchronize() override { // Synchronize with the copy back to CUDA tensors. for (const auto i : c10::irange(inputs.size())) { c10::Device device = inputs[i].device(); events[i].block( c10::impl::VirtualGuardImpl(device.type()).getStream(device)); } } std::vector tmp; std::vector streams{}; std::vector events{}; }; class AsyncSparseAllreduceCUDAWork : public AsyncSparseAllreduceWork { public: AsyncSparseAllreduceCUDAWork( const std::shared_ptr& context, std::vector& inputs, uint32_t tag, uint64_t seq) : AsyncSparseAllreduceWork(context, inputs, tag, seq) { initializeStreamsEvents(inputs, streams, events); // Kick off copy from CUDA tensors to CPU tensors. // Note that both coalescing the sparse tensor and copying it to CPU // memory must be performed asynchronously, or we block the caller. tmp.reserve(inputs.size()); c10::OptionalStreamGuard guard; for (const auto i : c10::irange(inputs.size())) { guard.reset_stream(streams[i]); tmp.push_back( inputs[i].coalesce().to(at::DeviceType::CPU, /*non_blocking=*/true)); } } void run() override { // Synchronize with copy operations. for (const auto i : c10::irange(inputs.size())) { streams[i].synchronize(); } // Run allreduce on host side tensors. auto output = allreduce(tmp); // Kick off copy back to the CUDA tensors. c10::OptionalStreamGuard guard; for (const auto i : c10::irange(inputs.size())) { guard.reset_stream(streams[i]); inputs[i].copy_(output, /*non_blocking=*/true); events[i].record(streams[i]); } } void synchronize() override { // Synchronize with the copy back to CUDA tensors. for (const auto i : c10::irange(inputs.size())) { c10::Device device = inputs[i].device(); events[i].block( c10::impl::VirtualGuardImpl(device.type()).getStream(device)); } } std::vector tmp{}; std::vector streams{}; std::vector events{}; }; } // namespace c10::intrusive_ptr ProcessGroupGloo::allreduce( std::vector& inputs, const AllreduceOptions& opts) { static auto invalidArgument = [](const std::string& msg) { TORCH_CHECK(false, "ProcessGroupGloo::allreduce: " + msg); }; assertNonEmpty(invalidArgument, inputs); assertLayoutMatch(invalidArgument, inputs); assertTypeAndSizesMatch(invalidArgument, inputs); const auto& device = inputs[0].device(); switch (device.type()) { case at::kCPU: break; case at::kCUDA: // If the user gave us a CUDA tensor then CUDA must be loaded. TORCH_INTERNAL_ASSERT(at::hasCUDA()); break; default: invalidArgument(c10::str("unsupported device type ", device.type())); } const auto& layout = inputs[0].layout(); if (layout == c10::kSparse && opts.reduceOp != ReduceOp::SUM) { invalidArgument( "unsupported reduction operation " "(allreduce of sparse tensors only works with ReduceOp.SUM)"); } c10::intrusive_ptr work; auto tag = nextTag(); auto context = getContext(tag); ++seq_; if (device.type() == at::kCPU) { if (layout == c10::kStrided) { work = c10::make_intrusive( std::move(context), inputs, opts.reduceOp, tag, seq_); } else if (layout == c10::kSparse) { work = c10::make_intrusive( std::move(context), inputs, tag, seq_); } else { invalidArgument("unsupported layout"); } } else if (device.type() == at::kCUDA) { if (layout == c10::kStrided) { work = c10::make_intrusive( std::move(context), inputs, opts.reduceOp, tag, seq_); } else if (layout == c10::kSparse) { work = c10::make_intrusive( std::move(context), inputs, tag, seq_); } else { invalidArgument("unsupported layout"); } } else { TORCH_CHECK(false, "Invalid backend"); } enqueue(work); return work; } c10::intrusive_ptr ProcessGroupGloo::allreduce_sparse( std::vector& inputs, const AllreduceOptions& opts) { // all reduce sparse calls into default allreduce which // implemented with all_gathering indices and values // we do ths we do not have a native cuda implementation return allreduce(inputs, opts); } c10::intrusive_ptr ProcessGroupGloo::allreduce_coalesced( std::vector& tensors, const AllreduceCoalescedOptions& opts) { static auto invalidArgument = [](const std::string& msg) { TORCH_CHECK(false, "ProcessGroupGloo::allreduce_coalesced: " + msg); }; assertNonEmpty(invalidArgument, tensors); // tensors will be flattened and concatenated (coalesced). This means that // input // tensors must have the same device, layout and type. assertLayoutMatch(invalidArgument, tensors); if (!std::all_of(tensors.begin(), tensors.end(), [&](at::Tensor& t) { return t.options().type_equal(tensors[0].options()); })) { invalidArgument("tensors must all have the same type"); } if (!std::all_of(tensors.begin(), tensors.end(), [&](at::Tensor& t) { return t.device() == tensors[0].device(); })) { invalidArgument("tensors must all be on the same device"); } const c10::Device& device = tensors[0].device(); const c10::Layout& layout = tensors[0].layout(); // invalid arguments are detected early here before any calls to nextTag() // which result in the collectiveCounter_ being incremented. switch (device.type()) { case c10::kCPU: break; default: invalidArgument(c10::str("unsupported device type ", device.type())); } switch (layout) { case c10::kStrided: break; default: invalidArgument("unsupported layout"); } c10::intrusive_ptr work; const uint32_t tag = nextTag(); std::shared_ptr context = getContext(tag); ++seq_; if (device.type() == c10::kCPU) { if (layout == c10::kStrided) { work = c10::make_intrusive( std::move(context), tensors, opts.reduceOp, tag, seq_); } else { invalidArgument("unsupported layout"); } } else { TORCH_CHECK(false, "Invalid backend"); } enqueue(work); return work; } namespace { class AsyncReduceWork : public ProcessGroupGloo::AsyncWork { public: AsyncReduceWork( const std::shared_ptr& context, std::vector& inputs, int rootRank, int rootTensor, ReduceOp reduceOp, uint32_t tag, uint64_t seq) : ProcessGroupGloo::AsyncWork( {inputs}, OpType::REDUCE, seq, "gloo:reduce", inputs), context(context), inputs(inputs), rootRank(rootRank), rootTensor(rootTensor), reduceOp(std::move(reduceOp)), tag(tag) {} std::shared_ptr context; std::vector inputs{}; const int rootRank; const int rootTensor; const ReduceOp reduceOp; const uint32_t tag; void reduce(std::vector& tensors) { const auto& scalarType = tensors[0].scalar_type(); gloo::ReduceOptions opts(context); opts.setRoot(rootRank); opts.setTag(tag); opts.setReduceFunction(getFunction(scalarType, reduceOp)); GENERATE_ALL_TYPES(scalarType, setOutput, opts, tensors[0]); gloo::reduce(opts); } void run() override { reduce(inputs); } protected: template void getFunction(gloo::ReduceOptions::Func& fn, const ReduceOp op) { fn = toFunction(op); } gloo::ReduceOptions::Func getFunction( const at::ScalarType& dtype, const ReduceOp& op) { gloo::ReduceOptions::Func fn; GENERATE_ALL_TYPES(dtype, getFunction, fn, op); return fn; } }; class AsyncReduceCUDAWork : public AsyncReduceWork { public: AsyncReduceCUDAWork( const std::shared_ptr& context, std::vector& inputs, int rootRank, int rootTensor, ReduceOp reduceOp, uint32_t tag, uint64_t seq) : AsyncReduceWork( context, inputs, rootRank, rootTensor, std::move(reduceOp), tag, seq) { initializeStreamsEvents(inputs, streams, events); // Kick off copy from CUDA tensors to pinned CPU tensors. tmp.reserve(inputs.size()); c10::OptionalStreamGuard guard; for (const auto i : c10::irange(inputs.size())) { guard.reset_stream(streams[i]); tmp.push_back(pinnedLike(inputs[i]).copy_(inputs[i], true)); } } void run() override { // Synchronize with copy operations. for (const auto i : c10::irange(inputs.size())) { streams[i].synchronize(); } // Run reduce on host side tensors. reduce(tmp); // Kick off copy back to the CUDA tensors. c10::OptionalStreamGuard guard; for (const auto i : c10::irange(inputs.size())) { guard.reset_stream(streams[i]); inputs[i].copy_(tmp[i], /* non_blocking */ true); events[i].record(streams[i]); } } void synchronize() override { // Synchronize with the copy back to CUDA tensors. for (const auto i : c10::irange(inputs.size())) { c10::Device device = inputs[i].device(); events[i].block( c10::impl::VirtualGuardImpl(device.type()).getStream(device)); } } std::vector tmp{}; std::vector streams{}; std::vector events{}; }; } // namespace c10::intrusive_ptr ProcessGroupGloo::reduce( std::vector& inputs, const ReduceOptions& opts) { static auto invalidArgument = [](const std::string& msg) { TORCH_CHECK(false, "ProcessGroupGloo::reduce: " + msg); }; assertRootRank(invalidArgument, opts.rootRank, size_); assertRootTensor( invalidArgument, opts.rootTensor, static_cast(inputs.size())); assertSingleElement(invalidArgument, inputs); assertDense(invalidArgument, inputs); const auto& device = inputs[0].device(); switch (device.type()) { case at::kCPU: break; case at::kCUDA: // If the user gave us a CUDA tensor then CUDA must be loaded. TORCH_INTERNAL_ASSERT(at::hasCUDA()); break; default: invalidArgument(c10::str("unsupported device type ", device.type())); } c10::intrusive_ptr work; auto tag = nextTag(); auto context = getContext(tag); ++seq_; if (device.type() == at::kCPU) { work = c10::make_intrusive( std::move(context), inputs, opts.rootRank, opts.rootTensor, opts.reduceOp, tag, seq_); } else if (device.type() == at::kCUDA) { work = c10::make_intrusive( std::move(context), inputs, opts.rootRank, opts.rootTensor, opts.reduceOp, tag, seq_); } else { TORCH_CHECK(false, "Invalid backend"); } enqueue(work); return work; } namespace { class AsyncAllgatherWork : public ProcessGroupGloo::AsyncWork { public: AsyncAllgatherWork( const std::shared_ptr& context, std::vector>& outputs, std::vector& inputs, uint32_t tag, uint64_t seq) : ProcessGroupGloo::AsyncWork( outputs, OpType::ALLGATHER, seq, "gloo:all_gather", inputs), context(context), outputs(outputs), inputs(inputs), tag(tag) {} std::shared_ptr context; std::vector> outputs{}; std::vector inputs{}; const uint32_t tag; void allgather( std::vector>& outputs, std::vector& inputs) { const auto& scalarType = inputs[0].scalar_type(); gloo::AllgatherOptions opts(context); opts.setTag(tag); // Use single flattened input tensor. at::Tensor flatInputTensor = flattenDenseTensors(inputs); GENERATE_ALL_TYPES(scalarType, setInput, opts, flatInputTensor); // Use single flat output tensor. // The first dimension corresponds to the index into outputs[N], // so copying into the actual output later is easy. at::Tensor flatOutputTensor = newLikeFlat(outputs[0]); GENERATE_ALL_TYPES(scalarType, setOutput, opts, flatOutputTensor); gloo::allgather(opts); // Unflatten into output tensors. for (auto& outputgroup : outputs) { for (const auto j : c10::irange(outputgroup.size())) { outputgroup[j].copy_(flatOutputTensor[static_cast(j)]); } } } void run() override { allgather(outputs, inputs); } }; // Note: current CUDA implementation holds the assumption that the // tensors in the nested output tensor vectors are on the same device. class AsyncAllgatherCUDAWork : public AsyncAllgatherWork { public: AsyncAllgatherCUDAWork( const std::shared_ptr& context, std::vector>& outputs, std::vector& inputs, uint32_t tag, uint64_t seq) : AsyncAllgatherWork(context, outputs, inputs, tag, seq) { initializeStreamsEvents(inputs, inputStreams, inputEvents); initializeStreamsEvents(outputs, outputStreams, outputEvents); // Kick off copy from CUDA tensors to pinned CPU tensors. tmpInputs.reserve(inputs.size()); c10::OptionalStreamGuard guard; for (const auto i : c10::irange(inputs.size())) { guard.reset_stream(inputStreams[i]); tmpInputs.push_back(pinnedLike(inputs[i]).copy_(inputs[i], true)); } tmpOutputs.resize(outputs.size()); for (const auto i : c10::irange(outputs.size())) { tmpOutputs[i].reserve(outputs[i].size()); for (const auto j : c10::irange(outputs[i].size())) { tmpOutputs[i].push_back(pinnedLike(outputs[i][j])); } } } void run() override { // Synchronize with copy operations. for (const auto i : c10::irange(inputs.size())) { inputStreams[i].synchronize(); } for (const auto i : c10::irange(outputs.size())) { outputStreams[i].synchronize(); } // Run allgather on host side tensors. allgather(tmpOutputs, tmpInputs); // Kick off copy back to the CUDA tensors. c10::OptionalStreamGuard guard; for (const auto i : c10::irange(outputs.size())) { guard.reset_stream(outputStreams[i]); for (const auto j : c10::irange(outputs[i].size())) { outputs[i][j].copy_(tmpOutputs[i][j], /* non_blocking */ true); } outputEvents[i].record(outputStreams[i]); } } void synchronize() override { // Synchronize with the copy back to CUDA tensors. for (const auto i : c10::irange(outputs.size())) { c10::Device device = outputs[i][0].device(); outputEvents[i].block( c10::impl::VirtualGuardImpl(device.type()).getStream(device)); } } std::vector tmpInputs{}; std::vector inputStreams{}; std::vector inputEvents{}; std::vector> tmpOutputs{}; std::vector outputStreams{}; std::vector outputEvents{}; }; // A work that takes an lambda on construction and calls it on wait. // It is useful for add a continuation to another work, and/or // composing multiple works together. class LambdaWork : public Work { public: LambdaWork(std::function fn) : fn_(std::move(fn)) {} bool wait(std::chrono::milliseconds /* unused */) override { fn_(); return true; } private: std::function fn_; }; } // namespace c10::intrusive_ptr ProcessGroupGloo::_reduce_scatter_base( at::Tensor& outputTensor, at::Tensor& inputTensor, const ReduceScatterOptions& opts) { std::vector outputTensors = {outputTensor}; std::vector inputTensors = {inputTensor}; return reduce_scatter_tensor_coalesced(outputTensors, inputTensors, opts); } c10::intrusive_ptr ProcessGroupGloo::reduce_scatter_tensor_coalesced( std::vector& outputTensors, std::vector& inputTensors, const ReduceScatterOptions& opts) { if (outputTensors.size() != inputTensors.size()) { TORCH_CHECK( false, "requires input/output tensor lists to have the same length"); } const auto rank = getRank(); const auto worldSize = getSize(); std::vector buffers; for (const auto i : c10::irange(inputTensors.size())) { auto inputShape = inputTensors[i].sizes().vec(); auto outputShape = outputTensors[i].sizes().vec(); TORCH_CHECK_EQ(outputTensors[i].dtype(), inputTensors[i].dtype()); TORCH_CHECK_EQ(outputShape[0] * worldSize, inputShape[0]); for (size_t i = 1; i < outputShape.size(); ++i) { TORCH_CHECK_EQ(outputShape[i], inputShape[i]); } buffers.push_back(inputTensors[i].clone()); } std::vector> works; for (const auto i : c10::irange(buffers.size())) { std::vector inp = {buffers[i]}; AllreduceOptions arOpts; arOpts.reduceOp = opts.reduceOp; works.push_back(allreduce(inp)); } return c10::make_intrusive( [rank, worldSize, buffers, outputTensors, works = std::move(works)]() { for (const auto i : c10::irange(outputTensors.size())) { works[i]->wait(); outputTensors[i].copy_(buffers[i].chunk(worldSize)[rank]); } }); } c10::intrusive_ptr ProcessGroupGloo::_allgather_base( at::Tensor& output_tensor, at::Tensor& input_tensor, const AllgatherOptions& opts) { auto tensor_list = at::chunk(output_tensor, this->getSize(), 0); std::vector> outputs = {tensor_list}; std::vector inputs = {input_tensor}; return this->allgather(outputs, inputs, opts); } // Note: current CUDA implementation holds the assumption that the // tensors in the nested output tensor vectors are on the same device. c10::intrusive_ptr ProcessGroupGloo::allgather( std::vector>& outputs, std::vector& inputs, const AllgatherOptions& opts) { static auto invalidArgument = [](const std::string& msg) { TORCH_CHECK(false, "ProcessGroupGloo::allgather: " + msg); }; if (inputs.empty()) { invalidArgument("requires non-empty input tensor list"); } if (inputs.size() != outputs.size()) { invalidArgument( "requires input/output tensor lists to have the same length"); } for (const auto i : c10::irange(outputs.size())) { const auto expected = inputs.size() * getSize(); const auto actual = outputs[i].size(); if (actual != expected) { invalidArgument( "invalid output tensor list at index " + std::to_string(i) + " (expected length " + std::to_string(expected) + ", got " + std::to_string(actual) + ")"); } } assertDense(invalidArgument, inputs); // Expect all input/output tensors to have the same type and sizes const auto& options = inputs[0].options(); const auto& sizes = inputs[0].sizes(); assertTypeAndSizesMatch(invalidArgument, inputs, options, sizes); for (const auto& output : outputs) { assertTypeAndSizesMatch(invalidArgument, output, options, sizes); } const auto& device = inputs[0].device(); switch (device.type()) { case at::kCPU: break; case at::kCUDA: // If the user gave us a CUDA tensor then CUDA must be loaded. TORCH_INTERNAL_ASSERT(at::hasCUDA()); break; default: invalidArgument(c10::str("unsupported device type ", device.type())); } c10::intrusive_ptr work; auto tag = nextTag(); auto context = getContext(tag); ++seq_; if (device.type() == at::kCPU) { work = c10::make_intrusive( std::move(context), outputs, inputs, tag, seq_); } else if (device.type() == at::kCUDA) { work = c10::make_intrusive( std::move(context), outputs, inputs, tag, seq_); } else { TORCH_CHECK(false, "Invalid backend"); } enqueue(work); return work; } namespace { class AsyncAllgatherCoalescedWork : public ProcessGroupGloo::AsyncWork { public: AsyncAllgatherCoalescedWork( const std::shared_ptr& context, std::vector>& output_lists, std::vector& input_list, uint32_t tag, uint64_t seq) : ProcessGroupGloo::AsyncWork( output_lists, OpType::ALLGATHER_COALESCED, seq, "gloo:all_gather", input_list), context(context), output_lists(output_lists), input_list(input_list), tag(tag) {} std::shared_ptr context; std::vector> output_lists{}; std::vector input_list{}; const uint32_t tag; void allgather_coalesced() { assert(!output_lists.empty()); assert(!output_lists[0].empty()); assert(!input_list.empty()); const auto& scalarType = input_list[0].scalar_type(); gloo::AllgatherOptions opts(context); opts.setTag(tag); // Use single flattened input tensor. at::Tensor flatInputTensor = flattenDenseTensors(input_list); GENERATE_ALL_TYPES(scalarType, setInput, opts, flatInputTensor); // Compute total number of elements we need to allocate for all tensors // requested. int64_t output_numel = 0; for (const auto& t : output_lists[0]) { output_numel += t.numel(); } output_numel *= static_cast(output_lists.size()); // Use single flat output tensor. at::Tensor flatOutputTensor = at::empty({output_numel}, output_lists[0][0].options()); GENERATE_ALL_TYPES(scalarType, setOutput, opts, flatOutputTensor); gloo::allgather(opts); int64_t current_element = 0; for (auto& output_list : output_lists) { for (auto& output_tensor : output_list) { output_tensor.copy_( flatOutputTensor.narrow(0, current_element, output_tensor.numel()) .reshape(output_tensor.sizes()), true); current_element += output_tensor.numel(); } } } void run() override { allgather_coalesced(); } }; } // namespace c10::intrusive_ptr ProcessGroupGloo::allgather_coalesced( std::vector>& output_lists, std::vector& input_list, const AllgatherOptions& /* unused */) { static auto invalidArgument = [](const std::string& msg) { TORCH_CHECK(false, "ProcessGroupGloo::allgather_coalesced: " + msg); }; if (input_list.empty()) { invalidArgument("requires non-empty input tensor list"); } if (output_lists.size() != static_cast(getSize())) { invalidArgument("output lists should be equal to world size"); } assertSameDevice(invalidArgument, input_list); // Expect i'th tensor of each list from 'output_lists' match i'th tensor // from 'input_list' in type and size. for (const auto& output_list : output_lists) { if (output_list.size() != input_list.size()) { invalidArgument( "invalid output size: (expected length " + std::to_string(input_list.size()) + ", got " + std::to_string(output_list.size()) + ")"); } for (const auto i : c10::irange(output_list.size())) { const auto expected = input_list[i].sizes(); const auto actual = output_list[i].sizes(); if (actual != expected) { invalidArgument( "invalid size of output tensor at index " + std::to_string(i) + " (expected length " + toString(expected) + ", got " + toString(actual) + ")"); } if (!input_list[i].options().type_equal(output_list[i].options())) { invalidArgument( "invalid tensor type at index " + std::to_string(i) + " (expected " + input_list[i].toString() + ", got " + output_list[i].toString() + ")"); } } } assertDense(invalidArgument, input_list); auto tag = nextTag(); auto context = getContext(tag); ++seq_; auto work = c10::make_intrusive( std::move(context), output_lists, input_list, tag, seq_); enqueue(work); return work; } c10::intrusive_ptr ProcessGroupGloo::allgather_into_tensor_coalesced( std::vector& outputs, std::vector& inputs, const AllgatherOptions& opts) { TORCH_CHECK_EQ(outputs.size(), inputs.size()); std::vector> output_lists(getSize()); for (auto& output : outputs) { auto chunks = output.chunk(getSize()); for (const auto i : c10::irange(output_lists.size())) { output_lists[i].push_back(std::move(chunks[i])); } } return allgather_coalesced(output_lists, inputs, opts); } namespace { class AsyncGatherWork : public ProcessGroupGloo::AsyncWork { public: AsyncGatherWork( const std::shared_ptr& context, std::vector>& outputs, std::vector& inputs, int root, uint32_t tag, uint64_t seq) : ProcessGroupGloo::AsyncWork( outputs, OpType::GATHER, seq, "gloo:gather", inputs), context(context), outputs(outputs), inputs(inputs), root(root), tag(tag) {} std::shared_ptr context; std::vector> outputs{}; std::vector inputs{}; const int root; const uint32_t tag; void gather( std::vector>& outputs, std::vector& inputs) { const auto scalarType = inputs[0].scalar_type(); gloo::GatherOptions opts(context); opts.setRoot(root); opts.setTag(tag); // Set single temporary tensor on root process. // This is later scattered to the separate output tensors. at::Tensor flatOutputTensor; if (context->rank == root) { flatOutputTensor = newLikeFlat(outputs[0]); GENERATE_ALL_TYPES(scalarType, setOutput, opts, flatOutputTensor); } // Set single input tensor on all processes. GENERATE_ALL_TYPES(scalarType, setInput, opts, inputs[0]); gloo::gather(opts); // Unflatten into output tensors on root process. if (context->rank == root) { for (const auto i : c10::irange(outputs[0].size())) { outputs[0][i].copy_(flatOutputTensor[static_cast(i)]); } } } void run() override { gather(outputs, inputs); } }; // Note: current CUDA implementation holds the assumptions: // - inputs.size() is 1 // - outputs.size() is 1 // - the size of the nested output tensors is world size, i.e., // outputs[0].size, is world size class AsyncGatherCUDAWork : public AsyncGatherWork { public: AsyncGatherCUDAWork( const std::shared_ptr& context, std::vector>& outputs, std::vector& inputs, int root, uint32_t tag, uint64_t seq) : AsyncGatherWork(context, outputs, inputs, root, tag, seq) { initializeStreamsEvents(inputs, inputStreams, inputEvents); initializeStreamsEvents(outputs, outputStreams, outputEvents); // Kick off copy from CUDA tensors to pinned CPU tensors. tmpInputs.reserve(inputs.size()); c10::OptionalStreamGuard guard; for (const auto i : c10::irange(inputs.size())) { guard.reset_stream(inputStreams[i]); tmpInputs.push_back(pinnedLike(inputs[i]).copy_(inputs[i], true)); } tmpOutputs.resize(outputs.size()); for (const auto i : c10::irange(outputs.size())) { tmpOutputs[i].reserve(outputs[i].size()); for (const auto j : c10::irange(outputs[i].size())) { tmpOutputs[i].push_back(pinnedLike(outputs[i][j])); } } } void run() override { // Synchronize with copy operations. for (const auto i : c10::irange(inputs.size())) { inputStreams[i].synchronize(); } for (const auto i : c10::irange(outputs.size())) { outputStreams[i].synchronize(); } // Run gather on host side tensors. gather(tmpOutputs, tmpInputs); // Kick off copy back to the CUDA tensors. c10::OptionalStreamGuard guard; for (const auto i : c10::irange(outputs.size())) { guard.reset_stream(outputStreams[i]); for (const auto j : c10::irange(outputs[i].size())) { outputs[i][j].copy_(tmpOutputs[i][j], /* non_blocking */ true); } outputEvents[i].record(outputStreams[i]); } } void synchronize() override { // Synchronize with the copy back to CUDA tensors. for (const auto i : c10::irange(outputs.size())) { c10::Device device = outputs[i][0].device(); outputEvents[i].block( c10::impl::VirtualGuardImpl(device.type()).getStream(device)); } } std::vector tmpInputs{}; std::vector inputStreams{}; std::vector inputEvents{}; std::vector> tmpOutputs{}; std::vector outputStreams{}; std::vector outputEvents{}; }; } // namespace c10::intrusive_ptr ProcessGroupGloo::gather( std::vector>& outputs, std::vector& inputs, const GatherOptions& opts) { static auto invalidArgument = [](const std::string& msg) { TORCH_CHECK(false, "ProcessGroupGloo::gather: " + msg); }; assertRootRank(invalidArgument, opts.rootRank, size_); assertSingleElementInput(invalidArgument, inputs); assertDense(invalidArgument, inputs); if (getRank() == opts.rootRank) { if (outputs.size() != 1) { std::stringstream ss; ss << "requires a single-element output list containing a list with " << getSize() << " tensors."; invalidArgument(ss.str()); } else if (outputs[0].size() != static_cast(getSize())) { std::stringstream ss; ss << "Incorrect output list size " << outputs[0].size() << ". Output list size should be " << getSize() << ", same as size of the process group."; invalidArgument(ss.str()); } const auto& options = inputs[0].options(); const auto& sizes = inputs[0].sizes(); assertTypeAndSizesMatch(invalidArgument, outputs[0], options, sizes); } else { if (!outputs.empty()) { invalidArgument("requires empty output on non-root"); } } const auto& device = inputs[0].device(); switch (device.type()) { case at::kCPU: break; case at::kCUDA: // If the user gave us a CUDA tensor then CUDA must be loaded. TORCH_INTERNAL_ASSERT(at::hasCUDA()); break; default: invalidArgument(c10::str("unsupported device type ", device.type())); } c10::intrusive_ptr work; auto tag = nextTag(); auto context = getContext(tag); ++seq_; if (device.type() == at::kCPU) { work = c10::make_intrusive( std::move(context), outputs, inputs, opts.rootRank, tag, seq_); } else if (device.type() == at::kCUDA) { work = c10::make_intrusive( std::move(context), outputs, inputs, opts.rootRank, tag, seq_); } else { TORCH_CHECK(false, "Invalid backend"); } enqueue(work); return work; } namespace { class AsyncScatterWork : public ProcessGroupGloo::AsyncWork { public: AsyncScatterWork( const std::shared_ptr& context, std::vector& outputs, std::vector>& inputs, int root, uint32_t tag, uint64_t seq) : ProcessGroupGloo::AsyncWork( {outputs}, OpType::SCATTER, seq, "gloo:scatter", !inputs.empty() ? std::optional>(inputs[0]) : std::nullopt), context(context), outputs(outputs), inputs(inputs), root(root), tag(tag) {} std::shared_ptr context; std::vector outputs{}; std::vector> inputs{}; const int root; const uint32_t tag; void scatter( std::vector& outputs, std::vector>& inputs) { const auto scalarType = outputs[0].scalar_type(); gloo::ScatterOptions opts(context); opts.setRoot(root); opts.setTag(tag); // Set list of input tensors on root process if (context->rank == root) { GENERATE_ALL_TYPES(scalarType, setInputs, opts, inputs[0]); } // Set single output tensor on all processes GENERATE_ALL_TYPES(scalarType, setOutput, opts, outputs[0]); gloo::scatter(opts); } void run() override { scatter(outputs, inputs); } }; class AsyncScatterCUDAWork : public AsyncScatterWork { public: AsyncScatterCUDAWork( const std::shared_ptr& context, std::vector& outputs, std::vector>& inputs, int root, uint32_t tag, uint64_t seq) : AsyncScatterWork(context, outputs, inputs, root, tag, seq) { initializeStreamsEvents(inputs, inputStreams, inputEvents); initializeStreamsEvents(outputs, outputStreams, outputEvents); // Kick off copy from CUDA tensors to pinned CPU tensors. tmpInputs.resize(inputs.size()); c10::OptionalStreamGuard guard; for (const auto i : c10::irange(inputs.size())) { guard.reset_stream(inputStreams[i]); tmpInputs[i].reserve(inputs[i].size()); for (const auto j : c10::irange(inputs[i].size())) { tmpInputs[i].push_back( pinnedLike(inputs[i][j]).copy_(inputs[i][j], true)); } } tmpOutputs.reserve(outputs.size()); for (auto& output : outputs) { tmpOutputs.push_back(pinnedLike(output)); } } void run() override { // Synchronize with copy operations. for (const auto i : c10::irange(inputs.size())) { inputStreams[i].synchronize(); } for (const auto i : c10::irange(outputs.size())) { outputStreams[i].synchronize(); } // Run scatter on host side tensors. scatter(tmpOutputs, tmpInputs); // Kick off copy back to the CUDA tensors. c10::OptionalStreamGuard guard; for (const auto i : c10::irange(outputs.size())) { guard.reset_stream(outputStreams[i]); outputs[i].copy_(tmpOutputs[i], /* non_blocking */ true); outputEvents[i].record(outputStreams[i]); } } void synchronize() override { // Synchronize with the copy back to CUDA tensors. for (const auto i : c10::irange(outputs.size())) { c10::Device device = outputs[i].device(); outputEvents[i].block( c10::impl::VirtualGuardImpl(device.type()).getStream(device)); } } std::vector tmpOutputs{}; std::vector outputStreams{}; std::vector outputEvents{}; std::vector> tmpInputs{}; std::vector inputStreams{}; std::vector inputEvents{}; }; } // namespace c10::intrusive_ptr ProcessGroupGloo::scatter( std::vector& outputs, std::vector>& inputs, const ScatterOptions& opts) { static auto invalidArgument = [](const std::string& msg) { TORCH_CHECK(false, "ProcessGroupGloo::scatter: " + msg); }; assertRootRank(invalidArgument, opts.rootRank, size_); assertSingleElementOutput(invalidArgument, outputs); assertDense(invalidArgument, outputs); if (getRank() == opts.rootRank) { if (inputs.size() != 1) { std::stringstream ss; ss << "requires a single-element input list containing a list with " << getSize() << " tensors"; invalidArgument(ss.str()); } else if (inputs[0].size() != static_cast(getSize())) { std::stringstream ss; ss << "Incorrect input list size " << inputs[0].size() << ". Input list size should be " << getSize() << ", same as size of the process group."; invalidArgument(ss.str()); } const auto& options = outputs[0].options(); const auto& sizes = outputs[0].sizes(); assertTypeAndSizesMatch(invalidArgument, inputs[0], options, sizes); } else { if (!inputs.empty()) { invalidArgument("requires empty input on non-root"); } } const auto& device = outputs[0].device(); switch (device.type()) { case at::kCPU: break; case at::kCUDA: // If the user gave us a CUDA tensor then CUDA must be loaded. TORCH_INTERNAL_ASSERT(at::hasCUDA()); break; default: invalidArgument(c10::str("unsupported device type ", device.type())); } c10::intrusive_ptr work; auto tag = nextTag(); auto context = getContext(tag); ++seq_; if (device.type() == at::kCPU) { work = c10::make_intrusive( std::move(context), outputs, inputs, opts.rootRank, tag, seq_); } else if (device.type() == at::kCUDA) { work = c10::make_intrusive( std::move(context), outputs, inputs, opts.rootRank, tag, seq_); } else { TORCH_CHECK(false, "Invalid backend"); } enqueue(work); return work; } c10::intrusive_ptr ProcessGroupGloo::reduce_scatter( std::vector& outputs, std::vector>& inputs, const ReduceScatterOptions& opts) { TORCH_CHECK(false, "ProcessGroupGloo does not support reduce_scatter"); } namespace { class AsyncAlltoallWork : public ProcessGroupGloo::AsyncWork { public: AsyncAlltoallWork( const std::shared_ptr& context, at::Tensor& outputTensor, at::Tensor& inputTensor, std::vector& outputCounts, std::vector& inputCounts, uint32_t tag, uint64_t seq) : ProcessGroupGloo::AsyncWork( {{outputTensor}}, OpType::ALLTOALL, seq, "gloo:all_to_all", std::optional>({inputTensor})), context(context), outputTensor(outputTensor), inputTensor(inputTensor), outputCounts(std::move(outputCounts)), inputCounts(std::move(inputCounts)), tag(tag) {} std::shared_ptr context; at::Tensor outputTensor; at::Tensor inputTensor; std::vector outputCounts{}; std::vector inputCounts{}; const uint32_t tag; void alltoall(at::Tensor& outputTensor, at::Tensor& inputTensor) { const auto scalarType = outputTensor.scalar_type(); if (outputCounts.empty() && inputCounts.empty()) { // Gloo alltoall gloo::AlltoallOptions opts(context); opts.setTag(tag); GENERATE_ALL_TYPES(scalarType, setInput, opts, inputTensor); GENERATE_ALL_TYPES(scalarType, setOutput, opts, outputTensor); gloo::alltoall(opts); } else { // Gloo alltoallv c10d::checkSplitSizes(inputCounts, inputTensor, context->size); c10d::checkSplitSizes(outputCounts, outputTensor, context->size); std::vector sendCounts(context->size); std::vector recvCounts(context->size); std::vector sendOffsets(context->size); std::vector recvOffsets(context->size); c10d::computeLengthsAndOffsets( inputCounts, inputTensor, &sendCounts, &sendOffsets); c10d::computeLengthsAndOffsets( outputCounts, outputTensor, &recvCounts, &recvOffsets); gloo::AlltoallvOptions opts(context); opts.setTag(tag); GENERATE_ALL_TYPES(scalarType, setInput, opts, inputTensor, sendCounts); GENERATE_ALL_TYPES(scalarType, setOutput, opts, outputTensor, recvCounts); gloo::alltoallv(opts); } } void run() override { alltoall(outputTensor, inputTensor); } }; class AsyncAlltoallCUDAWork : public AsyncAlltoallWork { public: AsyncAlltoallCUDAWork( const std::shared_ptr& context, at::Tensor& outputTensor, at::Tensor& inputTensor, std::vector& outputCounts, std::vector& inputCounts, uint32_t tag, uint64_t seq) : AsyncAlltoallWork( context, outputTensor, inputTensor, outputCounts, inputCounts, tag, seq) { initializeStreamsEvents({inputTensor}, inputStreams, inputEvents); initializeStreamsEvents({outputTensor}, outputStreams, outputEvents); // Kick off copy from CUDA tensors to pinned CPU tensors. c10::OptionalStreamGuard guard; guard.reset_stream(inputStreams.front()); cpuInput = pinnedLike(inputTensor).copy_(inputTensor, true); guard.reset_stream(outputStreams.front()); cpuOutput = pinnedLike(outputTensor); } void run() override { // Synchronize with copy operations. inputStreams.front().synchronize(); outputStreams.front().synchronize(); // Run alltoall on host side tensors. alltoall(cpuOutput, cpuInput); // Kick off copy back to the CUDA tensors. c10::OptionalStreamGuard guard; guard.reset_stream(outputStreams.front()); outputTensor.copy_(cpuOutput, /* non_blocking */ true); outputEvents.front().record(outputStreams.front()); } void synchronize() override { // Synchronize with the copy back to CUDA tensors. c10::Device device = outputTensor.device(); outputEvents.front().block( c10::impl::VirtualGuardImpl(device.type()).getStream(device)); } at::Tensor cpuOutput; std::vector outputStreams{}; std::vector outputEvents{}; at::Tensor cpuInput; std::vector inputStreams{}; std::vector inputEvents{}; }; } // namespace c10::intrusive_ptr ProcessGroupGloo::alltoall_base( at::Tensor& outputTensor, at::Tensor& inputTensor, std::vector& outputCounts, std::vector& inputCounts, const AllToAllOptions& /* unused */) { static auto invalidArgument = [](const std::string& msg) { TORCH_CHECK(false, "ProcessGroupGloo::alltoall_base: " + msg); }; TORCH_CHECK( outputTensor.device() == inputTensor.device(), "output tensor and input tensor must be on the same type of device"); assertDense(invalidArgument, {outputTensor}); assertDense(invalidArgument, {inputTensor}); const auto& device = outputTensor.device(); c10::intrusive_ptr work; auto tag = nextTag(); auto context = getContext(tag); ++seq_; if (device.type() == at::kCPU) { work = c10::make_intrusive( std::move(context), outputTensor, inputTensor, outputCounts, inputCounts, tag, seq_); } else if (device.type() == at::kCUDA) { work = c10::make_intrusive( std::move(context), outputTensor, inputTensor, outputCounts, inputCounts, tag, seq_); } else { invalidArgument(c10::str("unsupported device type ", device.type())); } enqueue(work); return work; } static at::Tensor& checkSingleTensor(std::vector& tensors) { if (tensors.size() != 1) { TORCH_CHECK(false, "ProcessGroupGloo::send takes a single tensor"); } auto& tensor = tensors[0]; if (!tensor.is_contiguous()) { TORCH_CHECK(false, "input tensor has to be contiguous"); } if (tensor.is_sparse()) { TORCH_CHECK(false, "input tensor has to be dense"); } return tensor; } static uint32_t checkTag(int32_t tag) { TORCH_CHECK(tag >= 0, "Tag must be nonnegative"); return (uint32_t)tag; } c10::intrusive_ptr ProcessGroupGloo::send( std::vector& tensors, int dstRank, int tag) { auto& tensor = checkSingleTensor(tensors); auto utag = checkTag(tag); auto ptr = tensor.const_data_ptr(); auto size = tensor.numel() * tensor.element_size(); // Construct unbound buffer. auto context = getContext(tag); // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) auto buf = context->createUnboundBuffer(const_cast(ptr), size); buf->send(dstRank, utag); ++seq_; // The work captures the tensor to prevent it being deallocated and // the unbound buffer to synchronize on completion of the send. return c10::make_intrusive(tensor, std::move(buf), seq_); } c10::intrusive_ptr ProcessGroupGloo::recv( std::vector& tensors, int srcRank, int tag) { auto& tensor = checkSingleTensor(tensors); auto utag = checkTag(tag); auto ptr = tensor.mutable_data_ptr(); auto size = tensor.numel() * tensor.element_size(); // Construct unbound buffer. auto context = getContext(tag); auto buf = context->createUnboundBuffer(ptr, size); buf->recv(srcRank, utag); ++seq_; // The work captures the tensor to prevent it being deallocated and // the unbound buffer to synchronize on completion of the recv. return c10::make_intrusive( tensor, std::move(buf), OpType::RECV, seq_, "gloo:recv"); } c10::intrusive_ptr ProcessGroupGloo::recvAnysource( std::vector& tensors, int tag) { auto& tensor = checkSingleTensor(tensors); auto utag = checkTag(tag); auto ptr = tensor.mutable_data_ptr(); auto size = tensor.numel() * tensor.element_size(); // Construct unbound buffer. auto context = getContext(tag); auto buf = context->createUnboundBuffer(ptr, size); // Build list of ranks that this operation can recv from. In these // bindings we don't differentiate between ranks and can receive // from any other process in the group. std::vector srcRanks; srcRanks.resize(size_); for (const auto i : c10::irange(size_)) { srcRanks.push_back(i); } buf->recv(srcRanks, utag); ++seq_; // The work captures the tensor to prevent it being deallocated and // the unbound buffer to synchronize on completion of the recv. return c10::make_intrusive( tensor, std::move(buf), OpType::RECVANYSOURCE, seq_, "gloo:recvAnySource"); } namespace { class AsyncBarrierWork : public ProcessGroupGloo::AsyncWork { public: AsyncBarrierWork( const std::shared_ptr& context, std::vector> priorWork, uint32_t tag, uint64_t seq) : ProcessGroupGloo::AsyncWork( {}, OpType::BARRIER, seq, "gloo:barrier", std::nullopt), context(context), priorWork(std::move(priorWork)), tag(tag) {} std::shared_ptr context; std::vector> priorWork{}; const uint32_t tag; void run() override { // Wait on prior work to complete for (auto& weakWork : priorWork) { auto work = weakWork.lock(); if (work) { work->wait(); } } gloo::BarrierOptions opts(context); opts.setTag(tag); gloo::barrier(opts); } }; } // namespace c10::intrusive_ptr ProcessGroupGloo::barrier(const BarrierOptions& opts) { std::vector> priorWork; // Snapshot all in progress and pending work as weak_ptr. // When executing a barrier, we need to ensure that all prior work // has completed before completing itself. { std::unique_lock lock(workMutex_); priorWork.insert( priorWork.end(), workInProgress_.begin(), workInProgress_.end()); priorWork.insert(priorWork.end(), workQueue_.begin(), workQueue_.end()); } auto tag = nextTag(); auto context = getContext(tag); ++seq_; auto work = c10::make_intrusive( std::move(context), std::move(priorWork), tag, seq_); enqueue(work); return work; } void ProcessGroupGloo::monitoredBarrier( const BarrierOptions& opts, bool waitAllRanks) { C10_LOG_API_USAGE_ONCE("torch.distributed.monitored_barrier"); // Use default timeout if no timeout was specified. auto monitoredBarrierTimeout = (opts.timeout == kUnsetTimeout) ? this->options_->timeout : opts.timeout; auto rank = this->getRank(); auto t1 = nextTag(); auto t2 = nextTag(); std::vector commTensor = {at::tensor({rank})}; // only enforce timeout on rank 0. This is so that other ranks aren't timed // out first, bringing down the job without reporting which rank timed out. if (rank != 0) { auto sendWork = send(commTensor, 0, static_cast(t1)); auto recvWork = recv(commTensor, 0, static_cast(t2)); try { sendWork->wait(); recvWork->wait(); } catch (const std::exception& e) { const std::string error = c10::str( "Rank ", rank, " successfully reached monitoredBarrier, but received errors while waiting", " for send/recv from rank 0. Please check rank 0 logs for faulty rank."); logAndThrow( error, c10::str(error, "\n Original exception: \n", e.what())); } return; } auto startTime = std::chrono::steady_clock::now(); auto worldSize = this->getSize(); // Mappings of rank to recvWork/sendWork respectively. std::map> recvWorkMap; std::map> sendWorkMap; // Kick off recvWork and wait to unblock sendWork->wait() from non-zero ranks. // Failed/hanging ranks will not ack this call, letting rank 0 know about the // failure. for (const auto dstRank : c10::irange(1, worldSize)) { recvWorkMap.emplace( dstRank, recv(commTensor, dstRank, static_cast(t1))); } auto waitLoop = [&](const std::map>& works) { std::vector processedRanks; for (auto& work : works) { bool rankResponded = false; try { // Note: if waitAllRanks=false, we recompute the time remaining in // barrier and use this recomputed time in wait(). However, if // waitAllRanks=true, we use the original timeout, since if we use // up the entire timeout waiting for response from rank n, then we // won't have any timeout left to query ranks beginning with n + 1. auto remainingTime = getRemainingTime(startTime, monitoredBarrierTimeout, waitAllRanks); if (!waitAllRanks) { checkRemainingTime( monitoredBarrierTimeout, remainingTime, processedRanks, rank); } work.second->wait(remainingTime); rankResponded = true; } catch (const std::exception& e) { const std::string error = c10::str( "[Rank 0]: Rank ", work.first, " failed to pass monitoredBarrier in ", monitoredBarrierTimeout.count(), " ms"); if (waitAllRanks) { LOG(ERROR) << error; } else { logAndThrow( error, c10::str(error, "\n Original exception: \n", e.what())); } } if (rankResponded) { processedRanks.push_back(work.first); } } // If we are collecting all failed ranks, check if we need to throw if // some ranks have not responded. // Ensure all ranks from 1, ... WORLD_SIZE -1 have been successfully // processed. auto rankFailure = (processedRanks.size() != static_cast(size_ - 1)); if (waitAllRanks && rankFailure) { std::vector failedRanks; for (const auto i : c10::irange(1, size_)) { if (std::find(processedRanks.begin(), processedRanks.end(), i) == processedRanks.end()) { failedRanks.push_back(i); } } TORCH_INTERNAL_ASSERT(!failedRanks.empty()); const std::string ranksStr = c10::Join(", ", failedRanks); const std::string error = c10::str( "[Rank 0]: Ranks ", ranksStr, " failed to pass monitoredBarrier in ", monitoredBarrierTimeout.count(), " ms"); logAndThrow(error, error); } }; waitLoop(recvWorkMap); // If we've reached here successfully, this means all ranks have acked in // monitoredBarrier. Unblock all ranks now by responding to their recv(). This // ensures that this is a true barrier in that all ranks exit it successfully // or none of them do. for (const auto dstRank : c10::irange(1, worldSize)) { sendWorkMap.emplace( dstRank, send(commTensor, dstRank, static_cast(t2))); } waitLoop(sendWorkMap); } void ProcessGroupGloo::setSequenceNumberForGroup() { } // Gloo just starts sequence numbers at 0. uint64_t ProcessGroupGloo::getSequenceNumberForGroup() { return seq_; } void ProcessGroupGloo::enableCollectivesTiming() { // Nothing to do to enable timing } } // namespace c10d #endif // USE_C10D_GLOO