1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/common_runtime/process_util.h"
17
18 #ifdef INTEL_MKL
19 #ifdef _OPENMP
20 #include <omp.h>
21 #endif // _OPENMP
22 #endif // INTEL_MKL
23 #include <string.h>
24
25 #include "tensorflow/core/lib/core/threadpool.h"
26 #include "tensorflow/core/platform/byte_order.h"
27 #include "tensorflow/core/platform/cpu_info.h"
28 #include "tensorflow/core/platform/logging.h"
29 #include "tensorflow/core/platform/tracing.h"
30 #include "tensorflow/core/platform/types.h"
31 #include "tensorflow/core/util/util.h"
32
33 namespace tensorflow {
34
35 namespace {
36
37 // Use environment setting if specified (init once)
GetEnvNumInterOpThreads()38 int32 GetEnvNumInterOpThreads() {
39 static int32 env_num_threads = NumInterOpThreadsFromEnvironment();
40 return env_num_threads;
41 }
42
DefaultNumInterOpThreads()43 int32 DefaultNumInterOpThreads() {
44 #ifndef __ANDROID__
45 int32 env_num_threads = GetEnvNumInterOpThreads();
46 if (env_num_threads > 0) {
47 return env_num_threads;
48 }
49
50 // Default to the maximum parallelism for the current process.
51 return port::MaxParallelism();
52 #else
53 // Historically, -D__ANDROID__ resulted in the inter-op threadpool not being
54 // used (regardless of what was chosen here); instead, all work was done on
55 // the thread(s) calling Session::Run. That's no longer the case, but we'd
56 // like to avoid suddenly higher concurrency and peak resource usage (for the
57 // same device shape, graph, and options) versus prior versions - as best we
58 // can:
59 //
60 // - Single Session::Run (none concurrent), and default options:
61 // Behavior is mostly the same as before.
62 //
63 // - Concurrent Session::Runs, and default options:
64 // Reduced concurrency versus before.
65 //
66 // - Thread-pool size set explicitly (>1):
67 // Increased concurrency versus before.
68 //
69 // (We assume the first case is the most common)
70 return 1;
71 #endif
72 }
73
InitComputePool(const SessionOptions & options)74 static thread::ThreadPool* InitComputePool(const SessionOptions& options) {
75 int32 inter_op_parallelism_threads =
76 options.config.inter_op_parallelism_threads();
77 if (inter_op_parallelism_threads == 0) {
78 inter_op_parallelism_threads = DefaultNumInterOpThreads();
79 }
80 return new thread::ThreadPool(
81 Env::Default(), ThreadOptions(), "Compute", inter_op_parallelism_threads,
82 !options.config.experimental().disable_thread_spinning(),
83 /*allocator=*/nullptr);
84 }
85
86 } // namespace
87
ComputePool(const SessionOptions & options)88 thread::ThreadPool* ComputePool(const SessionOptions& options) {
89 static thread::ThreadPool* compute_pool = InitComputePool(options);
90 return compute_pool;
91 }
92
NumInterOpThreadsFromEnvironment()93 int32 NumInterOpThreadsFromEnvironment() {
94 int32 num;
95 const char* val = std::getenv("TF_NUM_INTEROP_THREADS");
96 return (val && strings::safe_strto32(val, &num)) ? num : 0;
97 }
98
NumIntraOpThreadsFromEnvironment()99 int32 NumIntraOpThreadsFromEnvironment() {
100 int32 num;
101 const char* val = std::getenv("TF_NUM_INTRAOP_THREADS");
102 return (val && strings::safe_strto32(val, &num)) ? num : 0;
103 }
104
105 #ifdef INTEL_MKL
OMPThreadsFromEnvironment()106 int32 OMPThreadsFromEnvironment() {
107 // 1) std::getenv is thread-safe (as long as no other function modifies the
108 // host env) from C++11 onward. 2) Most of TF code (except tests and
109 // experimental code) doesn't call setenv and unsetenv
110 int32 num;
111 const char* val = std::getenv("OMP_NUM_THREADS");
112 return (val && strings::safe_strto32(val, &num)) ? num : 0;
113 }
114
DefaultNumIntraOpThreads()115 int32 DefaultNumIntraOpThreads() {
116 // Use environment setting if specified (init once)
117 static int env_num_threads = NumIntraOpThreadsFromEnvironment();
118 if (env_num_threads > 0) {
119 return env_num_threads;
120 }
121
122 // Default to the maximum parallelism for the current process.
123 return port::MaxParallelism();
124 }
125 #endif // INTEL_MKL
NumInterOpThreadsFromSessionOptions(const SessionOptions & options)126 int32 NumInterOpThreadsFromSessionOptions(const SessionOptions& options) {
127 const int32 inter_op = options.config.inter_op_parallelism_threads();
128 if (inter_op > 0) return inter_op;
129 const int32 env_inter_op = GetEnvNumInterOpThreads();
130 if (env_inter_op > 0) return env_inter_op;
131
132 #ifdef INTEL_MKL
133 if (!DisableMKL()) {
134 // MKL library executes ops in parallel using OMP threads.
135 // Setting inter_op conservatively to avoid thread oversubscription that
136 // could lead to severe perf degradations and OMP resource exhaustion.
137 // Inter ops are set such that mkl_inter_op * mkl_intra_op <= NumCores.
138 const int32 intra_op = options.config.intra_op_parallelism_threads();
139 const int32 omp_max_threads = OMPThreadsFromEnvironment();
140 const int32 mkl_intra_op =
141 (omp_max_threads > 0)
142 ? omp_max_threads
143 : (intra_op > 0) ? intra_op : DefaultNumIntraOpThreads();
144 DCHECK_GE(mkl_intra_op, 1);
145 const int32 mkl_inter_op = std::max(
146 (DefaultNumInterOpThreads() + mkl_intra_op - 1) / mkl_intra_op, 2);
147 VLOG(0)
148 << "Creating new thread pool with default inter op setting: "
149 << mkl_inter_op
150 << ". Tune using inter_op_parallelism_threads for best performance.";
151 return mkl_inter_op;
152 }
153 #endif // INTEL_MKL
154 return DefaultNumInterOpThreads();
155 }
156
NewThreadPoolFromSessionOptions(const SessionOptions & options)157 thread::ThreadPool* NewThreadPoolFromSessionOptions(
158 const SessionOptions& options) {
159 const int32 num_threads = NumInterOpThreadsFromSessionOptions(options);
160 VLOG(1) << "Direct session inter op parallelism threads: " << num_threads;
161 return new thread::ThreadPool(
162 options.env, ThreadOptions(), "Compute", num_threads,
163 !options.config.experimental().disable_thread_spinning(),
164 /*allocator=*/nullptr);
165 }
166
SchedClosure(std::function<void ()> closure)167 void SchedClosure(std::function<void()> closure) {
168 if (!tracing::EventCollector::IsEnabled()) {
169 return Env::Default()->SchedClosure(std::move(closure));
170 }
171 uint64 id = tracing::GetUniqueArg();
172 tracing::RecordEvent(tracing::EventCategory::kScheduleClosure, id);
173
174 Env::Default()->SchedClosure([id, closure = std::move(closure)]() {
175 tracing::ScopedRegion region(tracing::EventCategory::kRunClosure, id);
176 closure();
177 });
178 }
179
SchedNonBlockingClosureAfter(int64 micros,std::function<void ()> closure)180 void SchedNonBlockingClosureAfter(int64 micros, std::function<void()> closure) {
181 Env::Default()->SchedClosureAfter(micros, std::move(closure));
182 }
183
184 } // namespace tensorflow
185