1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/grappler/clusters/single_machine.h"
17
18 #include <atomic>
19 #include <memory>
20
21 #include "tensorflow/cc/training/queue_runner.h"
22 #include "tensorflow/core/common_runtime/device.h"
23 #include "tensorflow/core/common_runtime/device_mgr.h"
24 #include "tensorflow/core/common_runtime/gpu/gpu_id.h"
25 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h"
26 #include "tensorflow/core/grappler/clusters/utils.h"
27 #include "tensorflow/core/grappler/utils.h"
28 #include "tensorflow/core/kernels/ops_util.h"
29 #include "tensorflow/core/lib/core/errors.h"
30 #include "tensorflow/core/lib/strings/strcat.h"
31 #include "tensorflow/core/platform/mutex.h"
32 #include "tensorflow/core/platform/notification.h"
33 #include "tensorflow/core/platform/types.h"
34 #include "tensorflow/core/public/session.h"
35
36 namespace tensorflow {
37 namespace grappler {
38
39 static std::atomic<bool> already_provisioned(false);
40
SingleMachine(int timeout_s,int num_cpu_cores,int num_gpus)41 SingleMachine::SingleMachine(int timeout_s, int num_cpu_cores, int num_gpus)
42 : Cluster(timeout_s), expected_init_time_s_(0), closing_(false) {
43 VLOG(1) << "Number of CPU cores: " << num_cpu_cores
44 << " Number of GPUs: " << num_gpus;
45 thread_pool_.reset(new thread::ThreadPool(
46 Env::Default(), SanitizeThreadSuffix("single_machine"), 2));
47
48 (*options_.config.mutable_device_count())["CPU"] = 1;
49 if (num_gpus > 0) {
50 (*options_.config.mutable_device_count())["GPU"] = num_gpus;
51 }
52 CHECK_GE(num_cpu_cores, 1);
53 options_.config.set_intra_op_parallelism_threads(num_cpu_cores);
54 // Create a session specific thread pool to ensure the threads are reset when
55 // the session is reset.
56 options_.config.add_session_inter_op_thread_pool()->set_num_threads(
57 num_cpu_cores);
58 if (timeout_s > 0) {
59 options_.config.set_operation_timeout_in_ms(timeout_s * 1000);
60 }
61 }
62
~SingleMachine()63 SingleMachine::~SingleMachine() {
64 CloseSession(false /*use_timeout*/).IgnoreError();
65
66 // Reset the thread-pool so that there are no outstanding Session::Run(...)s
67 // when we delete the session.
68 thread_pool_.reset();
69 }
70
Provision()71 Status SingleMachine::Provision() {
72 // This is really ugly: to avoid leaking variables, we need to reset the tf
73 // session every time we're done processing a grappler item. However,
74 // variables are global, and therefore we can't have more than 1 session alive
75 // at a time. This check detects when more that one cluster is provisioned.
76 if (already_provisioned) {
77 return errors::Unavailable(
78 "Can't provision more than one single cluster at a time");
79 }
80
81 TF_RETURN_IF_ERROR(ResetSession());
82
83 std::vector<DeviceAttributes> devices;
84 TF_RETURN_IF_ERROR(session_->ListDevices(&devices));
85 for (const auto& dev : devices) {
86 DeviceProperties attr;
87 if (dev.device_type() == "CPU") {
88 attr = GetLocalCPUInfo();
89 } else if (dev.device_type() == "GPU") {
90 DeviceNameUtils::ParsedName parsed;
91 if (!DeviceNameUtils::ParseFullName(dev.name(), &parsed)) {
92 return errors::InvalidArgument(
93 strings::StrCat("Not able to parse GPU device name: ", dev.name()));
94 }
95 TfGpuId tf_gpu_id(parsed.id);
96 PlatformGpuId platform_gpu_id;
97 Status s = GpuIdManager::TfToPlatformGpuId(tf_gpu_id, &platform_gpu_id);
98 if (!s.ok()) {
99 return errors::Unavailable("Unknown TF GPU device with id ",
100 tf_gpu_id.value(), ": ", s.ToString());
101 }
102 attr = GetLocalGPUInfo(platform_gpu_id);
103 } else if (dev.device_type().find("XLA") == string::npos) {
104 // Filter out the fake XLA devices to avoid double counting the actual
105 // hardware resources that are available.
106 attr.set_type(dev.device_type());
107 }
108 // Overwrite the memory size since users might have requested to use only a
109 // fraction of the available device memory.
110 attr.set_memory_size(dev.memory_limit());
111 devices_[dev.name()] = attr;
112 }
113 already_provisioned = true;
114
115 // Clear highmark stats of all local allocators.
116 if (cpu_allocator_stats_enabled_) {
117 TF_RETURN_IF_ERROR(ClearAllocatorStats());
118 }
119 return Status::OK();
120 }
121
Initialize(const GrapplerItem & item)122 Status SingleMachine::Initialize(const GrapplerItem& item) {
123 mutex_lock l(this->last_graph_mu_);
124 if (last_graph_ != &item.graph || last_graph_id_ != item.id) {
125 init_ops_ = item.init_ops;
126 expected_init_time_s_ = item.expected_init_time;
127 last_graph_ = nullptr;
128 queue_runner_defs_ = item.queue_runners;
129 last_graph_id_ = item.id;
130 }
131 return Status::OK();
132 }
133
Shutdown()134 Status SingleMachine::Shutdown() {
135 TF_RETURN_IF_ERROR(ShutdownSession());
136
137 mutex_lock l(this->last_graph_mu_);
138 last_graph_ = nullptr;
139 already_provisioned = false;
140
141 return Status::OK();
142 }
143
Run(const GraphDef & graph_def,const std::vector<std::pair<string,Tensor>> & feed,const std::vector<string> & fetch,RunMetadata * metadata)144 Status SingleMachine::Run(const GraphDef& graph_def,
145 const std::vector<std::pair<string, Tensor>>& feed,
146 const std::vector<string>& fetch,
147 RunMetadata* metadata) {
148 mutex_lock l(this->last_graph_mu_);
149 if (last_graph_ != &graph_def) {
150 TF_RETURN_IF_ERROR(ResetSession());
151 TF_RETURN_IF_ERROR(session_->Create(graph_def));
152 if (!init_ops_.empty()) {
153 init_metadata_ = RunMetadata();
154 int64 timeout_s = timeout_s_ + expected_init_time_s_;
155 TF_RETURN_IF_ERROR(
156 RunWithTimeout({}, init_ops_, &init_metadata_, timeout_s));
157 // The compute cost for init ops is likely to be pessimistic since init
158 // ops are run only once before warmup. Therefore we only keep their
159 // memory costs.
160 for (auto node : *init_metadata_.mutable_cost_graph()->mutable_node()) {
161 node.clear_compute_cost();
162 }
163 // Also clear the timeline to save memory
164 init_metadata_.clear_step_stats();
165 }
166 // We can have at most one hardware trace. Use it for the main graph, and
167 // downgrade tracing of the queue runners to a software trace.
168 RunOptions queue_options = run_options_;
169 if (queue_options.trace_level() >= RunOptions::HARDWARE_TRACE) {
170 queue_options.set_trace_level(RunOptions::SOFTWARE_TRACE);
171 }
172 for (size_t i = 0; i < queue_runner_defs_.size(); ++i) {
173 std::unique_ptr<QueueRunner> queue_runner;
174 TF_RETURN_IF_ERROR(QueueRunner::New(queue_runner_defs_[i],
175 coordinator_.get(), &queue_runner));
176
177 TF_RETURN_IF_ERROR(queue_runner->StartAndCollectCostGraph(session_.get(),
178 queue_options));
179 TF_RETURN_IF_ERROR(coordinator_->RegisterRunner(std::move(queue_runner)));
180 TF_RETURN_IF_ERROR(coordinator_->GetStatus());
181 }
182
183 // Warmup TensorFlow if needed
184 for (int i = 0; i < NumWarmupSteps(); ++i) {
185 TF_RETURN_IF_ERROR(RunWithTimeout(feed, fetch, nullptr));
186 }
187 }
188
189 if (metadata) {
190 TF_RETURN_IF_ERROR(RunWithTimeout(feed, fetch, metadata));
191 // Merge the costs of the initialization and the queue runners.
192 CostGraphDef queue_costs;
193 TF_RETURN_IF_ERROR(coordinator_->ExportCostGraph(&queue_costs));
194 MergeCosts(metadata->mutable_cost_graph(), init_metadata_.cost_graph(),
195 queue_costs);
196 } else {
197 TF_RETURN_IF_ERROR(RunWithTimeout(feed, fetch, nullptr));
198 }
199
200 last_graph_ = &graph_def;
201
202 return Status::OK();
203 }
204
EnablePeakMemoryStats()205 Status SingleMachine::EnablePeakMemoryStats() {
206 EnableCPUAllocatorStats();
207 cpu_allocator_stats_enabled_ = true;
208 // No need to enable GPU allocator stats since its stats are always collected.
209 return Status::OK();
210 }
211
GetPeakMemoryUsage(std::unordered_map<string,uint64> * device_peak_memory) const212 Status SingleMachine::GetPeakMemoryUsage(
213 std::unordered_map<string, uint64>* device_peak_memory) const {
214 // Cpu_allocator->TracksAllocationSizes() returns true doesn't always mean the
215 // the AllocatorStats would be collected.
216 if (!cpu_allocator_stats_enabled_) {
217 return Status(error::INVALID_ARGUMENT,
218 "Tracking allocation for CPU is not enabled.");
219 }
220
221 const DeviceMgr* device_mgr;
222 TF_RETURN_IF_ERROR(session_->LocalDeviceManager(&device_mgr));
223 std::vector<Device*> devices = device_mgr->ListDevices();
224
225 device_peak_memory->clear();
226 for (Device* device : devices) {
227 auto* allocator = device->GetAllocator(AllocatorAttributes());
228 if (!allocator->TracksAllocationSizes()) {
229 return Status(error::INVALID_ARGUMENT,
230 "Tracking allocation is not enabled.");
231 }
232 absl::optional<AllocatorStats> stats = allocator->GetStats();
233 (*device_peak_memory)[device->name()] =
234 (stats ? stats->peak_bytes_in_use : 0);
235 }
236
237 return Status::OK();
238 }
239
RunWithTimeout(const std::vector<std::pair<string,Tensor>> & feed,const std::vector<string> & fetch,RunMetadata * run_metadata)240 Status SingleMachine::RunWithTimeout(
241 const std::vector<std::pair<string, Tensor>>& feed,
242 const std::vector<string>& fetch, RunMetadata* run_metadata) {
243 return RunWithTimeout(feed, fetch, run_metadata, timeout_s_);
244 }
245
RunWithTimeout(const std::vector<std::pair<string,Tensor>> & feed,const std::vector<string> & fetch,RunMetadata * run_metadata,int64 timeout_s)246 Status SingleMachine::RunWithTimeout(
247 const std::vector<std::pair<string, Tensor>>& feed,
248 const std::vector<string>& fetch, RunMetadata* run_metadata,
249 int64 timeout_s) {
250 // We shouldn't be running or closing the session at this point.
251 {
252 mutex_lock l(close_mu_);
253 CHECK(!closing_);
254 }
255
256 auto status = std::make_shared<Status>();
257 auto local_metadata = std::make_shared<RunMetadata>();
258 const bool executed_in_time = ExecuteWithTimeout(
259 [this, status, local_metadata, feed, fetch]() {
260 *status = session_->Run(run_options_, feed, {}, fetch, nullptr,
261 local_metadata.get());
262 },
263 timeout_s * 1000, thread_pool_.get());
264 if (!executed_in_time) {
265 return errors::DeadlineExceeded("Failed to run the graph after ", timeout_s,
266 " seconds, aborting");
267 } else if (run_metadata && status->ok()) {
268 *run_metadata = *local_metadata;
269 }
270 return *status;
271 }
272
CloseSession(bool use_timeout)273 Status SingleMachine::CloseSession(bool use_timeout) {
274 if (!session_ || !thread_pool_) {
275 return Status::OK();
276 }
277
278 {
279 mutex_lock l(close_mu_);
280
281 if (!closing_) {
282 closing_ = true;
283 }
284 }
285
286 const bool executed_in_time = ExecuteWithTimeout(
287 [&]() {
288 if (this->coordinator_) {
289 this->coordinator_->RequestStop().IgnoreError();
290 // Wait for all the runners to have closed their queues.
291 while (!this->coordinator_->AllRunnersStopped()) {
292 sleep(1);
293 }
294 // Now we can close the session. This should cancel any pending I/O
295 // operation.
296 this->session_->Close().IgnoreError();
297 // Last but not least, we can delete the coordinator.
298 this->coordinator_.reset();
299 } else {
300 this->session_->Close().IgnoreError();
301 }
302
303 mutex_lock l2(close_mu_);
304 closing_ = false;
305 },
306 use_timeout ? timeout_s_ * 1000 : -1, thread_pool_.get());
307
308 if (!executed_in_time) {
309 // Let the caller know that we can't shutdown the session, and therefore
310 // can't process any further.
311 return errors::Unavailable("Failed to close the previous session after ",
312 timeout_s_, " seconds, aborting");
313 }
314
315 return Status::OK();
316 }
317
ShutdownSession()318 Status SingleMachine::ShutdownSession() {
319 TF_RETURN_IF_ERROR(CloseSession(true /*use_timeout*/));
320
321 // Delete the threadpool: this ensures that all the pending closures complete
322 // before we return. Note that if TF deadlocked on us, the closures will
323 // never complete, and the call to thread_pool_.reset() will never return:
324 // therefore we need to delete the threadpool with the background thread.
325 // That thread itself will also never complete, so the user should
326 // abort the process to avoid leaking too many resources.
327 auto n = std::make_shared<Notification>();
328 Env::Default()->SchedClosure([this, n]() {
329 thread_pool_.reset();
330 n->Notify();
331 });
332 int64 timeout_us = 1000000ll * timeout_s_;
333 const bool notified = WaitForNotificationWithTimeout(n.get(), timeout_us);
334 if (!notified) {
335 // Let the caller know that we can't shutdown the session properly since
336 // there are calls to Session::Run() still running.
337 return errors::Unavailable("The session is still running graphs after ",
338 timeout_s_, " seconds");
339 }
340
341 return Status::OK();
342 }
343
ResetSession()344 Status SingleMachine::ResetSession() {
345 if (session_) {
346 LOG(INFO) << "Cleaning up previous session";
347
348 // Make sure the session is properly closed
349 TF_RETURN_IF_ERROR(ShutdownSession());
350
351 // Destroying the object deletes all its variables as well. This is only
352 // true for DirectSession.
353 session_.reset();
354 }
355
356 LOG(INFO) << "Starting new session";
357
358 // Create a new threadpool
359 thread_pool_.reset(new thread::ThreadPool(
360 Env::Default(), SanitizeThreadSuffix("single_machine"), 2));
361
362 session_.reset(NewSession(options_));
363 if (!session_) {
364 return errors::Unknown("Failed to create session");
365 }
366 coordinator_.reset(new Coordinator());
367
368 // Build the DeviceSet.
369 device_set_.reset(new DeviceSet);
370 const DeviceMgr* device_mgr;
371 TF_RETURN_IF_ERROR(session_->LocalDeviceManager(&device_mgr));
372 for (auto d : device_mgr->ListDevices()) {
373 device_set_->AddDevice(d);
374 // We currently don't care about the client device.
375 }
376
377 return Status::OK();
378 }
379
MergeCosts(CostGraphDef * graph_costs,const CostGraphDef & init_costs,const CostGraphDef & queue_costs)380 void SingleMachine::MergeCosts(CostGraphDef* graph_costs,
381 const CostGraphDef& init_costs,
382 const CostGraphDef& queue_costs) {
383 graph_costs->mutable_node()->Reserve(graph_costs->node_size() +
384 init_costs.node_size() +
385 queue_costs.node_size());
386 std::unordered_set<string> nodes_seen;
387 int queue_costs_id_offset = graph_costs->node_size();
388 for (const auto& node : graph_costs->node()) {
389 nodes_seen.insert(node.name());
390 if (node.id() >= queue_costs_id_offset) {
391 queue_costs_id_offset = node.id() + 1;
392 }
393 }
394
395 int init_costs_id_offset = queue_costs_id_offset + queue_costs.node_size();
396 // The costs obtained by running the main graph could be more stable than
397 // the one we get from the queue runners since the queue runners run
398 // asynchronously.
399 for (const auto& node : queue_costs.node()) {
400 if (nodes_seen.find(node.name()) != nodes_seen.end()) {
401 continue;
402 }
403
404 auto* new_node = graph_costs->add_node();
405 new_node->MergeFrom(node);
406
407 new_node->set_id(node.id() + queue_costs_id_offset);
408 if (new_node->id() >= init_costs_id_offset) {
409 init_costs_id_offset = new_node->id() + 1;
410 }
411
412 for (auto& input_info : *new_node->mutable_input_info()) {
413 input_info.set_preceding_node(input_info.preceding_node() +
414 queue_costs_id_offset);
415 }
416 for (auto& control_input : *new_node->mutable_control_input()) {
417 control_input += queue_costs_id_offset;
418 }
419 }
420
421 // Don't overwrite the costs with that generated during initialization since
422 // these are possibly outdated.
423 for (const auto& node : init_costs.node()) {
424 if (nodes_seen.find(node.name()) != nodes_seen.end()) {
425 continue;
426 }
427
428 auto* new_node = graph_costs->add_node();
429 new_node->MergeFrom(node);
430
431 new_node->set_id(node.id() + init_costs_id_offset);
432 for (auto& input_info : *new_node->mutable_input_info()) {
433 input_info.set_preceding_node(input_info.preceding_node() +
434 init_costs_id_offset);
435 }
436 for (auto& control_input : *new_node->mutable_control_input()) {
437 control_input += init_costs_id_offset;
438 }
439 }
440 }
441
ClearAllocatorStats() const442 Status SingleMachine::ClearAllocatorStats() const {
443 // Cpu_allocator->TracksAllocationSizes() returns true doesn't always mean the
444 // the AllocatorStats would be collected.
445 if (!cpu_allocator_stats_enabled_) {
446 return Status(error::INVALID_ARGUMENT,
447 "Tracking allocation for CPU is not enabled.");
448 }
449
450 const DeviceMgr* device_mgr;
451 TF_RETURN_IF_ERROR(session_->LocalDeviceManager(&device_mgr));
452 std::vector<Device*> devices = device_mgr->ListDevices();
453
454 for (Device* device : devices) {
455 auto* allocator = device->GetAllocator(AllocatorAttributes());
456 if (!allocator->TracksAllocationSizes()) {
457 return Status(error::INVALID_ARGUMENT,
458 "Tracking allocation is not enabled.");
459 }
460 allocator->ClearStats();
461 }
462 return Status::OK();
463 }
464
465 } // namespace grappler
466 } // namespace tensorflow
467