1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #define EIGEN_USE_THREADS
17
18 #include "tensorflow/core/common_runtime/local_device.h"
19 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
20 #include "tensorflow/core/common_runtime/eigen_thread_pool.h"
21 #include "tensorflow/core/common_runtime/process_state.h"
22 #include "tensorflow/core/common_runtime/process_util.h"
23 #include "tensorflow/core/lib/core/threadpool.h"
24 #include "tensorflow/core/platform/byte_order.h"
25 #include "tensorflow/core/platform/cpu_feature_guard.h"
26 #include "tensorflow/core/platform/cpu_info.h"
27 #include "tensorflow/core/platform/logging.h"
28 #include "tensorflow/core/platform/numa.h"
29 #include "tensorflow/core/platform/types.h"
30 #include "tensorflow/core/public/session_options.h"
31 #include "tensorflow/core/util/env_var.h"
32
33 namespace tensorflow {
34 namespace {
35
OverrideGlobalThreadPoolFromEnvironment()36 bool OverrideGlobalThreadPoolFromEnvironment() {
37 static const bool override_global_threadpool = [] {
38 bool flag;
39 auto status = ReadBoolFromEnvVar("TF_OVERRIDE_GLOBAL_THREADPOOL",
40 /*default_val=*/false, &flag);
41 if (!status.ok()) {
42 LOG(ERROR) << "OverrideGlobalThreadPool: " << status.error_message();
43 return false;
44 }
45 return flag;
46 }();
47 return override_global_threadpool;
48 }
49
50 } // namespace
51
52 /* static */
53 bool LocalDevice::use_global_threadpool_ = true;
54 mutex LocalDevice::global_tp_mu_;
55 gtl::InlinedVector<LocalDevice::EigenThreadPoolInfo*, 4>
56 LocalDevice::global_tp_info_;
57
58 struct LocalDevice::EigenThreadPoolInfo {
59 // Wrapper so we can provide the CPUAllocator to Eigen for use
60 // when ops need extra tmp memory.
61 class EigenAllocator : public Eigen::Allocator {
62 public:
EigenAllocator(tensorflow::Allocator * a)63 explicit EigenAllocator(tensorflow::Allocator* a) : allocator_(a) {}
allocate(size_t num_bytes) const64 void* allocate(size_t num_bytes) const override {
65 return allocator_->AllocateRaw(64, num_bytes);
66 }
deallocate(void * buffer) const67 void deallocate(void* buffer) const override {
68 allocator_->DeallocateRaw(buffer);
69 }
70 tensorflow::Allocator* allocator_;
71 };
72
EigenThreadPoolInfotensorflow::LocalDevice::EigenThreadPoolInfo73 explicit EigenThreadPoolInfo(const SessionOptions& options, int numa_node,
74 Allocator* allocator) {
75 // Use session setting if specified.
76 int32 intra_op_parallelism_threads =
77 options.config.intra_op_parallelism_threads();
78 // If no session setting, use environment setting.
79 if (intra_op_parallelism_threads == 0) {
80 static int env_num_threads = NumIntraOpThreadsFromEnvironment();
81 intra_op_parallelism_threads = env_num_threads;
82 // If no session setting or environment, compute a reasonable default.
83 if (intra_op_parallelism_threads == 0) {
84 intra_op_parallelism_threads = port::NumSchedulableCPUs();
85 if (numa_node != port::kNUMANoAffinity) {
86 // Assume that CPUs are equally distributed over available NUMA nodes.
87 // This may not be true, but there isn't currently a better way of
88 // determining the number of CPUs specific to the requested node.
89 intra_op_parallelism_threads /= port::NUMANumNodes();
90 }
91 }
92 }
93 ThreadOptions thread_opts;
94 thread_opts.numa_node = numa_node;
95 eigen_worker_threads_.num_threads = intra_op_parallelism_threads;
96 eigen_worker_threads_.workers = new thread::ThreadPool(
97 options.env, thread_opts, strings::StrCat("numa_", numa_node, "_Eigen"),
98 intra_op_parallelism_threads);
99 eigen_threadpool_wrapper_.reset(
100 new EigenThreadPoolWrapper(eigen_worker_threads_.workers));
101 if (allocator) {
102 eigen_allocator_.reset(new EigenAllocator(allocator));
103 }
104 eigen_device_.reset(new Eigen::ThreadPoolDevice(
105 eigen_threadpool_wrapper_.get(), eigen_worker_threads_.num_threads,
106 eigen_allocator_.get()));
107 }
108
~EigenThreadPoolInfotensorflow::LocalDevice::EigenThreadPoolInfo109 ~EigenThreadPoolInfo() {
110 eigen_threadpool_wrapper_.reset();
111 eigen_device_.reset();
112 delete eigen_worker_threads_.workers;
113 }
114
115 DeviceBase::CpuWorkerThreads eigen_worker_threads_;
116 std::unique_ptr<Eigen::ThreadPoolInterface> eigen_threadpool_wrapper_;
117 std::unique_ptr<Eigen::ThreadPoolDevice> eigen_device_;
118 std::unique_ptr<EigenAllocator> eigen_allocator_;
119 };
120
LocalDevice(const SessionOptions & options,const DeviceAttributes & attributes)121 LocalDevice::LocalDevice(const SessionOptions& options,
122 const DeviceAttributes& attributes)
123 : Device(options.env, attributes), owned_tp_info_(nullptr) {
124 // Log info messages if TensorFlow is not compiled with instructions that
125 // could speed up performance and are available on the current CPU.
126 port::InfoAboutUnusedCPUFeatures();
127 LocalDevice::EigenThreadPoolInfo* tp_info;
128
129 if (OverrideGlobalThreadPoolFromEnvironment()) {
130 set_use_global_threadpool(false);
131 }
132
133 if (use_global_threadpool_) {
134 mutex_lock l(global_tp_mu_);
135 if (options.config.experimental().use_numa_affinity()) {
136 int numa_node = attributes.locality().numa_node();
137 int num_numa_nodes = port::NUMANumNodes();
138 DCHECK_LT(numa_node, num_numa_nodes);
139 Allocator* numa_allocator =
140 ProcessState::singleton()->GetCPUAllocator(numa_node);
141 while (numa_node >= global_tp_info_.size()) {
142 global_tp_info_.push_back(nullptr);
143 }
144 if (!global_tp_info_[numa_node]) {
145 global_tp_info_[numa_node] = new LocalDevice::EigenThreadPoolInfo(
146 options, numa_node, numa_allocator);
147 }
148 tp_info = global_tp_info_[numa_node];
149 } else {
150 if (global_tp_info_.empty()) {
151 global_tp_info_.push_back(new LocalDevice::EigenThreadPoolInfo(
152 options, port::kNUMANoAffinity, nullptr));
153 }
154 tp_info = global_tp_info_[0];
155 }
156 } else {
157 // Each LocalDevice owns a separate ThreadPoolDevice for numerical
158 // computations.
159 // TODO(tucker): NUMA for these too?
160 owned_tp_info_.reset(new LocalDevice::EigenThreadPoolInfo(
161 options, port::kNUMANoAffinity, nullptr));
162 tp_info = owned_tp_info_.get();
163 }
164 set_tensorflow_cpu_worker_threads(&tp_info->eigen_worker_threads_);
165 set_eigen_cpu_device(tp_info->eigen_device_.get());
166 }
167
~LocalDevice()168 LocalDevice::~LocalDevice() {}
169
170 } // namespace tensorflow
171