1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 #ifndef TENSORFLOW_CORE_NCCL_NCCL_MANAGER_H_ 16 #define TENSORFLOW_CORE_NCCL_NCCL_MANAGER_H_ 17 18 #ifdef GOOGLE_CUDA 19 20 #include <unordered_map> 21 #include <vector> 22 23 // TODO(rmlarsen): Get rid of this workaround. "gpu_assert" is defined when 24 // setting EIGEN_USE_THREADS. But when defining EIGEN_USE_THREADS here, 25 // incAtomic and other CUDA specific symbols are no longer recognized. 26 #ifndef gpu_assert 27 #define gpu_assert(x) 28 #endif 29 30 #include "third_party/nccl/nccl.h" 31 #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" 32 #include "tensorflow/core/framework/tensor.h" 33 #include "tensorflow/core/platform/mutex.h" 34 #include "tensorflow/core/platform/stream_executor.h" 35 36 namespace tensorflow { 37 38 // NCCL manager is used to make the asynchronous communicator calls and to 39 // manage the per-device streams used for communication. 40 // 41 // See nccl_ops.cc for example usage, including description of memory 42 // management and stream synchronization. 43 class NcclManager { 44 public: 45 typedef std::function<void(Status)> DoneCallback; 46 NcclManager(); 47 ~NcclManager(); 48 49 static NcclManager* instance(); 50 51 // Calls `ncclGetUniqueId` and returns the id as a string. The returned value 52 // may be shared with other participants on different nodes and passed in to 53 // multi-node collective invocations. 54 string GenerateCommunicatorKey(); 55 56 // A participant in a Collective. 57 struct Participant { ParticipantParticipant58 Participant(se::StreamExecutor* executor, se::Stream* tensor_stream, 59 EventMgr* event_mgr, int gpu_device_id, const Tensor* input, 60 Tensor* output, int global_rank, DoneCallback done_callback) 61 : executor(executor), 62 tensor_stream(tensor_stream), 63 event_mgr(event_mgr), 64 gpu_device_id(gpu_device_id), 65 input(input), 66 output(output), 67 global_rank(global_rank), 68 done_callback(std::move(done_callback)), 69 root(false) { 70 DCHECK(executor != nullptr); 71 DCHECK(event_mgr != nullptr); 72 DCHECK(tensor_stream != nullptr); 73 } 74 75 // StreamExecutor for the device. Expected to be live for process lifetime. 76 se::StreamExecutor* const executor = nullptr; 77 78 // `tensor_stream` is the stream that should be waited on to ensure 79 // `input`'s data is available on the GPU for the communication stream to 80 // access. It is also the stream that will use the produced data; 81 // `done_callback` is not called until the next kernel launched on `stream` 82 // would see the data. Owned by the caller, who must keep it live until 83 // `done_callback` is called. 84 se::Stream* const tensor_stream; 85 86 // EventMgr which polls on executor. 87 // Owned by the caller, who must keep it live until `done_callback` is 88 // called. 89 EventMgr* const event_mgr; 90 91 const int gpu_device_id; 92 93 // Owned by the caller, who must keep it live until `done_callback` is 94 // called. Is NULL for participants that only receive data. 95 const Tensor* input; 96 97 // Owned by the caller, who must keep it live until `done_callback` is 98 // called. Is NULL for participants that only send data. 99 Tensor* output; 100 101 // Rank across all devices and all nodes. 102 // `global_rank` is not required for single-node collectives. 103 const int global_rank; 104 105 // The callback which is called at the completion of the NCCL operation. 106 // When called, `output` has been set to the result of the operation. (note: 107 // the stream may not yet have been synced) 108 DoneCallback done_callback; 109 110 // True if this is the root of the collective, e.g. source of broadcast. 111 bool root; 112 }; 113 114 // Data that provides context for the collective operation, including the 115 // operation key, number of participants, and communicator key. 116 struct Context { ContextContext117 Context(const string& collective_key, int num_local_devices, 118 int num_global_devices, const string& communicator_key) 119 : collective_key(collective_key), 120 num_local_devices(num_local_devices), 121 num_global_devices(num_global_devices), 122 communicator_key(communicator_key) {} 123 124 // Unique key for this collective instance 125 const string& collective_key; 126 127 // Devices local to this node 128 int num_local_devices; 129 130 // Devices across all nodes 131 int num_global_devices; 132 133 // In order to use NCCL across nodes, the callee first has to generate a 134 // `communicator_key` via `GenerateCommunicatorKey()` function and share 135 // this with all the other nodes. Each node should pass in this 136 // `communicator_key` to the `NcclManager` functions. 137 // `communicator_key` is not required for single-node collectives and can be 138 // empty. 139 const string& communicator_key; 140 }; 141 142 // Adds one participant to an all-reduce. 143 void AddToAllReduce(std::unique_ptr<Participant> participant, 144 const Context& context, ncclRedOp_t reduction_op); 145 146 // Adds one participant to an all-gather. 147 void AddToAllGather(std::unique_ptr<Participant> participant, 148 const Context& context); 149 150 // AddBroadcastSend and AddBroadcastRecv combine to send data from one sender 151 // to all receivers. 152 void AddBroadcastSend(std::unique_ptr<Participant> participant, 153 const Context& context); 154 void AddBroadcastRecv(std::unique_ptr<Participant> participant, 155 const Context& context); 156 157 // AddReduceSend and AddReduceRecv combine to send data from all senders 158 // to one receiver. 159 void AddReduceSend(std::unique_ptr<Participant> participant, 160 const Context& context, ncclRedOp_t reduction_op); 161 void AddReduceRecv(std::unique_ptr<Participant> participant, 162 const Context& context, ncclRedOp_t reduction_op); 163 164 // Signals that the `Collective` corresponding to `key` is ready to launch 165 // across all nodes participating in this multi-node collective operation. 166 // 167 // This should only be called for multi-node collectives; single-node 168 // collectives are implicitly ready when all participants have called Add* 169 // function. 170 void SignalMultiNodeReady(const string& collective_key); 171 172 private: 173 enum CollectiveType { 174 kAllReduce = 1, 175 kBroadcast = 2, 176 kReduce = 3, 177 kAllGather = 4, 178 }; 179 struct Collective; 180 struct Communicator; 181 struct CommunicatorMember; 182 struct NcclStream; 183 184 // Gets the `Communicator` object that will be used to enqueue NCCL kernels 185 // for `collective`, and returns it via `communicator`. 186 // 187 // This may involve creating CUDA streams and NCCL initialization. If a NCCL 188 // or CUDA error occurs in the process, this returns an INTERNAL error with 189 // the corresponding NCCL/CUDA error string. 190 Status GetCommunicator(Collective* collective, Communicator** communicator); 191 192 // Adds a participant device to the local `Collective` instance corresponding 193 // to `collective_key`. Launches the `Collective` if it is ready, which it 194 // checks by calling `CheckReady()`. Also performs consistency and sanity 195 // checks before launching. 196 void AddParticipant(std::unique_ptr<Participant> participant, 197 const Context& context, CollectiveType collective_type, 198 ncclRedOp_t reduction_op); 199 200 // If `collective` is ready to run, removes it from the `collectives_` map and 201 // returns the pointer. Otherwise returns `nullptr`. 202 // Assumes `collective_key` corresponds to `collective`. 203 // 204 // A collective is ready to run when all local participants have called Add* 205 // function, and the collective is signalled globally ready via 206 // `SetMultiNodeReady`. 207 Collective* CheckReady(const string& collective_key, Collective* collective) 208 EXCLUSIVE_LOCKS_REQUIRED(mu_); 209 210 // Run <collective>. This calls takes ownership of <collective>. 211 void RunCollective(Collective* collective); 212 void LoopKernelLaunches(NcclStream* stream); 213 214 mutex mu_; 215 216 // Maps key to collectives currently being assembled or run. 217 std::unordered_map<string, std::unique_ptr<Collective>> collectives_ 218 GUARDED_BY(mu_); 219 220 // Maps a device to the communication streams that make up its collective. 221 // This is used to share the stream across different communicators that 222 // include the same device. 223 std::map<se::StreamExecutor*, std::vector<std::unique_ptr<NcclStream>>> 224 device_to_comm_streams_ GUARDED_BY(mu_); 225 226 std::vector<std::unique_ptr<Communicator>> communicators_; 227 228 TF_DISALLOW_COPY_AND_ASSIGN(NcclManager); 229 }; 230 231 } // namespace tensorflow 232 233 #endif // GOOGLE_CUDA 234 235 #endif // TENSORFLOW_CORE_NCCL_NCCL_MANAGER_H_ 236