• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #include <arpa/inet.h>
24 #include <fcntl.h>
25 #include <gflags/gflags.h>
26 #include <inttypes.h>
27 #include <netinet/in.h>
28 #include <netinet/tcp.h>
29 #include <sys/wait.h>
30 #include <unistd.h>
31 #include <chrono>
32 #include <cstdlib>
33 #include <memory>
34 #include <string>
35 #include <thread>
36 
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39 #include <grpcpp/channel.h>
40 #include <grpcpp/client_context.h>
41 #include <grpcpp/grpcpp.h>
42 #include <grpcpp/support/channel_arguments.h>
43 
44 #include "src/core/lib/gpr/string.h"
45 #include "src/core/lib/iomgr/socket_mutator.h"
46 #include "src/proto/grpc/testing/empty.pb.h"
47 #include "src/proto/grpc/testing/messages.pb.h"
48 #include "src/proto/grpc/testing/test.grpc.pb.h"
49 #include "src/proto/grpc/testing/test.pb.h"
50 
51 #include "test/cpp/util/test_config.h"
52 #include "test/cpp/util/test_credentials_provider.h"
53 
54 DEFINE_string(custom_credentials_type, "", "User provided credentials type.");
55 DEFINE_string(server_uri, "localhost:1000", "Server URI target");
56 DEFINE_string(unroute_lb_and_backend_addrs_cmd, "exit 1",
57               "Shell command used to make LB and backend addresses unroutable");
58 DEFINE_string(blackhole_lb_and_backend_addrs_cmd, "exit 1",
59               "Shell command used to make LB and backend addresses blackholed");
60 DEFINE_string(
61     test_case, "",
62     "Test case to run. Valid options are:\n\n"
63     "fast_fallback_before_startup : fallback before establishing connection to "
64     "LB;\n"
65     "fast_fallback_after_startup : fallback after startup due to LB/backend "
66     "addresses becoming unroutable;\n"
67     "slow_fallback_before_startup : fallback before startup due to LB address "
68     "being blackholed;\n"
69     "slow_fallback_after_startup : fallback after startup due to LB/backend "
70     "addresses becoming blackholed;\n");
71 
72 #ifdef LINUX_VERSION_CODE
73 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 37)
74 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT
75 #endif
76 #endif
77 
78 #ifdef SOCKET_SUPPORTS_TCP_USER_TIMEOUT
79 using grpc::testing::GrpclbRouteType;
80 using grpc::testing::SimpleRequest;
81 using grpc::testing::SimpleResponse;
82 using grpc::testing::TestService;
83 
84 namespace {
85 
86 enum RpcMode {
87   FailFast,
88   WaitForReady,
89 };
90 
DoRPCAndGetPath(TestService::Stub * stub,int deadline_seconds,RpcMode rpc_mode)91 GrpclbRouteType DoRPCAndGetPath(TestService::Stub* stub, int deadline_seconds,
92                                 RpcMode rpc_mode) {
93   gpr_log(GPR_INFO, "DoRPCAndGetPath deadline_seconds:%d rpc_mode:%d",
94           deadline_seconds, rpc_mode);
95   SimpleRequest request;
96   SimpleResponse response;
97   grpc::ClientContext context;
98   if (rpc_mode == WaitForReady) {
99     context.set_wait_for_ready(true);
100   }
101   request.set_fill_grpclb_route_type(true);
102   std::chrono::system_clock::time_point deadline =
103       std::chrono::system_clock::now() + std::chrono::seconds(deadline_seconds);
104   context.set_deadline(deadline);
105   grpc::Status s = stub->UnaryCall(&context, request, &response);
106   if (!s.ok()) {
107     gpr_log(GPR_INFO, "DoRPCAndGetPath failed. status-message: %s",
108             s.error_message().c_str());
109     return GrpclbRouteType::GRPCLB_ROUTE_TYPE_UNKNOWN;
110   }
111   GPR_ASSERT(response.grpclb_route_type() ==
112                  GrpclbRouteType::GRPCLB_ROUTE_TYPE_BACKEND ||
113              response.grpclb_route_type() ==
114                  GrpclbRouteType::GRPCLB_ROUTE_TYPE_FALLBACK);
115   gpr_log(GPR_INFO, "DoRPCAndGetPath done. grpclb_route_type:%d",
116           response.grpclb_route_type());
117   return response.grpclb_route_type();
118 }
119 
DoRPCAndGetPath(TestService::Stub * stub,int deadline_seconds)120 GrpclbRouteType DoRPCAndGetPath(TestService::Stub* stub, int deadline_seconds) {
121   return DoRPCAndGetPath(stub, deadline_seconds, FailFast);
122 }
123 
DoWaitForReadyRPCAndGetPath(TestService::Stub * stub,int deadline_seconds)124 GrpclbRouteType DoWaitForReadyRPCAndGetPath(TestService::Stub* stub,
125                                             int deadline_seconds) {
126   return DoRPCAndGetPath(stub, deadline_seconds, WaitForReady);
127 }
128 
TcpUserTimeoutMutateFd(int fd,grpc_socket_mutator *)129 bool TcpUserTimeoutMutateFd(int fd, grpc_socket_mutator* /*mutator*/) {
130   int timeout = 20000;  // 20 seconds
131   gpr_log(GPR_INFO, "Setting socket option TCP_USER_TIMEOUT on fd: %d", fd);
132   if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout,
133                       sizeof(timeout))) {
134     gpr_log(GPR_ERROR, "Failed to set socket option TCP_USER_TIMEOUT");
135     abort();
136   }
137   int newval;
138   socklen_t len = sizeof(newval);
139   if (0 != getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len) ||
140       newval != timeout) {
141     gpr_log(GPR_ERROR, "Failed to get expected socket option TCP_USER_TIMEOUT");
142     abort();
143   }
144   return true;
145 }
146 
TcpUserTimeoutCompare(grpc_socket_mutator *,grpc_socket_mutator *)147 int TcpUserTimeoutCompare(grpc_socket_mutator* /*a*/,
148                           grpc_socket_mutator* /*b*/) {
149   return 0;
150 }
151 
TcpUserTimeoutDestroy(grpc_socket_mutator * mutator)152 void TcpUserTimeoutDestroy(grpc_socket_mutator* mutator) { gpr_free(mutator); }
153 
154 const grpc_socket_mutator_vtable kTcpUserTimeoutMutatorVtable =
155     grpc_socket_mutator_vtable{
156         .mutate_fd = TcpUserTimeoutMutateFd,
157         .compare = TcpUserTimeoutCompare,
158         .destroy = TcpUserTimeoutDestroy,
159     };
160 
CreateFallbackTestStub()161 std::unique_ptr<TestService::Stub> CreateFallbackTestStub() {
162   grpc::ChannelArguments channel_args;
163   grpc_socket_mutator* tcp_user_timeout_mutator =
164       static_cast<grpc_socket_mutator*>(
165           gpr_malloc(sizeof(tcp_user_timeout_mutator)));
166   grpc_socket_mutator_init(tcp_user_timeout_mutator,
167                            &kTcpUserTimeoutMutatorVtable);
168   channel_args.SetSocketMutator(tcp_user_timeout_mutator);
169   // Allow LB policy to be configured by service config
170   channel_args.SetInt(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION, 0);
171   std::shared_ptr<grpc::ChannelCredentials> channel_creds =
172       grpc::testing::GetCredentialsProvider()->GetChannelCredentials(
173           FLAGS_custom_credentials_type, &channel_args);
174   return TestService::NewStub(
175       grpc::CreateCustomChannel(FLAGS_server_uri, channel_creds, channel_args));
176 }
177 
RunCommand(const std::string & command)178 void RunCommand(const std::string& command) {
179   gpr_log(GPR_INFO, "RunCommand: |%s|", command.c_str());
180   int out = std::system(command.c_str());
181   if (WIFEXITED(out)) {
182     int code = WEXITSTATUS(out);
183     if (code != 0) {
184       gpr_log(GPR_ERROR, "RunCommand failed exit code:%d command:|%s|", code,
185               command.c_str());
186       abort();
187     }
188   } else {
189     gpr_log(GPR_ERROR, "RunCommand failed command:|%s|", command.c_str());
190     abort();
191   }
192 }
193 
RunFallbackBeforeStartupTest(const std::string & break_lb_and_backend_conns_cmd,int per_rpc_deadline_seconds)194 void RunFallbackBeforeStartupTest(
195     const std::string& break_lb_and_backend_conns_cmd,
196     int per_rpc_deadline_seconds) {
197   std::unique_ptr<TestService::Stub> stub = CreateFallbackTestStub();
198   RunCommand(break_lb_and_backend_conns_cmd);
199   for (size_t i = 0; i < 30; i++) {
200     GrpclbRouteType grpclb_route_type =
201         DoRPCAndGetPath(stub.get(), per_rpc_deadline_seconds);
202     if (grpclb_route_type != GrpclbRouteType::GRPCLB_ROUTE_TYPE_FALLBACK) {
203       gpr_log(GPR_ERROR, "Expected grpclb route type: FALLBACK. Got: %d",
204               grpclb_route_type);
205       abort();
206     }
207     std::this_thread::sleep_for(std::chrono::seconds(1));
208   }
209 }
210 
DoFastFallbackBeforeStartup()211 void DoFastFallbackBeforeStartup() {
212   RunFallbackBeforeStartupTest(FLAGS_unroute_lb_and_backend_addrs_cmd, 9);
213 }
214 
DoSlowFallbackBeforeStartup()215 void DoSlowFallbackBeforeStartup() {
216   RunFallbackBeforeStartupTest(FLAGS_blackhole_lb_and_backend_addrs_cmd, 20);
217 }
218 
RunFallbackAfterStartupTest(const std::string & break_lb_and_backend_conns_cmd)219 void RunFallbackAfterStartupTest(
220     const std::string& break_lb_and_backend_conns_cmd) {
221   std::unique_ptr<TestService::Stub> stub = CreateFallbackTestStub();
222   GrpclbRouteType grpclb_route_type = DoRPCAndGetPath(stub.get(), 20);
223   if (grpclb_route_type != GrpclbRouteType::GRPCLB_ROUTE_TYPE_BACKEND) {
224     gpr_log(GPR_ERROR, "Expected grpclb route type: BACKEND. Got: %d",
225             grpclb_route_type);
226     abort();
227   }
228   RunCommand(break_lb_and_backend_conns_cmd);
229   for (size_t i = 0; i < 40; i++) {
230     GrpclbRouteType grpclb_route_type =
231         DoWaitForReadyRPCAndGetPath(stub.get(), 1);
232     // Backends should be unreachable by now, otherwise the test is broken.
233     GPR_ASSERT(grpclb_route_type != GrpclbRouteType::GRPCLB_ROUTE_TYPE_BACKEND);
234     if (grpclb_route_type == GrpclbRouteType::GRPCLB_ROUTE_TYPE_FALLBACK) {
235       gpr_log(GPR_INFO,
236               "Made one successul RPC to a fallback. Now expect the same for "
237               "the rest.");
238       break;
239     } else {
240       gpr_log(GPR_ERROR, "Retryable RPC failure on iteration: %" PRIdPTR, i);
241     }
242   }
243   for (size_t i = 0; i < 30; i++) {
244     GrpclbRouteType grpclb_route_type = DoRPCAndGetPath(stub.get(), 20);
245     if (grpclb_route_type != GrpclbRouteType::GRPCLB_ROUTE_TYPE_FALLBACK) {
246       gpr_log(GPR_ERROR, "Expected grpclb route type: FALLBACK. Got: %d",
247               grpclb_route_type);
248       abort();
249     }
250     std::this_thread::sleep_for(std::chrono::seconds(1));
251   }
252 }
253 
DoFastFallbackAfterStartup()254 void DoFastFallbackAfterStartup() {
255   RunFallbackAfterStartupTest(FLAGS_unroute_lb_and_backend_addrs_cmd);
256 }
257 
DoSlowFallbackAfterStartup()258 void DoSlowFallbackAfterStartup() {
259   RunFallbackAfterStartupTest(FLAGS_blackhole_lb_and_backend_addrs_cmd);
260 }
261 }  // namespace
262 
main(int argc,char ** argv)263 int main(int argc, char** argv) {
264   grpc::testing::InitTest(&argc, &argv, true);
265   gpr_log(GPR_INFO, "Testing: %s", FLAGS_test_case.c_str());
266   if (FLAGS_test_case == "fast_fallback_before_startup") {
267     DoFastFallbackBeforeStartup();
268     gpr_log(GPR_INFO, "DoFastFallbackBeforeStartup done!");
269   } else if (FLAGS_test_case == "slow_fallback_before_startup") {
270     DoSlowFallbackBeforeStartup();
271     gpr_log(GPR_INFO, "DoSlowFallbackBeforeStartup done!");
272   } else if (FLAGS_test_case == "fast_fallback_after_startup") {
273     DoFastFallbackAfterStartup();
274     gpr_log(GPR_INFO, "DoFastFallbackAfterStartup done!");
275   } else if (FLAGS_test_case == "slow_fallback_after_startup") {
276     DoSlowFallbackAfterStartup();
277     gpr_log(GPR_INFO, "DoSlowFallbackAfterStartup done!");
278   } else {
279     gpr_log(GPR_ERROR, "Invalid test case: %s", FLAGS_test_case.c_str());
280     abort();
281   }
282 }
283 #else
main(int argc,char ** argv)284 int main(int argc, char** argv) {
285   grpc::testing::InitTest(&argc, &argv, true);
286   gpr_log(GPR_ERROR,
287           "This test requires TCP_USER_TIMEOUT, which isn't available");
288   abort();
289 }
290 #endif  // SOCKET_SUPPORTS_TCP_USER_TIMEOUT
291