• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <arpa/inet.h>
20 #include <fcntl.h>
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/grpcpp.h>
26 #include <grpcpp/support/channel_arguments.h>
27 #include <inttypes.h>
28 #include <netinet/in.h>
29 #include <netinet/tcp.h>
30 #include <sys/wait.h>
31 #include <unistd.h>
32 
33 #include <chrono>
34 #include <cstdlib>
35 #include <memory>
36 #include <string>
37 #include <thread>
38 
39 #include "absl/flags/flag.h"
40 #include "absl/log/check.h"
41 #include "absl/log/log.h"
42 #include "absl/strings/str_format.h"
43 #include "absl/time/time.h"
44 #include "src/core/lib/iomgr/port.h"
45 #include "src/core/lib/iomgr/socket_mutator.h"
46 #include "src/core/util/crash.h"
47 #include "src/core/util/string.h"
48 #include "src/proto/grpc/testing/empty.pb.h"
49 #include "src/proto/grpc/testing/messages.pb.h"
50 #include "src/proto/grpc/testing/test.grpc.pb.h"
51 #include "src/proto/grpc/testing/test.pb.h"
52 #include "test/cpp/util/test_config.h"
53 #include "test/cpp/util/test_credentials_provider.h"
54 
55 ABSL_FLAG(std::string, custom_credentials_type, "",
56           "User provided credentials type.");
57 ABSL_FLAG(std::string, server_uri, "localhost:1000", "Server URI target");
58 ABSL_FLAG(std::string, induce_fallback_cmd, "exit 1",
59           "Shell command to induce fallback, e.g. by unrouting addresses");
60 ABSL_FLAG(int, fallback_deadline_seconds, 1,
61           "Number of seconds to wait for fallback to occur after inducing it");
62 ABSL_FLAG(std::string, test_case, "",
63           "Test case to run. Valid options are:\n\n"
64           "fallback_before_startup : fallback before making RPCs to backends"
65           "fallback_after_startup : fallback after making RPCs to backends");
66 
67 #ifdef LINUX_VERSION_CODE
68 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 37)
69 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT
70 #endif
71 #endif
72 
73 #ifdef SOCKET_SUPPORTS_TCP_USER_TIMEOUT
74 using grpc::testing::GrpclbRouteType;
75 using grpc::testing::SimpleRequest;
76 using grpc::testing::SimpleResponse;
77 using grpc::testing::TestService;
78 
79 namespace {
80 
81 enum RpcMode {
82   FailFast,
83   WaitForReady,
84 };
85 
DoRPCAndGetPath(TestService::Stub * stub,int deadline_seconds,RpcMode rpc_mode)86 GrpclbRouteType DoRPCAndGetPath(TestService::Stub* stub, int deadline_seconds,
87                                 RpcMode rpc_mode) {
88   LOG(INFO) << "DoRPCAndGetPath deadline_seconds:" << deadline_seconds
89             << " rpc_mode:" << rpc_mode;
90   SimpleRequest request;
91   SimpleResponse response;
92   grpc::ClientContext context;
93   if (rpc_mode == WaitForReady) {
94     context.set_wait_for_ready(true);
95   }
96   request.set_fill_grpclb_route_type(true);
97   std::chrono::system_clock::time_point deadline =
98       std::chrono::system_clock::now() + std::chrono::seconds(deadline_seconds);
99   context.set_deadline(deadline);
100   grpc::Status s = stub->UnaryCall(&context, request, &response);
101   if (!s.ok()) {
102     LOG(INFO) << "DoRPCAndGetPath failed. status-message: "
103               << s.error_message();
104     return GrpclbRouteType::GRPCLB_ROUTE_TYPE_UNKNOWN;
105   }
106   CHECK(response.grpclb_route_type() ==
107             GrpclbRouteType::GRPCLB_ROUTE_TYPE_BACKEND ||
108         response.grpclb_route_type() ==
109             GrpclbRouteType::GRPCLB_ROUTE_TYPE_FALLBACK);
110   LOG(INFO) << "DoRPCAndGetPath done. grpclb_route_type:"
111             << response.grpclb_route_type();
112   return response.grpclb_route_type();
113 }
114 
DoRPCAndGetPath(TestService::Stub * stub,int deadline_seconds)115 GrpclbRouteType DoRPCAndGetPath(TestService::Stub* stub, int deadline_seconds) {
116   return DoRPCAndGetPath(stub, deadline_seconds, FailFast);
117 }
118 
TcpUserTimeoutMutateFd(int fd,grpc_socket_mutator *)119 bool TcpUserTimeoutMutateFd(int fd, grpc_socket_mutator* /*mutator*/) {
120   int timeout = 20000;  // 20 seconds
121   LOG(INFO) << "Setting socket option TCP_USER_TIMEOUT on fd: " << fd;
122   if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout,
123                       sizeof(timeout))) {
124     grpc_core::Crash("Failed to set socket option TCP_USER_TIMEOUT");
125   }
126   int newval;
127   socklen_t len = sizeof(newval);
128   if (0 != getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len) ||
129       newval != timeout) {
130     grpc_core::Crash("Failed to get expected socket option TCP_USER_TIMEOUT");
131   }
132   return true;
133 }
134 
TcpUserTimeoutCompare(grpc_socket_mutator *,grpc_socket_mutator *)135 int TcpUserTimeoutCompare(grpc_socket_mutator* /*a*/,
136                           grpc_socket_mutator* /*b*/) {
137   return 0;
138 }
139 
TcpUserTimeoutDestroy(grpc_socket_mutator * mutator)140 void TcpUserTimeoutDestroy(grpc_socket_mutator* mutator) { delete mutator; }
141 
142 const grpc_socket_mutator_vtable kTcpUserTimeoutMutatorVtable =
143     grpc_socket_mutator_vtable{TcpUserTimeoutMutateFd, TcpUserTimeoutCompare,
144                                TcpUserTimeoutDestroy, nullptr};
145 
CreateFallbackTestStub()146 std::unique_ptr<TestService::Stub> CreateFallbackTestStub() {
147   grpc::ChannelArguments channel_args;
148   grpc_socket_mutator* tcp_user_timeout_mutator = new grpc_socket_mutator();
149   grpc_socket_mutator_init(tcp_user_timeout_mutator,
150                            &kTcpUserTimeoutMutatorVtable);
151   channel_args.SetSocketMutator(tcp_user_timeout_mutator);
152   // Allow LB policy to be configured by service config
153   channel_args.SetInt(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION, 0);
154   std::shared_ptr<grpc::ChannelCredentials> channel_creds =
155       grpc::testing::GetCredentialsProvider()->GetChannelCredentials(
156           absl::GetFlag(FLAGS_custom_credentials_type), &channel_args);
157   return TestService::NewStub(grpc::CreateCustomChannel(
158       absl::GetFlag(FLAGS_server_uri), channel_creds, channel_args));
159 }
160 
RunCommand(const std::string & command)161 void RunCommand(const std::string& command) {
162   LOG(INFO) << "RunCommand: |" << command << "|";
163   int out = std::system(command.c_str());
164   if (WIFEXITED(out)) {
165     int code = WEXITSTATUS(out);
166     if (code != 0) {
167       grpc_core::Crash(
168           absl::StrFormat("RunCommand failed exit code:%d command:|%s|", code,
169                           command.c_str()));
170     }
171   } else {
172     grpc_core::Crash(
173         absl::StrFormat("RunCommand failed command:|%s|", command.c_str()));
174   }
175 }
176 
WaitForFallbackAndDoRPCs(TestService::Stub * stub)177 void WaitForFallbackAndDoRPCs(TestService::Stub* stub) {
178   int fallback_retry_count = 0;
179   bool fallback = false;
180   absl::Time fallback_deadline =
181       absl::Now() +
182       absl::Seconds(absl::GetFlag(FLAGS_fallback_deadline_seconds));
183   while (absl::Now() < fallback_deadline) {
184     GrpclbRouteType grpclb_route_type = DoRPCAndGetPath(stub, 1);
185     if (grpclb_route_type == GrpclbRouteType::GRPCLB_ROUTE_TYPE_BACKEND) {
186       LOG(ERROR) << "Got grpclb route type backend. Backends are "
187                     "supposed to be unreachable, so this test is broken";
188       CHECK(0);
189     }
190     if (grpclb_route_type == GrpclbRouteType::GRPCLB_ROUTE_TYPE_FALLBACK) {
191       LOG(INFO) << "Made one successful RPC to a fallback. Now expect the same "
192                    "for the rest.";
193       fallback = true;
194       break;
195     } else {
196       LOG(ERROR) << "Retryable RPC failure on iteration: "
197                  << fallback_retry_count;
198     }
199     fallback_retry_count++;
200   }
201   if (!fallback) {
202     LOG(ERROR) << "Didn't fall back within deadline";
203     CHECK(0);
204   }
205   for (int i = 0; i < 30; i++) {
206     GrpclbRouteType grpclb_route_type = DoRPCAndGetPath(stub, 20);
207     CHECK(grpclb_route_type == GrpclbRouteType::GRPCLB_ROUTE_TYPE_FALLBACK);
208     std::this_thread::sleep_for(std::chrono::seconds(1));
209   }
210 }
211 
DoFallbackBeforeStartupTest()212 void DoFallbackBeforeStartupTest() {
213   std::unique_ptr<TestService::Stub> stub = CreateFallbackTestStub();
214   RunCommand(absl::GetFlag(FLAGS_induce_fallback_cmd));
215   WaitForFallbackAndDoRPCs(stub.get());
216 }
217 
DoFallbackAfterStartupTest()218 void DoFallbackAfterStartupTest() {
219   std::unique_ptr<TestService::Stub> stub = CreateFallbackTestStub();
220   GrpclbRouteType grpclb_route_type = DoRPCAndGetPath(stub.get(), 20);
221   CHECK(grpclb_route_type == GrpclbRouteType::GRPCLB_ROUTE_TYPE_BACKEND);
222   RunCommand(absl::GetFlag(FLAGS_induce_fallback_cmd));
223   WaitForFallbackAndDoRPCs(stub.get());
224 }
225 
226 }  // namespace
227 
main(int argc,char ** argv)228 int main(int argc, char** argv) {
229   grpc::testing::InitTest(&argc, &argv, true);
230   LOG(INFO) << "Testing: " << absl::GetFlag(FLAGS_test_case);
231   if (absl::GetFlag(FLAGS_test_case) == "fallback_before_startup") {
232     DoFallbackBeforeStartupTest();
233     LOG(INFO) << "DoFallbackBeforeStartup done!";
234   } else if (absl::GetFlag(FLAGS_test_case) == "fallback_after_startup") {
235     DoFallbackAfterStartupTest();
236     LOG(INFO) << "DoFallbackBeforeStartup done!";
237   } else {
238     grpc_core::Crash(absl::StrFormat("Invalid test case: %s",
239                                      absl::GetFlag(FLAGS_test_case).c_str()));
240   }
241 }
242 
243 #else
244 
main(int argc,char ** argv)245 int main(int argc, char** argv) {
246   grpc::testing::InitTest(&argc, &argv, true);
247   grpc_core::Crash(
248       "This test requires TCP_USER_TIMEOUT, which isn't available");
249 }
250 
251 #endif  // SOCKET_SUPPORTS_TCP_USER_TIMEOUT
252