• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/common_runtime/process_util.h"
17 
18 #ifdef INTEL_MKL
19 #ifdef _OPENMP
20 #include <omp.h>
21 #endif  // _OPENMP
22 #endif  // INTEL_MKL
23 #include <string.h>
24 
25 #include "tensorflow/core/lib/core/threadpool.h"
26 #include "tensorflow/core/platform/byte_order.h"
27 #include "tensorflow/core/platform/cpu_info.h"
28 #include "tensorflow/core/platform/logging.h"
29 #include "tensorflow/core/platform/tracing.h"
30 #include "tensorflow/core/platform/types.h"
31 #include "tensorflow/core/util/util.h"
32 
33 namespace tensorflow {
34 
35 namespace {
36 
37 // Use environment setting if specified (init once)
GetEnvNumInterOpThreads()38 int32 GetEnvNumInterOpThreads() {
39   static int32 env_num_threads = NumInterOpThreadsFromEnvironment();
40   return env_num_threads;
41 }
42 
DefaultNumInterOpThreads()43 int32 DefaultNumInterOpThreads() {
44 #ifndef __ANDROID__
45   int32 env_num_threads = GetEnvNumInterOpThreads();
46   if (env_num_threads > 0) {
47     return env_num_threads;
48   }
49 
50   // Default to the maximum parallelism for the current process.
51   return port::MaxParallelism();
52 #else
53   // Historically, -D__ANDROID__ resulted in the inter-op threadpool not being
54   // used (regardless of what was chosen here); instead, all work was done on
55   // the thread(s) calling Session::Run. That's no longer the case, but we'd
56   // like to avoid suddenly higher concurrency and peak resource usage (for the
57   // same device shape, graph, and options) versus prior versions - as best we
58   // can:
59   //
60   //   - Single Session::Run (none concurrent), and default options:
61   //     Behavior is mostly the same as before.
62   //
63   //   - Concurrent Session::Runs, and default options:
64   //     Reduced concurrency versus before.
65   //
66   //   - Thread-pool size set explicitly (>1):
67   //     Increased concurrency versus before.
68   //
69   // (We assume the first case is the most common)
70   return 1;
71 #endif
72 }
73 
InitComputePool(const SessionOptions & options)74 static thread::ThreadPool* InitComputePool(const SessionOptions& options) {
75   int32 inter_op_parallelism_threads =
76       options.config.inter_op_parallelism_threads();
77   if (inter_op_parallelism_threads == 0) {
78     inter_op_parallelism_threads = DefaultNumInterOpThreads();
79   }
80   return new thread::ThreadPool(
81       Env::Default(), ThreadOptions(), "Compute", inter_op_parallelism_threads,
82       !options.config.experimental().disable_thread_spinning(),
83       /*allocator=*/nullptr);
84 }
85 
86 }  // namespace
87 
ComputePool(const SessionOptions & options)88 thread::ThreadPool* ComputePool(const SessionOptions& options) {
89   static thread::ThreadPool* compute_pool = InitComputePool(options);
90   return compute_pool;
91 }
92 
NumInterOpThreadsFromEnvironment()93 int32 NumInterOpThreadsFromEnvironment() {
94   int32 num;
95   const char* val = std::getenv("TF_NUM_INTEROP_THREADS");
96   return (val && strings::safe_strto32(val, &num)) ? num : 0;
97 }
98 
NumIntraOpThreadsFromEnvironment()99 int32 NumIntraOpThreadsFromEnvironment() {
100   int32 num;
101   const char* val = std::getenv("TF_NUM_INTRAOP_THREADS");
102   return (val && strings::safe_strto32(val, &num)) ? num : 0;
103 }
104 
105 #ifdef INTEL_MKL
OMPThreadsFromEnvironment()106 int32 OMPThreadsFromEnvironment() {
107   // 1) std::getenv is thread-safe (as long as no other function modifies the
108   // host env) from C++11 onward. 2) Most of TF code (except tests and
109   // experimental code) doesn't call setenv and unsetenv
110   int32 num;
111   const char* val = std::getenv("OMP_NUM_THREADS");
112   return (val && strings::safe_strto32(val, &num)) ? num : 0;
113 }
114 
DefaultNumIntraOpThreads()115 int32 DefaultNumIntraOpThreads() {
116   // Use environment setting if specified (init once)
117   static int env_num_threads = NumIntraOpThreadsFromEnvironment();
118   if (env_num_threads > 0) {
119     return env_num_threads;
120   }
121 
122   // Default to the maximum parallelism for the current process.
123   return port::MaxParallelism();
124 }
125 #endif  // INTEL_MKL
NumInterOpThreadsFromSessionOptions(const SessionOptions & options)126 int32 NumInterOpThreadsFromSessionOptions(const SessionOptions& options) {
127   const int32 inter_op = options.config.inter_op_parallelism_threads();
128   if (inter_op > 0) return inter_op;
129   const int32 env_inter_op = GetEnvNumInterOpThreads();
130   if (env_inter_op > 0) return env_inter_op;
131 
132 #ifdef INTEL_MKL
133   if (!DisableMKL()) {
134     // MKL library executes ops in parallel using OMP threads.
135     // Setting inter_op conservatively to avoid thread oversubscription that
136     // could lead to severe perf degradations and OMP resource exhaustion.
137     // Inter ops are set such that mkl_inter_op * mkl_intra_op <= NumCores.
138     const int32 intra_op = options.config.intra_op_parallelism_threads();
139     const int32 omp_max_threads = OMPThreadsFromEnvironment();
140     const int32 mkl_intra_op =
141         (omp_max_threads > 0)
142             ? omp_max_threads
143             : (intra_op > 0) ? intra_op : DefaultNumIntraOpThreads();
144     DCHECK_GE(mkl_intra_op, 1);
145     const int32 mkl_inter_op = std::max(
146         (DefaultNumInterOpThreads() + mkl_intra_op - 1) / mkl_intra_op, 2);
147     VLOG(0)
148         << "Creating new thread pool with default inter op setting: "
149         << mkl_inter_op
150         << ". Tune using inter_op_parallelism_threads for best performance.";
151     return mkl_inter_op;
152   }
153 #endif  // INTEL_MKL
154   return DefaultNumInterOpThreads();
155 }
156 
NewThreadPoolFromSessionOptions(const SessionOptions & options)157 thread::ThreadPool* NewThreadPoolFromSessionOptions(
158     const SessionOptions& options) {
159   const int32 num_threads = NumInterOpThreadsFromSessionOptions(options);
160   VLOG(1) << "Direct session inter op parallelism threads: " << num_threads;
161   return new thread::ThreadPool(
162       options.env, ThreadOptions(), "Compute", num_threads,
163       !options.config.experimental().disable_thread_spinning(),
164       /*allocator=*/nullptr);
165 }
166 
SchedClosure(std::function<void ()> closure)167 void SchedClosure(std::function<void()> closure) {
168   if (!tracing::EventCollector::IsEnabled()) {
169     return Env::Default()->SchedClosure(std::move(closure));
170   }
171   uint64 id = tracing::GetUniqueArg();
172   tracing::RecordEvent(tracing::EventCategory::kScheduleClosure, id);
173 
174   Env::Default()->SchedClosure([id, closure = std::move(closure)]() {
175     tracing::ScopedRegion region(tracing::EventCategory::kRunClosure, id);
176     closure();
177   });
178 }
179 
SchedNonBlockingClosureAfter(int64 micros,std::function<void ()> closure)180 void SchedNonBlockingClosureAfter(int64 micros, std::function<void()> closure) {
181   Env::Default()->SchedClosureAfter(micros, std::move(closure));
182 }
183 
184 }  // namespace tensorflow
185