1 /* Copyright 2021 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.h"
17
18 #include <stdlib.h>
19 #include <string.h>
20
21 #include <algorithm>
22 #include <list>
23 #include <map>
24 #include <tuple>
25 #include <utility>
26 #include <vector>
27
28 #include "tensorflow/core/common_runtime/device/device_id.h"
29 #include "tensorflow/core/common_runtime/device/device_id_manager.h"
30 #include "tensorflow/core/common_runtime/device/device_id_utils.h"
31 #include "tensorflow/core/common_runtime/device_factory.h"
32 #include "tensorflow/core/common_runtime/pluggable_device/pluggable_device.h"
33 #include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_init.h"
34 #include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_process_state.h"
35 #include "tensorflow/core/common_runtime/pluggable_device/pluggable_device_util.h"
36 #include "tensorflow/core/framework/allocator.h"
37
38 namespace tensorflow {
39 namespace {
40
MinSystemMemory(int64_t available_memory)41 int64_t MinSystemMemory(int64_t available_memory) {
42 // We use the following heuristic for now:
43 //
44 // If the available_memory is < 2GiB, we allocate 225MiB to system memory,
45 // Otherwise, allocate max(300MiB, kMinSystemMemoryFraction *
46 // available_memory) to system memory.
47 //
48 // In the future we could be more sophisticated by using a table of devices.
49 int64_t min_system_memory;
50 constexpr float kMinSystemMemoryFraction = 0.06;
51 if (available_memory < (1LL << 31)) {
52 // 225MiB
53 min_system_memory = 255 * 1024 * 1024;
54 } else {
55 // max(300 MiB, kMinSystemMemoryFraction * available_memory)
56 min_system_memory = std::max(
57 int64_t{314572800},
58 static_cast<int64_t>(available_memory * kMinSystemMemoryFraction));
59 }
60 #if defined(__GNUC__) && defined(__OPTIMIZE__)
61 // Do nothing
62 #elif !defined(__GNUC__) && defined(NDEBUG)
63 // Do nothing
64 #else
65 // Double the amount of available PluggableDevice memory in non-opt builds
66 // (debug builds in windows); because in non-opt builds more system memory is
67 // necessary.
68 min_system_memory *= 2;
69 #endif
70 VLOG(5) << "available_memory = " << available_memory;
71 VLOG(5) << "min_system_memory = " << min_system_memory;
72 return min_system_memory;
73 }
74
75 // Get the memory limit for the virtual device being created on PluggableDevice
76 // with 'platform_device_id', when that virtual device is the only
77 // virtual device being created on that PluggableDevice.
SingleVirtualDeviceMemoryLimit(const string & platform_name,const GPUOptions & device_options,PlatformDeviceId platform_device_id,int64_t * memory_limit)78 Status SingleVirtualDeviceMemoryLimit(const string& platform_name,
79 const GPUOptions& device_options,
80 PlatformDeviceId platform_device_id,
81 int64_t* memory_limit) {
82 int64_t total_memory = 0;
83 int64_t available_memory = 0;
84 se::Platform* platform = PluggableDeviceMachineManager(platform_name);
85 se::StreamExecutor* se =
86 DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_device_id)
87 .ValueOrDie();
88 if (!se->DeviceMemoryUsage(&available_memory, &total_memory)) {
89 return errors::Unknown(
90 "Failed to query available memory for PluggableDevice ",
91 platform_device_id.value());
92 }
93
94 int64_t allocated_memory = 0;
95 const double per_process_device_memory_fraction =
96 device_options.per_process_gpu_memory_fraction();
97 if (per_process_device_memory_fraction > 1.0 ||
98 device_options.experimental().use_unified_memory()) {
99 return errors::Internal("Unified memory is not supported yet.");
100 }
101
102 if (per_process_device_memory_fraction == 0) {
103 allocated_memory = available_memory;
104 const int64_t min_system_memory = MinSystemMemory(available_memory);
105 if (min_system_memory < allocated_memory) {
106 allocated_memory -= min_system_memory;
107 }
108 } else {
109 allocated_memory = total_memory * per_process_device_memory_fraction;
110 }
111 *memory_limit = allocated_memory;
112 return OkStatus();
113 }
114 } // namespace
115
PluggableDeviceFactory(const string & device_type,const string & platform_name)116 PluggableDeviceFactory::PluggableDeviceFactory(const string& device_type,
117 const string& platform_name)
118 : device_type_(device_type), platform_name_(platform_name) {}
119
ListPhysicalDevices(std::vector<string> * devices)120 Status PluggableDeviceFactory::ListPhysicalDevices(
121 std::vector<string>* devices) {
122 TF_RETURN_IF_ERROR(ValidatePluggableDeviceMachineManager(platform_name_));
123 se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
124
125 int device_count = platform->VisibleDeviceCount();
126 for (int i = 0; i < device_count; ++i) {
127 const string device_name =
128 strings::StrCat("/physical_device:", device_type_, ":", i);
129 devices->push_back(device_name);
130 }
131
132 return OkStatus();
133 }
134
GetDeviceDetails(int device_index,std::unordered_map<string,string> * details)135 Status PluggableDeviceFactory::GetDeviceDetails(
136 int device_index, std::unordered_map<string, string>* details) {
137 TF_RETURN_IF_ERROR(ValidatePluggableDeviceMachineManager(platform_name_));
138 se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
139 if (platform == nullptr) {
140 return OkStatus();
141 }
142
143 int device_count = platform->VisibleDeviceCount();
144 if (device_index < 0 || device_index >= device_count) {
145 return errors::Internal("Invalid device index: ", device_index);
146 }
147
148 auto desc_status = platform->DescriptionForDevice(device_index);
149 if (!desc_status.ok()) {
150 return desc_status.status();
151 }
152
153 auto desc = std::move(desc_status).value();
154 (*details)["device_name"] = desc->name();
155 return OkStatus();
156 }
157
CreateDevices(const SessionOptions & options,const string & name_prefix,std::vector<std::unique_ptr<Device>> * devices)158 Status PluggableDeviceFactory::CreateDevices(
159 const SessionOptions& options, const string& name_prefix,
160 std::vector<std::unique_ptr<Device>>* devices) {
161 TF_RETURN_IF_ERROR(ValidatePluggableDeviceMachineManager(platform_name_));
162 se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
163 if (platform == nullptr) {
164 return OkStatus();
165 }
166
167 if (platform->VisibleDeviceCount() <= 0) {
168 return OkStatus();
169 }
170
171 size_t num_tf_devices = INT_MAX;
172 auto iter = options.config.device_count().find(device_type_);
173 if (iter != options.config.device_count().end()) {
174 num_tf_devices = iter->second;
175 }
176 const auto& device_options = options.config.gpu_options();
177 std::vector<PlatformDeviceId> visible_device_order;
178
179 if (num_tf_devices > 0) {
180 TF_RETURN_IF_ERROR(DeviceIdUtil::ParseVisibleDeviceList(
181 device_options.visible_device_list(), platform->VisibleDeviceCount(),
182 &visible_device_order));
183 }
184 if (num_tf_devices > visible_device_order.size()) {
185 num_tf_devices = visible_device_order.size();
186 }
187
188 const auto& virtual_devices = device_options.experimental().virtual_devices();
189 if (!virtual_devices.empty())
190 VLOG(2) << "Pluggable device does not support virtual device setting yet";
191 std::vector<int64_t> memory_limit_bytes;
192 for (int i = 0; i < num_tf_devices; ++i) {
193 const PlatformDeviceId platform_device_id = visible_device_order[i];
194 int64_t single_virtual_device_memory_limit = 0;
195 TF_RETURN_IF_ERROR(SingleVirtualDeviceMemoryLimit(
196 platform_name_, device_options, platform_device_id,
197 &single_virtual_device_memory_limit));
198 memory_limit_bytes.push_back(single_virtual_device_memory_limit);
199 TfDeviceId tf_device_id(i);
200 TF_RETURN_IF_ERROR(DeviceIdManager::InsertTfPlatformDeviceIdPair(
201 DeviceType(device_type_), tf_device_id, platform_device_id));
202 }
203
204 std::vector<DeviceLocality> device_localities;
205 TF_RETURN_IF_ERROR(GetDeviceLocalities(num_tf_devices, &device_localities));
206
207 // Build the PluggableDevices.
208 for (int di = 0; di < num_tf_devices; ++di) {
209 TfDeviceId tf_device_id(di);
210 int64_t bytes = memory_limit_bytes[di];
211 TF_RETURN_IF_ERROR(CreatePluggableDevice(options, name_prefix, tf_device_id,
212 bytes, device_localities[di],
213 devices));
214 }
215 return OkStatus();
216 }
217
GetShortDeviceDescription(PlatformDeviceId platform_device_id,const se::DeviceDescription & desc)218 static string GetShortDeviceDescription(PlatformDeviceId platform_device_id,
219 const se::DeviceDescription& desc) {
220 return strings::StrCat("device: ", platform_device_id.value(),
221 ", name: ", desc.name(),
222 ", pci bus id: ", desc.pci_bus_id());
223 }
224
CreatePluggableDevice(const SessionOptions & options,const string & name_prefix,TfDeviceId tf_device_id,int64_t memory_limit,const DeviceLocality & dev_locality,std::vector<std::unique_ptr<Device>> * devices)225 Status PluggableDeviceFactory::CreatePluggableDevice(
226 const SessionOptions& options, const string& name_prefix,
227 TfDeviceId tf_device_id, int64_t memory_limit,
228 const DeviceLocality& dev_locality,
229 std::vector<std::unique_ptr<Device>>* devices) {
230 DCHECK_GE(tf_device_id.value(), 0);
231 const string device_name = strings::StrCat(
232 name_prefix, "/device:", device_type_, ":", tf_device_id.value());
233
234 se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
235 DeviceIdUtil::CheckValidTfDeviceId(DeviceType(device_type_), platform,
236 tf_device_id);
237 PlatformDeviceId platform_device_id;
238 TF_RETURN_IF_ERROR(DeviceIdManager::TfToPlatformDeviceId(
239 DeviceType(device_type_), tf_device_id, &platform_device_id));
240 int numa_node = dev_locality.numa_node();
241
242 auto desc_status = platform->DescriptionForDevice(platform_device_id.value());
243 if (!desc_status.ok()) {
244 return desc_status.status();
245 }
246 auto desc = std::move(desc_status).value();
247 PluggableDeviceProcessState* process_state =
248 PluggableDeviceProcessState::singleton(device_type_, platform_name_);
249 Allocator* device_allocator = process_state->GetPluggableDeviceAllocator(
250 options.config.gpu_options(), tf_device_id, memory_limit);
251 if (device_allocator == nullptr) {
252 return errors::Internal(
253 "Failed to get memory allocator for TF PluggableDevice ",
254 tf_device_id.value(), " with", memory_limit, " bytes of memory. ");
255 }
256 absl::optional<AllocatorStats> stats = device_allocator->GetStats();
257 if (!stats) {
258 return errors::Internal("No allocator statistics");
259 }
260 // 'memory_limit' is the required memory size, but if the allocator with
261 // given 'tf_device_id' was created before, we'll use it instead of creating
262 // a new one (as TF Device is a shared resource), in which case the actual
263 // memory limit represented by 'stats.bytes_limit' used by that allocator
264 // may be different (which should be an error).
265 int64_t bytes_limit = stats->bytes_limit ? *stats->bytes_limit : 0;
266 auto pluggable_device = absl::make_unique<PluggableDevice>(
267 options, device_name, device_type_, platform_name_,
268 static_cast<Bytes>(bytes_limit), dev_locality, tf_device_id,
269 GetShortDeviceDescription(platform_device_id, *desc), device_allocator,
270 ProcessState::singleton()->GetCPUAllocator(numa_node),
271 false /*sync every op*/);
272 LOG(INFO) << "Created TensorFlow device (" << device_name << " with "
273 << (bytes_limit >> 20)
274 << " MB memory) -> physical PluggableDevice ("
275 << GetShortDeviceDescription(platform_device_id, *desc) << ")";
276 TF_RETURN_IF_ERROR(pluggable_device->Init(options));
277 devices->push_back(std::move(pluggable_device));
278 return OkStatus();
279 }
280
GetDeviceLocalities(int num_tf_devices,std::vector<DeviceLocality> * device_localities)281 Status PluggableDeviceFactory::GetDeviceLocalities(
282 int num_tf_devices, std::vector<DeviceLocality>* device_localities) {
283 for (int i = 0; i < num_tf_devices; ++i) {
284 TfDeviceId tf_device_id(i);
285 PlatformDeviceId platform_device_id;
286 TF_RETURN_IF_ERROR(DeviceIdManager::TfToPlatformDeviceId(
287 DeviceType(device_type_), tf_device_id, &platform_device_id));
288 // Get PluggableDevice bus_id from its reported NUMA affinity. Because
289 // devices are virtualized in some environment, we can't just use the device
290 // id. NUMA locales are indexed from 0, buses are indexed from 1.
291 se::Platform* platform = PluggableDeviceMachineManager(platform_name_);
292 auto desc_status =
293 platform->DescriptionForDevice(platform_device_id.value());
294 if (!desc_status.ok()) {
295 return desc_status.status();
296 }
297 auto desc = std::move(desc_status).value();
298 int numa_node = desc->numa_node();
299 if (numa_node < 0) {
300 // For some reason the StreamExecutor couldn't get the NUMA
301 // affinity of the device. If this is not a multi-socket mobo with
302 // devices local to different buses, it doesn't matter. If it is,
303 // we may run into trouble later with data transfer operations.
304 // The trouble may manifest as slower than expected performance,
305 // or outright failures.
306 LOG(INFO) << "Could not identify NUMA node of platform " << device_type_
307 << " ID " << platform_device_id
308 << ", defaulting to 0. Your kernel may not have been built "
309 << "with NUMA support.";
310 numa_node = 0;
311 }
312 DeviceLocality dev_locality;
313 dev_locality.set_numa_node(numa_node);
314 dev_locality.set_bus_id(numa_node + 1);
315 device_localities->push_back(dev_locality);
316 VLOG(1) << "PluggableDevice PlatformDeviceId " << platform_device_id
317 << " TfDeviceId " << tf_device_id << " on bus "
318 << dev_locality.bus_id() << " numa: " << numa_node
319 << "DeviceLocality: " << dev_locality.DebugString();
320 }
321 return OkStatus();
322 }
323
324 } // namespace tensorflow
325