1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/common_runtime/process_util.h"
17
18 #ifdef INTEL_MKL
19 #ifdef _OPENMP
20 #include <omp.h>
21 #endif // _OPENMP
22 #endif // INTEL_MKL
23 #include <string.h>
24
25 #include "tensorflow/core/lib/core/threadpool.h"
26 #include "tensorflow/core/platform/byte_order.h"
27 #include "tensorflow/core/platform/cpu_info.h"
28 #include "tensorflow/core/platform/logging.h"
29 #include "tensorflow/core/platform/tracing.h"
30 #include "tensorflow/core/platform/types.h"
31 #include "tensorflow/core/util/util.h"
32
33 namespace tensorflow {
34
35 namespace {
36
37 // Use environment setting if specified (init once)
GetEnvNumInterOpThreads()38 int32 GetEnvNumInterOpThreads() {
39 static int32 env_num_threads = NumInterOpThreadsFromEnvironment();
40 return env_num_threads;
41 }
42
DefaultNumInterOpThreads()43 int32 DefaultNumInterOpThreads() {
44 #ifndef __ANDROID__
45 int32 env_num_threads = GetEnvNumInterOpThreads();
46 if (env_num_threads > 0) {
47 return env_num_threads;
48 }
49
50 // Default to the maximum parallelism for the current process.
51 return port::MaxParallelism();
52 #else
53 // Historically, -D__ANDROID__ resulted in the inter-op threadpool not being
54 // used (regardless of what was chosen here); instead, all work was done on
55 // the thread(s) calling Session::Run. That's no longer the case, but we'd
56 // like to avoid suddenly higher concurrency and peak resource usage (for the
57 // same device shape, graph, and options) versus prior versions - as best we
58 // can:
59 //
60 // - Single Session::Run (none concurrent), and default options:
61 // Behavior is mostly the same as before.
62 //
63 // - Concurrent Session::Runs, and default options:
64 // Reduced concurrency versus before.
65 //
66 // - Thread-pool size set explicitly (>1):
67 // Increased concurrency versus before.
68 //
69 // (We assume the first case is the most common)
70 return 1;
71 #endif
72 }
73
InitComputePool(const SessionOptions & options)74 static thread::ThreadPool* InitComputePool(const SessionOptions& options) {
75 int32 inter_op_parallelism_threads =
76 options.config.inter_op_parallelism_threads();
77 if (inter_op_parallelism_threads == 0) {
78 inter_op_parallelism_threads = DefaultNumInterOpThreads();
79 }
80 return new thread::ThreadPool(
81 Env::Default(), ThreadOptions(), "Compute", inter_op_parallelism_threads,
82 !options.config.experimental().disable_thread_spinning(),
83 /*allocator=*/nullptr);
84 }
85
86 } // namespace
87
ComputePool(const SessionOptions & options)88 thread::ThreadPool* ComputePool(const SessionOptions& options) {
89 static thread::ThreadPool* compute_pool = InitComputePool(options);
90 return compute_pool;
91 }
92
NumInterOpThreadsFromEnvironment()93 int32 NumInterOpThreadsFromEnvironment() {
94 int32 num;
95 const char* val = std::getenv("TF_NUM_INTEROP_THREADS");
96 return (val && strings::safe_strto32(val, &num)) ? num : 0;
97 }
98
NumIntraOpThreadsFromEnvironment()99 int32 NumIntraOpThreadsFromEnvironment() {
100 int32 num;
101 const char* val = std::getenv("TF_NUM_INTRAOP_THREADS");
102 return (val && strings::safe_strto32(val, &num)) ? num : 0;
103 }
104 #if !defined(ENABLE_MKLDNN_THREADPOOL) && defined(INTEL_MKL)
OMPThreadsFromEnvironment()105 int32 OMPThreadsFromEnvironment() {
106 // 1) std::getenv is thread-safe (as long as no other function modifies the
107 // host env) from C++11 onward. 2) Most of TF code (except tests and
108 // experimental code) doesn't call setenv and unsetenv
109 int32 num;
110 const char* val = std::getenv("OMP_NUM_THREADS");
111 return (val && strings::safe_strto32(val, &num)) ? num : 0;
112 }
113
DefaultNumIntraOpThreads()114 int32 DefaultNumIntraOpThreads() {
115 // Use environment setting if specified (init once)
116 static int env_num_threads = NumIntraOpThreadsFromEnvironment();
117 if (env_num_threads > 0) {
118 return env_num_threads;
119 }
120
121 // Default to the maximum parallelism for the current process.
122 return port::MaxParallelism();
123 }
124 #endif // !defined(ENABLE_MKLDNN_THREADPOOL) && defined(INTEL_MKL)
NumInterOpThreadsFromSessionOptions(const SessionOptions & options)125 int32 NumInterOpThreadsFromSessionOptions(const SessionOptions& options) {
126 const int32 inter_op = options.config.inter_op_parallelism_threads();
127 if (inter_op > 0) return inter_op;
128 const int32 env_inter_op = GetEnvNumInterOpThreads();
129 if (env_inter_op > 0) return env_inter_op;
130
131 #if !defined(ENABLE_MKLDNN_THREADPOOL) && defined(INTEL_MKL)
132 if (!DisableMKL()) {
133 // MKL library executes ops in parallel using OMP threads.
134 // Setting inter_op conservatively to avoid thread oversubscription that
135 // could lead to severe perf degradations and OMP resource exhaustion.
136 // Inter ops are set such that mkl_inter_op * mkl_intra_op <= NumCores.
137 const int32 intra_op = options.config.intra_op_parallelism_threads();
138 const int32 omp_max_threads = OMPThreadsFromEnvironment();
139 const int32 mkl_intra_op =
140 (omp_max_threads > 0)
141 ? omp_max_threads
142 : (intra_op > 0) ? intra_op : DefaultNumIntraOpThreads();
143 DCHECK_GE(mkl_intra_op, 1);
144 const int32 mkl_inter_op = std::max(
145 (DefaultNumInterOpThreads() + mkl_intra_op - 1) / mkl_intra_op, 2);
146 VLOG(0)
147 << "Creating new thread pool with default inter op setting: "
148 << mkl_inter_op
149 << ". Tune using inter_op_parallelism_threads for best performance.";
150 return mkl_inter_op;
151 }
152 #endif // !defined(ENABLE_MKLDNN_THREADPOOL) && defined(INTEL_MKL)
153 return DefaultNumInterOpThreads();
154 }
155
NewThreadPoolFromSessionOptions(const SessionOptions & options)156 thread::ThreadPool* NewThreadPoolFromSessionOptions(
157 const SessionOptions& options) {
158 const int32 num_threads = NumInterOpThreadsFromSessionOptions(options);
159 VLOG(1) << "Direct session inter op parallelism threads: " << num_threads;
160 return new thread::ThreadPool(
161 options.env, ThreadOptions(), "Compute", num_threads,
162 !options.config.experimental().disable_thread_spinning(),
163 /*allocator=*/nullptr);
164 }
165
SchedClosure(std::function<void ()> closure)166 void SchedClosure(std::function<void()> closure) {
167 if (!tracing::EventCollector::IsEnabled()) {
168 return Env::Default()->SchedClosure(std::move(closure));
169 }
170 uint64 id = tracing::GetUniqueArg();
171 tracing::RecordEvent(tracing::EventCategory::kScheduleClosure, id);
172
173 Env::Default()->SchedClosure([id, closure = std::move(closure)]() {
174 tracing::ScopedRegion region(tracing::EventCategory::kRunClosure, id);
175 closure();
176 });
177 }
178
SchedNonBlockingClosureAfter(int64 micros,std::function<void ()> closure)179 void SchedNonBlockingClosureAfter(int64 micros, std::function<void()> closure) {
180 Env::Default()->SchedClosureAfter(micros, std::move(closure));
181 }
182
183 } // namespace tensorflow
184