• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *is % allowed in string
17  */
18 
19 #include <inttypes.h>
20 #include <ctime>
21 #include <memory>
22 #include <string>
23 
24 #include <gflags/gflags.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/port_platform.h>
27 #include <grpcpp/grpcpp.h>
28 
29 #include "src/cpp/thread_manager/thread_manager.h"
30 #include "test/cpp/util/test_config.h"
31 
32 namespace grpc {
33 
34 struct ThreadManagerTestSettings {
35   // The min number of pollers that SHOULD be active in ThreadManager
36   int min_pollers;
37   // The max number of pollers that could be active in ThreadManager
38   int max_pollers;
39   // The sleep duration in PollForWork() function to simulate "polling"
40   int poll_duration_ms;
41   // The sleep duration in DoWork() function to simulate "work"
42   int work_duration_ms;
43   // Max number of times PollForWork() is called before shutting down
44   int max_poll_calls;
45 };
46 
47 class ThreadManagerTest final : public grpc::ThreadManager {
48  public:
ThreadManagerTest(const char * name,grpc_resource_quota * rq,const ThreadManagerTestSettings & settings)49   ThreadManagerTest(const char* name, grpc_resource_quota* rq,
50                     const ThreadManagerTestSettings& settings)
51       : ThreadManager(name, rq, settings.min_pollers, settings.max_pollers),
52         settings_(settings),
53         num_do_work_(0),
54         num_poll_for_work_(0),
55         num_work_found_(0) {}
56 
57   grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override;
58   void DoWork(void* tag, bool ok, bool resources) override;
59 
60   // Get number of times PollForWork() returned WORK_FOUND
61   int GetNumWorkFound();
62   // Get number of times DoWork() was called
63   int GetNumDoWork();
64 
65  private:
66   void SleepForMs(int sleep_time_ms);
67 
68   ThreadManagerTestSettings settings_;
69 
70   // Counters
71   gpr_atm num_do_work_;        // Number of calls to DoWork
72   gpr_atm num_poll_for_work_;  // Number of calls to PollForWork
73   gpr_atm num_work_found_;     // Number of times WORK_FOUND was returned
74 };
75 
SleepForMs(int duration_ms)76 void ThreadManagerTest::SleepForMs(int duration_ms) {
77   gpr_timespec sleep_time =
78       gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
79                    gpr_time_from_millis(duration_ms, GPR_TIMESPAN));
80   gpr_sleep_until(sleep_time);
81 }
82 
PollForWork(void ** tag,bool * ok)83 grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void** tag,
84                                                                bool* ok) {
85   int call_num = gpr_atm_no_barrier_fetch_add(&num_poll_for_work_, 1);
86   if (call_num >= settings_.max_poll_calls) {
87     Shutdown();
88     return SHUTDOWN;
89   }
90 
91   SleepForMs(settings_.poll_duration_ms);  // Simulate "polling" duration
92   *tag = nullptr;
93   *ok = true;
94 
95   // Return timeout roughly 1 out of every 3 calls just to make the test a bit
96   // more interesting
97   if (call_num % 3 == 0) {
98     return TIMEOUT;
99   }
100 
101   gpr_atm_no_barrier_fetch_add(&num_work_found_, 1);
102   return WORK_FOUND;
103 }
104 
DoWork(void * tag,bool ok,bool resources)105 void ThreadManagerTest::DoWork(void* tag, bool ok, bool resources) {
106   gpr_atm_no_barrier_fetch_add(&num_do_work_, 1);
107   SleepForMs(settings_.work_duration_ms);  // Simulate work by sleeping
108 }
109 
GetNumWorkFound()110 int ThreadManagerTest::GetNumWorkFound() {
111   return static_cast<int>(gpr_atm_no_barrier_load(&num_work_found_));
112 }
113 
GetNumDoWork()114 int ThreadManagerTest::GetNumDoWork() {
115   return static_cast<int>(gpr_atm_no_barrier_load(&num_do_work_));
116 }
117 }  // namespace grpc
118 
119 // Test that the number of times DoWork() is called is equal to the number of
120 // times PollForWork() returned WORK_FOUND
TestPollAndWork()121 static void TestPollAndWork() {
122   grpc_resource_quota* rq = grpc_resource_quota_create("Test-poll-and-work");
123   grpc::ThreadManagerTestSettings settings = {
124       2 /* min_pollers */, 10 /* max_pollers */, 10 /* poll_duration_ms */,
125       1 /* work_duration_ms */, 50 /* max_poll_calls */};
126 
127   grpc::ThreadManagerTest test_thread_mgr("TestThreadManager", rq, settings);
128   grpc_resource_quota_unref(rq);
129 
130   test_thread_mgr.Initialize();  // Start the thread manager
131   test_thread_mgr.Wait();        // Wait for all threads to finish
132 
133   // Verify that The number of times DoWork() was called is equal to the number
134   // of times WORK_FOUND was returned
135   gpr_log(GPR_DEBUG, "DoWork() called %d times",
136           test_thread_mgr.GetNumDoWork());
137   GPR_ASSERT(test_thread_mgr.GetNumDoWork() ==
138              test_thread_mgr.GetNumWorkFound());
139 }
140 
TestThreadQuota()141 static void TestThreadQuota() {
142   const int kMaxNumThreads = 3;
143   grpc_resource_quota* rq = grpc_resource_quota_create("Test-thread-quota");
144   grpc_resource_quota_set_max_threads(rq, kMaxNumThreads);
145 
146   // Set work_duration_ms to be much greater than poll_duration_ms. This way,
147   // the thread manager will be forced to create more 'polling' threads to
148   // honor the min_pollers guarantee
149   grpc::ThreadManagerTestSettings settings = {
150       1 /* min_pollers */, 1 /* max_pollers */, 1 /* poll_duration_ms */,
151       10 /* work_duration_ms */, 50 /* max_poll_calls */};
152 
153   // Create two thread managers (but with same resource quota). This means
154   // that the max number of active threads across BOTH the thread managers
155   // cannot be greater than kMaxNumthreads
156   grpc::ThreadManagerTest test_thread_mgr_1("TestThreadManager-1", rq,
157                                             settings);
158   grpc::ThreadManagerTest test_thread_mgr_2("TestThreadManager-2", rq,
159                                             settings);
160   // It is ok to unref resource quota before starting thread managers.
161   grpc_resource_quota_unref(rq);
162 
163   // Start both thread managers
164   test_thread_mgr_1.Initialize();
165   test_thread_mgr_2.Initialize();
166 
167   // Wait for both to finish
168   test_thread_mgr_1.Wait();
169   test_thread_mgr_2.Wait();
170 
171   // Now verify that the total number of active threads in either thread manager
172   // never exceeds kMaxNumThreads
173   //
174   // NOTE: Actually the total active threads across *both* thread managers at
175   // any point of time never exceeds kMaxNumThreads but unfortunately there is
176   // no easy way to verify it (i.e we can't just do (max1 + max2 <= k))
177   // Its okay to not test this case here. The resource quota c-core tests
178   // provide enough coverage to resource quota object with multiple resource
179   // users
180   int max1 = test_thread_mgr_1.GetMaxActiveThreadsSoFar();
181   int max2 = test_thread_mgr_2.GetMaxActiveThreadsSoFar();
182   gpr_log(
183       GPR_DEBUG,
184       "MaxActiveThreads in TestThreadManager_1: %d, TestThreadManager_2: %d",
185       max1, max2);
186   GPR_ASSERT(max1 <= kMaxNumThreads && max2 <= kMaxNumThreads);
187 }
188 
main(int argc,char ** argv)189 int main(int argc, char** argv) {
190   std::srand(std::time(nullptr));
191   grpc::testing::InitTest(&argc, &argv, true);
192   grpc_init();
193 
194   TestPollAndWork();
195   TestThreadQuota();
196 
197   grpc_shutdown();
198   return 0;
199 }
200