1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <cinttypes>
20 #include <mutex>
21 #include <thread>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/time.h>
25 #include <grpcpp/channel.h>
26 #include <grpcpp/client_context.h>
27 #include <grpcpp/create_channel.h>
28 #include <grpcpp/impl/codegen/sync.h>
29 #include <grpcpp/resource_quota.h>
30 #include <grpcpp/server.h>
31 #include <grpcpp/server_builder.h>
32 #include <grpcpp/server_context.h>
33
34 #include "src/core/lib/gpr/env.h"
35 #include "src/core/lib/surface/api_trace.h"
36 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
37 #include "src/proto/grpc/testing/echo.grpc.pb.h"
38 #include "test/core/util/port.h"
39 #include "test/core/util/test_config.h"
40
41 #include <gtest/gtest.h>
42
43 using grpc::testing::EchoRequest;
44 using grpc::testing::EchoResponse;
45 using std::chrono::system_clock;
46
47 const int kNumThreads = 100; // Number of threads
48 const int kNumAsyncSendThreads = 2;
49 const int kNumAsyncReceiveThreads = 50;
50 const int kNumAsyncServerThreads = 50;
51 const int kNumRpcs = 1000; // Number of RPCs per thread
52
53 namespace grpc {
54 namespace testing {
55
56 class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
57 public:
TestServiceImpl()58 TestServiceImpl() {}
59
Echo(ServerContext *,const EchoRequest * request,EchoResponse * response)60 Status Echo(ServerContext* /*context*/, const EchoRequest* request,
61 EchoResponse* response) override {
62 response->set_message(request->message());
63 return Status::OK;
64 }
65 };
66
67 template <class Service>
68 class CommonStressTest {
69 public:
CommonStressTest()70 CommonStressTest() : kMaxMessageSize_(8192) {
71 #if TARGET_OS_IPHONE
72 // Workaround Apple CFStream bug
73 gpr_setenv("grpc_cfstream", "0");
74 #endif
75 }
~CommonStressTest()76 virtual ~CommonStressTest() {}
77 virtual void SetUp() = 0;
78 virtual void TearDown() = 0;
79 virtual void ResetStub() = 0;
80 virtual bool AllowExhaustion() = 0;
GetStub()81 grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
82
83 protected:
84 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
85 std::unique_ptr<Server> server_;
86
87 virtual void SetUpStart(ServerBuilder* builder, Service* service) = 0;
SetUpStartCommon(ServerBuilder * builder,Service * service)88 void SetUpStartCommon(ServerBuilder* builder, Service* service) {
89 builder->RegisterService(service);
90 builder->SetMaxMessageSize(
91 kMaxMessageSize_); // For testing max message size.
92 }
SetUpEnd(ServerBuilder * builder)93 void SetUpEnd(ServerBuilder* builder) { server_ = builder->BuildAndStart(); }
TearDownStart()94 void TearDownStart() { server_->Shutdown(); }
TearDownEnd()95 void TearDownEnd() {}
96
97 private:
98 const int kMaxMessageSize_;
99 };
100
101 template <class Service>
102 class CommonStressTestInsecure : public CommonStressTest<Service> {
103 public:
ResetStub()104 void ResetStub() override {
105 std::shared_ptr<Channel> channel = grpc::CreateChannel(
106 server_address_.str(), InsecureChannelCredentials());
107 this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
108 }
AllowExhaustion()109 bool AllowExhaustion() override { return false; }
110
111 protected:
SetUpStart(ServerBuilder * builder,Service * service)112 void SetUpStart(ServerBuilder* builder, Service* service) override {
113 int port = grpc_pick_unused_port_or_die();
114 this->server_address_ << "localhost:" << port;
115 // Setup server
116 builder->AddListeningPort(server_address_.str(),
117 InsecureServerCredentials());
118 this->SetUpStartCommon(builder, service);
119 }
120
121 private:
122 std::ostringstream server_address_;
123 };
124
125 template <class Service, bool allow_resource_exhaustion>
126 class CommonStressTestInproc : public CommonStressTest<Service> {
127 public:
ResetStub()128 void ResetStub() override {
129 ChannelArguments args;
130 std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args);
131 this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
132 }
AllowExhaustion()133 bool AllowExhaustion() override { return allow_resource_exhaustion; }
134
135 protected:
SetUpStart(ServerBuilder * builder,Service * service)136 void SetUpStart(ServerBuilder* builder, Service* service) override {
137 this->SetUpStartCommon(builder, service);
138 }
139 };
140
141 template <class BaseClass>
142 class CommonStressTestSyncServer : public BaseClass {
143 public:
SetUp()144 void SetUp() override {
145 ServerBuilder builder;
146 this->SetUpStart(&builder, &service_);
147 this->SetUpEnd(&builder);
148 }
TearDown()149 void TearDown() override {
150 this->TearDownStart();
151 this->TearDownEnd();
152 }
153
154 private:
155 TestServiceImpl service_;
156 };
157
158 template <class BaseClass>
159 class CommonStressTestSyncServerLowThreadCount : public BaseClass {
160 public:
SetUp()161 void SetUp() override {
162 ServerBuilder builder;
163 ResourceQuota quota;
164 this->SetUpStart(&builder, &service_);
165 quota.SetMaxThreads(4);
166 builder.SetResourceQuota(quota);
167 this->SetUpEnd(&builder);
168 }
TearDown()169 void TearDown() override {
170 this->TearDownStart();
171 this->TearDownEnd();
172 }
173
174 private:
175 TestServiceImpl service_;
176 };
177
178 template <class BaseClass>
179 class CommonStressTestAsyncServer : public BaseClass {
180 public:
CommonStressTestAsyncServer()181 CommonStressTestAsyncServer() : contexts_(kNumAsyncServerThreads * 100) {}
SetUp()182 void SetUp() override {
183 shutting_down_ = false;
184 ServerBuilder builder;
185 this->SetUpStart(&builder, &service_);
186 cq_ = builder.AddCompletionQueue();
187 this->SetUpEnd(&builder);
188 for (int i = 0; i < kNumAsyncServerThreads * 100; i++) {
189 RefreshContext(i);
190 }
191 for (int i = 0; i < kNumAsyncServerThreads; i++) {
192 server_threads_.emplace_back(&CommonStressTestAsyncServer::ProcessRpcs,
193 this);
194 }
195 }
TearDown()196 void TearDown() override {
197 {
198 grpc::internal::MutexLock l(&mu_);
199 this->TearDownStart();
200 shutting_down_ = true;
201 cq_->Shutdown();
202 }
203
204 for (int i = 0; i < kNumAsyncServerThreads; i++) {
205 server_threads_[i].join();
206 }
207
208 void* ignored_tag;
209 bool ignored_ok;
210 while (cq_->Next(&ignored_tag, &ignored_ok))
211 ;
212 this->TearDownEnd();
213 }
214
215 private:
ProcessRpcs()216 void ProcessRpcs() {
217 void* tag;
218 bool ok;
219 while (cq_->Next(&tag, &ok)) {
220 if (ok) {
221 int i = static_cast<int>(reinterpret_cast<intptr_t>(tag));
222 switch (contexts_[i].state) {
223 case Context::READY: {
224 contexts_[i].state = Context::DONE;
225 EchoResponse send_response;
226 send_response.set_message(contexts_[i].recv_request.message());
227 contexts_[i].response_writer->Finish(send_response, Status::OK,
228 tag);
229 break;
230 }
231 case Context::DONE:
232 RefreshContext(i);
233 break;
234 }
235 }
236 }
237 }
RefreshContext(int i)238 void RefreshContext(int i) {
239 grpc::internal::MutexLock l(&mu_);
240 if (!shutting_down_) {
241 contexts_[i].state = Context::READY;
242 contexts_[i].srv_ctx.reset(new ServerContext);
243 contexts_[i].response_writer.reset(
244 new grpc::ServerAsyncResponseWriter<EchoResponse>(
245 contexts_[i].srv_ctx.get()));
246 service_.RequestEcho(contexts_[i].srv_ctx.get(),
247 &contexts_[i].recv_request,
248 contexts_[i].response_writer.get(), cq_.get(),
249 cq_.get(), (void*)static_cast<intptr_t>(i));
250 }
251 }
252 struct Context {
253 std::unique_ptr<ServerContext> srv_ctx;
254 std::unique_ptr<grpc::ServerAsyncResponseWriter<EchoResponse>>
255 response_writer;
256 EchoRequest recv_request;
257 enum { READY, DONE } state;
258 };
259 std::vector<Context> contexts_;
260 ::grpc::testing::EchoTestService::AsyncService service_;
261 std::unique_ptr<ServerCompletionQueue> cq_;
262 bool shutting_down_;
263 grpc::internal::Mutex mu_;
264 std::vector<std::thread> server_threads_;
265 };
266
267 template <class Common>
268 class End2endTest : public ::testing::Test {
269 protected:
End2endTest()270 End2endTest() {}
SetUp()271 void SetUp() override { common_.SetUp(); }
TearDown()272 void TearDown() override { common_.TearDown(); }
ResetStub()273 void ResetStub() { common_.ResetStub(); }
274
275 Common common_;
276 };
277
SendRpc(grpc::testing::EchoTestService::Stub * stub,int num_rpcs,bool allow_exhaustion,gpr_atm * errors)278 static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
279 bool allow_exhaustion, gpr_atm* errors) {
280 EchoRequest request;
281 EchoResponse response;
282 request.set_message("Hello");
283
284 for (int i = 0; i < num_rpcs; ++i) {
285 ClientContext context;
286 Status s = stub->Echo(&context, request, &response);
287 EXPECT_TRUE(s.ok() || (allow_exhaustion &&
288 s.error_code() == StatusCode::RESOURCE_EXHAUSTED));
289 if (!s.ok()) {
290 if (!(allow_exhaustion &&
291 s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) {
292 gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
293 s.error_message().c_str());
294 }
295 gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1));
296 } else {
297 EXPECT_EQ(response.message(), request.message());
298 }
299 }
300 }
301
302 typedef ::testing::Types<
303 CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>,
304 CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>,
305 CommonStressTestSyncServerLowThreadCount<
306 CommonStressTestInproc<TestServiceImpl, true>>,
307 CommonStressTestAsyncServer<
308 CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>,
309 CommonStressTestAsyncServer<CommonStressTestInproc<
310 grpc::testing::EchoTestService::AsyncService, false>>>
311 CommonTypes;
312 TYPED_TEST_SUITE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest,ThreadStress)313 TYPED_TEST(End2endTest, ThreadStress) {
314 this->common_.ResetStub();
315 std::vector<std::thread> threads;
316 gpr_atm errors;
317 gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0));
318 threads.reserve(kNumThreads);
319 for (int i = 0; i < kNumThreads; ++i) {
320 threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs,
321 this->common_.AllowExhaustion(), &errors);
322 }
323 for (int i = 0; i < kNumThreads; ++i) {
324 threads[i].join();
325 }
326 uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors));
327 if (error_cnt != 0) {
328 gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt);
329 }
330 // If this test allows resource exhaustion, expect that it actually sees some
331 if (this->common_.AllowExhaustion()) {
332 EXPECT_GT(error_cnt, static_cast<uint64_t>(0));
333 }
334 }
335
336 template <class Common>
337 class AsyncClientEnd2endTest : public ::testing::Test {
338 protected:
AsyncClientEnd2endTest()339 AsyncClientEnd2endTest() : rpcs_outstanding_(0) {}
340
SetUp()341 void SetUp() override { common_.SetUp(); }
TearDown()342 void TearDown() override {
343 void* ignored_tag;
344 bool ignored_ok;
345 while (cq_.Next(&ignored_tag, &ignored_ok))
346 ;
347 common_.TearDown();
348 }
349
Wait()350 void Wait() {
351 grpc::internal::MutexLock l(&mu_);
352 while (rpcs_outstanding_ != 0) {
353 cv_.Wait(&mu_);
354 }
355
356 cq_.Shutdown();
357 }
358
359 struct AsyncClientCall {
360 EchoResponse response;
361 ClientContext context;
362 Status status;
363 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader;
364 };
365
AsyncSendRpc(int num_rpcs)366 void AsyncSendRpc(int num_rpcs) {
367 for (int i = 0; i < num_rpcs; ++i) {
368 AsyncClientCall* call = new AsyncClientCall;
369 EchoRequest request;
370 request.set_message("Hello: " + std::to_string(i));
371 call->response_reader =
372 common_.GetStub()->AsyncEcho(&call->context, request, &cq_);
373 call->response_reader->Finish(&call->response, &call->status,
374 (void*)call);
375
376 grpc::internal::MutexLock l(&mu_);
377 rpcs_outstanding_++;
378 }
379 }
380
AsyncCompleteRpc()381 void AsyncCompleteRpc() {
382 while (true) {
383 void* got_tag;
384 bool ok = false;
385 if (!cq_.Next(&got_tag, &ok)) break;
386 AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
387 if (!ok) {
388 gpr_log(GPR_DEBUG, "Error: %d", call->status.error_code());
389 }
390 delete call;
391
392 bool notify;
393 {
394 grpc::internal::MutexLock l(&mu_);
395 rpcs_outstanding_--;
396 notify = (rpcs_outstanding_ == 0);
397 }
398 if (notify) {
399 cv_.Signal();
400 }
401 }
402 }
403
404 Common common_;
405 CompletionQueue cq_;
406 grpc::internal::Mutex mu_;
407 grpc::internal::CondVar cv_;
408 int rpcs_outstanding_;
409 };
410
411 TYPED_TEST_SUITE(AsyncClientEnd2endTest, CommonTypes);
TYPED_TEST(AsyncClientEnd2endTest,ThreadStress)412 TYPED_TEST(AsyncClientEnd2endTest, ThreadStress) {
413 this->common_.ResetStub();
414 std::vector<std::thread> send_threads, completion_threads;
415 for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
416 completion_threads.emplace_back(
417 &AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncCompleteRpc,
418 this);
419 }
420 for (int i = 0; i < kNumAsyncSendThreads; ++i) {
421 send_threads.emplace_back(
422 &AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncSendRpc,
423 this, kNumRpcs);
424 }
425 for (int i = 0; i < kNumAsyncSendThreads; ++i) {
426 send_threads[i].join();
427 }
428
429 this->Wait();
430 for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
431 completion_threads[i].join();
432 }
433 }
434
435 } // namespace testing
436 } // namespace grpc
437
main(int argc,char ** argv)438 int main(int argc, char** argv) {
439 grpc::testing::TestEnvironment env(argc, argv);
440 ::testing::InitGoogleTest(&argc, argv);
441 return RUN_ALL_TESTS();
442 }
443