1 /* Copyright 2019 Google LLC. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 // This file is a fork of gemmlowp's multi_thread_gemm.h, under Apache 2.0 17 // license. 18 19 #ifndef RUY_RUY_THREAD_POOL_H_ 20 #define RUY_RUY_THREAD_POOL_H_ 21 22 #include <vector> 23 24 #include "ruy/blocking_counter.h" 25 #include "ruy/time.h" 26 27 namespace ruy { 28 29 // A workload for a thread. 30 struct Task { ~TaskTask31 virtual ~Task() {} 32 virtual void Run() = 0; 33 }; 34 35 class Thread; 36 37 // A simple pool of threads, that only allows the very 38 // specific parallelization pattern that we use here: 39 // One thread, which we call the 'main thread', calls Execute, distributing 40 // a Task each to N threads, being N-1 'worker threads' and the main thread 41 // itself. After the main thread has completed its own Task, it waits for 42 // the worker threads to have all completed. That is the only synchronization 43 // performed by this ThreadPool. 44 // 45 // In particular, there is a naive 1:1 mapping of Tasks to threads. 46 // This ThreadPool considers it outside of its own scope to try to work 47 // with fewer threads than there are Tasks. The idea is that such N:M mappings 48 // of tasks to threads can be implemented as a higher-level feature on top of 49 // the present low-level 1:1 threadpool. For example, a user might have a 50 // Task subclass referencing a shared atomic counter indexing into a vector of 51 // finer-granularity subtasks. Different threads would then concurrently 52 // increment this atomic counter, getting each their own subtasks to work on. 53 // That approach is the one used in ruy's multi-thread matrix multiplication 54 // implementation --- see ruy's TrMulTask. 55 class ThreadPool { 56 public: ThreadPool()57 ThreadPool() {} 58 59 ~ThreadPool(); 60 61 // Executes task_count tasks on task_count threads. 62 // Grows the threadpool as needed to have at least (task_count-1) threads. 63 // The 0-th task is run on the thread on which Execute is called: that 64 // is by definition what we call the "main thread". Synchronization of all 65 // threads is performed before this function returns. 66 // 67 // As explained in the class comment, there is a 1:1 mapping of tasks to 68 // threads. If you need something smarter than that, for instance if you 69 // want to run an unbounded number of tasks on a bounded number of threads, 70 // then you need something higher-level than this ThreadPool, that can 71 // be layered on top of it by appropriately subclassing Tasks. 72 // 73 // TaskType must be a subclass of ruy::Task. That is implicitly guarded by 74 // the static_cast in this inline implementation. 75 template <typename TaskType> Execute(int task_count,TaskType * tasks)76 void Execute(int task_count, TaskType* tasks) { 77 ExecuteImpl(task_count, sizeof(TaskType), static_cast<Task*>(tasks)); 78 } 79 set_spin_milliseconds(float milliseconds)80 void set_spin_milliseconds(float milliseconds) { 81 spin_duration_ = DurationFromMilliseconds(milliseconds); 82 } 83 spin_milliseconds()84 float spin_milliseconds() const { 85 return ToFloatMilliseconds(spin_duration_); 86 } 87 88 private: 89 // Ensures that the pool has at least the given count of threads. 90 // If any new thread has to be created, this function waits for it to 91 // be ready. 92 void CreateThreads(int threads_count); 93 94 // Non-templatized implementation of the public Execute method. 95 // See the inline implementation of Execute for how this is used. 96 void ExecuteImpl(int task_count, int stride, Task* tasks); 97 98 // copy construction disallowed 99 ThreadPool(const ThreadPool&) = delete; 100 101 // The threads in this pool. They are owned by the pool: 102 // the pool creates threads and destroys them in its destructor. 103 std::vector<Thread*> threads_; 104 105 // The BlockingCounter used to wait for the threads. 106 BlockingCounter counter_to_decrement_when_ready_; 107 108 // This value was empirically derived with some microbenchmark, we don't have 109 // high confidence in it. 110 // 111 // That this value means that we may be sleeping substantially longer 112 // than a scheduler timeslice's duration is not necessarily surprising. The 113 // idea is to pick up quickly new work after having finished the previous 114 // workload. When it's new work within the same GEMM as the previous work, the 115 // time interval that we might be busy-waiting is very small, so for that 116 // purpose it would be more than enough to sleep for 1 ms. 117 // That is all what we would observe on a GEMM benchmark. However, in a real 118 // application, after having finished a GEMM, we might do unrelated work for 119 // a little while, then start on a new GEMM. In that case the wait interval 120 // may be a little longer. There may also not be another GEMM for a long time, 121 // in which case we'll end up passively waiting below. 122 Duration spin_duration_ = DurationFromMilliseconds(2); 123 }; 124 125 } // namespace ruy 126 127 #endif // RUY_RUY_THREAD_POOL_H_ 128