1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/compiler/xla/pjrt/gpu_device.h"
17
18 #include "absl/container/flat_hash_map.h"
19 #include "tensorflow/compiler/xla/pjrt/pjrt_stream_executor_client.h"
20
21 #ifdef NCCL_ENABLED
22 #include "third_party/nccl/nccl.h"
23 #endif // NCCL_ENABLED
24 #include "tensorflow/compiler/xla/client/client_library.h"
25 #include "tensorflow/compiler/xla/service/gpu/gpu_executable_run_options.h"
26 #include "tensorflow/compiler/xla/service/platform_util.h"
27 #include "tensorflow/compiler/xla/statusor.h"
28 #include "tensorflow/compiler/xla/util.h"
29 #include "tensorflow/core/common_runtime/device/device_host_allocator.h"
30 #include "tensorflow/core/common_runtime/device/device_id.h"
31 #include "tensorflow/core/common_runtime/device/device_mem_allocator.h"
32 #include "tensorflow/core/util/env_var.h"
33 #include "tensorflow/stream_executor/tf_allocator_adapter.h"
34
35 namespace xla {
36 namespace {
37
38 // A custom PjRtClient that overrides the device assignment method.
39 class GpuClient : public xla::PjRtStreamExecutorClient {
40 public:
41 using xla::PjRtStreamExecutorClient::PjRtStreamExecutorClient;
42
43 xla::StatusOr<xla::DeviceAssignment> GetDefaultDeviceAssignment(
44 int num_replicas, int num_partitions) const override;
45 };
46
GetDefaultDeviceAssignment(int num_replicas,int num_partitions) const47 xla::StatusOr<xla::DeviceAssignment> GpuClient::GetDefaultDeviceAssignment(
48 int num_replicas, int num_partitions) const {
49 if (num_partitions == 1 && num_replicas <= addressable_devices().size()) {
50 xla::DeviceAssignment assignment(num_replicas, 1);
51 for (int i = 0; i < num_replicas; ++i) {
52 assignment(i, 0) = addressable_devices().at(i)->id();
53 }
54 return assignment;
55 }
56 // Fallback to default global device assignment if we can't run locally.
57 return PjRtStreamExecutorClient::GetDefaultDeviceAssignment(num_replicas,
58 num_partitions);
59 }
60
61 // Builds an xla::LocalClient for the GPU platform.
GetGpuXlaClient()62 StatusOr<LocalClient*> GetGpuXlaClient() {
63 // "gpu" will be substitued by the default defined in platform_util.cc
64 TF_ASSIGN_OR_RETURN(se::Platform * platform,
65 PlatformUtil::GetPlatform("gpu"));
66 if (platform->VisibleDeviceCount() <= 0) {
67 return FailedPrecondition("No visible GPU devices.");
68 }
69 LocalClientOptions options;
70 options.set_platform(platform);
71 return ClientLibrary::GetOrCreateLocalClient(options);
72 }
73
74 // Builds a LocalDeviceState for each GPU present.
BuildLocalDeviceStates(LocalClient * xla_client,bool asynchronous)75 StatusOr<std::vector<std::unique_ptr<LocalDeviceState>>> BuildLocalDeviceStates(
76 LocalClient* xla_client, bool asynchronous) {
77 std::vector<std::unique_ptr<LocalDeviceState>> addressable_devices;
78 for (int i = 0; i < xla_client->device_count(); ++i) {
79 se::StreamExecutor* executor =
80 xla_client->backend().stream_executor(i).ValueOrDie();
81 addressable_devices.push_back(absl::make_unique<LocalDeviceState>(
82 executor, xla_client, LocalDeviceState::kComputeSynchronized,
83 asynchronous,
84 /*allow_event_reuse=*/true));
85 }
86 return std::move(addressable_devices);
87 }
88
89 // Builds a BFCAllocator for all local GPUs.
CreateBFCAllocator(absl::Span<std::unique_ptr<LocalDeviceState> const> addressable_devices,double memory_fraction,bool preallocate)90 StatusOr<std::unique_ptr<se::MultiDeviceAdapter>> CreateBFCAllocator(
91 absl::Span<std::unique_ptr<LocalDeviceState> const> addressable_devices,
92 double memory_fraction, bool preallocate) {
93 CHECK_GT(addressable_devices.size(), 0);
94 const se::Platform* platform =
95 addressable_devices.front()->executor()->platform();
96 std::vector<se::MultiDeviceAdapter::AllocatorWithStream> allocators;
97 bool enable_unified_memory;
98 Status status = tensorflow::ReadBoolFromEnvVar("TF_FORCE_UNIFIED_MEMORY",
99 false, &enable_unified_memory);
100 if (!status.ok()) {
101 LOG(ERROR) << "Unable to read TF_FORCE_UNIFIED_MEMORY: "
102 << status.error_message();
103 }
104
105 for (auto& local_device : addressable_devices) {
106 se::StreamExecutor* executor = local_device->executor();
107 int device_ordinal = executor->device_ordinal();
108 auto sub_allocator = absl::make_unique<tensorflow::DeviceMemAllocator>(
109 executor, tensorflow::PlatformDeviceId(device_ordinal),
110 /*use_unified_memory=*/enable_unified_memory,
111 /*alloc_visitors=*/std::vector<tensorflow::SubAllocator::Visitor>(),
112 /*free_visitors=*/std::vector<tensorflow::SubAllocator::Visitor>());
113
114 int64 free_memory;
115 int64 total_memory;
116 if (!executor->DeviceMemoryUsage(&free_memory, &total_memory)) {
117 return Unavailable("Failed to query available memory from device %i",
118 device_ordinal);
119 }
120 // To allow full GPU memory to be visible to the BFC allocator if using
121 // unified memory.
122 size_t allocator_memory =
123 enable_unified_memory ? total_memory : free_memory * memory_fraction;
124 if (preallocate) {
125 LOG(INFO) << "XLA backend allocating " << allocator_memory
126 << " bytes on device " << device_ordinal
127 << " for BFCAllocator.";
128 } else {
129 LOG(INFO) << "XLA backend will use up to " << allocator_memory
130 << " bytes on device " << device_ordinal
131 << " for BFCAllocator.";
132 }
133 auto gpu_bfc_allocator = absl::make_unique<tensorflow::BFCAllocator>(
134 sub_allocator.release(), allocator_memory,
135 /*allow_growth=*/!preallocate,
136 absl::StrCat("GPU_", device_ordinal, "_bfc"));
137 allocators.emplace_back(std::move(gpu_bfc_allocator),
138 local_device->compute_stream());
139 }
140 return absl::make_unique<se::MultiDeviceAdapter>(platform,
141 std::move(allocators));
142 }
143
144 // Constructs a GPU device memory allocator to use, according to the allocator
145 // configuration the client requested.
GetGpuDeviceAllocator(const GpuAllocatorConfig & allocator_config,absl::Span<std::unique_ptr<LocalDeviceState> const> addressable_devices)146 StatusOr<std::unique_ptr<se::DeviceMemoryAllocator>> GetGpuDeviceAllocator(
147 const GpuAllocatorConfig& allocator_config,
148 absl::Span<std::unique_ptr<LocalDeviceState> const> addressable_devices) {
149 std::unique_ptr<se::DeviceMemoryAllocator> allocator;
150 if (allocator_config.kind != GpuAllocatorConfig::Kind::kPlatform) {
151 TF_ASSIGN_OR_RETURN(allocator,
152 CreateBFCAllocator(addressable_devices,
153 allocator_config.memory_fraction,
154 allocator_config.preallocate));
155 }
156 return std::move(allocator);
157 }
158
159 // Returns a GPU pinned host memory allocator to use when staging host->GPU
160 // transfers. We use a fixed 64MB pool of pinned memory.
GetGpuHostAllocator(se::StreamExecutor * executor)161 std::unique_ptr<tensorflow::BFCAllocator> GetGpuHostAllocator(
162 se::StreamExecutor* executor) {
163 tensorflow::SubAllocator* sub_allocator = new tensorflow::DeviceHostAllocator(
164 executor, /*numa_node=*/0, /*alloc_visitors=*/{}, /*free_visitors=*/{});
165 // TODO(phawkins): allow the user to tune this.
166 const int64 kGpuHostMemoryLimitBytes = 64 * (1LL << 30);
167 return absl::make_unique<tensorflow::BFCAllocator>(
168 sub_allocator, kGpuHostMemoryLimitBytes, /*allow_growth=*/true,
169 /*name=*/"xla_gpu_host_bfc");
170 }
171
172 // A table mapping NcclCliqueKeys to ncclUniqueId values encoded as strings.
173 // In a distributed setup the table of NCCL IDs is kept on the master node
174 // (node 0). The node of the first participating device will create the unique
175 // id.
176 class NcclIdStore {
177 public:
NcclIdStore(int node_id,std::shared_ptr<DistributedRuntimeClient> client,absl::flat_hash_map<GlobalDeviceId,int> device_to_node)178 NcclIdStore(int node_id, std::shared_ptr<DistributedRuntimeClient> client,
179 absl::flat_hash_map<GlobalDeviceId, int> device_to_node)
180 : node_id_(node_id),
181 client_(std::move(client)),
182 device_to_node_(std::move(device_to_node)) {}
183
184 StatusOr<std::string> GetNcclUniqueId(const gpu::NcclCliqueKey& key);
185
186 private:
187 const int node_id_;
188 const std::shared_ptr<DistributedRuntimeClient> client_;
189 const absl::flat_hash_map<GlobalDeviceId, int> device_to_node_;
190
191 absl::Mutex mu_;
192 absl::flat_hash_map<gpu::NcclCliqueKey, std::string> cache_
193 ABSL_GUARDED_BY(mu_);
194 };
195
GetNcclUniqueId(const gpu::NcclCliqueKey & key)196 StatusOr<std::string> NcclIdStore::GetNcclUniqueId(
197 const gpu::NcclCliqueKey& key) {
198 // The caller must ensure that threads calling this method concurrently have
199 // unique keys, otherwise the global key-value store may hold the wrong value.
200 {
201 absl::MutexLock lock(&mu_);
202 auto it = cache_.find(key);
203 if (it != cache_.end()) {
204 return it->second;
205 }
206 }
207 std::string id_string;
208 int primary_node_id = device_to_node_.at(key.devices()[0]);
209 if (node_id_ == primary_node_id) {
210 #ifdef NCCL_ENABLED
211 ncclUniqueId id;
212 ncclResult_t r = ncclGetUniqueId(&id);
213 TF_RET_CHECK(r == ncclSuccess);
214 id_string = std::string(id.internal, NCCL_UNIQUE_ID_BYTES);
215 TF_RETURN_IF_ERROR(client_->KeyValueSet(key.ToString(), id_string));
216 #else
217 return FailedPrecondition("NCCL support was not built into XLA binary.");
218 #endif
219 } else {
220 TF_ASSIGN_OR_RETURN(id_string, client_->BlockingKeyValueGet(
221 key.ToString(), absl::Minutes(5)));
222 }
223 absl::MutexLock lock(&mu_);
224 auto result = cache_.emplace(key, std::move(id_string));
225 TF_RET_CHECK(result.second) << "Unique ID already in cache.";
226 return result.first->second;
227 }
228
BuildLocalDevices(std::vector<std::unique_ptr<LocalDeviceState>> local_device_states)229 std::vector<std::unique_ptr<PjRtStreamExecutorDevice>> BuildLocalDevices(
230 std::vector<std::unique_ptr<LocalDeviceState>> local_device_states) {
231 std::vector<std::unique_ptr<PjRtStreamExecutorDevice>> devices;
232 for (auto& local_device : local_device_states) {
233 int device_ordinal = local_device->device_ordinal();
234 const se::DeviceDescription& description =
235 local_device->executor()->GetDeviceDescription();
236 auto device = absl::make_unique<GpuDevice>(
237 device_ordinal, std::move(local_device), description.name(),
238 /*node_id=*/0);
239 devices.push_back(std::move(device));
240 }
241 return devices;
242 }
243
BuildDistributedDevices(std::vector<std::unique_ptr<LocalDeviceState>> local_device_states,std::shared_ptr<DistributedRuntimeClient> distributed_client,int node_id,std::vector<std::unique_ptr<PjRtStreamExecutorDevice>> * devices,gpu::GpuExecutableRunOptions * gpu_executable_run_options)244 Status BuildDistributedDevices(
245 std::vector<std::unique_ptr<LocalDeviceState>> local_device_states,
246 std::shared_ptr<DistributedRuntimeClient> distributed_client, int node_id,
247 std::vector<std::unique_ptr<PjRtStreamExecutorDevice>>* devices,
248 gpu::GpuExecutableRunOptions* gpu_executable_run_options) {
249 LocalTopologyProto local_topology;
250 local_topology.set_node_id(node_id);
251 for (const auto& local_device : local_device_states) {
252 const se::Platform* platform = local_device->executor()->platform();
253 TF_ASSIGN_OR_RETURN(
254 std::unique_ptr<xla::se::DeviceDescription> desc,
255 platform->DescriptionForDevice(local_device->device_ordinal()));
256 TF_RET_CHECK(local_device->device_ordinal() ==
257 local_topology.devices_size());
258 DeviceProto* device_proto = local_topology.add_devices();
259 device_proto->set_local_device_ordinal(local_device->device_ordinal());
260 device_proto->set_name(desc->name());
261 device_proto->set_vendor(desc->device_vendor());
262 }
263
264 GlobalTopologyProto global_topology;
265 TF_RETURN_IF_ERROR(
266 distributed_client->EnumerateDevices(local_topology, &global_topology));
267
268 std::vector<GlobalDeviceId> gpu_device_ids(local_device_states.size());
269 absl::flat_hash_map<GlobalDeviceId, int> device_to_node;
270 for (const LocalTopologyProto& node : global_topology.nodes()) {
271 for (const DeviceProto& device_proto : node.devices()) {
272 GlobalDeviceId global_device_id(device_proto.global_device_id());
273 device_to_node[global_device_id] = node.node_id();
274 std::unique_ptr<LocalDeviceState> local_device;
275 if (node.node_id() == node_id) {
276 TF_RET_CHECK(device_proto.local_device_ordinal() >= 0 &&
277 device_proto.local_device_ordinal() <
278 local_device_states.size());
279 TF_RET_CHECK(local_device_states[device_proto.local_device_ordinal()] !=
280 nullptr);
281 local_device =
282 std::move(local_device_states[device_proto.local_device_ordinal()]);
283 gpu_device_ids[device_proto.local_device_ordinal()] = global_device_id;
284 }
285 auto device = absl::make_unique<GpuDevice>(
286 device_proto.global_device_id(), std::move(local_device),
287 device_proto.name(), node.node_id());
288 devices->push_back(std::move(device));
289 }
290 }
291 for (const auto& device : local_device_states) {
292 TF_RET_CHECK(device == nullptr);
293 }
294 gpu_executable_run_options->set_gpu_global_device_ids(
295 std::move(gpu_device_ids));
296 auto nccl_id_store = std::make_shared<NcclIdStore>(
297 node_id, distributed_client, device_to_node);
298 gpu_executable_run_options->set_nccl_unique_id_callback(
299 [nccl_id_store](const gpu::NcclCliqueKey& key) {
300 return nccl_id_store->GetNcclUniqueId(key);
301 });
302 return Status::OK();
303 }
304
305 } // namespace
306
GpuDevice(int id,std::unique_ptr<LocalDeviceState> local_device_state,std::string device_kind,int node_id)307 GpuDevice::GpuDevice(int id,
308 std::unique_ptr<LocalDeviceState> local_device_state,
309 std::string device_kind, int node_id)
310 : PjRtStreamExecutorDevice(id, std::move(local_device_state),
311 std::move(device_kind), node_id) {}
312
GetGpuClient(bool asynchronous,const GpuAllocatorConfig & allocator_config,std::shared_ptr<DistributedRuntimeClient> distributed_client,int node_id)313 StatusOr<std::unique_ptr<PjRtClient>> GetGpuClient(
314 bool asynchronous, const GpuAllocatorConfig& allocator_config,
315 std::shared_ptr<DistributedRuntimeClient> distributed_client, int node_id) {
316 TF_ASSIGN_OR_RETURN(LocalClient * xla_client, GetGpuXlaClient());
317 TF_ASSIGN_OR_RETURN(
318 std::vector<std::unique_ptr<LocalDeviceState>> local_device_states,
319 BuildLocalDeviceStates(xla_client, asynchronous));
320 TF_ASSIGN_OR_RETURN(
321 auto allocator,
322 GetGpuDeviceAllocator(allocator_config, local_device_states));
323 auto host_memory_allocator =
324 GetGpuHostAllocator(local_device_states.front()->executor());
325
326 std::vector<std::unique_ptr<PjRtStreamExecutorDevice>> devices;
327 auto gpu_run_options = absl::make_unique<gpu::GpuExecutableRunOptions>();
328 if (distributed_client) {
329 TF_RETURN_IF_ERROR(BuildDistributedDevices(
330 std::move(local_device_states), std::move(distributed_client), node_id,
331 &devices, gpu_run_options.get()));
332 } else {
333 devices = BuildLocalDevices(std::move(local_device_states));
334 }
335
336 return std::unique_ptr<PjRtClient>(std::make_unique<GpuClient>(
337 kGpuName, xla_client, std::move(devices),
338 /*node_id=*/node_id, std::move(allocator),
339 std::move(host_memory_allocator),
340 /*should_stage_host_to_device_transfers=*/true,
341 /*gpu_run_options=*/std::move(gpu_run_options)));
342 }
343
344 } // namespace xla
345