• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <benchmark/benchmark.h>
20 #include <grpc/grpc.h>
21 
22 #include <condition_variable>
23 #include <mutex>
24 
25 #include "src/core/lib/iomgr/executor/threadpool.h"
26 #include "test/core/util/test_config.h"
27 #include "test/cpp/microbenchmarks/helpers.h"
28 #include "test/cpp/util/test_config.h"
29 
30 namespace grpc {
31 namespace testing {
32 
33 // This helper class allows a thread to block for a pre-specified number of
34 // actions. BlockingCounter has an initial non-negative count on initialization.
35 // Each call to DecrementCount will decrease the count by 1. When making a call
36 // to Wait, if the count is greater than 0, the thread will be blocked, until
37 // the count reaches 0.
38 class BlockingCounter {
39  public:
BlockingCounter(int count)40   BlockingCounter(int count) : count_(count) {}
DecrementCount()41   void DecrementCount() {
42     std::lock_guard<std::mutex> l(mu_);
43     count_--;
44     if (count_ == 0) cv_.notify_all();
45   }
46 
Wait()47   void Wait() {
48     std::unique_lock<std::mutex> l(mu_);
49     while (count_ > 0) {
50       cv_.wait(l);
51     }
52   }
53 
54  private:
55   int count_;
56   std::mutex mu_;
57   std::condition_variable cv_;
58 };
59 
60 // This is a functor/closure class for threadpool microbenchmark.
61 // This functor (closure) class will add another functor into pool if the
62 // number passed in (num_add) is greater than 0. Otherwise, it will decrement
63 // the counter to indicate that task is finished. This functor will suicide at
64 // the end, therefore, no need for caller to do clean-ups.
65 class AddAnotherFunctor : public grpc_experimental_completion_queue_functor {
66  public:
AddAnotherFunctor(grpc_core::ThreadPool * pool,BlockingCounter * counter,int num_add)67   AddAnotherFunctor(grpc_core::ThreadPool* pool, BlockingCounter* counter,
68                     int num_add)
69       : pool_(pool), counter_(counter), num_add_(num_add) {
70     functor_run = &AddAnotherFunctor::Run;
71     inlineable = false;
72     internal_next = this;
73     internal_success = 0;
74   }
75   // When the functor gets to run in thread pool, it will take itself as first
76   // argument and internal_success as second one.
Run(grpc_experimental_completion_queue_functor * cb,int)77   static void Run(grpc_experimental_completion_queue_functor* cb, int /*ok*/) {
78     auto* callback = static_cast<AddAnotherFunctor*>(cb);
79     if (--callback->num_add_ > 0) {
80       callback->pool_->Add(new AddAnotherFunctor(
81           callback->pool_, callback->counter_, callback->num_add_));
82     } else {
83       callback->counter_->DecrementCount();
84     }
85     // Suicides.
86     delete callback;
87   }
88 
89  private:
90   grpc_core::ThreadPool* pool_;
91   BlockingCounter* counter_;
92   int num_add_;
93 };
94 
95 template <int kConcurrentFunctor>
ThreadPoolAddAnother(benchmark::State & state)96 static void ThreadPoolAddAnother(benchmark::State& state) {
97   const int num_iterations = state.range(0);
98   const int num_threads = state.range(1);
99   // Number of adds done by each closure.
100   const int num_add = num_iterations / kConcurrentFunctor;
101   grpc_core::ThreadPool pool(num_threads);
102   while (state.KeepRunningBatch(num_iterations)) {
103     BlockingCounter counter(kConcurrentFunctor);
104     for (int i = 0; i < kConcurrentFunctor; ++i) {
105       pool.Add(new AddAnotherFunctor(&pool, &counter, num_add));
106     }
107     counter.Wait();
108   }
109   state.SetItemsProcessed(state.iterations());
110 }
111 
112 // First pair of arguments is range for number of iterations (num_iterations).
113 // Second pair of arguments is range for thread pool size (num_threads).
114 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 1)->RangePair(524288, 524288, 1, 1024);
115 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 4)->RangePair(524288, 524288, 1, 1024);
116 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 8)->RangePair(524288, 524288, 1, 1024);
117 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 16)
118     ->RangePair(524288, 524288, 1, 1024);
119 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 32)
120     ->RangePair(524288, 524288, 1, 1024);
121 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 64)
122     ->RangePair(524288, 524288, 1, 1024);
123 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 128)
124     ->RangePair(524288, 524288, 1, 1024);
125 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 512)
126     ->RangePair(524288, 524288, 1, 1024);
127 BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 2048)
128     ->RangePair(524288, 524288, 1, 1024);
129 
130 // A functor class that will delete self on end of running.
131 class SuicideFunctorForAdd : public grpc_experimental_completion_queue_functor {
132  public:
SuicideFunctorForAdd(BlockingCounter * counter)133   SuicideFunctorForAdd(BlockingCounter* counter) : counter_(counter) {
134     functor_run = &SuicideFunctorForAdd::Run;
135     inlineable = false;
136     internal_next = this;
137     internal_success = 0;
138   }
139 
Run(grpc_experimental_completion_queue_functor * cb,int)140   static void Run(grpc_experimental_completion_queue_functor* cb, int /*ok*/) {
141     // On running, the first argument would be itself.
142     auto* callback = static_cast<SuicideFunctorForAdd*>(cb);
143     callback->counter_->DecrementCount();
144     delete callback;
145   }
146 
147  private:
148   BlockingCounter* counter_;
149 };
150 
151 // Performs the scenario of external thread(s) adding closures into pool.
BM_ThreadPoolExternalAdd(benchmark::State & state)152 static void BM_ThreadPoolExternalAdd(benchmark::State& state) {
153   static grpc_core::ThreadPool* external_add_pool = nullptr;
154   // Setup for each run of test.
155   if (state.thread_index == 0) {
156     const int num_threads = state.range(1);
157     external_add_pool = new grpc_core::ThreadPool(num_threads);
158   }
159   const int num_iterations = state.range(0) / state.threads;
160   while (state.KeepRunningBatch(num_iterations)) {
161     BlockingCounter counter(num_iterations);
162     for (int i = 0; i < num_iterations; ++i) {
163       external_add_pool->Add(new SuicideFunctorForAdd(&counter));
164     }
165     counter.Wait();
166   }
167 
168   // Teardown at the end of each test run.
169   if (state.thread_index == 0) {
170     state.SetItemsProcessed(state.range(0));
171     delete external_add_pool;
172   }
173 }
174 BENCHMARK(BM_ThreadPoolExternalAdd)
175     // First pair is range for number of iterations (num_iterations).
176     // Second pair is range for thread pool size (num_threads).
177     ->RangePair(524288, 524288, 1, 1024)
178     ->ThreadRange(1, 256);  // Concurrent external thread(s) up to 256
179 
180 // Functor (closure) that adds itself into pool repeatedly. By adding self, the
181 // overhead would be low and can measure the time of add more accurately.
182 class AddSelfFunctor : public grpc_experimental_completion_queue_functor {
183  public:
AddSelfFunctor(grpc_core::ThreadPool * pool,BlockingCounter * counter,int num_add)184   AddSelfFunctor(grpc_core::ThreadPool* pool, BlockingCounter* counter,
185                  int num_add)
186       : pool_(pool), counter_(counter), num_add_(num_add) {
187     functor_run = &AddSelfFunctor::Run;
188     inlineable = false;
189     internal_next = this;
190     internal_success = 0;
191   }
192   // When the functor gets to run in thread pool, it will take itself as first
193   // argument and internal_success as second one.
Run(grpc_experimental_completion_queue_functor * cb,int)194   static void Run(grpc_experimental_completion_queue_functor* cb, int /*ok*/) {
195     auto* callback = static_cast<AddSelfFunctor*>(cb);
196     if (--callback->num_add_ > 0) {
197       callback->pool_->Add(cb);
198     } else {
199       callback->counter_->DecrementCount();
200       // Suicides.
201       delete callback;
202     }
203   }
204 
205  private:
206   grpc_core::ThreadPool* pool_;
207   BlockingCounter* counter_;
208   int num_add_;
209 };
210 
211 template <int kConcurrentFunctor>
ThreadPoolAddSelf(benchmark::State & state)212 static void ThreadPoolAddSelf(benchmark::State& state) {
213   const int num_iterations = state.range(0);
214   const int num_threads = state.range(1);
215   // Number of adds done by each closure.
216   const int num_add = num_iterations / kConcurrentFunctor;
217   grpc_core::ThreadPool pool(num_threads);
218   while (state.KeepRunningBatch(num_iterations)) {
219     BlockingCounter counter(kConcurrentFunctor);
220     for (int i = 0; i < kConcurrentFunctor; ++i) {
221       pool.Add(new AddSelfFunctor(&pool, &counter, num_add));
222     }
223     counter.Wait();
224   }
225   state.SetItemsProcessed(state.iterations());
226 }
227 
228 // First pair of arguments is range for number of iterations (num_iterations).
229 // Second pair of arguments is range for thread pool size (num_threads).
230 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 1)->RangePair(524288, 524288, 1, 1024);
231 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 4)->RangePair(524288, 524288, 1, 1024);
232 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 8)->RangePair(524288, 524288, 1, 1024);
233 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 16)->RangePair(524288, 524288, 1, 1024);
234 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 32)->RangePair(524288, 524288, 1, 1024);
235 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 64)->RangePair(524288, 524288, 1, 1024);
236 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 128)->RangePair(524288, 524288, 1, 1024);
237 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 512)->RangePair(524288, 524288, 1, 1024);
238 BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 2048)->RangePair(524288, 524288, 1, 1024);
239 
240 #if defined(__GNUC__) && !defined(SWIG)
241 #if defined(__i386__) || defined(__x86_64__)
242 #define CACHELINE_SIZE 64
243 #elif defined(__powerpc64__)
244 #define CACHELINE_SIZE 128
245 #elif defined(__aarch64__)
246 #define CACHELINE_SIZE 64
247 #elif defined(__arm__)
248 #if defined(__ARM_ARCH_5T__)
249 #define CACHELINE_SIZE 32
250 #elif defined(__ARM_ARCH_7A__)
251 #define CACHELINE_SIZE 64
252 #endif
253 #endif
254 #ifndef CACHELINE_SIZE
255 #define CACHELINE_SIZE 64
256 #endif
257 #endif
258 
259 // A functor (closure) that simulates closures with small but non-trivial amount
260 // of work.
261 class ShortWorkFunctorForAdd
262     : public grpc_experimental_completion_queue_functor {
263  public:
264   BlockingCounter* counter_;
265 
ShortWorkFunctorForAdd()266   ShortWorkFunctorForAdd() {
267     functor_run = &ShortWorkFunctorForAdd::Run;
268     inlineable = false;
269     internal_next = this;
270     internal_success = 0;
271     val_ = 0;
272   }
Run(grpc_experimental_completion_queue_functor * cb,int)273   static void Run(grpc_experimental_completion_queue_functor* cb, int /*ok*/) {
274     auto* callback = static_cast<ShortWorkFunctorForAdd*>(cb);
275     // Uses pad to avoid compiler complaining unused variable error.
276     callback->pad[0] = 0;
277     for (int i = 0; i < 1000; ++i) {
278       callback->val_++;
279     }
280     callback->counter_->DecrementCount();
281   }
282 
283  private:
284   char pad[CACHELINE_SIZE];
285   volatile int val_;
286 };
287 
288 // Simulates workloads where many short running callbacks are added to the
289 // threadpool. The callbacks are not enough to keep all the workers busy
290 // continuously so the number of workers running changes overtime.
291 //
292 // In effect this tests how well the threadpool avoids spurious wakeups.
BM_SpikyLoad(benchmark::State & state)293 static void BM_SpikyLoad(benchmark::State& state) {
294   const int num_threads = state.range(0);
295 
296   const int kNumSpikes = 1000;
297   const int batch_size = 3 * num_threads;
298   std::vector<ShortWorkFunctorForAdd> work_vector(batch_size);
299   grpc_core::ThreadPool pool(num_threads);
300   while (state.KeepRunningBatch(kNumSpikes * batch_size)) {
301     for (int i = 0; i != kNumSpikes; ++i) {
302       BlockingCounter counter(batch_size);
303       for (auto& w : work_vector) {
304         w.counter_ = &counter;
305         pool.Add(&w);
306       }
307       counter.Wait();
308     }
309   }
310   state.SetItemsProcessed(state.iterations() * batch_size);
311 }
312 BENCHMARK(BM_SpikyLoad)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16);
313 
314 }  // namespace testing
315 }  // namespace grpc
316 
317 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
318 // and others do not. This allows us to support both modes.
319 namespace benchmark {
RunTheBenchmarksNamespaced()320 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
321 }  // namespace benchmark
322 
main(int argc,char * argv[])323 int main(int argc, char* argv[]) {
324   grpc::testing::TestEnvironment env(argc, argv);
325   LibraryInitializer libInit;
326   ::benchmark::Initialize(&argc, argv);
327   ::grpc::testing::InitTest(&argc, &argv, false);
328   benchmark::RunTheBenchmarksNamespaced();
329   return 0;
330 }
331