1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpcpp/create_channel.h>
20 #include <grpcpp/security/credentials.h>
21 #include <grpcpp/server.h>
22 #include <grpcpp/server_builder.h>
23 #include <grpcpp/support/config.h>
24 #include <gtest/gtest.h>
25
26 #include <thread>
27
28 #include "absl/log/log.h"
29 #include "absl/strings/str_format.h"
30 #include "src/core/util/crash.h"
31 #include "src/proto/grpc/testing/echo.grpc.pb.h"
32 #include "test/core/test_util/port.h"
33 #include "test/core/test_util/test_config.h"
34
35 namespace grpc {
36 namespace {
37
TEST(ServerRequestCallTest,ShortDeadlineDoesNotCauseOkayFalse)38 TEST(ServerRequestCallTest, ShortDeadlineDoesNotCauseOkayFalse) {
39 std::mutex mu;
40 bool shutting_down = false;
41
42 // grpc server config.
43 std::ostringstream s;
44 int p = grpc_pick_unused_port_or_die();
45 s << "[::1]:" << p;
46 const string address = s.str();
47 testing::EchoTestService::AsyncService service;
48 ServerBuilder builder;
49 builder.AddListeningPort(address, InsecureServerCredentials());
50 auto cq = builder.AddCompletionQueue();
51 builder.RegisterService(&service);
52 auto server = builder.BuildAndStart();
53
54 // server thread.
55 std::thread t([address, &service, &cq, &mu, &shutting_down] {
56 for (int n = 0; true; n++) {
57 ServerContext ctx;
58 testing::EchoRequest req;
59 ServerAsyncResponseWriter<testing::EchoResponse> responder(&ctx);
60
61 // if shutting down, don't enqueue a new request.
62 {
63 std::lock_guard<std::mutex> lock(mu);
64 if (!shutting_down) {
65 service.RequestEcho(&ctx, &req, &responder, cq.get(), cq.get(),
66 reinterpret_cast<void*>(1));
67 }
68 }
69
70 bool ok;
71 void* tag;
72 if (!cq->Next(&tag, &ok)) {
73 break;
74 }
75
76 EXPECT_EQ((void*)1, tag);
77 // If not shutting down, ok must be true for new requests.
78 {
79 std::lock_guard<std::mutex> lock(mu);
80 if (!shutting_down && !ok) {
81 grpc_core::Crash(absl::StrFormat("!ok on request %d", n));
82 }
83 if (shutting_down && !ok) {
84 // Failed connection due to shutdown, continue flushing the CQ.
85 continue;
86 }
87 }
88
89 // Send a simple response after a small delay that would ensure the client
90 // deadline is exceeded.
91 LOG(INFO) << "Got request " << n;
92 testing::EchoResponse response;
93 response.set_message("foobar");
94 // A bit of sleep to make sure the deadline elapses.
95 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
96 gpr_time_from_seconds(1, GPR_TIMESPAN)));
97 {
98 std::lock_guard<std::mutex> lock(mu);
99 if (shutting_down) {
100 LOG(INFO) << "shut down while processing call, not calling Finish()";
101 // Continue flushing the CQ.
102 continue;
103 }
104 LOG(INFO) << "Finishing request " << n;
105 responder.Finish(response, grpc::Status::OK,
106 reinterpret_cast<void*>(2));
107 if (!cq->Next(&tag, &ok)) {
108 break;
109 }
110 EXPECT_EQ((void*)2, tag);
111 }
112 }
113 });
114
115 auto stub = testing::EchoTestService::NewStub(
116 grpc::CreateChannel(address, InsecureChannelCredentials()));
117
118 for (int i = 0; i < 100; i++) {
119 LOG(INFO) << "Sending " << i;
120 testing::EchoRequest request;
121
122 /////////
123 // Comment out the following line to get ok=false due to invalid request.
124 // Otherwise, ok=false due to deadline being exceeded.
125 /////////
126 request.set_message("foobar");
127
128 // A simple request with a short deadline. The server will always exceed the
129 // deadline, whether due to the sleep or because the server was unable to
130 // even fetch the request from the CQ before the deadline elapsed.
131 testing::EchoResponse response;
132 grpc::ClientContext ctx;
133 ctx.set_fail_fast(false);
134 ctx.set_deadline(std::chrono::system_clock::now() +
135 std::chrono::milliseconds(1));
136 grpc::Status status = stub->Echo(&ctx, request, &response);
137 EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, status.error_code());
138 LOG(INFO) << "Success.";
139 }
140 LOG(INFO) << "Done sending RPCs.";
141
142 // Shut down everything properly.
143 LOG(INFO) << "Shutting down.";
144 {
145 std::lock_guard<std::mutex> lock(mu);
146 shutting_down = true;
147 }
148 server->Shutdown();
149 cq->Shutdown();
150 server->Wait();
151
152 t.join();
153 }
154
ServerFunction(ServerCompletionQueue * cq,std::atomic_bool * shutdown)155 void ServerFunction(ServerCompletionQueue* cq, std::atomic_bool* shutdown) {
156 for (;;) {
157 bool ok;
158 void* tag;
159 if (!cq->Next(&tag, &ok)) {
160 break;
161 }
162 if (shutdown->load()) {
163 break;
164 }
165 // For UnimplementedAsyncRequest, the server handles it internally and never
166 // returns from Next except when shutdown.
167 grpc_core::Crash("unreached");
168 }
169 }
170
ClientFunction(testing::UnimplementedEchoService::Stub * stub)171 void ClientFunction(testing::UnimplementedEchoService::Stub* stub) {
172 constexpr int kNumRpcPerThreads = 5000;
173 for (int i = 0; i < kNumRpcPerThreads; i++) {
174 testing::EchoRequest request;
175 request.set_message("foobar");
176 testing::EchoResponse response;
177 grpc::ClientContext ctx;
178 grpc::Status status = stub->Unimplemented(&ctx, request, &response);
179 EXPECT_EQ(StatusCode::UNIMPLEMENTED, status.error_code());
180 }
181 }
182
TEST(ServerRequestCallTest,MultithreadedUnimplementedService)183 TEST(ServerRequestCallTest, MultithreadedUnimplementedService) {
184 std::atomic_bool shutdown(false);
185 // grpc server config.
186 std::ostringstream s;
187 int p = grpc_pick_unused_port_or_die();
188 s << "[::1]:" << p;
189 const string address = s.str();
190 testing::EchoTestService::AsyncService service;
191 ServerBuilder builder;
192 builder.AddListeningPort(address, InsecureServerCredentials());
193 auto cq = builder.AddCompletionQueue();
194 builder.RegisterService(&service);
195 auto server = builder.BuildAndStart();
196
197 ServerContext ctx;
198 testing::EchoRequest req;
199 ServerAsyncResponseWriter<testing::EchoResponse> responder(&ctx);
200 service.RequestEcho(&ctx, &req, &responder, cq.get(), cq.get(),
201 reinterpret_cast<void*>(1));
202
203 // server threads
204 constexpr int kNumServerThreads = 2;
205 std::vector<std::thread> server_threads;
206 server_threads.reserve(kNumServerThreads);
207 for (int i = 0; i < kNumServerThreads; i++) {
208 server_threads.emplace_back(ServerFunction, cq.get(), &shutdown);
209 }
210
211 auto stub = testing::UnimplementedEchoService::NewStub(
212 grpc::CreateChannel(address, InsecureChannelCredentials()));
213
214 // client threads
215 constexpr int kNumClientThreads = 2;
216 std::vector<std::thread> client_threads;
217 client_threads.reserve(kNumClientThreads);
218 for (int i = 0; i < kNumClientThreads; i++) {
219 client_threads.emplace_back(ClientFunction, stub.get());
220 }
221 for (auto& t : client_threads) {
222 t.join();
223 }
224
225 // Shut down everything properly.
226 LOG(INFO) << "Shutting down.";
227 shutdown.store(true);
228 server->Shutdown();
229 cq->Shutdown();
230 server->Wait();
231
232 for (auto& t : server_threads) {
233 t.join();
234 }
235 }
236
237 } // namespace
238 } // namespace grpc
239
main(int argc,char ** argv)240 int main(int argc, char** argv) {
241 grpc::testing::TestEnvironment env(&argc, argv);
242 ::testing::InitGoogleTest(&argc, argv);
243 return RUN_ALL_TESTS();
244 }
245