• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2021 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.h"
17 
18 #include <stdlib.h>
19 #include <string.h>
20 
21 #include <algorithm>
22 #include <list>
23 #include <map>
24 #include <tuple>
25 #include <utility>
26 #include <vector>
27 
28 #include "tensorflow/core/common_runtime/device/device_id.h"
29 #include "tensorflow/core/common_runtime/device/device_id_manager.h"
30 #include "tensorflow/core/common_runtime/device/device_id_utils.h"
31 #include "tensorflow/core/common_runtime/device_factory.h"
32 #include "tensorflow/core/common_runtime/pluggable_device/pluggable_device.h"
33 #include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_init.h"
34 #include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_process_state.h"
35 #include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_util.h"
36 #include "tensorflow/core/framework/allocator.h"
37 
38 namespace tensorflow {
39 namespace {
40 
MinSystemMemory(int64_t available_memory)41 int64_t MinSystemMemory(int64_t available_memory) {
42   // We use the following heuristic for now:
43   //
44   // If the available_memory is < 2GiB, we allocate 225MiB to system memory,
45   // Otherwise, allocate max(300MiB, kMinSystemMemoryFraction *
46   // available_memory) to system memory.
47   //
48   // In the future we could be more sophisticated by using a table of devices.
49   int64_t min_system_memory;
50   constexpr float kMinSystemMemoryFraction = 0.06;
51   if (available_memory < (1LL << 31)) {
52     // 225MiB
53     min_system_memory = 255 * 1024 * 1024;
54   } else {
55     // max(300 MiB, kMinSystemMemoryFraction * available_memory)
56     min_system_memory = std::max(
57         int64_t{314572800},
58         static_cast<int64_t>(available_memory * kMinSystemMemoryFraction));
59   }
60 #if defined(__GNUC__) && defined(__OPTIMIZE__)
61 // Do nothing
62 #elif !defined(__GNUC__) && defined(NDEBUG)
63 // Do nothing
64 #else
65   // Double the amount of available PluggableDevice memory in non-opt builds
66   // (debug builds in windows); because in non-opt builds more system memory is
67   // necessary.
68   min_system_memory *= 2;
69 #endif
70   VLOG(5) << "available_memory = " << available_memory;
71   VLOG(5) << "min_system_memory = " << min_system_memory;
72   return min_system_memory;
73 }
74 
75 // Get the memory limit for the virtual device being created on PluggableDevice
76 // with 'platform_device_id', when that virtual device is the only
77 // virtual device being created on that PluggableDevice.
SingleVirtualDeviceMemoryLimit(const string & platform_name,const GPUOptions & device_options,PlatformDeviceId platform_device_id,int64_t * memory_limit)78 Status SingleVirtualDeviceMemoryLimit(const string& platform_name,
79                                       const GPUOptions& device_options,
80                                       PlatformDeviceId platform_device_id,
81                                       int64_t* memory_limit) {
82   int64_t total_memory = 0;
83   int64_t available_memory = 0;
84   se::Platform* platform = PluggableDeviceMachineManager(platform_name);
85   se::StreamExecutor* se =
86       DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_device_id)
87           .ValueOrDie();
88   if (!se->DeviceMemoryUsage(&available_memory, &total_memory)) {
89     return errors::Unknown(
90         "Failed to query available memory for PluggableDevice ",
91         platform_device_id.value());
92   }
93 
94   int64_t allocated_memory = 0;
95   const double per_process_device_memory_fraction =
96       device_options.per_process_gpu_memory_fraction();
97   if (per_process_device_memory_fraction > 1.0 ||
98       device_options.experimental().use_unified_memory()) {
99     return errors::Internal("Unified memory is not supported yet.");
100   }
101 
102   if (per_process_device_memory_fraction == 0) {
103     allocated_memory = available_memory;
104     const int64_t min_system_memory = MinSystemMemory(available_memory);
105     if (min_system_memory < allocated_memory) {
106       allocated_memory -= min_system_memory;
107     }
108   } else {
109     allocated_memory = total_memory * per_process_device_memory_fraction;
110   }
111   *memory_limit = allocated_memory;
112   return OkStatus();
113 }
114 }  // namespace
115 
PluggableDeviceFactory(const string & device_type,const string & platform_name)116 PluggableDeviceFactory::PluggableDeviceFactory(const string& device_type,
117                                                const string& platform_name)
118     : device_type_(device_type), platform_name_(platform_name) {}
119 
ListPhysicalDevices(std::vector<string> * devices)120 Status PluggableDeviceFactory::ListPhysicalDevices(
121     std::vector<string>* devices) {
122   TF_RETURN_IF_ERROR(ValidatePluggableDeviceMachineManager(platform_name_));
123   se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
124 
125   int device_count = platform->VisibleDeviceCount();
126   for (int i = 0; i < device_count; ++i) {
127     const string device_name =
128         strings::StrCat("/physical_device:", device_type_, ":", i);
129     devices->push_back(device_name);
130   }
131 
132   return OkStatus();
133 }
134 
GetDeviceDetails(int device_index,std::unordered_map<string,string> * details)135 Status PluggableDeviceFactory::GetDeviceDetails(
136     int device_index, std::unordered_map<string, string>* details) {
137   TF_RETURN_IF_ERROR(ValidatePluggableDeviceMachineManager(platform_name_));
138   se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
139   if (platform == nullptr) {
140     return OkStatus();
141   }
142 
143   int device_count = platform->VisibleDeviceCount();
144   if (device_index < 0 || device_index >= device_count) {
145     return errors::Internal("Invalid device index: ", device_index);
146   }
147 
148   auto desc_status = platform->DescriptionForDevice(device_index);
149   if (!desc_status.ok()) {
150     return desc_status.status();
151   }
152 
153   auto desc = std::move(desc_status).value();
154   (*details)["device_name"] = desc->name();
155   return OkStatus();
156 }
157 
CreateDevices(const SessionOptions & options,const string & name_prefix,std::vector<std::unique_ptr<Device>> * devices)158 Status PluggableDeviceFactory::CreateDevices(
159     const SessionOptions& options, const string& name_prefix,
160     std::vector<std::unique_ptr<Device>>* devices) {
161   TF_RETURN_IF_ERROR(ValidatePluggableDeviceMachineManager(platform_name_));
162   se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
163   if (platform == nullptr) {
164     return OkStatus();
165   }
166 
167   if (platform->VisibleDeviceCount() <= 0) {
168     return OkStatus();
169   }
170 
171   size_t num_tf_devices = INT_MAX;
172   auto iter = options.config.device_count().find(device_type_);
173   if (iter != options.config.device_count().end()) {
174     num_tf_devices = iter->second;
175   }
176   const auto& device_options = options.config.gpu_options();
177   std::vector<PlatformDeviceId> visible_device_order;
178 
179   if (num_tf_devices > 0) {
180     TF_RETURN_IF_ERROR(DeviceIdUtil::ParseVisibleDeviceList(
181         device_options.visible_device_list(), platform->VisibleDeviceCount(),
182         &visible_device_order));
183   }
184   if (num_tf_devices > visible_device_order.size()) {
185     num_tf_devices = visible_device_order.size();
186   }
187 
188   const auto& virtual_devices = device_options.experimental().virtual_devices();
189   if (!virtual_devices.empty())
190     VLOG(2) << "Pluggable device does not support virtual device setting yet";
191   std::vector<int64_t> memory_limit_bytes;
192   for (int i = 0; i < num_tf_devices; ++i) {
193     const PlatformDeviceId platform_device_id = visible_device_order[i];
194     int64_t single_virtual_device_memory_limit = 0;
195     TF_RETURN_IF_ERROR(SingleVirtualDeviceMemoryLimit(
196         platform_name_, device_options, platform_device_id,
197         &single_virtual_device_memory_limit));
198     memory_limit_bytes.push_back(single_virtual_device_memory_limit);
199     TfDeviceId tf_device_id(i);
200     TF_RETURN_IF_ERROR(DeviceIdManager::InsertTfPlatformDeviceIdPair(
201         DeviceType(device_type_), tf_device_id, platform_device_id));
202   }
203 
204   std::vector<DeviceLocality> device_localities;
205   TF_RETURN_IF_ERROR(GetDeviceLocalities(num_tf_devices, &device_localities));
206 
207   // Build the PluggableDevices.
208   for (int di = 0; di < num_tf_devices; ++di) {
209     TfDeviceId tf_device_id(di);
210     int64_t bytes = memory_limit_bytes[di];
211     TF_RETURN_IF_ERROR(CreatePluggableDevice(options, name_prefix, tf_device_id,
212                                              bytes, device_localities[di],
213                                              devices));
214   }
215   return OkStatus();
216 }
217 
GetShortDeviceDescription(PlatformDeviceId platform_device_id,const se::DeviceDescription & desc)218 static string GetShortDeviceDescription(PlatformDeviceId platform_device_id,
219                                         const se::DeviceDescription& desc) {
220   return strings::StrCat("device: ", platform_device_id.value(),
221                          ", name: ", desc.name(),
222                          ", pci bus id: ", desc.pci_bus_id());
223 }
224 
CreatePluggableDevice(const SessionOptions & options,const string & name_prefix,TfDeviceId tf_device_id,int64_t memory_limit,const DeviceLocality & dev_locality,std::vector<std::unique_ptr<Device>> * devices)225 Status PluggableDeviceFactory::CreatePluggableDevice(
226     const SessionOptions& options, const string& name_prefix,
227     TfDeviceId tf_device_id, int64_t memory_limit,
228     const DeviceLocality& dev_locality,
229     std::vector<std::unique_ptr<Device>>* devices) {
230   DCHECK_GE(tf_device_id.value(), 0);
231   const string device_name = strings::StrCat(
232       name_prefix, "/device:", device_type_, ":", tf_device_id.value());
233 
234   se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
235   DeviceIdUtil::CheckValidTfDeviceId(DeviceType(device_type_), platform,
236                                      tf_device_id);
237   PlatformDeviceId platform_device_id;
238   TF_RETURN_IF_ERROR(DeviceIdManager::TfToPlatformDeviceId(
239       DeviceType(device_type_), tf_device_id, &platform_device_id));
240   int numa_node = dev_locality.numa_node();
241 
242   auto desc_status = platform->DescriptionForDevice(platform_device_id.value());
243   if (!desc_status.ok()) {
244     return desc_status.status();
245   }
246   auto desc = std::move(desc_status).value();
247   PluggableDeviceProcessState* process_state =
248       PluggableDeviceProcessState::singleton(device_type_, platform_name_);
249   Allocator* device_allocator = process_state->GetPluggableDeviceAllocator(
250       options.config.gpu_options(), tf_device_id, memory_limit);
251   if (device_allocator == nullptr) {
252     return errors::Internal(
253         "Failed to get memory allocator for TF PluggableDevice ",
254         tf_device_id.value(), " with", memory_limit, " bytes of memory. ");
255   }
256   absl::optional<AllocatorStats> stats = device_allocator->GetStats();
257   if (!stats) {
258     return errors::Internal("No allocator statistics");
259   }
260   // 'memory_limit' is the required memory size, but if the allocator with
261   // given 'tf_device_id' was created before, we'll use it instead of creating
262   // a new one (as TF Device is a shared resource), in which case the actual
263   // memory limit represented by 'stats.bytes_limit' used by that allocator
264   // may be different (which should be an error).
265   int64_t bytes_limit = stats->bytes_limit ? *stats->bytes_limit : 0;
266   auto pluggable_device = absl::make_unique<PluggableDevice>(
267       options, device_name, device_type_, platform_name_,
268       static_cast<Bytes>(bytes_limit), dev_locality, tf_device_id,
269       GetShortDeviceDescription(platform_device_id, *desc), device_allocator,
270       ProcessState::singleton()->GetCPUAllocator(numa_node),
271       false /*sync every op*/);
272   LOG(INFO) << "Created TensorFlow device (" << device_name << " with "
273             << (bytes_limit >> 20)
274             << " MB memory) -> physical PluggableDevice ("
275             << GetShortDeviceDescription(platform_device_id, *desc) << ")";
276   TF_RETURN_IF_ERROR(pluggable_device->Init(options));
277   devices->push_back(std::move(pluggable_device));
278   return OkStatus();
279 }
280 
GetDeviceLocalities(int num_tf_devices,std::vector<DeviceLocality> * device_localities)281 Status PluggableDeviceFactory::GetDeviceLocalities(
282     int num_tf_devices, std::vector<DeviceLocality>* device_localities) {
283   for (int i = 0; i < num_tf_devices; ++i) {
284     TfDeviceId tf_device_id(i);
285     PlatformDeviceId platform_device_id;
286     TF_RETURN_IF_ERROR(DeviceIdManager::TfToPlatformDeviceId(
287         DeviceType(device_type_), tf_device_id, &platform_device_id));
288     // Get PluggableDevice bus_id from its reported NUMA affinity. Because
289     // devices are virtualized in some environment, we can't just use the device
290     // id. NUMA locales are indexed from 0, buses are indexed from 1.
291     se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
292     auto desc_status =
293         platform->DescriptionForDevice(platform_device_id.value());
294     if (!desc_status.ok()) {
295       return desc_status.status();
296     }
297     auto desc = std::move(desc_status).value();
298     int numa_node = desc->numa_node();
299     if (numa_node < 0) {
300       // For some reason the StreamExecutor couldn't get the NUMA
301       // affinity of the device. If this is not a multi-socket mobo with
302       // devices local to different buses, it doesn't matter. If it is,
303       // we may run into trouble later with data transfer operations.
304       // The trouble may manifest as slower than expected performance,
305       // or outright failures.
306       LOG(INFO) << "Could not identify NUMA node of platform " << device_type_
307                 << " ID " << platform_device_id
308                 << ", defaulting to 0. Your kernel may not have been built "
309                 << "with NUMA support.";
310       numa_node = 0;
311     }
312     DeviceLocality dev_locality;
313     dev_locality.set_numa_node(numa_node);
314     dev_locality.set_bus_id(numa_node + 1);
315     device_localities->push_back(dev_locality);
316     VLOG(1) << "PluggableDevice PlatformDeviceId " << platform_device_id
317             << " TfDeviceId " << tf_device_id << " on bus "
318             << dev_locality.bus_id() << " numa: " << numa_node
319             << "DeviceLocality: " << dev_locality.DebugString();
320   }
321   return OkStatus();
322 }
323 
324 }  // namespace tensorflow
325