1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/grpc.h>
20 #include <grpc/support/time.h>
21 #include <grpcpp/security/server_credentials.h>
22 #include <grpcpp/server.h>
23 #include <grpcpp/server_builder.h>
24 #include <stdint.h>
25 #include <stdio.h>
26 #include <string.h>
27
28 #include <algorithm>
29 #include <iterator>
30 #include <limits>
31 #include <map>
32 #include <memory>
33 #include <string>
34 #include <utility>
35 #include <vector>
36
37 #include "absl/algorithm/container.h"
38 #include "absl/flags/flag.h"
39 #include "absl/flags/parse.h"
40 #include "absl/log/log.h"
41 #include "absl/strings/str_cat.h"
42 #include "absl/strings/str_join.h"
43 #include "absl/strings/str_split.h"
44 #include "absl/strings/string_view.h"
45 #include "envoy/config/cluster/v3/cluster.pb.h"
46 #include "envoy/config/core/v3/health_check.pb.h"
47 #include "google/protobuf/wrappers.pb.h"
48 #include "src/core/config/config_vars.h"
49 #include "src/core/util/env.h"
50 #include "src/core/util/subprocess.h"
51 #include "test/core/test_util/port.h"
52 #include "test/core/test_util/resolve_localhost_ip46.h"
53 #include "test/core/test_util/test_config.h"
54 #include "test/cpp/end2end/xds/xds_server.h"
55 #include "test/cpp/end2end/xds/xds_utils.h"
56
57 using grpc::testing::XdsResourceUtils;
58
59 // Default all benchmarks in order to trigger CI testing for each one
60 ABSL_FLAG(std::string, benchmark_names, "",
61 "Which benchmark to run. If empty, defaults to 'call,channel' "
62 "if --use_xds is false, or 'call,channel,channel_multi_address' "
63 "if --use_xds is true.");
64
65 ABSL_FLAG(int, size, 1000, "Number of channels/calls");
66 ABSL_FLAG(
67 std::string, scenario_config, "insecure",
68 "Possible Values: minstack (Use minimal stack), resource_quota, insecure, "
69 "secure (Use SSL credentials on server), chaotic_good");
70 ABSL_FLAG(bool, memory_profiling, false,
71 "Run memory profiling"); // TODO (chennancy) Connect this flag
72 ABSL_FLAG(bool, use_xds, false, "Use xDS");
73
74 // TODO(roth, ctiller): Add support for multiple addresses per channel.
75
76 class Subprocess {
77 public:
Subprocess(std::vector<std::string> args)78 explicit Subprocess(std::vector<std::string> args) {
79 std::vector<const char*> args_c;
80 args_c.reserve(args.size());
81 for (const auto& arg : args) {
82 args_c.push_back(arg.c_str());
83 }
84 LOG(INFO) << "START: " << absl::StrJoin(args, " ");
85 process_ = gpr_subprocess_create(args_c.size(), args_c.data());
86 }
87
GetPID()88 int GetPID() { return gpr_subprocess_get_process_id(process_); }
Join()89 int Join() { return gpr_subprocess_join(process_); }
Interrupt()90 void Interrupt() { gpr_subprocess_interrupt(process_); }
91
~Subprocess()92 ~Subprocess() { gpr_subprocess_destroy(process_); }
93
94 private:
95 gpr_subprocess* process_;
96 };
97
98 // per-call memory usage benchmark
RunCallBenchmark(int port,char * root,std::vector<std::string> server_scenario_flags,std::vector<std::string> client_scenario_flags)99 int RunCallBenchmark(int port, char* root,
100 std::vector<std::string> server_scenario_flags,
101 std::vector<std::string> client_scenario_flags) {
102 int status;
103
104 // start the server
105 LOG(INFO) << "starting server";
106 std::vector<std::string> server_flags = {
107 absl::StrCat(root, "/memory_usage_server",
108 gpr_subprocess_binary_extension()),
109 "--grpc_experiments",
110 std::string(grpc_core::ConfigVars::Get().Experiments()), "--bind",
111 grpc_core::LocalIpAndPort(port)};
112 if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
113 // Add scenario-specific server flags to the end of the server_flags
114 absl::c_move(server_scenario_flags, std::back_inserter(server_flags));
115 Subprocess svr(server_flags);
116 LOG(INFO) << "server started, pid " << svr.GetPID();
117
118 // Wait one second before starting client to give the server a chance
119 // to start up.
120 gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
121
122 // start the client
123 LOG(INFO) << "starting client";
124 std::vector<std::string> client_flags = {
125 absl::StrCat(root, "/memory_usage_client",
126 gpr_subprocess_binary_extension()),
127 "--target",
128 absl::GetFlag(FLAGS_use_xds)
129 ? absl::StrCat("xds:", XdsResourceUtils::kServerName)
130 : grpc_core::LocalIpAndPort(port),
131 "--grpc_experiments",
132 std::string(grpc_core::ConfigVars::Get().Experiments()),
133 absl::StrCat("--warmup=", 10000),
134 absl::StrCat("--benchmark=", absl::GetFlag(FLAGS_size))};
135 // Add scenario-specific client flags to the end of the client_flags
136 absl::c_move(client_scenario_flags, std::back_inserter(client_flags));
137 Subprocess cli(client_flags);
138 LOG(INFO) << "client started, pid " << cli.GetPID();
139 // wait for completion
140 if ((status = cli.Join()) != 0) {
141 printf("client failed with: %d", status);
142 return 1;
143 }
144
145 svr.Interrupt();
146 return svr.Join() == 0 ? 0 : 2;
147 }
148
149 // Per-channel benchmark
RunChannelBenchmark(const std::vector<int> & server_ports,char * root)150 int RunChannelBenchmark(const std::vector<int>& server_ports, char* root) {
151 // TODO(chennancy) Add the scenario specific flags
152
153 // start the servers
154 std::vector<Subprocess> servers;
155 servers.reserve(server_ports.size());
156 for (int port : server_ports) {
157 LOG(INFO) << "starting server on port " << port;
158 std::vector<std::string> server_flags = {
159 absl::StrCat(root, "/memory_usage_callback_server",
160 gpr_subprocess_binary_extension()),
161 "--bind", grpc_core::LocalIpAndPort(port)};
162 if (absl::GetFlag(FLAGS_use_xds)) server_flags.emplace_back("--use_xds");
163 servers.emplace_back(server_flags);
164 LOG(INFO) << "server started, pid " << servers.back().GetPID();
165 }
166
167 // Wait one second before starting client to avoid possible race condition
168 // of client sending an RPC before the server is set up
169 gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
170
171 // start the client
172 LOG(INFO) << "starting client";
173 std::vector<std::string> client_flags = {
174 absl::StrCat(root, "/memory_usage_callback_client",
175 gpr_subprocess_binary_extension()),
176 "--target",
177 absl::GetFlag(FLAGS_use_xds)
178 ? absl::StrCat("xds:", XdsResourceUtils::kServerName)
179 : grpc_core::LocalIpAndPort(server_ports[0]),
180 "--nosecure", absl::StrCat("--size=", absl::GetFlag(FLAGS_size))};
181 if (server_ports.size() == 1) {
182 client_flags.emplace_back(
183 absl::StrCat("--server_pid=", servers[0].GetPID()));
184 }
185 Subprocess cli(client_flags);
186 LOG(INFO) << "client started, pid " << cli.GetPID();
187 // wait for completion
188 int retval = cli.Join();
189 if (retval != 0) {
190 printf("client failed with: %d", retval);
191 return 1;
192 }
193 for (auto& server : servers) {
194 server.Interrupt();
195 if (server.Join() != 0) retval = 2;
196 }
197 return retval;
198 }
199
200 struct XdsServer {
201 std::shared_ptr<grpc::testing::AdsServiceImpl> ads_service;
202 std::unique_ptr<grpc::Server> server;
203 };
204
StartXdsServerAndConfigureBootstrap(const std::vector<int> & server_ports)205 XdsServer StartXdsServerAndConfigureBootstrap(
206 const std::vector<int>& server_ports) {
207 XdsServer xds_server;
208 int xds_server_port = grpc_pick_unused_port_or_die();
209 LOG(INFO) << "xDS server port: " << xds_server_port;
210 // Generate xDS bootstrap and set the env var.
211 std::string bootstrap =
212 grpc::testing::XdsBootstrapBuilder()
213 .SetServers({absl::StrCat("localhost:", xds_server_port)})
214 .SetXdsChannelCredentials("insecure")
215 .Build();
216 grpc_core::SetEnv("GRPC_XDS_BOOTSTRAP_CONFIG", bootstrap);
217 LOG(INFO) << "xDS bootstrap: " << bootstrap;
218 // Create ADS service.
219 xds_server.ads_service = std::make_shared<grpc::testing::AdsServiceImpl>();
220 xds_server.ads_service->Start();
221 // Populate xDS resources.
222 std::vector<XdsResourceUtils::EdsResourceArgs::Endpoint> endpoints;
223 endpoints.reserve(server_ports.size());
224 for (int port : server_ports) {
225 endpoints.emplace_back(port);
226 XdsResourceUtils::SetServerListenerNameAndRouteConfiguration(
227 xds_server.ads_service.get(), XdsResourceUtils::DefaultServerListener(),
228 port, XdsResourceUtils::DefaultServerRouteConfig());
229 }
230 XdsResourceUtils::SetListenerAndRouteConfiguration(
231 xds_server.ads_service.get(), XdsResourceUtils::DefaultListener(),
232 XdsResourceUtils::DefaultRouteConfig());
233 auto cluster = XdsResourceUtils::DefaultCluster();
234 cluster.mutable_circuit_breakers()
235 ->add_thresholds()
236 ->mutable_max_requests()
237 ->set_value(std::numeric_limits<uint32_t>::max());
238 xds_server.ads_service->SetCdsResource(cluster);
239 xds_server.ads_service->SetEdsResource(
240 XdsResourceUtils::BuildEdsResource(XdsResourceUtils::EdsResourceArgs(
241 {XdsResourceUtils::EdsResourceArgs::Locality(
242 "here", std::move(endpoints))})));
243 // Create and start server.
244 LOG(INFO) << "starting xDS server...";
245 grpc::ServerBuilder builder;
246 builder.RegisterService(xds_server.ads_service.get());
247 builder.AddListeningPort(absl::StrCat("localhost:", xds_server_port),
248 grpc::InsecureServerCredentials());
249 xds_server.server = builder.BuildAndStart();
250 LOG(INFO) << "xDS server started";
251 return xds_server;
252 }
253
RunBenchmark(char * root,absl::string_view benchmark,std::vector<std::string> server_scenario_flags,std::vector<std::string> client_scenario_flags)254 int RunBenchmark(char* root, absl::string_view benchmark,
255 std::vector<std::string> server_scenario_flags,
256 std::vector<std::string> client_scenario_flags) {
257 LOG(INFO) << "running benchmark: " << benchmark;
258 const size_t num_ports = benchmark == "channel_multi_address" ? 10 : 1;
259 std::vector<int> server_ports;
260 server_ports.reserve(num_ports);
261 for (size_t i = 0; i < num_ports; ++i) {
262 server_ports.push_back(grpc_pick_unused_port_or_die());
263 }
264 LOG(INFO) << "server ports: " << absl::StrJoin(server_ports, ",");
265 XdsServer xds_server;
266 if (absl::GetFlag(FLAGS_use_xds)) {
267 xds_server = StartXdsServerAndConfigureBootstrap(server_ports);
268 }
269 int retval;
270 if (benchmark == "call") {
271 retval = RunCallBenchmark(server_ports[0], root, server_scenario_flags,
272 client_scenario_flags);
273 } else if (benchmark == "channel" || benchmark == "channel_multi_address") {
274 retval = RunChannelBenchmark(server_ports, root);
275 } else {
276 LOG(INFO) << "Not a valid benchmark name";
277 retval = 4;
278 }
279 if (xds_server.server != nullptr) xds_server.server->Shutdown();
280 LOG(INFO) << "done running benchmark";
281 return retval;
282 }
283
main(int argc,char ** argv)284 int main(int argc, char** argv) {
285 absl::ParseCommandLine(argc, argv);
286
287 char* me = argv[0];
288 char* lslash = strrchr(me, '/');
289 char root[1024];
290
291 std::vector<const char*> args;
292 // figure out where we are
293 if (lslash) {
294 memcpy(root, me, static_cast<size_t>(lslash - me));
295 root[lslash - me] = 0;
296 } else {
297 strcpy(root, ".");
298 }
299
300 // Set configurations based off scenario_config
301 struct ScenarioArgs {
302 std::vector<std::string> client;
303 std::vector<std::string> server;
304 };
305 // TODO(chennancy): add in resource quota parameter setting later
306 const std::map<std::string /*scenario*/, ScenarioArgs> scenarios = {
307 {"secure", {/*client=*/{}, /*server=*/{"--secure"}}},
308 {"resource_quota", {/*client=*/{}, /*server=*/{"--secure"}}},
309 {"minstack", {/*client=*/{"--minstack"}, /*server=*/{"--minstack"}}},
310 {"insecure", {{}, {}}},
311 {"chaotic_good", {{"--chaotic_good"}, {"--chaotic_good"}}}};
312 auto it_scenario = scenarios.find(absl::GetFlag(FLAGS_scenario_config));
313 if (it_scenario == scenarios.end()) {
314 printf("No scenario matching the name could be found\n");
315 return 3;
316 }
317
318 // Run all benchmarks listed (Multiple benchmarks usually only for default
319 // scenario)
320 std::string benchmark_names = absl::GetFlag(FLAGS_benchmark_names);
321 if (benchmark_names.empty()) {
322 benchmark_names = absl::GetFlag(FLAGS_use_xds)
323 ? "call,channel,channel_multi_address"
324 : "call,channel";
325 }
326 auto benchmarks = absl::StrSplit(benchmark_names, ',');
327 grpc_init();
328 for (const auto& benchmark : benchmarks) {
329 int r = RunBenchmark(root, benchmark, it_scenario->second.server,
330 it_scenario->second.client);
331 if (r != 0) return r;
332 }
333 grpc_shutdown();
334 return 0;
335 }
336