1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpcpp/channel.h>
22 #include <grpcpp/client_context.h>
23 #include <grpcpp/ext/gcp_observability.h>
24 #include <grpcpp/ext/otel_plugin.h>
25
26 #include <memory>
27 #include <unordered_map>
28
29 #include "absl/flags/flag.h"
30 #include "absl/log/log.h"
31 #include "opentelemetry/exporters/prometheus/exporter_factory.h"
32 #include "opentelemetry/exporters/prometheus/exporter_options.h"
33 #include "opentelemetry/sdk/metrics/meter_provider.h"
34 #include "src/core/util/crash.h"
35 #include "src/core/util/string.h"
36 #include "test/core/test_util/test_config.h"
37 #include "test/cpp/interop/client_helper.h"
38 #include "test/cpp/interop/interop_client.h"
39 #include "test/cpp/util/test_config.h"
40
41 ABSL_FLAG(bool, use_alts, false,
42 "Whether to use alts. Enable alts will disable tls.");
43 ABSL_FLAG(bool, use_tls, false, "Whether to use tls.");
44 ABSL_FLAG(std::string, custom_credentials_type, "",
45 "User provided credentials type.");
46 ABSL_FLAG(bool, use_test_ca, false, "False to use SSL roots for google");
47 ABSL_FLAG(int32_t, server_port, 0, "Server port.");
48 ABSL_FLAG(std::string, server_host, "localhost", "Server host to connect to");
49 ABSL_FLAG(std::string, server_host_override, "",
50 "Override the server host which is sent in HTTP header");
51 ABSL_FLAG(
52 std::string, test_case, "large_unary",
53 "Configure different test cases. Valid options are:\n\n"
54 "all : all test cases;\n"
55
56 // TODO(veblush): Replace the help message with the following full message
57 // once Abseil fixes the flag-help compiler error on Windows. (b/171659833)
58 //
59 //"cancel_after_begin : cancel stream after starting it;\n"
60 //"cancel_after_first_response: cancel on first response;\n"
61 //"channel_soak: sends 'soak_iterations' rpcs, rebuilds channel each
62 // time;\n" "client_compressed_streaming : compressed request streaming with
63 //" "client_compressed_unary : single compressed request;\n"
64 //"client_streaming : request streaming with single response;\n"
65 //"compute_engine_creds: large_unary with compute engine auth;\n"
66 //"custom_metadata: server will echo custom metadata;\n"
67 //"empty_stream : bi-di stream with no request/response;\n"
68 //"empty_unary : empty (zero bytes) request and response;\n"
69 //"google_default_credentials: large unary using GDC;\n"
70 //"half_duplex : half-duplex streaming;\n"
71 //"jwt_token_creds: large_unary with JWT token auth;\n"
72 //"large_unary : single request and (large) response;\n"
73 //"long_lived_channel: sends large_unary rpcs over a long-lived channel;\n"
74 //"oauth2_auth_token: raw oauth2 access token auth;\n"
75 //"per_rpc_creds: raw oauth2 access token on a single rpc;\n"
76 //"ping_pong : full-duplex streaming;\n"
77 //"response streaming;\n"
78 //"rpc_soak: 'sends soak_iterations' large_unary rpcs;\n"
79 //"server_compressed_streaming : single request with compressed "
80 //"server_compressed_unary : single compressed response;\n"
81 //"server_streaming : single request with response streaming;\n"
82 //"slow_consumer : single request with response streaming with "
83 //"slow client consumer;\n"
84 //"special_status_message: verify Unicode and whitespace in status
85 // message;\n" "status_code_and_message: verify status code & message;\n"
86 //"timeout_on_sleeping_server: deadline exceeds on stream;\n"
87 //"unimplemented_method: client calls an unimplemented method;\n"
88 //"unimplemented_service: client calls an unimplemented service;\n"
89 //
90 );
91 ABSL_FLAG(int32_t, num_times, 1, "Number of times to run the test case");
92 ABSL_FLAG(std::string, default_service_account, "",
93 "Email of GCE default service account");
94 ABSL_FLAG(std::string, service_account_key_file, "",
95 "Path to service account json key file.");
96 ABSL_FLAG(std::string, oauth_scope, "", "Scope for OAuth tokens.");
97 ABSL_FLAG(bool, do_not_abort_on_transient_failures, false,
98 "If set to 'true', abort() is not called in case of transient "
99 "failures (i.e failures that are temporary and will likely go away "
100 "on retrying; like a temporary connection failure) and an error "
101 "message is printed instead. Note that this flag just controls "
102 "whether abort() is called or not. It does not control whether the "
103 "test is retried in case of transient failures (and currently the "
104 "interop tests are not retried even if this flag is set to true)");
105 ABSL_FLAG(int32_t, soak_iterations, 1000,
106 "The number of iterations to use for the two soak tests; rpc_soak "
107 "and channel_soak.");
108 ABSL_FLAG(int32_t, soak_max_failures, 0,
109 "The number of iterations in soak tests that are allowed to fail "
110 "(either due to non-OK status code or exceeding the "
111 "per-iteration max acceptable latency).");
112 ABSL_FLAG(int32_t, soak_per_iteration_max_acceptable_latency_ms, 0,
113 "The number of milliseconds a single iteration in the two soak "
114 "tests (rpc_soak and channel_soak) should take.");
115 ABSL_FLAG(int32_t, soak_overall_timeout_seconds, 0,
116 "The overall number of seconds after which a soak test should "
117 "stop and fail, if the desired number of iterations have not yet "
118 "completed.");
119 ABSL_FLAG(int32_t, soak_min_time_ms_between_rpcs, 0,
120 "The minimum time in milliseconds between consecutive RPCs in a "
121 "soak test (rpc_soak or channel_soak), useful for limiting QPS");
122 ABSL_FLAG(
123 int32_t, soak_request_size, 271828,
124 "The request size in a soak RPC. "
125 "The default value is set based on the interop large unary test case.");
126 ABSL_FLAG(
127 int32_t, soak_response_size, 314159,
128 "The response size in a soak RPC. "
129 "The default value is set based on the interop large unary test case.");
130 ABSL_FLAG(int32_t, iteration_interval, 10,
131 "The interval in seconds between rpcs. This is used by "
132 "long_connection test");
133 ABSL_FLAG(std::string, additional_metadata, "",
134 "Additional metadata to send in each request, as a "
135 "semicolon-separated list of key:value pairs.");
136 ABSL_FLAG(
137 bool, log_metadata_and_status, false,
138 "If set to 'true', will print received initial and trailing metadata, "
139 "grpc-status and error message to the console, in a stable format.");
140 ABSL_FLAG(bool, enable_observability, false,
141 "Whether to enable GCP Observability");
142 ABSL_FLAG(bool, enable_otel_plugin, false,
143 "Whether to enable OpenTelemetry Plugin");
144
145 using grpc::testing::CreateChannelForTestCase;
146 using grpc::testing::GetServiceAccountJsonKey;
147 using grpc::testing::UpdateActions;
148
149 namespace {
150
151 // Parse the contents of FLAGS_additional_metadata into a map. Allow
152 // alphanumeric characters and dashes in keys, and any character but semicolons
153 // in values. Convert keys to lowercase. On failure, log an error and return
154 // false.
ParseAdditionalMetadataFlag(const std::string & flag,std::multimap<std::string,std::string> * additional_metadata)155 bool ParseAdditionalMetadataFlag(
156 const std::string& flag,
157 std::multimap<std::string, std::string>* additional_metadata) {
158 size_t start_pos = 0;
159 while (start_pos < flag.length()) {
160 size_t colon_pos = flag.find(':', start_pos);
161 if (colon_pos == std::string::npos) {
162 LOG(ERROR)
163 << "Couldn't parse metadata flag: extra characters at end of flag";
164 return false;
165 }
166 size_t semicolon_pos = flag.find(';', colon_pos);
167
168 std::string key = flag.substr(start_pos, colon_pos - start_pos);
169 std::string value =
170 flag.substr(colon_pos + 1, semicolon_pos - colon_pos - 1);
171
172 constexpr char alphanum_and_hyphen[] =
173 "-0123456789"
174 "abcdefghijklmnopqrstuvwxyz"
175 "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
176 if (key.find_first_not_of(alphanum_and_hyphen) != std::string::npos) {
177 LOG(ERROR) << "Couldn't parse metadata flag: key contains characters "
178 "other than alphanumeric and hyphens: "
179 << key;
180 return false;
181 }
182
183 // Convert to lowercase.
184 for (char& c : key) {
185 if (c >= 'A' && c <= 'Z') {
186 c += ('a' - 'A');
187 }
188 }
189
190 LOG(INFO) << "Adding additional metadata with key " << key << " and value "
191 << value;
192 additional_metadata->insert({key, value});
193
194 if (semicolon_pos == std::string::npos) {
195 break;
196 } else {
197 start_pos = semicolon_pos + 1;
198 }
199 }
200
201 return true;
202 }
203
204 } // namespace
205
main(int argc,char ** argv)206 int main(int argc, char** argv) {
207 grpc::testing::TestEnvironment env(&argc, argv);
208 grpc::testing::InitTest(&argc, &argv, true);
209 LOG(INFO) << "Testing these cases: " << absl::GetFlag(FLAGS_test_case);
210 int ret = 0;
211
212 if (absl::GetFlag(FLAGS_enable_observability)) {
213 // TODO(someone): remove deprecated usage
214 // NOLINTNEXTLINE(clang-diagnostic-deprecated-declarations)
215 auto status = grpc::experimental::GcpObservabilityInit();
216 VLOG(2) << "GcpObservabilityInit() status_code: " << status.code();
217 if (!status.ok()) {
218 return 1;
219 }
220 }
221
222 // TODO(stanleycheung): switch to CsmObservabilityBuilder once xds setup is
223 // ready
224 if (absl::GetFlag(FLAGS_enable_otel_plugin)) {
225 VLOG(2) << "Registering Prometheus exporter";
226 opentelemetry::exporter::metrics::PrometheusExporterOptions opts;
227 // default was "localhost:9464" which causes connection issue across GKE
228 // pods
229 opts.url = "0.0.0.0:9464";
230 auto prometheus_exporter =
231 opentelemetry::exporter::metrics::PrometheusExporterFactory::Create(
232 opts);
233 auto meter_provider =
234 std::make_shared<opentelemetry::sdk::metrics::MeterProvider>();
235 meter_provider->AddMetricReader(std::move(prometheus_exporter));
236 grpc::OpenTelemetryPluginBuilder otel_builder;
237 otel_builder.SetMeterProvider(std::move(meter_provider));
238 assert(otel_builder.BuildAndRegisterGlobal().ok());
239 }
240
241 grpc::testing::ChannelCreationFunc channel_creation_func;
242 std::string test_case = absl::GetFlag(FLAGS_test_case);
243 if (absl::GetFlag(FLAGS_additional_metadata).empty()) {
244 channel_creation_func = [test_case](auto arguments) {
245 std::vector<std::unique_ptr<
246 grpc::experimental::ClientInterceptorFactoryInterface>>
247 factories;
248 if (absl::GetFlag(FLAGS_log_metadata_and_status)) {
249 factories.emplace_back(
250 new grpc::testing::MetadataAndStatusLoggerInterceptorFactory());
251 }
252 return CreateChannelForTestCase(test_case, std::move(factories),
253 arguments);
254 };
255 } else {
256 std::multimap<std::string, std::string> additional_metadata;
257 if (!ParseAdditionalMetadataFlag(absl::GetFlag(FLAGS_additional_metadata),
258 &additional_metadata)) {
259 return 1;
260 }
261
262 channel_creation_func = [test_case, additional_metadata](auto arguments) {
263 std::vector<std::unique_ptr<
264 grpc::experimental::ClientInterceptorFactoryInterface>>
265 factories;
266 factories.emplace_back(
267 new grpc::testing::AdditionalMetadataInterceptorFactory(
268 additional_metadata));
269 if (absl::GetFlag(FLAGS_log_metadata_and_status)) {
270 factories.emplace_back(
271 new grpc::testing::MetadataAndStatusLoggerInterceptorFactory());
272 }
273 return CreateChannelForTestCase(test_case, std::move(factories),
274 arguments);
275 };
276 }
277
278 grpc::testing::InteropClient client(
279 channel_creation_func, true,
280 absl::GetFlag(FLAGS_do_not_abort_on_transient_failures));
281
282 std::unordered_map<std::string, std::function<bool()>> actions;
283 actions["empty_unary"] =
284 std::bind(&grpc::testing::InteropClient::DoEmpty, &client);
285 actions["large_unary"] =
286 std::bind(&grpc::testing::InteropClient::DoLargeUnary, &client);
287 actions["server_compressed_unary"] = std::bind(
288 &grpc::testing::InteropClient::DoServerCompressedUnary, &client);
289 actions["client_compressed_unary"] = std::bind(
290 &grpc::testing::InteropClient::DoClientCompressedUnary, &client);
291 actions["client_streaming"] =
292 std::bind(&grpc::testing::InteropClient::DoRequestStreaming, &client);
293 actions["server_streaming"] =
294 std::bind(&grpc::testing::InteropClient::DoResponseStreaming, &client);
295 actions["server_compressed_streaming"] = std::bind(
296 &grpc::testing::InteropClient::DoServerCompressedStreaming, &client);
297 actions["client_compressed_streaming"] = std::bind(
298 &grpc::testing::InteropClient::DoClientCompressedStreaming, &client);
299 actions["slow_consumer"] = std::bind(
300 &grpc::testing::InteropClient::DoResponseStreamingWithSlowConsumer,
301 &client);
302 actions["half_duplex"] =
303 std::bind(&grpc::testing::InteropClient::DoHalfDuplex, &client);
304 actions["ping_pong"] =
305 std::bind(&grpc::testing::InteropClient::DoPingPong, &client);
306 actions["cancel_after_begin"] =
307 std::bind(&grpc::testing::InteropClient::DoCancelAfterBegin, &client);
308 actions["cancel_after_first_response"] = std::bind(
309 &grpc::testing::InteropClient::DoCancelAfterFirstResponse, &client);
310 actions["timeout_on_sleeping_server"] = std::bind(
311 &grpc::testing::InteropClient::DoTimeoutOnSleepingServer, &client);
312 actions["empty_stream"] =
313 std::bind(&grpc::testing::InteropClient::DoEmptyStream, &client);
314 actions["pick_first_unary"] =
315 std::bind(&grpc::testing::InteropClient::DoPickFirstUnary, &client);
316 if (absl::GetFlag(FLAGS_use_tls)) {
317 actions["compute_engine_creds"] =
318 std::bind(&grpc::testing::InteropClient::DoComputeEngineCreds, &client,
319 absl::GetFlag(FLAGS_default_service_account),
320 absl::GetFlag(FLAGS_oauth_scope));
321 actions["jwt_token_creds"] =
322 std::bind(&grpc::testing::InteropClient::DoJwtTokenCreds, &client,
323 GetServiceAccountJsonKey());
324 actions["oauth2_auth_token"] =
325 std::bind(&grpc::testing::InteropClient::DoOauth2AuthToken, &client,
326 absl::GetFlag(FLAGS_default_service_account),
327 absl::GetFlag(FLAGS_oauth_scope));
328 actions["per_rpc_creds"] =
329 std::bind(&grpc::testing::InteropClient::DoPerRpcCreds, &client,
330 GetServiceAccountJsonKey());
331 }
332 if (absl::GetFlag(FLAGS_custom_credentials_type) ==
333 "google_default_credentials") {
334 actions["google_default_credentials"] =
335 std::bind(&grpc::testing::InteropClient::DoGoogleDefaultCredentials,
336 &client, absl::GetFlag(FLAGS_default_service_account));
337 }
338 actions["status_code_and_message"] =
339 std::bind(&grpc::testing::InteropClient::DoStatusWithMessage, &client);
340 actions["special_status_message"] =
341 std::bind(&grpc::testing::InteropClient::DoSpecialStatusMessage, &client);
342 actions["custom_metadata"] =
343 std::bind(&grpc::testing::InteropClient::DoCustomMetadata, &client);
344 actions["unimplemented_method"] =
345 std::bind(&grpc::testing::InteropClient::DoUnimplementedMethod, &client);
346 actions["unimplemented_service"] =
347 std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client);
348 actions["channel_soak"] = std::bind(
349 &grpc::testing::InteropClient::DoChannelSoakTest, &client,
350 absl::GetFlag(FLAGS_server_host), absl::GetFlag(FLAGS_soak_iterations),
351 absl::GetFlag(FLAGS_soak_max_failures),
352 absl::GetFlag(FLAGS_soak_per_iteration_max_acceptable_latency_ms),
353 absl::GetFlag(FLAGS_soak_min_time_ms_between_rpcs),
354 absl::GetFlag(FLAGS_soak_overall_timeout_seconds),
355 absl::GetFlag(FLAGS_soak_request_size),
356 absl::GetFlag(FLAGS_soak_response_size));
357 actions["rpc_soak"] = std::bind(
358 &grpc::testing::InteropClient::DoRpcSoakTest, &client,
359 absl::GetFlag(FLAGS_server_host), absl::GetFlag(FLAGS_soak_iterations),
360 absl::GetFlag(FLAGS_soak_max_failures),
361 absl::GetFlag(FLAGS_soak_per_iteration_max_acceptable_latency_ms),
362 absl::GetFlag(FLAGS_soak_min_time_ms_between_rpcs),
363 absl::GetFlag(FLAGS_soak_overall_timeout_seconds),
364 absl::GetFlag(FLAGS_soak_request_size),
365 absl::GetFlag(FLAGS_soak_response_size));
366 actions["long_lived_channel"] =
367 std::bind(&grpc::testing::InteropClient::DoLongLivedChannelTest, &client,
368 absl::GetFlag(FLAGS_soak_iterations),
369 absl::GetFlag(FLAGS_iteration_interval));
370
371 UpdateActions(&actions);
372
373 if (absl::GetFlag(FLAGS_test_case) == "all") {
374 for (const auto& action : actions) {
375 for (int i = 0; i < absl::GetFlag(FLAGS_num_times); i++) {
376 action.second();
377 }
378 }
379 } else if (actions.find(absl::GetFlag(FLAGS_test_case)) != actions.end()) {
380 for (int i = 0; i < absl::GetFlag(FLAGS_num_times); i++) {
381 actions.find(absl::GetFlag(FLAGS_test_case))->second();
382 }
383 } else {
384 std::string test_cases;
385 for (const auto& action : actions) {
386 if (!test_cases.empty()) test_cases += "\n";
387 test_cases += action.first;
388 }
389 LOG(ERROR) << "Unsupported test case " << absl::GetFlag(FLAGS_test_case)
390 << ". Valid options are\n"
391 << test_cases;
392 ret = 1;
393 }
394
395 if (absl::GetFlag(FLAGS_enable_observability)) {
396 // TODO(someone): remove deprecated usage
397 // NOLINTNEXTLINE(clang-diagnostic-deprecated-declarations)
398 grpc::experimental::GcpObservabilityClose();
399 }
400
401 return ret;
402 }
403