• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/qps/driver.h"
20 
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/string_util.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26 
27 #include <cinttypes>
28 #include <deque>
29 #include <list>
30 #include <thread>
31 #include <unordered_map>
32 #include <vector>
33 
34 #include "absl/log/check.h"
35 #include "absl/log/log.h"
36 #include "google/protobuf/timestamp.pb.h"
37 #include "src/core/util/crash.h"
38 #include "src/core/util/env.h"
39 #include "src/core/util/host_port.h"
40 #include "src/proto/grpc/testing/worker_service.grpc.pb.h"
41 #include "test/core/test_util/port.h"
42 #include "test/core/test_util/test_config.h"
43 #include "test/cpp/qps/client.h"
44 #include "test/cpp/qps/histogram.h"
45 #include "test/cpp/qps/qps_worker.h"
46 #include "test/cpp/qps/stats.h"
47 #include "test/cpp/util/test_credentials_provider.h"
48 
49 using std::deque;
50 using std::list;
51 using std::unique_ptr;
52 using std::vector;
53 
54 namespace grpc {
55 namespace testing {
get_host(const std::string & worker)56 static std::string get_host(const std::string& worker) {
57   absl::string_view host;
58   absl::string_view port;
59   grpc_core::SplitHostPort(worker.c_str(), &host, &port);
60   return std::string(host.data(), host.size());
61 }
62 
get_workers(const string & env_name)63 static deque<string> get_workers(const string& env_name) {
64   deque<string> out;
65   auto env = grpc_core::GetEnv(env_name.c_str()).value_or("");
66   const char* p = env.c_str();
67   if (!env.empty()) {
68     for (;;) {
69       const char* comma = strchr(p, ',');
70       if (comma) {
71         out.emplace_back(p, comma);
72         p = comma + 1;
73       } else {
74         out.emplace_back(p);
75         break;
76       }
77     }
78   }
79   if (out.empty()) {
80     LOG(ERROR) << "Environment variable \"" << env_name
81                << "\" does not contain a list of QPS "
82                   "workers to use. Set it to a comma-separated list of "
83                   "hostname:port pairs, starting with hosts that should act as "
84                   "servers. E.g. export "
85                << env_name
86                << "=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"";
87   }
88   return out;
89 }
90 
GetCredType(const std::string & worker_addr,const std::map<std::string,std::string> & per_worker_credential_types,const std::string & credential_type)91 std::string GetCredType(
92     const std::string& worker_addr,
93     const std::map<std::string, std::string>& per_worker_credential_types,
94     const std::string& credential_type) {
95   auto it = per_worker_credential_types.find(worker_addr);
96   if (it != per_worker_credential_types.end()) {
97     return it->second;
98   }
99   return credential_type;
100 }
101 
102 // helpers for postprocess_scenario_result
WallTime(const ClientStats & s)103 static double WallTime(const ClientStats& s) { return s.time_elapsed(); }
SystemTime(const ClientStats & s)104 static double SystemTime(const ClientStats& s) { return s.time_system(); }
UserTime(const ClientStats & s)105 static double UserTime(const ClientStats& s) { return s.time_user(); }
CliPollCount(const ClientStats & s)106 static double CliPollCount(const ClientStats& s) { return s.cq_poll_count(); }
SvrPollCount(const ServerStats & s)107 static double SvrPollCount(const ServerStats& s) { return s.cq_poll_count(); }
ServerSystemTime(const ServerStats & s)108 static double ServerSystemTime(const ServerStats& s) { return s.time_system(); }
ServerUserTime(const ServerStats & s)109 static double ServerUserTime(const ServerStats& s) { return s.time_user(); }
ServerTotalCpuTime(const ServerStats & s)110 static double ServerTotalCpuTime(const ServerStats& s) {
111   return s.total_cpu_time();
112 }
ServerIdleCpuTime(const ServerStats & s)113 static double ServerIdleCpuTime(const ServerStats& s) {
114   return s.idle_cpu_time();
115 }
Cores(int n)116 static int Cores(int n) { return n; }
117 
IsSuccess(const Status & s)118 static bool IsSuccess(const Status& s) {
119   if (s.ok()) return true;
120   // Since we shutdown servers and clients at the same time, they both can
121   // observe cancellation.  Thus, we consider CANCELLED as good status.
122   if (static_cast<StatusCode>(s.error_code()) == StatusCode::CANCELLED) {
123     return true;
124   }
125   // Since we shutdown servers and clients at the same time, server can close
126   // the socket before the client attempts to do that, and vice versa.  Thus
127   // receiving a "Socket closed" error is fine.
128   if (s.error_message() == "Socket closed") return true;
129   return false;
130 }
131 
132 // Postprocess ScenarioResult and populate result summary.
postprocess_scenario_result(ScenarioResult * result)133 static void postprocess_scenario_result(ScenarioResult* result) {
134   // Get latencies from ScenarioResult latencies histogram and populate to
135   // result summary.
136   Histogram histogram;
137   histogram.MergeProto(result->latencies());
138   result->mutable_summary()->set_latency_50(histogram.Percentile(50));
139   result->mutable_summary()->set_latency_90(histogram.Percentile(90));
140   result->mutable_summary()->set_latency_95(histogram.Percentile(95));
141   result->mutable_summary()->set_latency_99(histogram.Percentile(99));
142   result->mutable_summary()->set_latency_999(histogram.Percentile(99.9));
143 
144   // Calculate qps and cpu load for each client and then aggregate results for
145   // all clients
146   double qps = 0;
147   double client_system_cpu_load = 0, client_user_cpu_load = 0;
148   for (int i = 0; i < result->client_stats_size(); i++) {
149     auto client_stat = result->client_stats(i);
150     qps += client_stat.latencies().count() / client_stat.time_elapsed();
151     client_system_cpu_load +=
152         client_stat.time_system() / client_stat.time_elapsed();
153     client_user_cpu_load +=
154         client_stat.time_user() / client_stat.time_elapsed();
155   }
156   // Calculate cpu load for each server and then aggregate results for all
157   // servers
158   double server_system_cpu_load = 0, server_user_cpu_load = 0;
159   for (int i = 0; i < result->server_stats_size(); i++) {
160     auto server_stat = result->server_stats(i);
161     server_system_cpu_load +=
162         server_stat.time_system() / server_stat.time_elapsed();
163     server_user_cpu_load +=
164         server_stat.time_user() / server_stat.time_elapsed();
165   }
166   result->mutable_summary()->set_qps(qps);
167   // Populate the percentage of cpu load to result summary.
168   result->mutable_summary()->set_server_system_time(100 *
169                                                     server_system_cpu_load);
170   result->mutable_summary()->set_server_user_time(100 * server_user_cpu_load);
171   result->mutable_summary()->set_client_system_time(100 *
172                                                     client_system_cpu_load);
173   result->mutable_summary()->set_client_user_time(100 * client_user_cpu_load);
174 
175   // For Non-linux platform, get_cpu_usage() is not implemented. Thus,
176   // ServerTotalCpuTime and ServerIdleCpuTime are both 0.
177   if (average(result->server_stats(), ServerTotalCpuTime) == 0) {
178     result->mutable_summary()->set_server_cpu_usage(0);
179   } else {
180     auto server_cpu_usage =
181         100 - (100 * average(result->server_stats(), ServerIdleCpuTime) /
182                average(result->server_stats(), ServerTotalCpuTime));
183     result->mutable_summary()->set_server_cpu_usage(server_cpu_usage);
184   }
185 
186   // Calculate and populate successful request per second and failed requests
187   // per seconds to result summary.
188   auto time_estimate = average(result->client_stats(), WallTime);
189   if (result->request_results_size() > 0) {
190     int64_t successes = 0;
191     int64_t failures = 0;
192     for (int i = 0; i < result->request_results_size(); i++) {
193       const RequestResultCount& rrc = result->request_results(i);
194       if (rrc.status_code() == 0) {
195         successes += rrc.count();
196       } else {
197         failures += rrc.count();
198       }
199     }
200     result->mutable_summary()->set_successful_requests_per_second(
201         successes / time_estimate);
202     result->mutable_summary()->set_failed_requests_per_second(failures /
203                                                               time_estimate);
204   }
205 
206   // Fill in data for other metrics required in result summary
207   auto qps_per_server_core = qps / sum(result->server_cores(), Cores);
208   result->mutable_summary()->set_qps_per_server_core(qps_per_server_core);
209   result->mutable_summary()->set_client_polls_per_request(
210       sum(result->client_stats(), CliPollCount) / histogram.Count());
211   result->mutable_summary()->set_server_polls_per_request(
212       sum(result->server_stats(), SvrPollCount) / histogram.Count());
213 
214   auto server_queries_per_cpu_sec =
215       histogram.Count() / (sum(result->server_stats(), ServerSystemTime) +
216                            sum(result->server_stats(), ServerUserTime));
217   auto client_queries_per_cpu_sec =
218       histogram.Count() / (sum(result->client_stats(), SystemTime) +
219                            sum(result->client_stats(), UserTime));
220 
221   result->mutable_summary()->set_server_queries_per_cpu_sec(
222       server_queries_per_cpu_sec);
223   result->mutable_summary()->set_client_queries_per_cpu_sec(
224       client_queries_per_cpu_sec);
225 }
226 
227 struct ClientData {
228   unique_ptr<WorkerService::Stub> stub;
229   unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
230 };
231 
232 struct ServerData {
233   unique_ptr<WorkerService::Stub> stub;
234   unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
235 };
236 
FinishClients(const std::vector<ClientData> & clients,const ClientArgs & client_mark)237 static void FinishClients(const std::vector<ClientData>& clients,
238                           const ClientArgs& client_mark) {
239   LOG(INFO) << "Finishing clients";
240   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
241     auto client = &clients[i];
242     if (!client->stream->Write(client_mark)) {
243       grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
244     }
245     if (!client->stream->WritesDone()) {
246       grpc_core::Crash(absl::StrFormat("Failed WritesDone for client %zu", i));
247     }
248   }
249 }
250 
ReceiveFinalStatusFromClients(const std::vector<ClientData> & clients,Histogram & merged_latencies,std::unordered_map<int,int64_t> & merged_statuses,ScenarioResult & result)251 static void ReceiveFinalStatusFromClients(
252     const std::vector<ClientData>& clients, Histogram& merged_latencies,
253     std::unordered_map<int, int64_t>& merged_statuses, ScenarioResult& result) {
254   LOG(INFO) << "Receiving final status from clients";
255   ClientStatus client_status;
256   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
257     auto client = &clients[i];
258     // Read the client final status
259     if (client->stream->Read(&client_status)) {
260       LOG(INFO) << "Received final status from client " << i;
261       const auto& stats = client_status.stats();
262       merged_latencies.MergeProto(stats.latencies());
263       for (int i = 0; i < stats.request_results_size(); i++) {
264         merged_statuses[stats.request_results(i).status_code()] +=
265             stats.request_results(i).count();
266       }
267       result.add_client_stats()->CopyFrom(stats);
268       // Check that final status was should be the last message on the client
269       // stream.
270       // TODO(jtattermusch): note that that waiting for Read to return can take
271       // long on some scenarios (e.g. unconstrained streaming_from_server). See
272       // https://github.com/grpc/grpc/blob/3bd0cd208ea549760a2daf595f79b91b247fe240/test/cpp/qps/server_async.cc#L176
273       // where the shutdown delay pretty much determines the wait here.
274       CHECK(!client->stream->Read(&client_status));
275     } else {
276       grpc_core::Crash(
277           absl::StrFormat("Couldn't get final status from client %zu", i));
278     }
279   }
280 }
281 
ShutdownClients(const std::vector<ClientData> & clients,ScenarioResult & result)282 static void ShutdownClients(const std::vector<ClientData>& clients,
283                             ScenarioResult& result) {
284   LOG(INFO) << "Shutdown clients";
285   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
286     auto client = &clients[i];
287     Status s = client->stream->Finish();
288     // Since we shutdown servers and clients at the same time, clients can
289     // observe cancellation.  Thus, we consider both OK and CANCELLED as good
290     // status.
291     const bool success = IsSuccess(s);
292     result.add_client_success(success);
293     if (!success) {
294       grpc_core::Crash(absl::StrFormat("Client %zu had an error %s", i,
295                                        s.error_message().c_str()));
296     }
297   }
298 }
299 
FinishServers(const std::vector<ServerData> & servers,const ServerArgs & server_mark)300 static void FinishServers(const std::vector<ServerData>& servers,
301                           const ServerArgs& server_mark) {
302   LOG(INFO) << "Finishing servers";
303   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
304     auto server = &servers[i];
305     if (!server->stream->Write(server_mark)) {
306       grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
307     }
308     if (!server->stream->WritesDone()) {
309       grpc_core::Crash(absl::StrFormat("Failed WritesDone for server %zu", i));
310     }
311   }
312 }
313 
ReceiveFinalStatusFromServer(const std::vector<ServerData> & servers,ScenarioResult & result)314 static void ReceiveFinalStatusFromServer(const std::vector<ServerData>& servers,
315                                          ScenarioResult& result) {
316   LOG(INFO) << "Receiving final status from servers";
317   ServerStatus server_status;
318   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
319     auto server = &servers[i];
320     // Read the server final status
321     if (server->stream->Read(&server_status)) {
322       LOG(INFO) << "Received final status from server " << i;
323       result.add_server_stats()->CopyFrom(server_status.stats());
324       result.add_server_cores(server_status.cores());
325       // That final status should be the last message on the server stream
326       CHECK(!server->stream->Read(&server_status));
327     } else {
328       grpc_core::Crash(
329           absl::StrFormat("Couldn't get final status from server %zu", i));
330     }
331   }
332 }
333 
ShutdownServers(const std::vector<ServerData> & servers,ScenarioResult & result)334 static void ShutdownServers(const std::vector<ServerData>& servers,
335                             ScenarioResult& result) {
336   LOG(INFO) << "Shutdown servers";
337   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
338     auto server = &servers[i];
339     Status s = server->stream->Finish();
340     // Since we shutdown servers and clients at the same time, servers can
341     // observe cancellation.  Thus, we consider both OK and CANCELLED as good
342     // status.
343     const bool success = IsSuccess(s);
344     result.add_server_success(success);
345     if (!success) {
346       grpc_core::Crash(absl::StrFormat("Server %zu had an error %s", i,
347                                        s.error_message().c_str()));
348     }
349   }
350 }
351 
352 std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr;
353 
RunScenario(const ClientConfig & initial_client_config,size_t num_clients,const ServerConfig & initial_server_config,size_t num_servers,int warmup_seconds,int benchmark_seconds,int spawn_local_worker_count,const std::string & qps_server_target_override,const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types,bool run_inproc,int32_t median_latency_collection_interval_millis)354 std::unique_ptr<ScenarioResult> RunScenario(
355     const ClientConfig& initial_client_config, size_t num_clients,
356     const ServerConfig& initial_server_config, size_t num_servers,
357     int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
358     const std::string& qps_server_target_override,
359     const std::string& credential_type,
360     const std::map<std::string, std::string>& per_worker_credential_types,
361     bool run_inproc, int32_t median_latency_collection_interval_millis) {
362   if (run_inproc) {
363     g_inproc_servers = new std::vector<grpc::testing::Server*>;
364   }
365 
366   // ClientContext allocations (all are destroyed at scope exit)
367   list<ClientContext> contexts;
368   auto alloc_context = [](list<ClientContext>* contexts) {
369     contexts->emplace_back();
370     auto context = &contexts->back();
371     context->set_wait_for_ready(true);
372     return context;
373   };
374 
375   // To be added to the result, containing the final configuration used for
376   // client and config (including host, etc.)
377   ClientConfig result_client_config;
378 
379   // Get client, server lists; ignore if inproc test
380   auto workers = (!run_inproc) ? get_workers("QPS_WORKERS") : deque<string>();
381   ClientConfig client_config = initial_client_config;
382 
383   // Spawn some local workers if desired
384   vector<unique_ptr<QpsWorker>> local_workers;
385   for (int i = 0; i < abs(spawn_local_worker_count); i++) {
386     // act as if we're a new test -- gets a good rng seed
387     static bool called_init = false;
388     if (!called_init) {
389       char args_buf[100];
390       strcpy(args_buf, "some-benchmark");
391       char* args[] = {args_buf};
392       int argc = 1;
393       grpc_test_init(&argc, args);
394       called_init = true;
395     }
396 
397     char addr[256];
398     // we use port # of -1 to indicate inproc
399     int driver_port = (!run_inproc) ? grpc_pick_unused_port_or_die() : -1;
400     local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
401     sprintf(addr, "localhost:%d", driver_port);
402     if (spawn_local_worker_count < 0) {
403       workers.push_front(addr);
404     } else {
405       workers.push_back(addr);
406     }
407   }
408   CHECK(!workers.empty());
409 
410   // if num_clients is set to <=0, do dynamic sizing: all workers
411   // except for servers are clients
412   if (num_clients <= 0) {
413     num_clients = workers.size() - num_servers;
414   }
415 
416   // TODO(ctiller): support running multiple configurations, and binpack
417   // client/server pairs
418   // to available workers
419   CHECK_GE(workers.size(), num_clients + num_servers);
420 
421   // Trim to just what we need
422   workers.resize(num_clients + num_servers);
423 
424   // Start servers
425   std::vector<ServerData> servers(num_servers);
426   std::unordered_map<string, std::deque<int>> hosts_cores;
427   ChannelArguments channel_args;
428 
429   for (size_t i = 0; i < num_servers; i++) {
430     LOG(INFO) << "Starting server on " << workers[i] << " (worker #" << i
431               << ")";
432     if (!run_inproc) {
433       servers[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
434           workers[i],
435           GetCredType(workers[i], per_worker_credential_types, credential_type),
436           nullptr /* call creds */, {} /* interceptor creators */));
437     } else {
438       servers[i].stub = WorkerService::NewStub(
439           local_workers[i]->InProcessChannel(channel_args));
440     }
441 
442     const ServerConfig& server_config = initial_server_config;
443     if (server_config.core_limit() != 0) {
444       grpc_core::Crash("server config core limit is set but ignored by driver");
445     }
446 
447     ServerArgs args;
448     *args.mutable_setup() = server_config;
449     servers[i].stream = servers[i].stub->RunServer(alloc_context(&contexts));
450     if (!servers[i].stream->Write(args)) {
451       grpc_core::Crash(
452           absl::StrFormat("Could not write args to server %zu", i));
453     }
454     ServerStatus init_status;
455     if (!servers[i].stream->Read(&init_status)) {
456       grpc_core::Crash(
457           absl::StrFormat("Server %zu did not yield initial status", i));
458     }
459     if (run_inproc) {
460       std::string cli_target(INPROC_NAME_PREFIX);
461       cli_target += std::to_string(i);
462       client_config.add_server_targets(cli_target);
463     } else {
464       std::string host = get_host(workers[i]);
465       std::string cli_target =
466           grpc_core::JoinHostPort(host.c_str(), init_status.port());
467       client_config.add_server_targets(cli_target.c_str());
468     }
469   }
470   if (!qps_server_target_override.empty()) {
471     // overriding the qps server target only makes since if there is <= 1
472     // servers
473     CHECK_LE(num_servers, 1u);
474     client_config.clear_server_targets();
475     client_config.add_server_targets(qps_server_target_override);
476   }
477   client_config.set_median_latency_collection_interval_millis(
478       median_latency_collection_interval_millis);
479 
480   // Targets are all set by now
481   result_client_config = client_config;
482   // Start clients
483   std::vector<ClientData> clients(num_clients);
484   size_t channels_allocated = 0;
485   for (size_t i = 0; i < num_clients; i++) {
486     const auto& worker = workers[i + num_servers];
487     LOG(INFO) << "Starting client on " << worker << " (worker #"
488               << i + num_servers << ")";
489     if (!run_inproc) {
490       clients[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
491           worker,
492           GetCredType(worker, per_worker_credential_types, credential_type),
493           nullptr /* call creds */, {} /* interceptor creators */));
494     } else {
495       clients[i].stub = WorkerService::NewStub(
496           local_workers[i + num_servers]->InProcessChannel(channel_args));
497     }
498     ClientConfig per_client_config = client_config;
499 
500     if (initial_client_config.core_limit() != 0) {
501       grpc_core::Crash("client config core limit set but ignored");
502     }
503 
504     // Reduce channel count so that total channels specified is held regardless
505     // of the number of clients available
506     size_t num_channels =
507         (client_config.client_channels() - channels_allocated) /
508         (num_clients - i);
509     channels_allocated += num_channels;
510     VLOG(2) << "Client " << i << " gets " << num_channels << " channels";
511     per_client_config.set_client_channels(num_channels);
512 
513     ClientArgs args;
514     *args.mutable_setup() = per_client_config;
515     clients[i].stream = clients[i].stub->RunClient(alloc_context(&contexts));
516     if (!clients[i].stream->Write(args)) {
517       grpc_core::Crash(
518           absl::StrFormat("Could not write args to client %zu", i));
519     }
520   }
521 
522   for (size_t i = 0; i < num_clients; i++) {
523     ClientStatus init_status;
524     if (!clients[i].stream->Read(&init_status)) {
525       grpc_core::Crash(
526           absl::StrFormat("Client %zu did not yield initial status", i));
527     }
528   }
529 
530   // Send an initial mark: clients can use this to know that everything is ready
531   // to start
532   LOG(INFO) << "Initiating";
533   ServerArgs server_mark;
534   server_mark.mutable_mark()->set_reset(true);
535   ClientArgs client_mark;
536   client_mark.mutable_mark()->set_reset(true);
537   ServerStatus server_status;
538   ClientStatus client_status;
539   for (size_t i = 0; i < num_clients; i++) {
540     auto client = &clients[i];
541     if (!client->stream->Write(client_mark)) {
542       grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
543     }
544   }
545   for (size_t i = 0; i < num_clients; i++) {
546     auto client = &clients[i];
547     if (!client->stream->Read(&client_status)) {
548       grpc_core::Crash(
549           absl::StrFormat("Couldn't get status from client %zu", i));
550     }
551   }
552 
553   // Let everything warmup
554   LOG(INFO) << "Warming up";
555   gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
556   gpr_sleep_until(
557       gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN)));
558 
559   // Start a run
560   LOG(INFO) << "Starting";
561 
562   auto start_time = time(nullptr);
563 
564   for (size_t i = 0; i < num_servers; i++) {
565     auto server = &servers[i];
566     if (!server->stream->Write(server_mark)) {
567       grpc_core::Crash(absl::StrFormat("Couldn't write mark to server %zu", i));
568     }
569   }
570   for (size_t i = 0; i < num_clients; i++) {
571     auto client = &clients[i];
572     if (!client->stream->Write(client_mark)) {
573       grpc_core::Crash(absl::StrFormat("Couldn't write mark to client %zu", i));
574     }
575   }
576   for (size_t i = 0; i < num_servers; i++) {
577     auto server = &servers[i];
578     if (!server->stream->Read(&server_status)) {
579       grpc_core::Crash(
580           absl::StrFormat("Couldn't get status from server %zu", i));
581     }
582   }
583   for (size_t i = 0; i < num_clients; i++) {
584     auto client = &clients[i];
585     if (!client->stream->Read(&client_status)) {
586       grpc_core::Crash(
587           absl::StrFormat("Couldn't get status from client %zu", i));
588     }
589   }
590 
591   // Wait some time
592   LOG(INFO) << "Running";
593   // Use gpr_sleep_until rather than this_thread::sleep_until to support
594   // compilers that don't work with this_thread
595   gpr_sleep_until(gpr_time_add(
596       start,
597       gpr_time_from_seconds(warmup_seconds + benchmark_seconds, GPR_TIMESPAN)));
598 
599   // Finish a run
600   std::unique_ptr<ScenarioResult> result(new ScenarioResult);
601   Histogram merged_latencies;
602   std::unordered_map<int, int64_t> merged_statuses;
603 
604   // For the case where clients lead the test such as UNARY and
605   // STREAMING_FROM_CLIENT, clients need to finish completely while a server
606   // is running to prevent the clients from being stuck while waiting for
607   // the result.
608   bool client_finish_first =
609       (client_config.rpc_type() != STREAMING_FROM_SERVER);
610 
611   auto end_time = time(nullptr);
612 
613   FinishClients(clients, client_mark);
614 
615   if (!client_finish_first) {
616     FinishServers(servers, server_mark);
617   }
618 
619   ReceiveFinalStatusFromClients(clients, merged_latencies, merged_statuses,
620                                 *result);
621   ShutdownClients(clients, *result);
622 
623   if (client_finish_first) {
624     FinishServers(servers, server_mark);
625   }
626 
627   ReceiveFinalStatusFromServer(servers, *result);
628   ShutdownServers(servers, *result);
629 
630   delete g_inproc_servers;
631 
632   merged_latencies.FillProto(result->mutable_latencies());
633   for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin();
634        it != merged_statuses.end(); ++it) {
635     RequestResultCount* rrc = result->add_request_results();
636     rrc->set_status_code(it->first);
637     rrc->set_count(it->second);
638   }
639 
640   // Fill in start and end time for the test scenario
641   result->mutable_summary()->mutable_start_time()->set_seconds(start_time);
642   result->mutable_summary()->mutable_end_time()->set_seconds(end_time);
643 
644   postprocess_scenario_result(result.get());
645   return result;
646 }
647 
RunQuit(const std::string & credential_type,const std::map<std::string,std::string> & per_worker_credential_types)648 bool RunQuit(
649     const std::string& credential_type,
650     const std::map<std::string, std::string>& per_worker_credential_types) {
651   // Get client, server lists
652   bool result = true;
653   auto workers = get_workers("QPS_WORKERS");
654   if (workers.empty()) {
655     return false;
656   }
657 
658   for (size_t i = 0; i < workers.size(); i++) {
659     auto stub = WorkerService::NewStub(grpc::CreateTestChannel(
660         workers[i],
661         GetCredType(workers[i], per_worker_credential_types, credential_type),
662         nullptr /* call creds */, {} /* interceptor creators */));
663     Void phony;
664     grpc::ClientContext ctx;
665     ctx.set_wait_for_ready(true);
666     Status s = stub->QuitWorker(&ctx, phony, &phony);
667     if (!s.ok()) {
668       LOG(ERROR) << "Worker " << i << " could not be properly quit because "
669                  << s.error_message();
670       result = false;
671     }
672   }
673   return result;
674 }
675 
676 }  // namespace testing
677 }  // namespace grpc
678