1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/qps/driver.h"
20
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/string_util.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26
27 #include <cinttypes>
28 #include <deque>
29 #include <list>
30 #include <thread>
31 #include <unordered_map>
32 #include <vector>
33
34 #include "absl/log/check.h"
35 #include "absl/log/log.h"
36 #include "google/protobuf/timestamp.pb.h"
37 #include "src/core/util/crash.h"
38 #include "src/core/util/env.h"
39 #include "src/core/util/host_port.h"
40 #include "src/proto/grpc/testing/worker_service.grpc.pb.h"
41 #include "test/core/test_util/port.h"
42 #include "test/core/test_util/test_config.h"
43 #include "test/cpp/qps/client.h"
44 #include "test/cpp/qps/histogram.h"
45 #include "test/cpp/qps/qps_worker.h"
46 #include "test/cpp/qps/stats.h"
47 #include "test/cpp/util/test_credentials_provider.h"
48
49 using std::deque;
50 using std::list;
51 using std::unique_ptr;
52 using std::vector;
53
54 namespace grpc {
55 namespace testing {
get_host(const std::string & worker)56 static std::string get_host(const std::string& worker) {
57 absl::string_view host;
58 absl::string_view port;
59 grpc_core::SplitHostPort(worker.c_str(), &host, &port);
60 return std::string(host.data(), host.size());
61 }
62
get_workers(const string & env_name)63 static deque<string> get_workers(const string& env_name) {
64 deque<string> out;
65 auto env = grpc_core::GetEnv(env_name.c_str()).value_or("");
66 const char* p = env.c_str();
67 if (!env.empty()) {
68 for (;;) {
69 const char* comma = strchr(p, ',');
70 if (comma) {
71 out.emplace_back(p, comma);
72 p = comma + 1;
73 } else {
74 out.emplace_back(p);
75 break;
76 }
77 }
78 }
79 if (out.empty()) {
80 LOG(ERROR) << "Environment variable \"" << env_name
81 << "\" does not contain a list of QPS "
82 "workers to use. Set it to a comma-separated list of "
83 "hostname:port pairs, starting with hosts that should act as "
84 "servers. E.g. export "
85 << env_name
86 << "=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"";
87 }
88 return out;
89 }
90
GetCredType(const std::string & worker_addr,const std::map<std::string,std::string> & per_worker_credential_types,const std::string & credential_type)91 std::string GetCredType(
92 const std::string& worker_addr,
93 const std::map<std::string, std::string>& per_worker_credential_types,
94 const std::string& credential_type) {
95 auto it = per_worker_credential_types.find(worker_addr);
96 if (it != per_worker_credential_types.end()) {
97 return it->second;
98 }
99 return credential_type;
100 }
101
102 // helpers for postprocess_scenario_result
WallTime(const ClientStats & s)103 static double WallTime(const ClientStats& s) { return s.time_elapsed(); }
SystemTime(const ClientStats & s)104 static double SystemTime(const ClientStats& s) { return s.time_system(); }
UserTime(const ClientStats & s)105 static double UserTime(const ClientStats& s) { return s.time_user(); }
CliPollCount(const ClientStats & s)106 static double CliPollCount(const ClientStats& s) { return s.cq_poll_count(); }
SvrPollCount(const ServerStats & s)107 static double SvrPollCount(const ServerStats& s) { return s.cq_poll_count(); }
ServerSystemTime(const ServerStats & s)108 static double ServerSystemTime(const ServerStats& s) { return s.time_system(); }
ServerUserTime(const ServerStats & s)109 static double ServerUserTime(const ServerStats& s) { return s.time_user(); }
ServerTotalCpuTime(const ServerStats & s)110 static double ServerTotalCpuTime(const ServerStats& s) {
111 return s.total_cpu_time();
112 }
ServerIdleCpuTime(const ServerStats & s)113 static double ServerIdleCpuTime(const ServerStats& s) {
114 return s.idle_cpu_time();
115 }
Cores(int n)116 static int Cores(int n) { return n; }
117
IsSuccess(const Status & s)118 static bool IsSuccess(const Status& s) {
119 if (s.ok()) return true;
120 // Since we shutdown servers and clients at the same time, they both can
121 // observe cancellation. Thus, we consider CANCELLED as good status.
122 if (static_cast<StatusCode>(s.error_code()) == StatusCode::CANCELLED) {
123 return true;
124 }
125 // Since we shutdown servers and clients at the same time, server can close
126 // the socket before the client attempts to do that, and vice versa. Thus
127 // receiving a "Socket closed" error is fine.
128 if (s.error_message() == "Socket closed") return true;
129 return false;
130 }
131
132 // Postprocess ScenarioResult and populate result summary.
postprocess_scenario_result(ScenarioResult * result)133 static void postprocess_scenario_result(ScenarioResult* result) {
134 // Get latencies from ScenarioResult latencies histogram and populate to
135 // result summary.
136 Histogram histogram;
137 histogram.MergeProto(result->latencies());
138 result->mutable_summary()->set_latency_50(histogram.Percentile(50));
139 result->mutable_summary()->set_latency_90(histogram.Percentile(90));
140 result->mutable_summary()->set_latency_95(histogram.Percentile(95));
141 result->mutable_summary()->set_latency_99(histogram.Percentile(99));
142 result->mutable_summary()->set_latency_999(histogram.Percentile(99.9));
143
144 // Calculate qps and cpu load for each client and then aggregate results for
145 // all clients
146 double qps = 0;
147 double client_system_cpu_load = 0, client_user_cpu_load = 0;
148 for (int i = 0; i < result->client_stats_size(); i++) {
149 auto client_stat = result->client_stats(i);
150 qps += client_stat.latencies().count() / client_stat.time_elapsed();
151 client_system_cpu_load +=
152 client_stat.time_system() / client_stat.time_elapsed();
153 client_user_cpu_load +=
154 client_stat.time_user() / client_stat.time_elapsed();
155 }
156 // Calculate cpu load for each server and then aggregate results for all
157 // servers
158 double server_system_cpu_load = 0, server_user_cpu_load = 0;
159 for (int i = 0; i < result->server_stats_size(); i++) {
160 auto server_stat = result->server_stats(i);
161 server_system_cpu_load +=
162 server_stat.time_system() / server_stat.time_elapsed();
163 server_user_cpu_load +=
164 server_stat.time_user() / server_stat.time_elapsed();
165 }
166 result->mutable_summary()->set_qps(qps);
167 // Populate the percentage of cpu load to result summary.
168 result->mutable_summary()->set_server_system_time(100 *
169 server_system_cpu_load);
170 result->mutable_summary()->set_server_user_time(100 * server_user_cpu_load);
171 result->mutable_summary()->set_client_system_time(100 *
172 client_system_cpu_load);
173 result->mutable_summary()->set_client_user_time(100 * client_user_cpu_load);
174
175 // For Non-linux platform, get_cpu_usage() is not implemented. Thus,
176 // ServerTotalCpuTime and ServerIdleCpuTime are both 0.
177 if (average(result->server_stats(), ServerTotalCpuTime) == 0) {
178 result->mutable_summary()->set_server_cpu_usage(0);
179 } else {
180 auto server_cpu_usage =
181 100 - (100 * average(result->server_stats(), ServerIdleCpuTime) /
182 average(result->server_stats(), ServerTotalCpuTime));
183 result->mutable_summary()->set_server_cpu_usage(server_cpu_usage);
184 }
185
186 // Calculate and populate successful request per second and failed requests
187 // per seconds to result summary.
188 auto time_estimate = average(result->client_stats(), WallTime);
189 if (result->request_results_size() > 0) {
190 int64_t successes = 0;
191 int64_t failures = 0;
192 for (int i = 0; i < result->request_results_size(); i++) {
193 const RequestResultCount& rrc = result->request_results(i);
194 if (rrc.status_code() == 0) {
195 successes += rrc.count();
196 } else {
197 failures += rrc.count();
198 }
199 }
200 result->mutable_summary()->set_successful_requests_per_second(
201 successes / time_estimate);
202 result->mutable_summary()->set_failed_requests_per_second(failures /
203 time_estimate);
204 }
205
206 // Fill in data for other metrics required in result summary
207 auto qps_per_server_core = qps / sum(result->server_cores(), Cores);
208 result->mutable_summary()->set_qps_per_server_core(qps_per_server_core);
209 result->mutable_summary()->set_client_polls_per_request(
210 sum(result->client_stats(), CliPollCount) / histogram.Count());
211 result->mutable_summary()->set_server_polls_per_request(
212 sum(result->server_stats(), SvrPollCount) / histogram.Count());
213
214 auto server_queries_per_cpu_sec =
215 histogram.Count() / (sum(result->server_stats(), ServerSystemTime) +
216 sum(result->server_stats(), ServerUserTime));
217 auto client_queries_per_cpu_sec =
218 histogram.Count() / (sum(result->client_stats(), SystemTime) +
219 sum(result->client_stats(), UserTime));
220
221 result->mutable_summary()->set_server_queries_per_cpu_sec(
222 server_queries_per_cpu_sec);
223 result->mutable_summary()->set_client_queries_per_cpu_sec(
224 client_queries_per_cpu_sec);
225 }
226
227 struct ClientData {
228 unique_ptr<WorkerService::Stub> stub;
229 unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
230 };
231
232 struct ServerData {
233 unique_ptr<WorkerService::Stub> stub;
234 unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
235 };
236
FinishClients(const std::vector<ClientData> & clients,const ClientArgs & client_mark)237 static void FinishClients(const std::vector<ClientData>& clients,
238 const ClientArgs& client_mark) {
239 LOG(INFO) << "Finishing clients";
240 for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
241 auto client = &clients[i];
242 if (!client->stream->Write(client_mark)) {
243 grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
244 }
245 if (!client->stream->WritesDone()) {
246 grpc_core::Crash(absl::StrFormat("Failed WritesDone for client %zu", i));
247 }
248 }
249 }
250
ReceiveFinalStatusFromClients(const std::vector<ClientData> & clients,Histogram & merged_latencies,std::unordered_map<int,int64_t> & merged_statuses,ScenarioResult & result)251 static void ReceiveFinalStatusFromClients(
252 const std::vector<ClientData>& clients, Histogram& merged_latencies,
253 std::unordered_map<int, int64_t>& merged_statuses, ScenarioResult& result) {
254 LOG(INFO) << "Receiving final status from clients";
255 ClientStatus client_status;
256 for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
257 auto client = &clients[i];
258 // Read the client final status
259 if (client->stream->Read(&client_status)) {
260 LOG(INFO) << "Received final status from client " << i;
261 const auto& stats = client_status.stats();
262 merged_latencies.MergeProto(stats.latencies());
263 for (int i = 0; i < stats.request_results_size(); i++) {
264 merged_statuses[stats.request_results(i).status_code()] +=
265 stats.request_results(i).count();
266 }
267 result.add_client_stats()->CopyFrom(stats);
268 // Check that final status was should be the last message on the client
269 // stream.
270 // TODO(jtattermusch): note that that waiting for Read to return can take
271 // long on some scenarios (e.g. unconstrained streaming_from_server). See
272 // https://github.com/grpc/grpc/blob/3bd0cd208ea549760a2daf595f79b91b247fe240/test/cpp/qps/server_async.cc#L176
273 // where the shutdown delay pretty much determines the wait here.
274 CHECK(!client->stream->Read(&client_status));
275 } else {
276 grpc_core::Crash(
277 absl::StrFormat("Couldn't get final status from client %zu", i));
278 }
279 }
280 }
281
ShutdownClients(const std::vector<ClientData> & clients,ScenarioResult & result)282 static void ShutdownClients(const std::vector<ClientData>& clients,
283 ScenarioResult& result) {
284 LOG(INFO) << "Shutdown clients";
285 for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
286 auto client = &clients[i];
287 Status s = client->stream->Finish();
288 // Since we shutdown servers and clients at the same time, clients can
289 // observe cancellation. Thus, we consider both OK and CANCELLED as good
290 // status.
291 const bool success = IsSuccess(s);
292 result.add_client_success(success);
293 if (!success) {
294 grpc_core::Crash(absl::StrFormat("Client %zu had an error %s", i,
295 s.error_message().c_str()));
296 }
297 }
298 }
299
FinishServers(const std::vector<ServerData> & servers,const ServerArgs & server_mark)300 static void FinishServers(const std::vector<ServerData>& servers,
301 const ServerArgs& server_mark) {
302 LOG(INFO) << "Finishing servers";
303 for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
304 auto server = &servers[i];
305 if (!server->stream->Write(server_mark)) {
306 grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
307 }
308 if (!server->stream->WritesDone()) {
309 grpc_core::Crash(absl::StrFormat("Failed WritesDone for server %zu", i));
310 }
311 }
312 }
313
ReceiveFinalStatusFromServer(const std::vector<ServerData> & servers,ScenarioResult & result)314 static void ReceiveFinalStatusFromServer(const std::vector<ServerData>& servers,
315 ScenarioResult& result) {
316 LOG(INFO) << "Receiving final status from servers";
317 ServerStatus server_status;
318 for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
319 auto server = &servers[i];
320 // Read the server final status
321 if (server->stream->Read(&server_status)) {
322 LOG(INFO) << "Received final status from server " << i;
323 result.add_server_stats()->CopyFrom(server_status.stats());
324 result.add_server_cores(server_status.cores());
325 // That final status should be the last message on the server stream
326 CHECK(!server->stream->Read(&server_status));
327 } else {
328 grpc_core::Crash(
329 absl::StrFormat("Couldn't get final status from server %zu", i));
330 }
331 }
332 }
333
ShutdownServers(const std::vector<ServerData> & servers,ScenarioResult & result)334 static void ShutdownServers(const std::vector<ServerData>& servers,
335 ScenarioResult& result) {
336 LOG(INFO) << "Shutdown servers";
337 for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
338 auto server = &servers[i];
339 Status s = server->stream->Finish();
340 // Since we shutdown servers and clients at the same time, servers can
341 // observe cancellation. Thus, we consider both OK and CANCELLED as good
342 // status.
343 const bool success = IsSuccess(s);
344 result.add_server_success(success);
345 if (!success) {
346 grpc_core::Crash(absl::StrFormat("Server %zu had an error %s", i,
347 s.error_message().c_str()));
348 }
349 }
350 }
351
352 std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr;
353
RunScenario(const ClientConfig & initial_client_config,size_t num_clients,const ServerConfig & initial_server_config,size_t num_servers,int warmup_seconds,int benchmark_seconds,int spawn_local_worker_count,const std::string & qps_server_target_override,const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types,bool run_inproc,int32_t median_latency_collection_interval_millis)354 std::unique_ptr<ScenarioResult> RunScenario(
355 const ClientConfig& initial_client_config, size_t num_clients,
356 const ServerConfig& initial_server_config, size_t num_servers,
357 int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
358 const std::string& qps_server_target_override,
359 const std::string& credential_type,
360 const std::map<std::string, std::string>& per_worker_credential_types,
361 bool run_inproc, int32_t median_latency_collection_interval_millis) {
362 if (run_inproc) {
363 g_inproc_servers = new std::vector<grpc::testing::Server*>;
364 }
365
366 // ClientContext allocations (all are destroyed at scope exit)
367 list<ClientContext> contexts;
368 auto alloc_context = [](list<ClientContext>* contexts) {
369 contexts->emplace_back();
370 auto context = &contexts->back();
371 context->set_wait_for_ready(true);
372 return context;
373 };
374
375 // To be added to the result, containing the final configuration used for
376 // client and config (including host, etc.)
377 ClientConfig result_client_config;
378
379 // Get client, server lists; ignore if inproc test
380 auto workers = (!run_inproc) ? get_workers("QPS_WORKERS") : deque<string>();
381 ClientConfig client_config = initial_client_config;
382
383 // Spawn some local workers if desired
384 vector<unique_ptr<QpsWorker>> local_workers;
385 for (int i = 0; i < abs(spawn_local_worker_count); i++) {
386 // act as if we're a new test -- gets a good rng seed
387 static bool called_init = false;
388 if (!called_init) {
389 char args_buf[100];
390 strcpy(args_buf, "some-benchmark");
391 char* args[] = {args_buf};
392 int argc = 1;
393 grpc_test_init(&argc, args);
394 called_init = true;
395 }
396
397 char addr[256];
398 // we use port # of -1 to indicate inproc
399 int driver_port = (!run_inproc) ? grpc_pick_unused_port_or_die() : -1;
400 local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
401 sprintf(addr, "localhost:%d", driver_port);
402 if (spawn_local_worker_count < 0) {
403 workers.push_front(addr);
404 } else {
405 workers.push_back(addr);
406 }
407 }
408 CHECK(!workers.empty());
409
410 // if num_clients is set to <=0, do dynamic sizing: all workers
411 // except for servers are clients
412 if (num_clients <= 0) {
413 num_clients = workers.size() - num_servers;
414 }
415
416 // TODO(ctiller): support running multiple configurations, and binpack
417 // client/server pairs
418 // to available workers
419 CHECK_GE(workers.size(), num_clients + num_servers);
420
421 // Trim to just what we need
422 workers.resize(num_clients + num_servers);
423
424 // Start servers
425 std::vector<ServerData> servers(num_servers);
426 std::unordered_map<string, std::deque<int>> hosts_cores;
427 ChannelArguments channel_args;
428
429 for (size_t i = 0; i < num_servers; i++) {
430 LOG(INFO) << "Starting server on " << workers[i] << " (worker #" << i
431 << ")";
432 if (!run_inproc) {
433 servers[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
434 workers[i],
435 GetCredType(workers[i], per_worker_credential_types, credential_type),
436 nullptr /* call creds */, {} /* interceptor creators */));
437 } else {
438 servers[i].stub = WorkerService::NewStub(
439 local_workers[i]->InProcessChannel(channel_args));
440 }
441
442 const ServerConfig& server_config = initial_server_config;
443 if (server_config.core_limit() != 0) {
444 grpc_core::Crash("server config core limit is set but ignored by driver");
445 }
446
447 ServerArgs args;
448 *args.mutable_setup() = server_config;
449 servers[i].stream = servers[i].stub->RunServer(alloc_context(&contexts));
450 if (!servers[i].stream->Write(args)) {
451 grpc_core::Crash(
452 absl::StrFormat("Could not write args to server %zu", i));
453 }
454 ServerStatus init_status;
455 if (!servers[i].stream->Read(&init_status)) {
456 grpc_core::Crash(
457 absl::StrFormat("Server %zu did not yield initial status", i));
458 }
459 if (run_inproc) {
460 std::string cli_target(INPROC_NAME_PREFIX);
461 cli_target += std::to_string(i);
462 client_config.add_server_targets(cli_target);
463 } else {
464 std::string host = get_host(workers[i]);
465 std::string cli_target =
466 grpc_core::JoinHostPort(host.c_str(), init_status.port());
467 client_config.add_server_targets(cli_target.c_str());
468 }
469 }
470 if (!qps_server_target_override.empty()) {
471 // overriding the qps server target only makes since if there is <= 1
472 // servers
473 CHECK_LE(num_servers, 1u);
474 client_config.clear_server_targets();
475 client_config.add_server_targets(qps_server_target_override);
476 }
477 client_config.set_median_latency_collection_interval_millis(
478 median_latency_collection_interval_millis);
479
480 // Targets are all set by now
481 result_client_config = client_config;
482 // Start clients
483 std::vector<ClientData> clients(num_clients);
484 size_t channels_allocated = 0;
485 for (size_t i = 0; i < num_clients; i++) {
486 const auto& worker = workers[i + num_servers];
487 LOG(INFO) << "Starting client on " << worker << " (worker #"
488 << i + num_servers << ")";
489 if (!run_inproc) {
490 clients[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
491 worker,
492 GetCredType(worker, per_worker_credential_types, credential_type),
493 nullptr /* call creds */, {} /* interceptor creators */));
494 } else {
495 clients[i].stub = WorkerService::NewStub(
496 local_workers[i + num_servers]->InProcessChannel(channel_args));
497 }
498 ClientConfig per_client_config = client_config;
499
500 if (initial_client_config.core_limit() != 0) {
501 grpc_core::Crash("client config core limit set but ignored");
502 }
503
504 // Reduce channel count so that total channels specified is held regardless
505 // of the number of clients available
506 size_t num_channels =
507 (client_config.client_channels() - channels_allocated) /
508 (num_clients - i);
509 channels_allocated += num_channels;
510 VLOG(2) << "Client " << i << " gets " << num_channels << " channels";
511 per_client_config.set_client_channels(num_channels);
512
513 ClientArgs args;
514 *args.mutable_setup() = per_client_config;
515 clients[i].stream = clients[i].stub->RunClient(alloc_context(&contexts));
516 if (!clients[i].stream->Write(args)) {
517 grpc_core::Crash(
518 absl::StrFormat("Could not write args to client %zu", i));
519 }
520 }
521
522 for (size_t i = 0; i < num_clients; i++) {
523 ClientStatus init_status;
524 if (!clients[i].stream->Read(&init_status)) {
525 grpc_core::Crash(
526 absl::StrFormat("Client %zu did not yield initial status", i));
527 }
528 }
529
530 // Send an initial mark: clients can use this to know that everything is ready
531 // to start
532 LOG(INFO) << "Initiating";
533 ServerArgs server_mark;
534 server_mark.mutable_mark()->set_reset(true);
535 ClientArgs client_mark;
536 client_mark.mutable_mark()->set_reset(true);
537 ServerStatus server_status;
538 ClientStatus client_status;
539 for (size_t i = 0; i < num_clients; i++) {
540 auto client = &clients[i];
541 if (!client->stream->Write(client_mark)) {
542 grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
543 }
544 }
545 for (size_t i = 0; i < num_clients; i++) {
546 auto client = &clients[i];
547 if (!client->stream->Read(&client_status)) {
548 grpc_core::Crash(
549 absl::StrFormat("Couldn't get status from client %zu", i));
550 }
551 }
552
553 // Let everything warmup
554 LOG(INFO) << "Warming up";
555 gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
556 gpr_sleep_until(
557 gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN)));
558
559 // Start a run
560 LOG(INFO) << "Starting";
561
562 auto start_time = time(nullptr);
563
564 for (size_t i = 0; i < num_servers; i++) {
565 auto server = &servers[i];
566 if (!server->stream->Write(server_mark)) {
567 grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
568 }
569 }
570 for (size_t i = 0; i < num_clients; i++) {
571 auto client = &clients[i];
572 if (!client->stream->Write(client_mark)) {
573 grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
574 }
575 }
576 for (size_t i = 0; i < num_servers; i++) {
577 auto server = &servers[i];
578 if (!server->stream->Read(&server_status)) {
579 grpc_core::Crash(
580 absl::StrFormat("Couldn't get status from server %zu", i));
581 }
582 }
583 for (size_t i = 0; i < num_clients; i++) {
584 auto client = &clients[i];
585 if (!client->stream->Read(&client_status)) {
586 grpc_core::Crash(
587 absl::StrFormat("Couldn't get status from client %zu", i));
588 }
589 }
590
591 // Wait some time
592 LOG(INFO) << "Running";
593 // Use gpr_sleep_until rather than this_thread::sleep_until to support
594 // compilers that don't work with this_thread
595 gpr_sleep_until(gpr_time_add(
596 start,
597 gpr_time_from_seconds(warmup_seconds + benchmark_seconds, GPR_TIMESPAN)));
598
599 // Finish a run
600 std::unique_ptr<ScenarioResult> result(new ScenarioResult);
601 Histogram merged_latencies;
602 std::unordered_map<int, int64_t> merged_statuses;
603
604 // For the case where clients lead the test such as UNARY and
605 // STREAMING_FROM_CLIENT, clients need to finish completely while a server
606 // is running to prevent the clients from being stuck while waiting for
607 // the result.
608 bool client_finish_first =
609 (client_config.rpc_type() != STREAMING_FROM_SERVER);
610
611 auto end_time = time(nullptr);
612
613 FinishClients(clients, client_mark);
614
615 if (!client_finish_first) {
616 FinishServers(servers, server_mark);
617 }
618
619 ReceiveFinalStatusFromClients(clients, merged_latencies, merged_statuses,
620 *result);
621 ShutdownClients(clients, *result);
622
623 if (client_finish_first) {
624 FinishServers(servers, server_mark);
625 }
626
627 ReceiveFinalStatusFromServer(servers, *result);
628 ShutdownServers(servers, *result);
629
630 delete g_inproc_servers;
631
632 merged_latencies.FillProto(result->mutable_latencies());
633 for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin();
634 it != merged_statuses.end(); ++it) {
635 RequestResultCount* rrc = result->add_request_results();
636 rrc->set_status_code(it->first);
637 rrc->set_count(it->second);
638 }
639
640 // Fill in start and end time for the test scenario
641 result->mutable_summary()->mutable_start_time()->set_seconds(start_time);
642 result->mutable_summary()->mutable_end_time()->set_seconds(end_time);
643
644 postprocess_scenario_result(result.get());
645 return result;
646 }
647
RunQuit(const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types)648 bool RunQuit(
649 const std::string& credential_type,
650 const std::map<std::string, std::string>& per_worker_credential_types) {
651 // Get client, server lists
652 bool result = true;
653 auto workers = get_workers("QPS_WORKERS");
654 if (workers.empty()) {
655 return false;
656 }
657
658 for (size_t i = 0; i < workers.size(); i++) {
659 auto stub = WorkerService::NewStub(grpc::CreateTestChannel(
660 workers[i],
661 GetCredType(workers[i], per_worker_credential_types, credential_type),
662 nullptr /* call creds */, {} /* interceptor creators */));
663 Void phony;
664 grpc::ClientContext ctx;
665 ctx.set_wait_for_ready(true);
666 Status s = stub->QuitWorker(&ctx, phony, &phony);
667 if (!s.ok()) {
668 LOG(ERROR) << "Worker " << i << " could not be properly quit because "
669 << s.error_message();
670 result = false;
671 }
672 }
673 return result;
674 }
675
676 } // namespace testing
677 } // namespace grpc
678