1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/kernels/collective_nccl_reducer.h"
16
17 #ifdef GOOGLE_CUDA
18
19 #include "tensorflow/core/common_runtime/collective_util.h"
20 #include "tensorflow/core/nccl/nccl_manager.h"
21
22 namespace tensorflow {
23 namespace {
NcclCollectiveKey(const string & exec_key,int step_id)24 string NcclCollectiveKey(const string& exec_key, int step_id) {
25 return strings::StrCat(exec_key, ":", step_id);
26 }
27 } // namespace
28
NcclReducer()29 NcclReducer::NcclReducer() : col_ctx_(nullptr), col_params_(nullptr) {}
30
InitializeCollectiveParams(CollectiveParams * col_params)31 Status NcclReducer::InitializeCollectiveParams(CollectiveParams* col_params) {
32 if (col_params->instance.type != REDUCTION_COLLECTIVE ||
33 col_params->instance.impl_details.collective_name != "NcclReduce") {
34 return errors::Internal("Unexpected collective type ",
35 col_params->instance.type, " expected ",
36 REDUCTION_COLLECTIVE, "; or collective name ",
37 col_params->instance.impl_details.collective_name,
38 " expected NcclReduce");
39 } else {
40 return Status::OK();
41 }
42 }
43
InitializeCollectiveContext(CollectiveContext * col_ctx)44 Status NcclReducer::InitializeCollectiveContext(CollectiveContext* col_ctx) {
45 col_ctx_ = col_ctx;
46 col_params_ = &col_ctx->col_params;
47 return collective_util::InitializeDeviceAndLocality(
48 col_ctx->dev_mgr, col_ctx->device_name, &col_ctx->device,
49 &col_ctx->device_locality);
50 }
51
InitializeInstanceBeforeGroupDiscovery(CollectiveParams * col_params)52 Status NcclReducer::InitializeInstanceBeforeGroupDiscovery(
53 CollectiveParams* col_params) {
54 if (col_params->default_rank == 0 && col_params->group.num_tasks > 1) {
55 col_params->instance.communicator_key =
56 NcclManager::instance()->GenerateCommunicatorKey();
57 }
58 return Status::OK();
59 }
60
ReductionOp(const string & merge_op,ncclRedOp_t * reduction_op)61 Status ReductionOp(const string& merge_op, ncclRedOp_t* reduction_op) {
62 if (merge_op == "Add") {
63 *reduction_op = ncclSum;
64 return Status::OK();
65 } else if (merge_op == "Mul") {
66 *reduction_op = ncclProd;
67 return Status::OK();
68 } else {
69 return errors::Internal("Expected merge_op to be either Add or Mul, found ",
70 merge_op);
71 }
72 }
73
Run(StatusCallback done)74 void NcclReducer::Run(StatusCallback done) {
75 ncclRedOp_t reduction_op;
76 Status s = ReductionOp(col_params_->merge_op->type_string(), &reduction_op);
77 if (!s.ok()) {
78 done(s);
79 return;
80 }
81
82 Tensor group_size;
83 Notification group_size_ready;
84 Status group_size_status;
85 if (col_params_->final_op) {
86 // Create an on-device scalar value from group_size_.
87 // TODO(ayushd, tucker): avoid this copy by either reusing across
88 // invocations or providing the scalar to the kernel in host memory.
89 Tensor group_size_val(col_ctx_->output->dtype(), TensorShape({}));
90 switch (col_ctx_->output->dtype()) {
91 case DT_FLOAT:
92 group_size_val.scalar<float>()() = col_params_->group.group_size;
93 break;
94 case DT_DOUBLE:
95 group_size_val.scalar<double>()() = col_params_->group.group_size;
96 break;
97 case DT_INT32:
98 group_size_val.scalar<int32>()() = col_params_->group.group_size;
99 break;
100 case DT_INT64:
101 group_size_val.scalar<int64>()() = col_params_->group.group_size;
102 break;
103 default:
104 done(errors::Internal("Unsupported type ", col_ctx_->output->dtype()));
105 return;
106 }
107 group_size = Tensor(
108 col_ctx_->device->GetAllocator(col_ctx_->op_ctx->input_alloc_attr(0)),
109 col_ctx_->output->dtype(), TensorShape({}));
110 DeviceContext* op_dev_ctx = col_ctx_->op_ctx->op_device_context();
111 // Enqueue copy on gpu stream.
112 op_dev_ctx->CopyCPUTensorToDevice(
113 &group_size_val, col_ctx_->device, &group_size,
114 [&group_size_ready, &group_size_status](const Status& s) {
115 group_size_status = s;
116 group_size_ready.Notify();
117 });
118 } else {
119 group_size_ready.Notify();
120 }
121
122 Notification nccl_done;
123 Status nccl_status;
124 auto* compute_stream = col_ctx_->op_ctx->op_device_context()->stream();
125 auto* gpu_info = col_ctx_->op_ctx->device()->tensorflow_gpu_device_info();
126 // `AddToAllReduce` performs consistency checks for the NCCL call and enqueues
127 // the `Participant` struct locally. When all local participants with this
128 // `nccl_collective_key` have called `AddToAllReduce` and
129 // `SignalMultiNodeReady`, all devices at this worker are ready to process
130 // this NCCL op.
131 //
132 // The `NcclManager` uses a dedicated CUDA stream for NCCL kernels. At this
133 // point, it synchronizes the NCCL stream with the compute stream, and then
134 // enqueues the NCCL kernel on the NCCL stream.
135 const int num_global_devices = col_params_->group.group_size;
136 const int num_local_devices = col_params_->instance.num_devices_per_task.at(
137 col_params_->instance.task_names[col_params_->default_rank]);
138 const string nccl_collective_key =
139 NcclCollectiveKey(col_ctx_->exec_key, col_ctx_->step_id);
140 auto done_callback = [&nccl_done, &nccl_status](const Status& s) {
141 nccl_status = s;
142 nccl_done.Notify();
143 };
144 auto participant = absl::make_unique<NcclManager::Participant>(
145 compute_stream->parent(), compute_stream, gpu_info->event_mgr,
146 gpu_info->gpu_id, col_ctx_->input, col_ctx_->output,
147 col_params_->default_rank, std::move(done_callback));
148 VLOG(1) << "NcclReducer calling NcclManager::AddToAllReduce num_tasks "
149 << col_params_->group.num_tasks << " current task "
150 << col_params_->instance.task_names[col_params_->default_rank]
151 << " num local devices " << num_local_devices
152 << " num global devices " << num_global_devices << " device "
153 << col_ctx_->device_name << " instance "
154 << col_params_->instance.instance_key;
155 NcclManager::instance()->AddToAllReduce(
156 std::move(participant),
157 {nccl_collective_key, num_local_devices, num_global_devices,
158 col_params_->instance.communicator_key},
159 reduction_op);
160
161 // NOTE(ayushd): We need to synchronize NCCL launches across nodes to prevent
162 // deadlocks. In the current implementation, we define a deterministic
163 // sequential launch order between potentially concurrent collective instances
164 // by introducing control information during static graph analysis in
165 // graph/collective_order.cc. This can be either in the form of explicit
166 // control edges or via `wait_for` attribute on the collective op.
167 //
168 // The other end of the design spectrum would have a distinguished node
169 // dynamically signal the next collective to launch to all other participants.
170 // This has higher degree of runtime coordination, but it may be able to
171 // achieve better performance if the (arbitrary) static execution order
172 // assigned in the first approach turns out to not be good from a scheduling
173 // perspective. e.g. consider a graph in which c1, c2, and c3 are three
174 // concurrent collective instances, and the static ordering assigns c1 -> c2
175 // -> c3. In practice, it could turn out that c3 is always ready to execute
176 // before c1 or c2.
177 //
178 // `WaitForDependencies` may block if the collective instances on which this
179 // op depends have not yet launched. When this function returns, this op is
180 // ready to go.
181 col_ctx_->col_exec->WaitForDependencies(*col_params_);
182 NcclManager::instance()->SignalMultiNodeReady(nccl_collective_key);
183 // When all devices at this worker have called `SignalMultiNodeReady`, the
184 // `NcclManager` will enqueue the NCCL kernel on the NCCL stream. Thus the
185 // implementation of `Launched` keeps track of the number of devices that have
186 // launched.
187 col_ctx_->col_exec->Launched(*col_params_);
188
189 // Wait for nccl op and group_size copy to succeed, then do final_op.
190 group_size_ready.WaitForNotification();
191 nccl_done.WaitForNotification();
192 Status final_status =
193 group_size_status.ok() ? nccl_status : group_size_status;
194 if (final_status.ok() && col_params_->final_op) {
195 final_status = collective_util::ComputeBinOp(
196 col_ctx_->op_ctx, col_ctx_->op_params, col_ctx_->device,
197 col_params_->final_op.get(), col_ctx_->output, &group_size);
198 }
199 done(final_status);
200 }
201
202 REGISTER_COLLECTIVE(NcclReduce, NcclReducer);
203
204 } // namespace tensorflow
205
206 #endif // GOOGLE_CUDA
207