1 /* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H 20 #define GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <grpc/grpc.h> 25 26 #include "src/core/lib/gprpp/thd.h" 27 #include "src/core/lib/iomgr/executor/mpmcqueue.h" 28 29 namespace grpc_core { 30 31 // A base abstract base class for threadpool. 32 // Threadpool is an executor that maintains a pool of threads sitting around 33 // and waiting for closures. A threadpool also maintains a queue of pending 34 // closures, when closures appearing in the queue, the threads in pool will 35 // pull them out and execute them. 36 class ThreadPoolInterface { 37 public: 38 // Waits for all pending closures to complete, then shuts down thread pool. ~ThreadPoolInterface()39 virtual ~ThreadPoolInterface() {} 40 41 // Schedules a given closure for execution later. 42 // Depending on specific subclass implementation, this routine might cause 43 // current thread to be blocked (in case of unable to schedule). 44 // Closure should contain a function pointer and arguments it will take, more 45 // details for closure struct at /grpc/include/grpc/impl/codegen/grpc_types.h 46 virtual void Add(grpc_experimental_completion_queue_functor* closure) = 0; 47 48 // Returns the current number of pending closures 49 virtual int num_pending_closures() const = 0; 50 51 // Returns the capacity of pool (number of worker threads in pool) 52 virtual int pool_capacity() const = 0; 53 54 // Thread option accessor 55 virtual const Thread::Options& thread_options() const = 0; 56 57 // Returns the thread name for threads in this ThreadPool. 58 virtual const char* thread_name() const = 0; 59 }; 60 61 // Worker thread for threadpool. Executes closures in the queue, until getting a 62 // NULL closure. 63 class ThreadPoolWorker { 64 public: ThreadPoolWorker(const char * thd_name,MPMCQueueInterface * queue,Thread::Options & options,int index)65 ThreadPoolWorker(const char* thd_name, MPMCQueueInterface* queue, 66 Thread::Options& options, int index) 67 : queue_(queue), thd_name_(thd_name), index_(index) { 68 thd_ = Thread( 69 thd_name, [](void* th) { static_cast<ThreadPoolWorker*>(th)->Run(); }, 70 this, nullptr, options); 71 } 72 ~ThreadPoolWorker()73 ~ThreadPoolWorker() {} 74 Start()75 void Start() { thd_.Start(); } Join()76 void Join() { thd_.Join(); } 77 78 private: 79 // struct for tracking stats of thread 80 struct Stats { 81 gpr_timespec sleep_time; StatsStats82 Stats() { sleep_time = gpr_time_0(GPR_TIMESPAN); } 83 }; 84 85 void Run(); // Pulls closures from queue and executes them 86 87 MPMCQueueInterface* queue_; // Queue in thread pool to pull closures from 88 Thread thd_; // Thread wrapped in 89 Stats stats_; // Stats to be collected in run time 90 const char* thd_name_; // Name of thread 91 int index_; // Index in thread pool 92 }; 93 94 // A fixed size thread pool implementation of abstract thread pool interface. 95 // In this implementation, the number of threads in pool is fixed, but the 96 // capacity of closure queue is unlimited. 97 class ThreadPool : public ThreadPoolInterface { 98 public: 99 // Creates a thread pool with size of "num_threads", with default thread name 100 // "ThreadPoolWorker" and all thread options set to default. If the given size 101 // is 0 or less, there will be 1 worker thread created inside pool. 102 explicit ThreadPool(int num_threads); 103 104 // Same as ThreadPool(int num_threads) constructor, except 105 // that it also sets "thd_name" as the name of all threads in the thread pool. 106 ThreadPool(int num_threads, const char* thd_name); 107 108 // Same as ThreadPool(const char *thd_name, int num_threads) constructor, 109 // except that is also set thread_options for threads. 110 // Notes for stack size: 111 // If the stack size field of the passed in Thread::Options is set to default 112 // value 0, default ThreadPool stack size will be used. The current default 113 // stack size of this implementation is 1952K for mobile platform and 64K for 114 // all others. 115 ThreadPool(int num_threads, const char* thd_name, 116 const Thread::Options& thread_options); 117 118 // Waits for all pending closures to complete, then shuts down thread pool. 119 ~ThreadPool() override; 120 121 // Adds given closure into pending queue immediately. Since closure queue has 122 // infinite length, this routine will not block. 123 void Add(grpc_experimental_completion_queue_functor* closure) override; 124 125 int num_pending_closures() const override; 126 int pool_capacity() const override; 127 const Thread::Options& thread_options() const override; 128 const char* thread_name() const override; 129 130 private: 131 int num_threads_ = 0; 132 const char* thd_name_ = nullptr; 133 Thread::Options thread_options_; 134 ThreadPoolWorker** threads_ = nullptr; // Array of worker threads 135 MPMCQueueInterface* queue_ = nullptr; // Closure queue 136 137 Atomic<bool> shut_down_{false}; // Destructor has been called if set to true 138 139 void SharedThreadPoolConstructor(); 140 // For ThreadPool, default stack size for mobile platform is 1952K. for other 141 // platforms is 64K. 142 size_t DefaultStackSize(); 143 // Internal Use Only for debug checking. 144 void AssertHasNotBeenShutDown(); 145 }; 146 147 } // namespace grpc_core 148 149 #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H */ 150