1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 // is % allowed in string
17 //
18
19 #include "src/cpp/thread_manager/thread_manager.h"
20
21 #include <grpc/support/port_platform.h>
22 #include <grpcpp/grpcpp.h>
23 #include <gtest/gtest.h>
24
25 #include <atomic>
26 #include <chrono>
27 #include <climits>
28 #include <memory>
29 #include <thread>
30
31 #include "absl/log/log.h"
32 #include "src/core/util/crash.h"
33 #include "test/core/test_util/test_config.h"
34
35 namespace grpc {
36 namespace {
37
38 struct TestThreadManagerSettings {
39 // The min number of pollers that SHOULD be active in ThreadManager
40 int min_pollers;
41
42 // The max number of pollers that could be active in ThreadManager
43 int max_pollers;
44
45 // The sleep duration in PollForWork() function to simulate "polling"
46 int poll_duration_ms;
47
48 // The sleep duration in DoWork() function to simulate "work"
49 int work_duration_ms;
50
51 // Max number of times PollForWork() is called before shutting down
52 int max_poll_calls;
53
54 // The thread limit (for use in resource quote)
55 int thread_limit;
56
57 // How many should be instantiated
58 int thread_manager_count;
59 };
60
61 class TestThreadManager final : public grpc::ThreadManager {
62 public:
TestThreadManager(const char * name,grpc_resource_quota * rq,const TestThreadManagerSettings & settings)63 TestThreadManager(const char* name, grpc_resource_quota* rq,
64 const TestThreadManagerSettings& settings)
65 : ThreadManager(name, rq, settings.min_pollers, settings.max_pollers),
66 settings_(settings),
67 num_do_work_(0),
68 num_poll_for_work_(0),
69 num_work_found_(0) {}
70
71 grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override;
DoWork(void *,bool,bool)72 void DoWork(void* /* tag */, bool /*ok*/, bool /*resources*/) override {
73 num_do_work_.fetch_add(1, std::memory_order_relaxed);
74
75 // Simulate work by sleeping
76 std::this_thread::sleep_for(
77 std::chrono::milliseconds(settings_.work_duration_ms));
78 }
79
80 // Get number of times PollForWork() was called
num_poll_for_work() const81 int num_poll_for_work() const {
82 return num_poll_for_work_.load(std::memory_order_relaxed);
83 }
84 // Get number of times PollForWork() returned WORK_FOUND
num_work_found() const85 int num_work_found() const {
86 return num_work_found_.load(std::memory_order_relaxed);
87 }
88 // Get number of times DoWork() was called
num_do_work() const89 int num_do_work() const {
90 return num_do_work_.load(std::memory_order_relaxed);
91 }
92
93 private:
94 TestThreadManagerSettings settings_;
95
96 // Counters
97 std::atomic_int num_do_work_; // Number of calls to DoWork
98 std::atomic_int num_poll_for_work_; // Number of calls to PollForWork
99 std::atomic_int num_work_found_; // Number of times WORK_FOUND was returned
100 };
101
PollForWork(void ** tag,bool * ok)102 grpc::ThreadManager::WorkStatus TestThreadManager::PollForWork(void** tag,
103 bool* ok) {
104 int call_num = num_poll_for_work_.fetch_add(1, std::memory_order_relaxed);
105 if (call_num >= settings_.max_poll_calls) {
106 Shutdown();
107 return SHUTDOWN;
108 }
109
110 // Simulate "polling" duration
111 std::this_thread::sleep_for(
112 std::chrono::milliseconds(settings_.poll_duration_ms));
113 *tag = nullptr;
114 *ok = true;
115
116 // Return timeout roughly 1 out of every 3 calls just to make the test a bit
117 // more interesting
118 if (call_num % 3 == 0) {
119 return TIMEOUT;
120 }
121
122 num_work_found_.fetch_add(1, std::memory_order_relaxed);
123 return WORK_FOUND;
124 }
125
126 class ThreadManagerTest
127 : public ::testing::TestWithParam<TestThreadManagerSettings> {
128 protected:
SetUp()129 void SetUp() override {
130 grpc_resource_quota* rq = grpc_resource_quota_create("Thread manager test");
131 if (GetParam().thread_limit > 0) {
132 grpc_resource_quota_set_max_threads(rq, GetParam().thread_limit);
133 }
134 for (int i = 0; i < GetParam().thread_manager_count; i++) {
135 thread_manager_.emplace_back(
136 new TestThreadManager("TestThreadManager", rq, GetParam()));
137 }
138 grpc_resource_quota_unref(rq);
139 for (auto& tm : thread_manager_) {
140 tm->Initialize();
141 }
142 for (auto& tm : thread_manager_) {
143 tm->Wait();
144 }
145 }
146
147 std::vector<std::unique_ptr<TestThreadManager>> thread_manager_;
148 };
149
150 TestThreadManagerSettings scenarios[] = {
151 {2 /* min_pollers */, 10 /* max_pollers */, 10 /* poll_duration_ms */,
152 1 /* work_duration_ms */, 50 /* max_poll_calls */,
153 INT_MAX /* thread_limit */, 1 /* thread_manager_count */},
154 {1 /* min_pollers */, 1 /* max_pollers */, 1 /* poll_duration_ms */,
155 10 /* work_duration_ms */, 50 /* max_poll_calls */, 3 /* thread_limit */,
156 2 /* thread_manager_count */}};
157
158 INSTANTIATE_TEST_SUITE_P(ThreadManagerTest, ThreadManagerTest,
159 ::testing::ValuesIn(scenarios));
160
TEST_P(ThreadManagerTest,TestPollAndWork)161 TEST_P(ThreadManagerTest, TestPollAndWork) {
162 for (auto& tm : thread_manager_) {
163 // Verify that The number of times DoWork() was called is equal to the
164 // number of times WORK_FOUND was returned
165 VLOG(2) << "DoWork() called " << tm->num_do_work() << " times";
166 EXPECT_GE(tm->num_poll_for_work(), GetParam().max_poll_calls);
167 EXPECT_EQ(tm->num_do_work(), tm->num_work_found());
168 }
169 }
170
TEST_P(ThreadManagerTest,TestThreadQuota)171 TEST_P(ThreadManagerTest, TestThreadQuota) {
172 if (GetParam().thread_limit > 0) {
173 for (auto& tm : thread_manager_) {
174 EXPECT_GE(tm->num_poll_for_work(), GetParam().max_poll_calls);
175 EXPECT_LE(tm->GetMaxActiveThreadsSoFar(), GetParam().thread_limit);
176 }
177 }
178 }
179
180 } // namespace
181 } // namespace grpc
182
main(int argc,char ** argv)183 int main(int argc, char** argv) {
184 std::srand(std::time(nullptr));
185 grpc::testing::TestEnvironment env(&argc, argv);
186 ::testing::InitGoogleTest(&argc, argv);
187
188 grpc_init();
189 auto ret = RUN_ALL_TESTS();
190 grpc_shutdown();
191
192 return ret;
193 }
194