1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_ 17 #define TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_ 18 19 #include <functional> 20 #include <memory> 21 22 #include "absl/types/optional.h" 23 #include "tensorflow/core/platform/env.h" 24 #include "tensorflow/core/platform/macros.h" 25 #include "tensorflow/core/platform/threadpool_interface.h" 26 #include "tensorflow/core/platform/types.h" 27 28 namespace Eigen { 29 class Allocator; 30 class ThreadPoolInterface; 31 struct ThreadPoolDevice; 32 33 template <typename Environment> 34 class ThreadPoolTempl; 35 } // namespace Eigen 36 37 namespace tensorflow { 38 namespace thread { 39 40 struct EigenEnvironment; 41 42 class ThreadPool { 43 public: 44 // Scheduling strategies for ParallelFor. The strategy governs how the given 45 // units of work are distributed among the available threads in the 46 // threadpool. 47 enum class SchedulingStrategy { 48 // The Adaptive scheduling strategy adaptively chooses the shard sizes based 49 // on the cost of each unit of work, and the cost model of the underlying 50 // threadpool device. 51 // 52 // The 'cost_per_unit' is an estimate of the number of CPU cycles (or 53 // nanoseconds if not CPU-bound) to complete a unit of work. Overestimating 54 // creates too many shards and CPU time will be dominated by per-shard 55 // overhead, such as Context creation. Underestimating may not fully make 56 // use of the specified parallelism, and may also cause inefficiencies due 57 // to load balancing issues and stragglers. 58 kAdaptive, 59 // The Fixed Block Size scheduling strategy shards the given units of work 60 // into shards of fixed size. In case the total number of units is not 61 // evenly divisible by 'block_size', at most one of the shards may be of 62 // smaller size. The exact number of shards may be found by a call to 63 // NumShardsUsedByFixedBlockSizeScheduling. 64 // 65 // Each shard may be executed on a different thread in parallel, depending 66 // on the number of threads available in the pool. Note that when there 67 // aren't enough threads in the pool to achieve full parallelism, function 68 // calls will be automatically queued. 69 kFixedBlockSize 70 }; 71 72 // Contains additional parameters for either the Adaptive or the Fixed Block 73 // Size scheduling strategy. 74 class SchedulingParams { 75 public: SchedulingParams(SchedulingStrategy strategy,absl::optional<int64> cost_per_unit,absl::optional<int64> block_size)76 explicit SchedulingParams(SchedulingStrategy strategy, 77 absl::optional<int64> cost_per_unit, 78 absl::optional<int64> block_size) 79 : strategy_(strategy), 80 cost_per_unit_(cost_per_unit), 81 block_size_(block_size) {} 82 strategy()83 SchedulingStrategy strategy() const { return strategy_; } cost_per_unit()84 absl::optional<int64> cost_per_unit() const { return cost_per_unit_; } block_size()85 absl::optional<int64> block_size() const { return block_size_; } 86 87 private: 88 // The underlying Scheduling Strategy for which this instance contains 89 // additional parameters. 90 SchedulingStrategy strategy_; 91 92 // The estimated cost per unit of work in number of CPU cycles (or 93 // nanoseconds if not CPU-bound). Only applicable for Adaptive scheduling 94 // strategy. 95 absl::optional<int64> cost_per_unit_; 96 97 // The block size of each shard. Only applicable for Fixed Block Size 98 // scheduling strategy. 99 absl::optional<int64> block_size_; 100 }; 101 102 // Constructs a pool that contains "num_threads" threads with specified 103 // "name". env->StartThread() is used to create individual threads with the 104 // given ThreadOptions. If "low_latency_hint" is true the thread pool 105 // implementation may use it as a hint that lower latency is preferred at the 106 // cost of higher CPU usage, e.g. by letting one or more idle threads spin 107 // wait. Conversely, if the threadpool is used to schedule high-latency 108 // operations like I/O the hint should be set to false. 109 // 110 // REQUIRES: num_threads > 0 111 ThreadPool(Env* env, const ThreadOptions& thread_options, 112 const std::string& name, int num_threads, bool low_latency_hint, 113 Eigen::Allocator* allocator = nullptr); 114 115 // Constructs a pool for low-latency ops that contains "num_threads" threads 116 // with specified "name". env->StartThread() is used to create individual 117 // threads. 118 // REQUIRES: num_threads > 0 119 ThreadPool(Env* env, const std::string& name, int num_threads); 120 121 // Constructs a pool for low-latency ops that contains "num_threads" threads 122 // with specified "name". env->StartThread() is used to create individual 123 // threads with the given ThreadOptions. 124 // REQUIRES: num_threads > 0 125 ThreadPool(Env* env, const ThreadOptions& thread_options, 126 const std::string& name, int num_threads); 127 128 // Constructs a pool that wraps around the thread::ThreadPoolInterface 129 // instance provided by the caller. Caller retains ownership of 130 // `user_threadpool` and must ensure its lifetime is longer than the 131 // ThreadPool instance. 132 explicit ThreadPool(thread::ThreadPoolInterface* user_threadpool); 133 134 // Waits until all scheduled work has finished and then destroy the 135 // set of threads. 136 ~ThreadPool(); 137 138 // Schedules fn() for execution in the pool of threads. 139 void Schedule(std::function<void()> fn); 140 141 void SetStealPartitions( 142 const std::vector<std::pair<unsigned, unsigned>>& partitions); 143 144 void ScheduleWithHint(std::function<void()> fn, int start, int limit); 145 146 // Returns the number of shards used by ParallelForFixedBlockSizeScheduling 147 // with these parameters. 148 int NumShardsUsedByFixedBlockSizeScheduling(const int64 total, 149 const int64 block_size); 150 151 // Returns the number of threads spawned by calling TransformRangeConcurrently 152 // with these parameters. 153 // Deprecated. Use NumShardsUsedByFixedBlockSizeScheduling. 154 int NumShardsUsedByTransformRangeConcurrently(const int64 block_size, 155 const int64 total); 156 157 // ParallelFor shards the "total" units of work assuming each unit of work 158 // having roughly "cost_per_unit" cost, in cycles. Each unit of work is 159 // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work 160 // and the total cost of each shard is roughly the same. 161 // 162 // "cost_per_unit" is an estimate of the number of CPU cycles (or nanoseconds 163 // if not CPU-bound) to complete a unit of work. Overestimating creates too 164 // many shards and CPU time will be dominated by per-shard overhead, such as 165 // Context creation. Underestimating may not fully make use of the specified 166 // parallelism, and may also cause inefficiencies due to load balancing 167 // issues and stragglers. 168 void ParallelFor(int64 total, int64 cost_per_unit, 169 const std::function<void(int64, int64)>& fn); 170 171 // Similar to ParallelFor above, but takes the specified scheduling strategy 172 // into account. 173 void ParallelFor(int64 total, const SchedulingParams& scheduling_params, 174 const std::function<void(int64, int64)>& fn); 175 176 // Same as ParallelFor with Fixed Block Size scheduling strategy. 177 // Deprecated. Prefer ParallelFor with a SchedulingStrategy argument. 178 void TransformRangeConcurrently(const int64 block_size, const int64 total, 179 const std::function<void(int64, int64)>& fn); 180 181 // Shards the "total" units of work. For more details, see "ParallelFor". 182 // 183 // The function is passed a thread_id between 0 and NumThreads() *inclusive*. 184 // This is because some work can happen on the caller thread while the threads 185 // in the pool are also being used. 186 // 187 // The caller can allocate NumThreads() + 1 separate buffers for each thread. 188 // Each thread can safely write to the buffer given by its id without 189 // synchronization. However, the worker fn may be called multiple times 190 // sequentially with the same id. 191 // 192 // At most NumThreads() unique ids will actually be used, and only a few may 193 // be used for small workloads. If each buffer is expensive, the buffers 194 // should be stored in an array initially filled with null, and a buffer 195 // should be allocated by fn the first time that the id is used. 196 void ParallelForWithWorkerId( 197 int64 total, int64 cost_per_unit, 198 const std::function<void(int64, int64, int)>& fn); 199 200 // Similar to ParallelForWithWorkerId above, but takes the specified 201 // scheduling strategy into account. 202 void ParallelForWithWorkerId( 203 int64 total, const SchedulingParams& scheduling_params, 204 const std::function<void(int64, int64, int)>& fn); 205 206 // Returns the number of threads in the pool. 207 int NumThreads() const; 208 209 // Returns current thread id between 0 and NumThreads() - 1, if called from a 210 // thread in the pool. Returns -1 otherwise. 211 int CurrentThreadId() const; 212 213 // If ThreadPool implementation is compatible with Eigen::ThreadPoolInterface, 214 // returns a non-null pointer. The caller does not own the object the returned 215 // pointer points to, and should not attempt to delete. 216 Eigen::ThreadPoolInterface* AsEigenThreadPool() const; 217 218 private: 219 // Divides the work represented by the range [0, total) into k shards. 220 // Calls fn(i*block_size, (i+1)*block_size) from the ith shard (0 <= i < k). 221 // Each shard may be executed on a different thread in parallel, depending on 222 // the number of threads available in the pool. 223 // When (i+1)*block_size > total, fn(i*block_size, total) is called instead. 224 // Here, k = NumShardsUsedByFixedBlockSizeScheduling(total, block_size). 225 // Requires 0 < block_size <= total. 226 void ParallelForFixedBlockSizeScheduling( 227 const int64 total, const int64 block_size, 228 const std::function<void(int64, int64)>& fn); 229 230 // underlying_threadpool_ is the user_threadpool if user_threadpool is 231 // provided in the constructor. Otherwise it is the eigen_threadpool_. 232 Eigen::ThreadPoolInterface* underlying_threadpool_; 233 // eigen_threadpool_ is instantiated and owned by thread::ThreadPool if 234 // user_threadpool is not in the constructor. 235 std::unique_ptr<Eigen::ThreadPoolTempl<EigenEnvironment>> eigen_threadpool_; 236 std::unique_ptr<Eigen::ThreadPoolDevice> threadpool_device_; 237 TF_DISALLOW_COPY_AND_ASSIGN(ThreadPool); 238 }; 239 240 } // namespace thread 241 } // namespace tensorflow 242 243 #endif // TENSORFLOW_CORE_PLATFORM_THREADPOOL_H_ 244