1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <cinttypes>
20 #include <mutex>
21 #include <thread>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/time.h>
25 #include <grpcpp/channel.h>
26 #include <grpcpp/client_context.h>
27 #include <grpcpp/create_channel.h>
28 #include <grpcpp/resource_quota.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/server_context.h>
32
33 #include "src/core/lib/surface/api_trace.h"
34 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
35 #include "src/proto/grpc/testing/echo.grpc.pb.h"
36 #include "test/core/util/port.h"
37 #include "test/core/util/test_config.h"
38
39 #include <gtest/gtest.h>
40
41 using grpc::testing::EchoRequest;
42 using grpc::testing::EchoResponse;
43 using std::chrono::system_clock;
44
45 const int kNumThreads = 100; // Number of threads
46 const int kNumAsyncSendThreads = 2;
47 const int kNumAsyncReceiveThreads = 50;
48 const int kNumAsyncServerThreads = 50;
49 const int kNumRpcs = 1000; // Number of RPCs per thread
50
51 namespace grpc {
52 namespace testing {
53
54 class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
55 public:
TestServiceImpl()56 TestServiceImpl() {}
57
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)58 Status Echo(ServerContext* context, const EchoRequest* request,
59 EchoResponse* response) override {
60 response->set_message(request->message());
61 return Status::OK;
62 }
63 };
64
65 template <class Service>
66 class CommonStressTest {
67 public:
CommonStressTest()68 CommonStressTest() : kMaxMessageSize_(8192) {}
~CommonStressTest()69 virtual ~CommonStressTest() {}
70 virtual void SetUp() = 0;
71 virtual void TearDown() = 0;
72 virtual void ResetStub() = 0;
73 virtual bool AllowExhaustion() = 0;
GetStub()74 grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
75
76 protected:
77 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
78 std::unique_ptr<Server> server_;
79
80 virtual void SetUpStart(ServerBuilder* builder, Service* service) = 0;
SetUpStartCommon(ServerBuilder * builder,Service * service)81 void SetUpStartCommon(ServerBuilder* builder, Service* service) {
82 builder->RegisterService(service);
83 builder->SetMaxMessageSize(
84 kMaxMessageSize_); // For testing max message size.
85 }
SetUpEnd(ServerBuilder * builder)86 void SetUpEnd(ServerBuilder* builder) { server_ = builder->BuildAndStart(); }
TearDownStart()87 void TearDownStart() { server_->Shutdown(); }
TearDownEnd()88 void TearDownEnd() {}
89
90 private:
91 const int kMaxMessageSize_;
92 };
93
94 template <class Service>
95 class CommonStressTestInsecure : public CommonStressTest<Service> {
96 public:
ResetStub()97 void ResetStub() override {
98 std::shared_ptr<Channel> channel =
99 CreateChannel(server_address_.str(), InsecureChannelCredentials());
100 this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
101 }
AllowExhaustion()102 bool AllowExhaustion() override { return false; }
103
104 protected:
SetUpStart(ServerBuilder * builder,Service * service)105 void SetUpStart(ServerBuilder* builder, Service* service) override {
106 int port = grpc_pick_unused_port_or_die();
107 this->server_address_ << "localhost:" << port;
108 // Setup server
109 builder->AddListeningPort(server_address_.str(),
110 InsecureServerCredentials());
111 this->SetUpStartCommon(builder, service);
112 }
113
114 private:
115 std::ostringstream server_address_;
116 };
117
118 template <class Service, bool allow_resource_exhaustion>
119 class CommonStressTestInproc : public CommonStressTest<Service> {
120 public:
ResetStub()121 void ResetStub() override {
122 ChannelArguments args;
123 std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args);
124 this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
125 }
AllowExhaustion()126 bool AllowExhaustion() override { return allow_resource_exhaustion; }
127
128 protected:
SetUpStart(ServerBuilder * builder,Service * service)129 void SetUpStart(ServerBuilder* builder, Service* service) override {
130 this->SetUpStartCommon(builder, service);
131 }
132 };
133
134 template <class BaseClass>
135 class CommonStressTestSyncServer : public BaseClass {
136 public:
SetUp()137 void SetUp() override {
138 ServerBuilder builder;
139 this->SetUpStart(&builder, &service_);
140 this->SetUpEnd(&builder);
141 }
TearDown()142 void TearDown() override {
143 this->TearDownStart();
144 this->TearDownEnd();
145 }
146
147 private:
148 TestServiceImpl service_;
149 };
150
151 template <class BaseClass>
152 class CommonStressTestSyncServerLowThreadCount : public BaseClass {
153 public:
SetUp()154 void SetUp() override {
155 ServerBuilder builder;
156 ResourceQuota quota;
157 this->SetUpStart(&builder, &service_);
158 quota.SetMaxThreads(4);
159 builder.SetResourceQuota(quota);
160 this->SetUpEnd(&builder);
161 }
TearDown()162 void TearDown() override {
163 this->TearDownStart();
164 this->TearDownEnd();
165 }
166
167 private:
168 TestServiceImpl service_;
169 };
170
171 template <class BaseClass>
172 class CommonStressTestAsyncServer : public BaseClass {
173 public:
CommonStressTestAsyncServer()174 CommonStressTestAsyncServer() : contexts_(kNumAsyncServerThreads * 100) {}
SetUp()175 void SetUp() override {
176 shutting_down_ = false;
177 ServerBuilder builder;
178 this->SetUpStart(&builder, &service_);
179 cq_ = builder.AddCompletionQueue();
180 this->SetUpEnd(&builder);
181 for (int i = 0; i < kNumAsyncServerThreads * 100; i++) {
182 RefreshContext(i);
183 }
184 for (int i = 0; i < kNumAsyncServerThreads; i++) {
185 server_threads_.emplace_back(&CommonStressTestAsyncServer::ProcessRpcs,
186 this);
187 }
188 }
TearDown()189 void TearDown() override {
190 {
191 std::unique_lock<std::mutex> l(mu_);
192 this->TearDownStart();
193 shutting_down_ = true;
194 cq_->Shutdown();
195 }
196
197 for (int i = 0; i < kNumAsyncServerThreads; i++) {
198 server_threads_[i].join();
199 }
200
201 void* ignored_tag;
202 bool ignored_ok;
203 while (cq_->Next(&ignored_tag, &ignored_ok))
204 ;
205 this->TearDownEnd();
206 }
207
208 private:
ProcessRpcs()209 void ProcessRpcs() {
210 void* tag;
211 bool ok;
212 while (cq_->Next(&tag, &ok)) {
213 if (ok) {
214 int i = static_cast<int>(reinterpret_cast<intptr_t>(tag));
215 switch (contexts_[i].state) {
216 case Context::READY: {
217 contexts_[i].state = Context::DONE;
218 EchoResponse send_response;
219 send_response.set_message(contexts_[i].recv_request.message());
220 contexts_[i].response_writer->Finish(send_response, Status::OK,
221 tag);
222 break;
223 }
224 case Context::DONE:
225 RefreshContext(i);
226 break;
227 }
228 }
229 }
230 }
RefreshContext(int i)231 void RefreshContext(int i) {
232 std::unique_lock<std::mutex> l(mu_);
233 if (!shutting_down_) {
234 contexts_[i].state = Context::READY;
235 contexts_[i].srv_ctx.reset(new ServerContext);
236 contexts_[i].response_writer.reset(
237 new grpc::ServerAsyncResponseWriter<EchoResponse>(
238 contexts_[i].srv_ctx.get()));
239 service_.RequestEcho(contexts_[i].srv_ctx.get(),
240 &contexts_[i].recv_request,
241 contexts_[i].response_writer.get(), cq_.get(),
242 cq_.get(), (void*)static_cast<intptr_t>(i));
243 }
244 }
245 struct Context {
246 std::unique_ptr<ServerContext> srv_ctx;
247 std::unique_ptr<grpc::ServerAsyncResponseWriter<EchoResponse>>
248 response_writer;
249 EchoRequest recv_request;
250 enum { READY, DONE } state;
251 };
252 std::vector<Context> contexts_;
253 ::grpc::testing::EchoTestService::AsyncService service_;
254 std::unique_ptr<ServerCompletionQueue> cq_;
255 bool shutting_down_;
256 std::mutex mu_;
257 std::vector<std::thread> server_threads_;
258 };
259
260 template <class Common>
261 class End2endTest : public ::testing::Test {
262 protected:
End2endTest()263 End2endTest() {}
SetUp()264 void SetUp() override { common_.SetUp(); }
TearDown()265 void TearDown() override { common_.TearDown(); }
ResetStub()266 void ResetStub() { common_.ResetStub(); }
267
268 Common common_;
269 };
270
SendRpc(grpc::testing::EchoTestService::Stub * stub,int num_rpcs,bool allow_exhaustion,gpr_atm * errors)271 static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
272 bool allow_exhaustion, gpr_atm* errors) {
273 EchoRequest request;
274 EchoResponse response;
275 request.set_message("Hello");
276
277 for (int i = 0; i < num_rpcs; ++i) {
278 ClientContext context;
279 Status s = stub->Echo(&context, request, &response);
280 EXPECT_TRUE(s.ok() || (allow_exhaustion &&
281 s.error_code() == StatusCode::RESOURCE_EXHAUSTED));
282 if (!s.ok()) {
283 if (!(allow_exhaustion &&
284 s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) {
285 gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
286 s.error_message().c_str());
287 }
288 gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1));
289 } else {
290 EXPECT_EQ(response.message(), request.message());
291 }
292 }
293 }
294
295 typedef ::testing::Types<
296 CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>,
297 CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>,
298 CommonStressTestSyncServerLowThreadCount<
299 CommonStressTestInproc<TestServiceImpl, true>>,
300 CommonStressTestAsyncServer<
301 CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>,
302 CommonStressTestAsyncServer<CommonStressTestInproc<
303 grpc::testing::EchoTestService::AsyncService, false>>>
304 CommonTypes;
305 TYPED_TEST_CASE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest,ThreadStress)306 TYPED_TEST(End2endTest, ThreadStress) {
307 this->common_.ResetStub();
308 std::vector<std::thread> threads;
309 gpr_atm errors;
310 gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0));
311 threads.reserve(kNumThreads);
312 for (int i = 0; i < kNumThreads; ++i) {
313 threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs,
314 this->common_.AllowExhaustion(), &errors);
315 }
316 for (int i = 0; i < kNumThreads; ++i) {
317 threads[i].join();
318 }
319 uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors));
320 if (error_cnt != 0) {
321 gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt);
322 }
323 // If this test allows resource exhaustion, expect that it actually sees some
324 if (this->common_.AllowExhaustion()) {
325 EXPECT_GT(error_cnt, static_cast<uint64_t>(0));
326 }
327 }
328
329 template <class Common>
330 class AsyncClientEnd2endTest : public ::testing::Test {
331 protected:
AsyncClientEnd2endTest()332 AsyncClientEnd2endTest() : rpcs_outstanding_(0) {}
333
SetUp()334 void SetUp() override { common_.SetUp(); }
TearDown()335 void TearDown() override {
336 void* ignored_tag;
337 bool ignored_ok;
338 while (cq_.Next(&ignored_tag, &ignored_ok))
339 ;
340 common_.TearDown();
341 }
342
Wait()343 void Wait() {
344 std::unique_lock<std::mutex> l(mu_);
345 while (rpcs_outstanding_ != 0) {
346 cv_.wait(l);
347 }
348
349 cq_.Shutdown();
350 }
351
352 struct AsyncClientCall {
353 EchoResponse response;
354 ClientContext context;
355 Status status;
356 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader;
357 };
358
AsyncSendRpc(int num_rpcs)359 void AsyncSendRpc(int num_rpcs) {
360 for (int i = 0; i < num_rpcs; ++i) {
361 AsyncClientCall* call = new AsyncClientCall;
362 EchoRequest request;
363 request.set_message("Hello: " + grpc::to_string(i));
364 call->response_reader =
365 common_.GetStub()->AsyncEcho(&call->context, request, &cq_);
366 call->response_reader->Finish(&call->response, &call->status,
367 (void*)call);
368
369 std::unique_lock<std::mutex> l(mu_);
370 rpcs_outstanding_++;
371 }
372 }
373
AsyncCompleteRpc()374 void AsyncCompleteRpc() {
375 while (true) {
376 void* got_tag;
377 bool ok = false;
378 if (!cq_.Next(&got_tag, &ok)) break;
379 AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
380 if (!ok) {
381 gpr_log(GPR_DEBUG, "Error: %d", call->status.error_code());
382 }
383 delete call;
384
385 bool notify;
386 {
387 std::unique_lock<std::mutex> l(mu_);
388 rpcs_outstanding_--;
389 notify = (rpcs_outstanding_ == 0);
390 }
391 if (notify) {
392 cv_.notify_all();
393 }
394 }
395 }
396
397 Common common_;
398 CompletionQueue cq_;
399 std::mutex mu_;
400 std::condition_variable cv_;
401 int rpcs_outstanding_;
402 };
403
404 TYPED_TEST_CASE(AsyncClientEnd2endTest, CommonTypes);
TYPED_TEST(AsyncClientEnd2endTest,ThreadStress)405 TYPED_TEST(AsyncClientEnd2endTest, ThreadStress) {
406 this->common_.ResetStub();
407 std::vector<std::thread> send_threads, completion_threads;
408 for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
409 completion_threads.emplace_back(
410 &AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncCompleteRpc,
411 this);
412 }
413 for (int i = 0; i < kNumAsyncSendThreads; ++i) {
414 send_threads.emplace_back(
415 &AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncSendRpc,
416 this, kNumRpcs);
417 }
418 for (int i = 0; i < kNumAsyncSendThreads; ++i) {
419 send_threads[i].join();
420 }
421
422 this->Wait();
423 for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
424 completion_threads[i].join();
425 }
426 }
427
428 } // namespace testing
429 } // namespace grpc
430
main(int argc,char ** argv)431 int main(int argc, char** argv) {
432 grpc_test_init(argc, argv);
433 ::testing::InitGoogleTest(&argc, argv);
434 return RUN_ALL_TESTS();
435 }
436