1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/compiler/xla/pjrt/gpu_device.h"
17 
18 #include <map>
19 #include <optional>
20 #include <set>
21 #include <string>
22 #include <utility>
23 #include <vector>
24 
25 #include "absl/base/attributes.h"
26 #include "absl/container/flat_hash_map.h"
27 #include "tensorflow/compiler/xla/pjrt/pjrt_stream_executor_client.h"
28 #include "tensorflow/stream_executor/device_memory.h"
29 
30 #ifdef GOOGLE_CUDA
31 #include "third_party/gpus/cuda/include/cuda.h"
32 #include "third_party/gpus/cuda/include/cuda_runtime_api.h"
33 #include "tensorflow/compiler/xla/pjrt/nccl_id_store.h"
34 #include "tensorflow/core/common_runtime/gpu/gpu_cudamallocasync_allocator.h"
35 #include "tensorflow/stream_executor/cuda/cuda_activation.h"
36 #endif  // GOOGLE_CUDA
37 
38 #ifdef TENSORFLOW_USE_ROCM
39 #include "rocm/rocm_config.h"
40 #endif  // TENSORFLOW_USE_ROCM
41 
42 #include "tensorflow/compiler/xla/client/client_library.h"
43 #include "tensorflow/compiler/xla/service/gpu/gpu_executable_run_options.h"
44 #include "tensorflow/compiler/xla/service/platform_util.h"
45 #include "tensorflow/compiler/xla/statusor.h"
46 #include "tensorflow/compiler/xla/util.h"
47 #include "tensorflow/core/common_runtime/device/device_host_allocator.h"
48 #include "tensorflow/core/common_runtime/device/device_id.h"
49 #include "tensorflow/core/common_runtime/device/device_mem_allocator.h"
50 #include "tensorflow/core/util/env_var.h"
51 #include "tensorflow/stream_executor/tf_allocator_adapter.h"
52 
53 namespace xla {
54 namespace {
55 
56 #if defined(GOOGLE_CUDA) && CUDA_VERSION >= 11020
57 
CreateCudaAsyncAllocator(se::Platform * platform,const std::map<int,std::unique_ptr<LocalDeviceState>> & addressable_devices,double memory_fraction,bool preallocate)58 StatusOr<std::unique_ptr<se::MultiDeviceAdapter>> CreateCudaAsyncAllocator(
59     se::Platform* platform,
60     const std::map<int, std::unique_ptr<LocalDeviceState>>& addressable_devices,
61     double memory_fraction, bool preallocate) {
62   CHECK_GT(addressable_devices.size(), 0);
63   std::vector<se::MultiDeviceAdapter::AllocatorWithStream> allocators;
64 
65   for (auto& ordinal_and_device : addressable_devices) {
66     se::StreamExecutor* executor = ordinal_and_device.second->executor();
67     int device_ordinal = executor->device_ordinal();
68 
69     int64_t free_memory;
70     int64_t total_memory;
71     if (!executor->DeviceMemoryUsage(&free_memory, &total_memory)) {
72       return Unavailable("Failed to query available memory from device %i",
73                          device_ordinal);
74     }
75     // To allow full GPU memory to be visible to the BFC allocator if using
76     // unified memory.
77     // When unified memory is enabled, allow GPU memory oversubscription by
78     // setting memory_fraction > 1.
79     size_t allocator_memory = free_memory * memory_fraction;
80     if (preallocate) {
81       LOG(INFO) << "XLA backend allocating " << allocator_memory
82                 << " bytes on device " << device_ordinal
83                 << " for BFCAllocator.";
84     } else {
85       LOG(INFO) << "XLA backend will use up to " << allocator_memory
86                 << " bytes on device " << device_ordinal
87                 << " for BFCAllocator.";
88     }
89 
90     auto allocator = std::make_unique<tensorflow::GpuCudaMallocAsyncAllocator>(
91         tensorflow::PlatformDeviceId(device_ordinal), allocator_memory,
92         preallocate);
93     allocator->SetStreamAndPreallocateMemory(
94         ordinal_and_device.second->compute_stream()
95             ->implementation()
96             ->GpuStreamMemberHack());
97     allocators.emplace_back(std::move(allocator),
98                             ordinal_and_device.second->compute_stream());
99   }
100   return std::make_unique<se::MultiDeviceAdapter>(platform,
101                                                   std::move(allocators));
102 }
103 
104 #else  // defined(GOOGLE_CUDA) && CUDA_VERSION >= 11020
105 
106 StatusOr<std::unique_ptr<se::MultiDeviceAdapter>> CreateCudaAsyncAllocator(
107     se::Platform* platform,
108     const std::map<int, std::unique_ptr<LocalDeviceState>>& addressable_devices,
109     double memory_fraction, bool preallocate) {
110   return FailedPrecondition("CUDA async allocator requires CUDA >= 11.2");
111 }
112 
113 #endif  // defined(GOOGLE_CUDA) && CUDA_VERSION >= 11020
114 
115 // A custom PjRtClient that overrides the device assignment method.
116 class GpuClient : public xla::PjRtStreamExecutorClient {
117  public:
118   using xla::PjRtStreamExecutorClient::PjRtStreamExecutorClient;
119 
120   xla::StatusOr<xla::DeviceAssignment> GetDefaultDeviceAssignment(
121       int num_replicas, int num_partitions) const override;
122 
platform_version() const123   absl::string_view platform_version() const override {
124 #define STRINGIFY2(X) #X
125 #define STRINGIFY(X) STRINGIFY2(X)
126 #if TENSORFLOW_USE_ROCM && defined(TF_ROCM_VERSION)  // rocm
127     // TF_ROCM_VERSION fomrat may change in future. Use it
128     // cautiously
129     return "rocm " STRINGIFY(TF_ROCM_VERSION);
130 #elif GOOGLE_CUDA && defined(CUDART_VERSION)  // cuda
131     return "cuda " STRINGIFY(CUDART_VERSION);
132 #else
133     return "<unknown>";
134 #endif  // TENSORFLOW_USE_ROCM && defined(TF_ROCM_VERSION)
135   }
136 };
137 
GetDefaultDeviceAssignment(int num_replicas,int num_partitions) const138 xla::StatusOr<xla::DeviceAssignment> GpuClient::GetDefaultDeviceAssignment(
139     int num_replicas, int num_partitions) const {
140   if (num_partitions == 1 && num_replicas <= addressable_devices().size()) {
141     xla::DeviceAssignment assignment(num_replicas, 1);
142     for (int i = 0; i < num_replicas; ++i) {
143       assignment(i, 0) = addressable_devices().at(i)->id();
144     }
145     return assignment;
146   }
147   // Fallback to default global device assignment if we can't run locally.
148   return PjRtStreamExecutorClient::GetDefaultDeviceAssignment(num_replicas,
149                                                               num_partitions);
150 }
151 
152 // Builds an xla::LocalClient for the GPU platform.
GetGpuXlaClient(const std::optional<std::string> & platform_name,const std::optional<std::set<int>> & allowed_devices)153 StatusOr<LocalClient*> GetGpuXlaClient(
154     const std::optional<std::string>& platform_name,
155     const std::optional<std::set<int>>& allowed_devices) {
156   TF_ASSIGN_OR_RETURN(
157       se::Platform * platform,
158       PlatformUtil::GetPlatform(platform_name ? *platform_name : "gpu"));
159   if (platform->VisibleDeviceCount() <= 0) {
160     return FailedPrecondition("No visible GPU devices.");
161   }
162   LocalClientOptions options;
163   options.set_platform(platform);
164   options.set_allowed_devices(allowed_devices);
165   return ClientLibrary::GetOrCreateLocalClient(options);
166 }
167 
EnablePeerAccess(absl::Span<se::StreamExecutor * const> executors)168 void EnablePeerAccess(absl::Span<se::StreamExecutor* const> executors) {
169   for (int i = 0; i < executors.size(); ++i) {
170     for (int j = 0; j < executors.size(); ++j) {
171       if (i == j) {
172         continue;
173       }
174       se::StreamExecutor* from = executors[i];
175       se::StreamExecutor* to = executors[j];
176       if (from->CanEnablePeerAccessTo(to)) {
177         Status status = from->EnablePeerAccessTo(to);
178         if (!status.ok()) {
179           LOG(WARNING) << "Unable to enable peer access between GPUs " << i
180                        << " and " << j << "; status: " << status;
181         } else {
182           VLOG(2) << "Enabled peer access from GPU " << i << " to GPU " << j;
183         }
184       }
185     }
186   }
187 }
188 
189 // Builds a LocalDeviceState for each GPU present.
190 StatusOr<std::map<int, std::unique_ptr<LocalDeviceState>>>
BuildLocalDeviceStates(LocalClient * xla_client,bool asynchronous)191 BuildLocalDeviceStates(LocalClient* xla_client, bool asynchronous) {
192   std::map<int, std::unique_ptr<LocalDeviceState>> addressable_devices;
193   for (se::StreamExecutor* executor :
194        xla_client->backend().stream_executors()) {
195     addressable_devices.emplace(
196         executor->device_ordinal(),
197         std::make_unique<LocalDeviceState>(
198             executor, xla_client, LocalDeviceState::kComputeSynchronized,
199             /*max_inflight_computations=*/32,
200             /*allow_event_reuse=*/true, /*use_callback_stream=*/true));
201   }
202   return std::move(addressable_devices);
203 }
204 
205 // Builds a BFCAllocator for all local GPUs.
CreateBFCAllocator(const std::map<int,std::unique_ptr<LocalDeviceState>> & addressable_devices,double memory_fraction,bool preallocate)206 StatusOr<std::unique_ptr<se::MultiDeviceAdapter>> CreateBFCAllocator(
207     const std::map<int, std::unique_ptr<LocalDeviceState>>& addressable_devices,
208     double memory_fraction, bool preallocate) {
209   CHECK_GT(addressable_devices.size(), 0);
210   const se::Platform* platform =
211       addressable_devices.begin()->second->executor()->platform();
212   std::vector<se::MultiDeviceAdapter::AllocatorWithStream> allocators;
213   bool enable_unified_memory;
214   Status status = tensorflow::ReadBoolFromEnvVar("TF_FORCE_UNIFIED_MEMORY",
215                                                  false, &enable_unified_memory);
216   if (!status.ok()) {
217     LOG(ERROR) << "Unable to read TF_FORCE_UNIFIED_MEMORY: "
218                << status.error_message();
219   }
220 
221   for (auto& ordinal_and_device : addressable_devices) {
222     se::StreamExecutor* executor = ordinal_and_device.second->executor();
223     int device_ordinal = executor->device_ordinal();
224     auto sub_allocator = std::make_unique<tensorflow::DeviceMemAllocator>(
225         executor, tensorflow::PlatformDeviceId(device_ordinal),
226         /*use_unified_memory=*/enable_unified_memory,
227         /*alloc_visitors=*/std::vector<tensorflow::SubAllocator::Visitor>(),
228         /*free_visitors=*/std::vector<tensorflow::SubAllocator::Visitor>());
229 
230     int64_t free_memory;
231     int64_t total_memory;
232     if (!executor->DeviceMemoryUsage(&free_memory, &total_memory)) {
233       return Unavailable("Failed to query available memory from device %i",
234                          device_ordinal);
235     }
236     // To allow full GPU memory to be visible to the BFC allocator if using
237     // unified memory.
238     // When unified memory is enabled, allow GPU memory oversubscription by
239     // setting memory_fraction > 1.
240     size_t allocator_memory = enable_unified_memory
241                                   ? total_memory * fmax(1.0, memory_fraction)
242                                   : free_memory * memory_fraction;
243     if (preallocate) {
244       LOG(INFO) << "XLA backend allocating " << allocator_memory
245                 << " bytes on device " << device_ordinal
246                 << " for BFCAllocator.";
247     } else {
248       LOG(INFO) << "XLA backend will use up to " << allocator_memory
249                 << " bytes on device " << device_ordinal
250                 << " for BFCAllocator.";
251     }
252 
253     tensorflow::BFCAllocator::Options opts;
254     opts.allow_growth = !preallocate;
255     auto gpu_bfc_allocator = std::make_unique<tensorflow::BFCAllocator>(
256         std::move(sub_allocator), allocator_memory,
257         absl::StrCat("GPU_", device_ordinal, "_bfc"), opts);
258     allocators.emplace_back(std::move(gpu_bfc_allocator),
259                             ordinal_and_device.second->compute_stream());
260   }
261   return std::make_unique<se::MultiDeviceAdapter>(platform,
262                                                   std::move(allocators));
263 }
264 
265 // Constructs a GPU device memory allocator to use, according to the allocator
266 // configuration the client requested.
GetGpuDeviceAllocator(se::Platform * platform,const GpuAllocatorConfig & allocator_config,const std::map<int,std::unique_ptr<LocalDeviceState>> & addressable_devices)267 StatusOr<std::unique_ptr<se::DeviceMemoryAllocator>> GetGpuDeviceAllocator(
268     se::Platform* platform, const GpuAllocatorConfig& allocator_config,
269     const std::map<int, std::unique_ptr<LocalDeviceState>>&
270         addressable_devices) {
271   std::unique_ptr<se::DeviceMemoryAllocator> allocator;
272   switch (allocator_config.kind) {
273     case GpuAllocatorConfig::Kind::kCudaAsync: {
274       auto allocator_or = CreateCudaAsyncAllocator(
275           platform, addressable_devices, allocator_config.memory_fraction,
276           allocator_config.preallocate);
277       if (allocator_or.ok()) {
278         LOG(INFO) << "Using CUDA async allocator.";
279         allocator = std::move(allocator_or.ValueOrDie());
280         break;
281       }
282       LOG(ERROR) << "Failed to initialize CUDA async allocator: "
283                  << allocator_or.status() << "; falling back to BFC.";
284       [[fallthrough]];
285     }
286 
287     case GpuAllocatorConfig::Kind::kDefault:
288     case GpuAllocatorConfig::Kind::kBFC: {
289       LOG(INFO) << "Using BFC allocator.";
290       TF_ASSIGN_OR_RETURN(allocator,
291                           CreateBFCAllocator(addressable_devices,
292                                              allocator_config.memory_fraction,
293                                              allocator_config.preallocate));
294       break;
295     }
296 
297     case GpuAllocatorConfig::Kind::kPlatform:
298       LOG(INFO) << "Using platform allocator.";
299       break;
300   }
301   return std::move(allocator);
302 }
303 
304 // Returns a GPU pinned host memory allocator to use when staging host->GPU
305 // transfers. We use a fixed 64MB pool of pinned memory.
GetGpuHostAllocator(se::StreamExecutor * executor)306 std::unique_ptr<tensorflow::BFCAllocator> GetGpuHostAllocator(
307     se::StreamExecutor* executor) {
308   std::unique_ptr<tensorflow::SubAllocator> sub_allocator(
309       new tensorflow::DeviceHostAllocator(executor, /*numa_node=*/0,
310                                           /*alloc_visitors=*/{},
311                                           /*free_visitors=*/{}));
312   // TODO(phawkins): allow the user to tune this.
313   const int64_t kGpuHostMemoryLimitBytes = 64 * (1LL << 30);
314 
315   tensorflow::BFCAllocator::Options opts;
316   opts.allow_growth = true;
317   return std::make_unique<tensorflow::BFCAllocator>(
318       std::move(sub_allocator), kGpuHostMemoryLimitBytes,
319       /*name=*/"xla_gpu_host_bfc", opts);
320 }
321 
BuildLocalDevices(std::map<int,std::unique_ptr<LocalDeviceState>> local_device_states)322 std::vector<std::unique_ptr<PjRtStreamExecutorDevice>> BuildLocalDevices(
323     std::map<int, std::unique_ptr<LocalDeviceState>> local_device_states) {
324   std::vector<std::unique_ptr<PjRtStreamExecutorDevice>> devices;
325   for (auto& ordinal_and_device : local_device_states) {
326     const se::DeviceDescription& description =
327         ordinal_and_device.second->executor()->GetDeviceDescription();
328     auto device = std::make_unique<GpuDevice>(
329         ordinal_and_device.first, std::move(ordinal_and_device.second),
330         description.name(), description.device_vendor(),
331         /*node_id=*/0);
332     devices.push_back(std::move(device));
333   }
334   return devices;
335 }
336 
BuildDistributedDevices(std::map<int,std::unique_ptr<LocalDeviceState>> local_device_states,std::shared_ptr<DistributedRuntimeClient> distributed_client,int node_id,std::vector<std::unique_ptr<PjRtStreamExecutorDevice>> * devices,gpu::GpuExecutableRunOptions * gpu_executable_run_options)337 Status BuildDistributedDevices(
338     std::map<int, std::unique_ptr<LocalDeviceState>> local_device_states,
339     std::shared_ptr<DistributedRuntimeClient> distributed_client, int node_id,
340     std::vector<std::unique_ptr<PjRtStreamExecutorDevice>>* devices,
341     gpu::GpuExecutableRunOptions* gpu_executable_run_options) {
342   LocalTopologyProto local_topology;
343   local_topology.set_node_id(node_id);
344   for (const auto& ordinal_and_device : local_device_states) {
345     const se::Platform* platform =
346         ordinal_and_device.second->executor()->platform();
347     TF_ASSIGN_OR_RETURN(
348         std::unique_ptr<xla::se::DeviceDescription> desc,
349         platform->DescriptionForDevice(ordinal_and_device.first));
350     DeviceProto* device_proto = local_topology.add_devices();
351     device_proto->set_local_device_ordinal(ordinal_and_device.first);
352     device_proto->set_name(desc->name());
353     device_proto->set_vendor(desc->device_vendor());
354   }
355 
356   GlobalTopologyProto global_topology;
357   TF_RETURN_IF_ERROR(
358       distributed_client->EnumerateDevices(local_topology, &global_topology));
359 
360   std::map<int, GlobalDeviceId> gpu_device_ids;
361   absl::flat_hash_map<GlobalDeviceId, int> device_to_node;
362   for (const LocalTopologyProto& node : global_topology.nodes()) {
363     for (const DeviceProto& device_proto : node.devices()) {
364       GlobalDeviceId global_device_id(device_proto.global_device_id());
365       device_to_node[global_device_id] = node.node_id();
366       std::unique_ptr<LocalDeviceState> local_device;
367       if (node.node_id() == node_id) {
368         auto it = local_device_states.find(device_proto.local_device_ordinal());
369         TF_RET_CHECK(it != local_device_states.end())
370             << device_proto.local_device_ordinal();
371         TF_RET_CHECK(it->second != nullptr);
372         local_device = std::move(it->second);
373         gpu_device_ids[device_proto.local_device_ordinal()] = global_device_id;
374       }
375       auto device = std::make_unique<GpuDevice>(
376           device_proto.global_device_id(), std::move(local_device),
377           device_proto.name(), device_proto.vendor(), node.node_id());
378       devices->push_back(std::move(device));
379     }
380   }
381   for (const auto& device : local_device_states) {
382     TF_RET_CHECK(device.second == nullptr);
383   }
384   std::vector<GlobalDeviceId> sorted_global_device_ids;
385   sorted_global_device_ids.reserve(gpu_device_ids.size());
386   for (const auto& e : gpu_device_ids) {
387     sorted_global_device_ids.push_back(e.second);
388   }
389   gpu_executable_run_options->set_gpu_global_device_ids(
390       std::move(sorted_global_device_ids));
391 #ifdef GOOGLE_CUDA
392   auto nccl_id_store = std::make_shared<NcclIdStore>(
393       node_id, distributed_client, device_to_node);
394   gpu_executable_run_options->set_nccl_unique_id_callback(
395       [nccl_id_store](const gpu::NcclCliqueKey& key) {
396         return nccl_id_store->GetNcclUniqueId(key);
397       });
398 #endif  // GOOGLE_CUDA
399   return OkStatus();
400 }
401 
402 }  // namespace
403 
GpuDevice(int id,std::unique_ptr<LocalDeviceState> local_device_state,std::string device_kind,std::string device_vendor,int node_id)404 GpuDevice::GpuDevice(int id,
405                      std::unique_ptr<LocalDeviceState> local_device_state,
406                      std::string device_kind, std::string device_vendor,
407                      int node_id)
408     : PjRtStreamExecutorDevice(id, std::move(local_device_state),
409                                std::move(device_kind), node_id),
410       device_vendor_(std::move(device_vendor)) {
411   attributes_ = {
412       {"device_vendor", PjRtDeviceAttribute(device_vendor_)},
413   };
414   to_string_ = absl::StrFormat("GpuDevice(id=%i, process_index=%i)", id,
415                                process_index());
416 }
417 
device_vendor()418 absl::string_view GpuDevice::device_vendor() { return device_vendor_; }
419 
ToString() const420 absl::string_view GpuDevice::ToString() const { return to_string_; }
421 
GetGpuClient(bool asynchronous,const GpuAllocatorConfig & allocator_config,std::shared_ptr<DistributedRuntimeClient> distributed_client,int node_id,const std::optional<std::set<int>> & allowed_devices,std::optional<std::string> platform_name)422 StatusOr<std::unique_ptr<PjRtClient>> GetGpuClient(
423     bool asynchronous, const GpuAllocatorConfig& allocator_config,
424     std::shared_ptr<DistributedRuntimeClient> distributed_client, int node_id,
425     const std::optional<std::set<int>>& allowed_devices,
426     std::optional<std::string> platform_name) {
427   TF_ASSIGN_OR_RETURN(LocalClient * xla_client,
428                       GetGpuXlaClient(platform_name, allowed_devices));
429   std::map<int, std::unique_ptr<LocalDeviceState>> local_device_states;
430   TF_ASSIGN_OR_RETURN(local_device_states,
431                       BuildLocalDeviceStates(xla_client, asynchronous));
432   EnablePeerAccess(xla_client->backend().stream_executors());
433   TF_ASSIGN_OR_RETURN(
434       auto allocator,
435       GetGpuDeviceAllocator(xla_client->platform(), allocator_config,
436                             local_device_states));
437   auto host_memory_allocator =
438       GetGpuHostAllocator(local_device_states.begin()->second->executor());
439 
440   std::vector<std::unique_ptr<PjRtStreamExecutorDevice>> devices;
441   auto gpu_run_options = std::make_unique<gpu::GpuExecutableRunOptions>();
442   if (distributed_client) {
443     TF_RETURN_IF_ERROR(BuildDistributedDevices(
444         std::move(local_device_states), std::move(distributed_client), node_id,
445         &devices, gpu_run_options.get()));
446   } else {
447     devices = BuildLocalDevices(std::move(local_device_states));
448   }
449 
450   return std::unique_ptr<PjRtClient>(std::make_unique<GpuClient>(
451       GpuName(), xla_client, std::move(devices),
452       /*node_id=*/node_id, std::move(allocator),
453       std::move(host_memory_allocator),
454       /*should_stage_host_to_device_transfers=*/true,
455       /*gpu_run_options=*/std::move(gpu_run_options)));
456 }
457 
458 }  // namespace xla
459