1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_LIB_CORE_THREADPOOL_H_ 17 #define TENSORFLOW_CORE_LIB_CORE_THREADPOOL_H_ 18 19 #include <functional> 20 #include <memory> 21 #include "tensorflow/core/platform/env.h" 22 #include "tensorflow/core/platform/macros.h" 23 #include "tensorflow/core/platform/types.h" 24 25 namespace Eigen { 26 class Allocator; 27 } // namespace Eigen 28 namespace tensorflow { 29 namespace thread { 30 31 class ThreadPool { 32 public: 33 // Constructs a pool that contains "num_threads" threads with specified 34 // "name". env->StartThread() is used to create individual threads with the 35 // given ThreadOptions. If "low_latency_hint" is true the thread pool 36 // implementation may use it as a hint that lower latency is preferred at the 37 // cost of higher CPU usage, e.g. by letting one or more idle threads spin 38 // wait. Conversely, if the threadpool is used to schedule high-latency 39 // operations like I/O the hint should be set to false. 40 // 41 // REQUIRES: num_threads > 0 42 ThreadPool(Env* env, const ThreadOptions& thread_options, const string& name, 43 int num_threads, bool low_latency_hint, 44 Eigen::Allocator* allocator = nullptr); 45 46 // Constructs a pool for low-latency ops that contains "num_threads" threads 47 // with specified "name". env->StartThread() is used to create individual 48 // threads. 49 // REQUIRES: num_threads > 0 50 ThreadPool(Env* env, const string& name, int num_threads); 51 52 // Constructs a pool for low-latency ops that contains "num_threads" threads 53 // with specified "name". env->StartThread() is used to create individual 54 // threads with the given ThreadOptions. 55 // REQUIRES: num_threads > 0 56 ThreadPool(Env* env, const ThreadOptions& thread_options, const string& name, 57 int num_threads); 58 59 // Waits until all scheduled work has finished and then destroy the 60 // set of threads. 61 ~ThreadPool(); 62 63 // Schedules fn() for execution in the pool of threads. 64 void Schedule(std::function<void()> fn); 65 66 void SetStealPartitions( 67 const std::vector<std::pair<unsigned, unsigned>>& partitions); 68 69 void ScheduleWithHint(std::function<void()> fn, int start, int limit); 70 // Requires 0 < block_size <= total. 71 // Spawns k threads and calls fn(i*block_size, (i+1)*block_size) from the 72 // ith thread (i>=0). When (i+1)*block_size > total, fn(i*block_size, total) 73 // is called instead. k = NumShardsUsedByTransformRangeConcurrently(...). 74 // Note that when there aren't enough threads in the pool to achieve full 75 // parallelism, function calls will be automatically queued. 76 void TransformRangeConcurrently(const int64 block_size, const int64 total, 77 const std::function<void(int64, int64)>& fn); 78 79 // Returns the number of threads spawned by calling TransformRangeConcurrently 80 // with these parameters. 81 int NumShardsUsedByTransformRangeConcurrently(const int64 block_size, 82 const int64 total); 83 84 // ParallelFor shards the "total" units of work assuming each unit of work 85 // having roughly "cost_per_unit" cost, in cycles. Each unit of work is 86 // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work 87 // and the total cost of each shard is roughly the same. 88 // 89 // "cost_per_unit" is an estimate of the number of CPU cycles (or nanoseconds 90 // if not CPU-bound) to complete a unit of work. Overestimating creates too 91 // many shards and CPU time will be dominated by per-shard overhead, such as 92 // Context creation. Underestimating may not fully make use of the specified 93 // parallelism. 94 void ParallelFor(int64 total, int64 cost_per_unit, 95 std::function<void(int64, int64)> fn); 96 97 // Shards the "total" units of work. For more details, see "ParallelFor". 98 // 99 // The function is passed a thread_id between 0 and NumThreads() *inclusive*. 100 // This is because some work can happen on the caller thread while the threads 101 // in the pool are also being used. 102 // 103 // The caller can allocate NumThreads() + 1 separate buffers for each thread. 104 // Each thread can safely write to the buffer given by its id without 105 // synchronization. However, the worker fn may be called multiple times 106 // sequentially with the same id. 107 // 108 // At most NumThreads() unique ids will actually be used, and only a few may 109 // be used for small workloads. If each buffer is expensive, the buffers 110 // should be stored in an array initially filled with null, and a buffer 111 // should be allocated by fn the first time that the id is used. 112 void ParallelForWithWorkerId( 113 int64 total, int64 cost_per_unit, 114 const std::function<void(int64, int64, int)>& fn); 115 116 // Returns the number of threads in the pool. 117 int NumThreads() const; 118 119 // Returns current thread id between 0 and NumThreads() - 1, if called from a 120 // thread in the pool. Returns -1 otherwise. 121 int CurrentThreadId() const; 122 123 struct Impl; 124 125 private: 126 std::unique_ptr<Impl> impl_; 127 TF_DISALLOW_COPY_AND_ASSIGN(ThreadPool); 128 }; 129 130 } // namespace thread 131 } // namespace tensorflow 132 133 #endif // TENSORFLOW_CORE_LIB_CORE_THREADPOOL_H_ 134