• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *is % allowed in string
17  */
18 
19 #include <atomic>
20 #include <chrono>
21 #include <climits>
22 #include <memory>
23 #include <thread>
24 
25 #include <gflags/gflags.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/port_platform.h>
28 #include <grpcpp/grpcpp.h>
29 
30 #include "src/cpp/thread_manager/thread_manager.h"
31 #include "test/core/util/test_config.h"
32 
33 #include <gtest/gtest.h>
34 
35 namespace grpc {
36 namespace {
37 
38 struct TestThreadManagerSettings {
39   // The min number of pollers that SHOULD be active in ThreadManager
40   int min_pollers;
41 
42   // The max number of pollers that could be active in ThreadManager
43   int max_pollers;
44 
45   // The sleep duration in PollForWork() function to simulate "polling"
46   int poll_duration_ms;
47 
48   // The sleep duration in DoWork() function to simulate "work"
49   int work_duration_ms;
50 
51   // Max number of times PollForWork() is called before shutting down
52   int max_poll_calls;
53 
54   // The thread limit (for use in resource quote)
55   int thread_limit;
56 
57   // How many should be instantiated
58   int thread_manager_count;
59 };
60 
61 class TestThreadManager final : public grpc::ThreadManager {
62  public:
TestThreadManager(const char * name,grpc_resource_quota * rq,const TestThreadManagerSettings & settings)63   TestThreadManager(const char* name, grpc_resource_quota* rq,
64                     const TestThreadManagerSettings& settings)
65       : ThreadManager(name, rq, settings.min_pollers, settings.max_pollers),
66         settings_(settings),
67         num_do_work_(0),
68         num_poll_for_work_(0),
69         num_work_found_(0) {}
70 
71   grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override;
DoWork(void *,bool,bool)72   void DoWork(void* /* tag */, bool /*ok*/, bool /*resources*/) override {
73     num_do_work_.fetch_add(1, std::memory_order_relaxed);
74 
75     // Simulate work by sleeping
76     std::this_thread::sleep_for(
77         std::chrono::milliseconds(settings_.work_duration_ms));
78   }
79 
80   // Get number of times PollForWork() was called
num_poll_for_work() const81   int num_poll_for_work() const {
82     return num_poll_for_work_.load(std::memory_order_relaxed);
83   }
84   // Get number of times PollForWork() returned WORK_FOUND
num_work_found() const85   int num_work_found() const {
86     return num_work_found_.load(std::memory_order_relaxed);
87   }
88   // Get number of times DoWork() was called
num_do_work() const89   int num_do_work() const {
90     return num_do_work_.load(std::memory_order_relaxed);
91   }
92 
93  private:
94   TestThreadManagerSettings settings_;
95 
96   // Counters
97   std::atomic_int num_do_work_;        // Number of calls to DoWork
98   std::atomic_int num_poll_for_work_;  // Number of calls to PollForWork
99   std::atomic_int num_work_found_;  // Number of times WORK_FOUND was returned
100 };
101 
PollForWork(void ** tag,bool * ok)102 grpc::ThreadManager::WorkStatus TestThreadManager::PollForWork(void** tag,
103                                                                bool* ok) {
104   int call_num = num_poll_for_work_.fetch_add(1, std::memory_order_relaxed);
105   if (call_num >= settings_.max_poll_calls) {
106     Shutdown();
107     return SHUTDOWN;
108   }
109 
110   // Simulate "polling" duration
111   std::this_thread::sleep_for(
112       std::chrono::milliseconds(settings_.poll_duration_ms));
113   *tag = nullptr;
114   *ok = true;
115 
116   // Return timeout roughly 1 out of every 3 calls just to make the test a bit
117   // more interesting
118   if (call_num % 3 == 0) {
119     return TIMEOUT;
120   }
121 
122   num_work_found_.fetch_add(1, std::memory_order_relaxed);
123   return WORK_FOUND;
124 }
125 
126 class ThreadManagerTest
127     : public ::testing::TestWithParam<TestThreadManagerSettings> {
128  protected:
SetUp()129   void SetUp() override {
130     grpc_resource_quota* rq = grpc_resource_quota_create("Thread manager test");
131     if (GetParam().thread_limit > 0) {
132       grpc_resource_quota_set_max_threads(rq, GetParam().thread_limit);
133     }
134     for (int i = 0; i < GetParam().thread_manager_count; i++) {
135       thread_manager_.emplace_back(
136           new TestThreadManager("TestThreadManager", rq, GetParam()));
137     }
138     grpc_resource_quota_unref(rq);
139     for (auto& tm : thread_manager_) {
140       tm->Initialize();
141     }
142     for (auto& tm : thread_manager_) {
143       tm->Wait();
144     }
145   }
146 
147   std::vector<std::unique_ptr<TestThreadManager>> thread_manager_;
148 };
149 
150 TestThreadManagerSettings scenarios[] = {
151     {2 /* min_pollers */, 10 /* max_pollers */, 10 /* poll_duration_ms */,
152      1 /* work_duration_ms */, 50 /* max_poll_calls */,
153      INT_MAX /* thread_limit */, 1 /* thread_manager_count */},
154     {1 /* min_pollers */, 1 /* max_pollers */, 1 /* poll_duration_ms */,
155      10 /* work_duration_ms */, 50 /* max_poll_calls */, 3 /* thread_limit */,
156      2 /* thread_manager_count */}};
157 
158 INSTANTIATE_TEST_SUITE_P(ThreadManagerTest, ThreadManagerTest,
159                          ::testing::ValuesIn(scenarios));
160 
TEST_P(ThreadManagerTest,TestPollAndWork)161 TEST_P(ThreadManagerTest, TestPollAndWork) {
162   for (auto& tm : thread_manager_) {
163     // Verify that The number of times DoWork() was called is equal to the
164     // number of times WORK_FOUND was returned
165     gpr_log(GPR_DEBUG, "DoWork() called %d times", tm->num_do_work());
166     EXPECT_GE(tm->num_poll_for_work(), GetParam().max_poll_calls);
167     EXPECT_EQ(tm->num_do_work(), tm->num_work_found());
168   }
169 }
170 
TEST_P(ThreadManagerTest,TestThreadQuota)171 TEST_P(ThreadManagerTest, TestThreadQuota) {
172   if (GetParam().thread_limit > 0) {
173     for (auto& tm : thread_manager_) {
174       EXPECT_GE(tm->num_poll_for_work(), GetParam().max_poll_calls);
175       EXPECT_LE(tm->GetMaxActiveThreadsSoFar(), GetParam().thread_limit);
176     }
177   }
178 }
179 
180 }  // namespace
181 }  // namespace grpc
182 
main(int argc,char ** argv)183 int main(int argc, char** argv) {
184   std::srand(std::time(nullptr));
185   grpc::testing::TestEnvironment env(argc, argv);
186   ::testing::InitGoogleTest(&argc, argv);
187 
188   grpc_init();
189   auto ret = RUN_ALL_TESTS();
190   grpc_shutdown();
191 
192   return ret;
193 }
194