1 // Copyright 2023 The gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H 15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H 16 17 #include <grpc/support/cpu.h> 18 #include <grpc/support/port_platform.h> 19 20 #include <atomic> 21 #include <cstddef> 22 #include <cstdint> 23 #include <numeric> 24 #include <utility> 25 #include <vector> 26 27 #include "absl/base/thread_annotations.h" 28 #include "src/core/util/sync.h" 29 #include "src/core/util/time.h" 30 #include "src/core/util/useful.h" 31 32 namespace grpc_event_engine { 33 namespace experimental { 34 35 // Tracks counts across some fixed number of shards. 36 // It is intended for fast increment/decrement operations, but a slower overall 37 // count operation. 38 class BusyThreadCount { 39 public: 40 // Increments a per-shard counter on construction, decrements on destruction. 41 class AutoThreadCounter { 42 public: AutoThreadCounter(BusyThreadCount * counter,size_t idx)43 AutoThreadCounter(BusyThreadCount* counter, size_t idx) 44 : counter_(counter), idx_(idx) { 45 counter_->Increment(idx_); 46 } ~AutoThreadCounter()47 ~AutoThreadCounter() { 48 if (counter_ != nullptr) counter_->Decrement(idx_); 49 } 50 // not copyable 51 AutoThreadCounter(const AutoThreadCounter&) = delete; 52 AutoThreadCounter& operator=(const AutoThreadCounter&) = delete; 53 // moveable AutoThreadCounter(AutoThreadCounter && other)54 AutoThreadCounter(AutoThreadCounter&& other) noexcept { 55 counter_ = std::exchange(other.counter_, nullptr); 56 idx_ = other.idx_; 57 } 58 AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept { 59 counter_ = std::exchange(other.counter_, nullptr); 60 idx_ = other.idx_; 61 return *this; 62 } 63 64 private: 65 BusyThreadCount* counter_; 66 size_t idx_; 67 }; 68 BusyThreadCount()69 BusyThreadCount() : shards_(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 64u)) {} MakeAutoThreadCounter(size_t idx)70 AutoThreadCounter MakeAutoThreadCounter(size_t idx) { 71 return AutoThreadCounter(this, idx); 72 }; Increment(size_t idx)73 void Increment(size_t idx) { 74 shards_[idx].busy_count.fetch_add(1, std::memory_order_relaxed); 75 } Decrement(size_t idx)76 void Decrement(size_t idx) { 77 shards_[idx].busy_count.fetch_sub(1, std::memory_order_relaxed); 78 } count()79 size_t count() { 80 return std::accumulate( 81 shards_.begin(), shards_.end(), 0, [](size_t running, ShardedData& d) { 82 return running + d.busy_count.load(std::memory_order_relaxed); 83 }); 84 } 85 // Returns some valid index into the per-shard data, which is rotated on every 86 // call to distribute load and reduce contention. NextIndex()87 size_t NextIndex() { return next_idx_.fetch_add(1) % shards_.size(); } 88 89 private: 90 // We want to ensure that this data structure lands on different cachelines per 91 // cpu. With C++17 we can do so explicitly with an `alignas` specifier. Prior 92 // versions we can at best approximate it by padding the structure. It'll 93 // probably work out ok, but it's not guaranteed across allocators. 94 // TODO(ctiller): When we move to C++17 delete the duplicate definition. 95 #if __cplusplus >= 201703L 96 struct ShardedData { 97 std::atomic<size_t> busy_count{0}; 98 } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); 99 #else 100 struct ShardedDataHeader { 101 std::atomic<size_t> busy_count{0}; 102 }; 103 struct ShardedData : public ShardedDataHeader { 104 uint8_t padding[GPR_CACHELINE_SIZE - sizeof(ShardedDataHeader)]; 105 }; 106 #endif 107 108 std::vector<ShardedData> shards_; 109 std::atomic<size_t> next_idx_{0}; 110 }; 111 112 // Tracks the number of living threads. It is intended for a fast count 113 // operation, with relatively slower increment/decrement operations. 114 class LivingThreadCount { 115 public: 116 // Increments the global counter on construction, decrements on destruction. 117 class AutoThreadCounter { 118 public: AutoThreadCounter(LivingThreadCount * counter)119 explicit AutoThreadCounter(LivingThreadCount* counter) : counter_(counter) { 120 counter_->Increment(); 121 } ~AutoThreadCounter()122 ~AutoThreadCounter() { 123 if (counter_ != nullptr) counter_->Decrement(); 124 } 125 // not copyable 126 AutoThreadCounter(const AutoThreadCounter&) = delete; 127 AutoThreadCounter& operator=(const AutoThreadCounter&) = delete; 128 // moveable AutoThreadCounter(AutoThreadCounter && other)129 AutoThreadCounter(AutoThreadCounter&& other) noexcept { 130 counter_ = std::exchange(other.counter_, nullptr); 131 } 132 AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept { 133 counter_ = std::exchange(other.counter_, nullptr); 134 return *this; 135 } 136 137 private: 138 LivingThreadCount* counter_; 139 }; 140 MakeAutoThreadCounter()141 AutoThreadCounter MakeAutoThreadCounter() { return AutoThreadCounter(this); }; Increment()142 void Increment() ABSL_LOCKS_EXCLUDED(mu_) { 143 grpc_core::MutexLock lock(&mu_); 144 ++living_count_; 145 cv_.SignalAll(); 146 } Decrement()147 void Decrement() ABSL_LOCKS_EXCLUDED(mu_) { 148 grpc_core::MutexLock lock(&mu_); 149 --living_count_; 150 cv_.SignalAll(); 151 } 152 // Blocks the calling thread until the desired number of threads is reached. 153 // If the thread count does not change for some given `stuck_timeout` 154 // duration, this method returns error. If the thread count does change, the 155 // timeout clock is reset. 156 absl::Status BlockUntilThreadCount(size_t desired_threads, const char* why, 157 grpc_core::Duration stuck_timeout) 158 ABSL_LOCKS_EXCLUDED(mu_); count()159 size_t count() ABSL_LOCKS_EXCLUDED(mu_) { 160 grpc_core::MutexLock lock(&mu_); 161 return CountLocked(); 162 } 163 164 private: CountLocked()165 size_t CountLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 166 return living_count_; 167 } 168 size_t WaitForCountChange(size_t desired_threads, 169 grpc_core::Duration timeout); 170 171 grpc_core::Mutex mu_; 172 grpc_core::CondVar cv_ ABSL_GUARDED_BY(mu_); 173 size_t living_count_ ABSL_GUARDED_BY(mu_) = 0; 174 }; 175 176 } // namespace experimental 177 } // namespace grpc_event_engine 178 179 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H 180