1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <arpa/inet.h>
20 #include <fcntl.h>
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/grpcpp.h>
26 #include <grpcpp/support/channel_arguments.h>
27 #include <inttypes.h>
28 #include <netinet/in.h>
29 #include <netinet/tcp.h>
30 #include <sys/wait.h>
31 #include <unistd.h>
32
33 #include <chrono>
34 #include <cstdlib>
35 #include <memory>
36 #include <string>
37 #include <thread>
38
39 #include "absl/flags/flag.h"
40 #include "absl/log/check.h"
41 #include "absl/log/log.h"
42 #include "absl/strings/str_format.h"
43 #include "absl/time/time.h"
44 #include "src/core/lib/iomgr/port.h"
45 #include "src/core/lib/iomgr/socket_mutator.h"
46 #include "src/core/util/crash.h"
47 #include "src/core/util/string.h"
48 #include "src/proto/grpc/testing/empty.pb.h"
49 #include "src/proto/grpc/testing/messages.pb.h"
50 #include "src/proto/grpc/testing/test.grpc.pb.h"
51 #include "src/proto/grpc/testing/test.pb.h"
52 #include "test/cpp/util/test_config.h"
53 #include "test/cpp/util/test_credentials_provider.h"
54
55 ABSL_FLAG(std::string, custom_credentials_type, "",
56 "User provided credentials type.");
57 ABSL_FLAG(std::string, server_uri, "localhost:1000", "Server URI target");
58 ABSL_FLAG(std::string, induce_fallback_cmd, "exit 1",
59 "Shell command to induce fallback, e.g. by unrouting addresses");
60 ABSL_FLAG(int, fallback_deadline_seconds, 1,
61 "Number of seconds to wait for fallback to occur after inducing it");
62 ABSL_FLAG(std::string, test_case, "",
63 "Test case to run. Valid options are:\n\n"
64 "fallback_before_startup : fallback before making RPCs to backends"
65 "fallback_after_startup : fallback after making RPCs to backends");
66
67 #ifdef LINUX_VERSION_CODE
68 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 37)
69 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT
70 #endif
71 #endif
72
73 #ifdef SOCKET_SUPPORTS_TCP_USER_TIMEOUT
74 using grpc::testing::GrpclbRouteType;
75 using grpc::testing::SimpleRequest;
76 using grpc::testing::SimpleResponse;
77 using grpc::testing::TestService;
78
79 namespace {
80
81 enum RpcMode {
82 FailFast,
83 WaitForReady,
84 };
85
DoRPCAndGetPath(TestService::Stub * stub,int deadline_seconds,RpcMode rpc_mode)86 GrpclbRouteType DoRPCAndGetPath(TestService::Stub* stub, int deadline_seconds,
87 RpcMode rpc_mode) {
88 LOG(INFO) << "DoRPCAndGetPath deadline_seconds:" << deadline_seconds
89 << " rpc_mode:" << rpc_mode;
90 SimpleRequest request;
91 SimpleResponse response;
92 grpc::ClientContext context;
93 if (rpc_mode == WaitForReady) {
94 context.set_wait_for_ready(true);
95 }
96 request.set_fill_grpclb_route_type(true);
97 std::chrono::system_clock::time_point deadline =
98 std::chrono::system_clock::now() + std::chrono::seconds(deadline_seconds);
99 context.set_deadline(deadline);
100 grpc::Status s = stub->UnaryCall(&context, request, &response);
101 if (!s.ok()) {
102 LOG(INFO) << "DoRPCAndGetPath failed. status-message: "
103 << s.error_message();
104 return GrpclbRouteType::GRPCLB_ROUTE_TYPE_UNKNOWN;
105 }
106 CHECK(response.grpclb_route_type() ==
107 GrpclbRouteType::GRPCLB_ROUTE_TYPE_BACKEND ||
108 response.grpclb_route_type() ==
109 GrpclbRouteType::GRPCLB_ROUTE_TYPE_FALLBACK);
110 LOG(INFO) << "DoRPCAndGetPath done. grpclb_route_type:"
111 << response.grpclb_route_type();
112 return response.grpclb_route_type();
113 }
114
DoRPCAndGetPath(TestService::Stub * stub,int deadline_seconds)115 GrpclbRouteType DoRPCAndGetPath(TestService::Stub* stub, int deadline_seconds) {
116 return DoRPCAndGetPath(stub, deadline_seconds, FailFast);
117 }
118
TcpUserTimeoutMutateFd(int fd,grpc_socket_mutator *)119 bool TcpUserTimeoutMutateFd(int fd, grpc_socket_mutator* /*mutator*/) {
120 int timeout = 20000; // 20 seconds
121 LOG(INFO) << "Setting socket option TCP_USER_TIMEOUT on fd: " << fd;
122 if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout,
123 sizeof(timeout))) {
124 grpc_core::Crash("Failed to set socket option TCP_USER_TIMEOUT");
125 }
126 int newval;
127 socklen_t len = sizeof(newval);
128 if (0 != getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len) ||
129 newval != timeout) {
130 grpc_core::Crash("Failed to get expected socket option TCP_USER_TIMEOUT");
131 }
132 return true;
133 }
134
TcpUserTimeoutCompare(grpc_socket_mutator *,grpc_socket_mutator *)135 int TcpUserTimeoutCompare(grpc_socket_mutator* /*a*/,
136 grpc_socket_mutator* /*b*/) {
137 return 0;
138 }
139
TcpUserTimeoutDestroy(grpc_socket_mutator * mutator)140 void TcpUserTimeoutDestroy(grpc_socket_mutator* mutator) { delete mutator; }
141
142 const grpc_socket_mutator_vtable kTcpUserTimeoutMutatorVtable =
143 grpc_socket_mutator_vtable{TcpUserTimeoutMutateFd, TcpUserTimeoutCompare,
144 TcpUserTimeoutDestroy, nullptr};
145
CreateFallbackTestStub()146 std::unique_ptr<TestService::Stub> CreateFallbackTestStub() {
147 grpc::ChannelArguments channel_args;
148 grpc_socket_mutator* tcp_user_timeout_mutator = new grpc_socket_mutator();
149 grpc_socket_mutator_init(tcp_user_timeout_mutator,
150 &kTcpUserTimeoutMutatorVtable);
151 channel_args.SetSocketMutator(tcp_user_timeout_mutator);
152 // Allow LB policy to be configured by service config
153 channel_args.SetInt(GRPC_ARG_SERVICE_CONFIG_DISABLE_RESOLUTION, 0);
154 std::shared_ptr<grpc::ChannelCredentials> channel_creds =
155 grpc::testing::GetCredentialsProvider()->GetChannelCredentials(
156 absl::GetFlag(FLAGS_custom_credentials_type), &channel_args);
157 return TestService::NewStub(grpc::CreateCustomChannel(
158 absl::GetFlag(FLAGS_server_uri), channel_creds, channel_args));
159 }
160
RunCommand(const std::string & command)161 void RunCommand(const std::string& command) {
162 LOG(INFO) << "RunCommand: |" << command << "|";
163 int out = std::system(command.c_str());
164 if (WIFEXITED(out)) {
165 int code = WEXITSTATUS(out);
166 if (code != 0) {
167 grpc_core::Crash(
168 absl::StrFormat("RunCommand failed exit code:%d command:|%s|", code,
169 command.c_str()));
170 }
171 } else {
172 grpc_core::Crash(
173 absl::StrFormat("RunCommand failed command:|%s|", command.c_str()));
174 }
175 }
176
WaitForFallbackAndDoRPCs(TestService::Stub * stub)177 void WaitForFallbackAndDoRPCs(TestService::Stub* stub) {
178 int fallback_retry_count = 0;
179 bool fallback = false;
180 absl::Time fallback_deadline =
181 absl::Now() +
182 absl::Seconds(absl::GetFlag(FLAGS_fallback_deadline_seconds));
183 while (absl::Now() < fallback_deadline) {
184 GrpclbRouteType grpclb_route_type = DoRPCAndGetPath(stub, 1);
185 if (grpclb_route_type == GrpclbRouteType::GRPCLB_ROUTE_TYPE_BACKEND) {
186 LOG(ERROR) << "Got grpclb route type backend. Backends are "
187 "supposed to be unreachable, so this test is broken";
188 CHECK(0);
189 }
190 if (grpclb_route_type == GrpclbRouteType::GRPCLB_ROUTE_TYPE_FALLBACK) {
191 LOG(INFO) << "Made one successful RPC to a fallback. Now expect the same "
192 "for the rest.";
193 fallback = true;
194 break;
195 } else {
196 LOG(ERROR) << "Retryable RPC failure on iteration: "
197 << fallback_retry_count;
198 }
199 fallback_retry_count++;
200 }
201 if (!fallback) {
202 LOG(ERROR) << "Didn't fall back within deadline";
203 CHECK(0);
204 }
205 for (int i = 0; i < 30; i++) {
206 GrpclbRouteType grpclb_route_type = DoRPCAndGetPath(stub, 20);
207 CHECK(grpclb_route_type == GrpclbRouteType::GRPCLB_ROUTE_TYPE_FALLBACK);
208 std::this_thread::sleep_for(std::chrono::seconds(1));
209 }
210 }
211
DoFallbackBeforeStartupTest()212 void DoFallbackBeforeStartupTest() {
213 std::unique_ptr<TestService::Stub> stub = CreateFallbackTestStub();
214 RunCommand(absl::GetFlag(FLAGS_induce_fallback_cmd));
215 WaitForFallbackAndDoRPCs(stub.get());
216 }
217
DoFallbackAfterStartupTest()218 void DoFallbackAfterStartupTest() {
219 std::unique_ptr<TestService::Stub> stub = CreateFallbackTestStub();
220 GrpclbRouteType grpclb_route_type = DoRPCAndGetPath(stub.get(), 20);
221 CHECK(grpclb_route_type == GrpclbRouteType::GRPCLB_ROUTE_TYPE_BACKEND);
222 RunCommand(absl::GetFlag(FLAGS_induce_fallback_cmd));
223 WaitForFallbackAndDoRPCs(stub.get());
224 }
225
226 } // namespace
227
main(int argc,char ** argv)228 int main(int argc, char** argv) {
229 grpc::testing::InitTest(&argc, &argv, true);
230 LOG(INFO) << "Testing: " << absl::GetFlag(FLAGS_test_case);
231 if (absl::GetFlag(FLAGS_test_case) == "fallback_before_startup") {
232 DoFallbackBeforeStartupTest();
233 LOG(INFO) << "DoFallbackBeforeStartup done!";
234 } else if (absl::GetFlag(FLAGS_test_case) == "fallback_after_startup") {
235 DoFallbackAfterStartupTest();
236 LOG(INFO) << "DoFallbackBeforeStartup done!";
237 } else {
238 grpc_core::Crash(absl::StrFormat("Invalid test case: %s",
239 absl::GetFlag(FLAGS_test_case).c_str()));
240 }
241 }
242
243 #else
244
main(int argc,char ** argv)245 int main(int argc, char** argv) {
246 grpc::testing::InitTest(&argc, &argv, true);
247 grpc_core::Crash(
248 "This test requires TCP_USER_TIMEOUT, which isn't available");
249 }
250
251 #endif // SOCKET_SUPPORTS_TCP_USER_TIMEOUT
252