• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpcpp/create_channel.h>
20 #include <grpcpp/security/credentials.h>
21 #include <grpcpp/server.h>
22 #include <grpcpp/server_builder.h>
23 #include <grpcpp/support/config.h>
24 #include <gtest/gtest.h>
25 
26 #include <thread>
27 
28 #include "absl/log/log.h"
29 #include "absl/strings/str_format.h"
30 #include "src/core/util/crash.h"
31 #include "src/proto/grpc/testing/echo.grpc.pb.h"
32 #include "test/core/test_util/port.h"
33 #include "test/core/test_util/test_config.h"
34 
35 namespace grpc {
36 namespace {
37 
TEST(ServerRequestCallTest,ShortDeadlineDoesNotCauseOkayFalse)38 TEST(ServerRequestCallTest, ShortDeadlineDoesNotCauseOkayFalse) {
39   std::mutex mu;
40   bool shutting_down = false;
41 
42   // grpc server config.
43   std::ostringstream s;
44   int p = grpc_pick_unused_port_or_die();
45   s << "[::1]:" << p;
46   const string address = s.str();
47   testing::EchoTestService::AsyncService service;
48   ServerBuilder builder;
49   builder.AddListeningPort(address, InsecureServerCredentials());
50   auto cq = builder.AddCompletionQueue();
51   builder.RegisterService(&service);
52   auto server = builder.BuildAndStart();
53 
54   // server thread.
55   std::thread t([address, &service, &cq, &mu, &shutting_down] {
56     for (int n = 0; true; n++) {
57       ServerContext ctx;
58       testing::EchoRequest req;
59       ServerAsyncResponseWriter<testing::EchoResponse> responder(&ctx);
60 
61       // if shutting down, don't enqueue a new request.
62       {
63         std::lock_guard<std::mutex> lock(mu);
64         if (!shutting_down) {
65           service.RequestEcho(&ctx, &req, &responder, cq.get(), cq.get(),
66                               reinterpret_cast<void*>(1));
67         }
68       }
69 
70       bool ok;
71       void* tag;
72       if (!cq->Next(&tag, &ok)) {
73         break;
74       }
75 
76       EXPECT_EQ((void*)1, tag);
77       // If not shutting down, ok must be true for new requests.
78       {
79         std::lock_guard<std::mutex> lock(mu);
80         if (!shutting_down && !ok) {
81           grpc_core::Crash(absl::StrFormat("!ok on request %d", n));
82         }
83         if (shutting_down && !ok) {
84           // Failed connection due to shutdown, continue flushing the CQ.
85           continue;
86         }
87       }
88 
89       // Send a simple response after a small delay that would ensure the client
90       // deadline is exceeded.
91       LOG(INFO) << "Got request " << n;
92       testing::EchoResponse response;
93       response.set_message("foobar");
94       // A bit of sleep to make sure the deadline elapses.
95       gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
96                                    gpr_time_from_seconds(1, GPR_TIMESPAN)));
97       {
98         std::lock_guard<std::mutex> lock(mu);
99         if (shutting_down) {
100           LOG(INFO) << "shut down while processing call, not calling Finish()";
101           // Continue flushing the CQ.
102           continue;
103         }
104         LOG(INFO) << "Finishing request " << n;
105         responder.Finish(response, grpc::Status::OK,
106                          reinterpret_cast<void*>(2));
107         if (!cq->Next(&tag, &ok)) {
108           break;
109         }
110         EXPECT_EQ((void*)2, tag);
111       }
112     }
113   });
114 
115   auto stub = testing::EchoTestService::NewStub(
116       grpc::CreateChannel(address, InsecureChannelCredentials()));
117 
118   for (int i = 0; i < 100; i++) {
119     LOG(INFO) << "Sending " << i;
120     testing::EchoRequest request;
121 
122     /////////
123     // Comment out the following line to get ok=false due to invalid request.
124     // Otherwise, ok=false due to deadline being exceeded.
125     /////////
126     request.set_message("foobar");
127 
128     // A simple request with a short deadline. The server will always exceed the
129     // deadline, whether due to the sleep or because the server was unable to
130     // even fetch the request from the CQ before the deadline elapsed.
131     testing::EchoResponse response;
132     grpc::ClientContext ctx;
133     ctx.set_fail_fast(false);
134     ctx.set_deadline(std::chrono::system_clock::now() +
135                      std::chrono::milliseconds(1));
136     grpc::Status status = stub->Echo(&ctx, request, &response);
137     EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, status.error_code());
138     LOG(INFO) << "Success.";
139   }
140   LOG(INFO) << "Done sending RPCs.";
141 
142   // Shut down everything properly.
143   LOG(INFO) << "Shutting down.";
144   {
145     std::lock_guard<std::mutex> lock(mu);
146     shutting_down = true;
147   }
148   server->Shutdown();
149   cq->Shutdown();
150   server->Wait();
151 
152   t.join();
153 }
154 
ServerFunction(ServerCompletionQueue * cq,std::atomic_bool * shutdown)155 void ServerFunction(ServerCompletionQueue* cq, std::atomic_bool* shutdown) {
156   for (;;) {
157     bool ok;
158     void* tag;
159     if (!cq->Next(&tag, &ok)) {
160       break;
161     }
162     if (shutdown->load()) {
163       break;
164     }
165     // For UnimplementedAsyncRequest, the server handles it internally and never
166     // returns from Next except when shutdown.
167     grpc_core::Crash("unreached");
168   }
169 }
170 
ClientFunction(testing::UnimplementedEchoService::Stub * stub)171 void ClientFunction(testing::UnimplementedEchoService::Stub* stub) {
172   constexpr int kNumRpcPerThreads = 5000;
173   for (int i = 0; i < kNumRpcPerThreads; i++) {
174     testing::EchoRequest request;
175     request.set_message("foobar");
176     testing::EchoResponse response;
177     grpc::ClientContext ctx;
178     grpc::Status status = stub->Unimplemented(&ctx, request, &response);
179     EXPECT_EQ(StatusCode::UNIMPLEMENTED, status.error_code());
180   }
181 }
182 
TEST(ServerRequestCallTest,MultithreadedUnimplementedService)183 TEST(ServerRequestCallTest, MultithreadedUnimplementedService) {
184   std::atomic_bool shutdown(false);
185   // grpc server config.
186   std::ostringstream s;
187   int p = grpc_pick_unused_port_or_die();
188   s << "[::1]:" << p;
189   const string address = s.str();
190   testing::EchoTestService::AsyncService service;
191   ServerBuilder builder;
192   builder.AddListeningPort(address, InsecureServerCredentials());
193   auto cq = builder.AddCompletionQueue();
194   builder.RegisterService(&service);
195   auto server = builder.BuildAndStart();
196 
197   ServerContext ctx;
198   testing::EchoRequest req;
199   ServerAsyncResponseWriter<testing::EchoResponse> responder(&ctx);
200   service.RequestEcho(&ctx, &req, &responder, cq.get(), cq.get(),
201                       reinterpret_cast<void*>(1));
202 
203   // server threads
204   constexpr int kNumServerThreads = 2;
205   std::vector<std::thread> server_threads;
206   server_threads.reserve(kNumServerThreads);
207   for (int i = 0; i < kNumServerThreads; i++) {
208     server_threads.emplace_back(ServerFunction, cq.get(), &shutdown);
209   }
210 
211   auto stub = testing::UnimplementedEchoService::NewStub(
212       grpc::CreateChannel(address, InsecureChannelCredentials()));
213 
214   // client threads
215   constexpr int kNumClientThreads = 2;
216   std::vector<std::thread> client_threads;
217   client_threads.reserve(kNumClientThreads);
218   for (int i = 0; i < kNumClientThreads; i++) {
219     client_threads.emplace_back(ClientFunction, stub.get());
220   }
221   for (auto& t : client_threads) {
222     t.join();
223   }
224 
225   // Shut down everything properly.
226   LOG(INFO) << "Shutting down.";
227   shutdown.store(true);
228   server->Shutdown();
229   cq->Shutdown();
230   server->Wait();
231 
232   for (auto& t : server_threads) {
233     t.join();
234   }
235 }
236 
237 }  // namespace
238 }  // namespace grpc
239 
main(int argc,char ** argv)240 int main(int argc, char** argv) {
241   grpc::testing::TestEnvironment env(&argc, argv);
242   ::testing::InitGoogleTest(&argc, argv);
243   return RUN_ALL_TESTS();
244 }
245