• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H
15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H
16 
17 #include <grpc/support/cpu.h>
18 #include <grpc/support/port_platform.h>
19 
20 #include <atomic>
21 #include <cstddef>
22 #include <cstdint>
23 #include <numeric>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/base/thread_annotations.h"
28 #include "src/core/util/sync.h"
29 #include "src/core/util/time.h"
30 #include "src/core/util/useful.h"
31 
32 namespace grpc_event_engine {
33 namespace experimental {
34 
35 // Tracks counts across some fixed number of shards.
36 // It is intended for fast increment/decrement operations, but a slower overall
37 // count operation.
38 class BusyThreadCount {
39  public:
40   // Increments a per-shard counter on construction, decrements on destruction.
41   class AutoThreadCounter {
42    public:
AutoThreadCounter(BusyThreadCount * counter,size_t idx)43     AutoThreadCounter(BusyThreadCount* counter, size_t idx)
44         : counter_(counter), idx_(idx) {
45       counter_->Increment(idx_);
46     }
~AutoThreadCounter()47     ~AutoThreadCounter() {
48       if (counter_ != nullptr) counter_->Decrement(idx_);
49     }
50     // not copyable
51     AutoThreadCounter(const AutoThreadCounter&) = delete;
52     AutoThreadCounter& operator=(const AutoThreadCounter&) = delete;
53     // moveable
AutoThreadCounter(AutoThreadCounter && other)54     AutoThreadCounter(AutoThreadCounter&& other) noexcept {
55       counter_ = std::exchange(other.counter_, nullptr);
56       idx_ = other.idx_;
57     }
58     AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept {
59       counter_ = std::exchange(other.counter_, nullptr);
60       idx_ = other.idx_;
61       return *this;
62     }
63 
64    private:
65     BusyThreadCount* counter_;
66     size_t idx_;
67   };
68 
BusyThreadCount()69   BusyThreadCount() : shards_(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 64u)) {}
MakeAutoThreadCounter(size_t idx)70   AutoThreadCounter MakeAutoThreadCounter(size_t idx) {
71     return AutoThreadCounter(this, idx);
72   };
Increment(size_t idx)73   void Increment(size_t idx) {
74     shards_[idx].busy_count.fetch_add(1, std::memory_order_relaxed);
75   }
Decrement(size_t idx)76   void Decrement(size_t idx) {
77     shards_[idx].busy_count.fetch_sub(1, std::memory_order_relaxed);
78   }
count()79   size_t count() {
80     return std::accumulate(
81         shards_.begin(), shards_.end(), 0, [](size_t running, ShardedData& d) {
82           return running + d.busy_count.load(std::memory_order_relaxed);
83         });
84   }
85   // Returns some valid index into the per-shard data, which is rotated on every
86   // call to distribute load and reduce contention.
NextIndex()87   size_t NextIndex() { return next_idx_.fetch_add(1) % shards_.size(); }
88 
89  private:
90 // We want to ensure that this data structure lands on different cachelines per
91 // cpu. With C++17 we can do so explicitly with an `alignas` specifier. Prior
92 // versions we can at best approximate it by padding the structure. It'll
93 // probably work out ok, but it's not guaranteed across allocators.
94 // TODO(ctiller): When we move to C++17 delete the duplicate definition.
95 #if __cplusplus >= 201703L
96   struct ShardedData {
97     std::atomic<size_t> busy_count{0};
98   } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
99 #else
100   struct ShardedDataHeader {
101     std::atomic<size_t> busy_count{0};
102   };
103   struct ShardedData : public ShardedDataHeader {
104     uint8_t padding[GPR_CACHELINE_SIZE - sizeof(ShardedDataHeader)];
105   };
106 #endif
107 
108   std::vector<ShardedData> shards_;
109   std::atomic<size_t> next_idx_{0};
110 };
111 
112 // Tracks the number of living threads. It is intended for a fast count
113 // operation, with relatively slower increment/decrement operations.
114 class LivingThreadCount {
115  public:
116   // Increments the global counter on construction, decrements on destruction.
117   class AutoThreadCounter {
118    public:
AutoThreadCounter(LivingThreadCount * counter)119     explicit AutoThreadCounter(LivingThreadCount* counter) : counter_(counter) {
120       counter_->Increment();
121     }
~AutoThreadCounter()122     ~AutoThreadCounter() {
123       if (counter_ != nullptr) counter_->Decrement();
124     }
125     // not copyable
126     AutoThreadCounter(const AutoThreadCounter&) = delete;
127     AutoThreadCounter& operator=(const AutoThreadCounter&) = delete;
128     // moveable
AutoThreadCounter(AutoThreadCounter && other)129     AutoThreadCounter(AutoThreadCounter&& other) noexcept {
130       counter_ = std::exchange(other.counter_, nullptr);
131     }
132     AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept {
133       counter_ = std::exchange(other.counter_, nullptr);
134       return *this;
135     }
136 
137    private:
138     LivingThreadCount* counter_;
139   };
140 
MakeAutoThreadCounter()141   AutoThreadCounter MakeAutoThreadCounter() { return AutoThreadCounter(this); };
Increment()142   void Increment() ABSL_LOCKS_EXCLUDED(mu_) {
143     grpc_core::MutexLock lock(&mu_);
144     ++living_count_;
145     cv_.SignalAll();
146   }
Decrement()147   void Decrement() ABSL_LOCKS_EXCLUDED(mu_) {
148     grpc_core::MutexLock lock(&mu_);
149     --living_count_;
150     cv_.SignalAll();
151   }
152   // Blocks the calling thread until the desired number of threads is reached.
153   // If the thread count does not change for some given `stuck_timeout`
154   // duration, this method returns error. If the thread count does change, the
155   // timeout clock is reset.
156   absl::Status BlockUntilThreadCount(size_t desired_threads, const char* why,
157                                      grpc_core::Duration stuck_timeout)
158       ABSL_LOCKS_EXCLUDED(mu_);
count()159   size_t count() ABSL_LOCKS_EXCLUDED(mu_) {
160     grpc_core::MutexLock lock(&mu_);
161     return CountLocked();
162   }
163 
164  private:
CountLocked()165   size_t CountLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
166     return living_count_;
167   }
168   size_t WaitForCountChange(size_t desired_threads,
169                             grpc_core::Duration timeout);
170 
171   grpc_core::Mutex mu_;
172   grpc_core::CondVar cv_ ABSL_GUARDED_BY(mu_);
173   size_t living_count_ ABSL_GUARDED_BY(mu_) = 0;
174 };
175 
176 }  // namespace experimental
177 }  // namespace grpc_event_engine
178 
179 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H
180