1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #ifndef GRPC_TEST_CPP_QPS_CLIENT_H
20 #define GRPC_TEST_CPP_QPS_CLIENT_H
21
22 #include <grpc/support/time.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/support/byte_buffer.h>
25 #include <grpcpp/support/channel_arguments.h>
26 #include <grpcpp/support/slice.h>
27 #include <inttypes.h>
28 #include <stdint.h>
29 #include <stdlib.h>
30
31 #include <condition_variable>
32 #include <mutex>
33 #include <thread>
34 #include <unordered_map>
35 #include <vector>
36
37 #include "absl/log/log.h"
38 #include "absl/memory/memory.h"
39 #include "absl/strings/match.h"
40 #include "absl/strings/str_format.h"
41 #include "src/core/util/crash.h"
42 #include "src/core/util/env.h"
43 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
44 #include "src/proto/grpc/testing/payloads.pb.h"
45 #include "test/cpp/qps/histogram.h"
46 #include "test/cpp/qps/interarrival.h"
47 #include "test/cpp/qps/qps_worker.h"
48 #include "test/cpp/qps/server.h"
49 #include "test/cpp/qps/usage_timer.h"
50 #include "test/cpp/util/create_test_channel.h"
51 #include "test/cpp/util/test_credentials_provider.h"
52
53 #define INPROC_NAME_PREFIX "qpsinproc:"
54
55 namespace grpc {
56 namespace testing {
57
58 template <class RequestType>
59 class ClientRequestCreator {
60 public:
ClientRequestCreator(RequestType *,const PayloadConfig &)61 ClientRequestCreator(RequestType* /*req*/, const PayloadConfig&) {
62 // this template must be specialized
63 // fail with an assertion rather than a compile-time
64 // check since these only happen at the beginning anyway
65 grpc_core::Crash("unreachable");
66 }
67 };
68
69 template <>
70 class ClientRequestCreator<SimpleRequest> {
71 public:
ClientRequestCreator(SimpleRequest * req,const PayloadConfig & payload_config)72 ClientRequestCreator(SimpleRequest* req,
73 const PayloadConfig& payload_config) {
74 if (payload_config.has_bytebuf_params()) {
75 grpc_core::Crash(absl::StrFormat(
76 "Invalid PayloadConfig, config cannot have bytebuf_params: %s",
77 payload_config.DebugString()
78 .c_str())); // not appropriate for this specialization
79 } else if (payload_config.has_simple_params()) {
80 req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
81 req->set_response_size(payload_config.simple_params().resp_size());
82 req->mutable_payload()->set_type(
83 grpc::testing::PayloadType::COMPRESSABLE);
84 int size = payload_config.simple_params().req_size();
85 std::unique_ptr<char[]> body(new char[size]);
86 req->mutable_payload()->set_body(body.get(), size);
87 } else if (payload_config.has_complex_params()) {
88 grpc_core::Crash(absl::StrFormat(
89 "Invalid PayloadConfig, cannot have complex_params: %s",
90 payload_config.DebugString()
91 .c_str())); // not appropriate for this specialization
92 } else {
93 // default should be simple proto without payloads
94 req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
95 req->set_response_size(0);
96 req->mutable_payload()->set_type(
97 grpc::testing::PayloadType::COMPRESSABLE);
98 }
99 }
100 };
101
102 template <>
103 class ClientRequestCreator<ByteBuffer> {
104 public:
ClientRequestCreator(ByteBuffer * req,const PayloadConfig & payload_config)105 ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) {
106 if (payload_config.has_bytebuf_params()) {
107 size_t req_sz =
108 static_cast<size_t>(payload_config.bytebuf_params().req_size());
109 std::unique_ptr<char[]> buf(new char[req_sz]);
110 memset(buf.get(), 0, req_sz);
111 Slice slice(buf.get(), req_sz);
112 *req = ByteBuffer(&slice, 1);
113 } else {
114 grpc_core::Crash(absl::StrFormat(
115 "Invalid PayloadConfig, missing bytebug_params: %s",
116 payload_config.DebugString()
117 .c_str())); // not appropriate for this specialization
118 }
119 }
120 };
121
122 class HistogramEntry final {
123 public:
HistogramEntry()124 HistogramEntry() : value_used_(false), status_used_(false) {}
value_used()125 bool value_used() const { return value_used_; }
value()126 double value() const { return value_; }
set_value(double v)127 void set_value(double v) {
128 value_used_ = true;
129 value_ = v;
130 }
status_used()131 bool status_used() const { return status_used_; }
status()132 int status() const { return status_; }
set_status(int status)133 void set_status(int status) {
134 status_used_ = true;
135 status_ = status;
136 }
137
138 private:
139 bool value_used_;
140 double value_;
141 bool status_used_;
142 int status_;
143 };
144
145 typedef std::unordered_map<int, int64_t> StatusHistogram;
146
MergeStatusHistogram(const StatusHistogram & from,StatusHistogram * to)147 inline void MergeStatusHistogram(const StatusHistogram& from,
148 StatusHistogram* to) {
149 for (StatusHistogram::const_iterator it = from.begin(); it != from.end();
150 ++it) {
151 (*to)[it->first] += it->second;
152 }
153 }
154
155 class Client {
156 public:
Client()157 Client()
158 : timer_(new UsageTimer),
159 interarrival_timer_(),
160 started_requests_(false),
161 last_reset_poll_count_(0) {
162 gpr_event_init(&start_requests_);
163 }
~Client()164 virtual ~Client() {}
165
Mark(bool reset)166 ClientStats Mark(bool reset) {
167 Histogram latencies;
168 StatusHistogram statuses;
169 UsageTimer::Result timer_result;
170
171 MaybeStartRequests();
172
173 int cur_poll_count = GetPollCount();
174 int poll_count = cur_poll_count - last_reset_poll_count_;
175 if (reset) {
176 std::vector<Histogram> to_merge(threads_.size());
177 std::vector<StatusHistogram> to_merge_status(threads_.size());
178
179 for (size_t i = 0; i < threads_.size(); i++) {
180 threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]);
181 }
182 std::unique_ptr<UsageTimer> timer(new UsageTimer);
183 timer_.swap(timer);
184 for (size_t i = 0; i < threads_.size(); i++) {
185 latencies.Merge(to_merge[i]);
186 MergeStatusHistogram(to_merge_status[i], &statuses);
187 }
188 timer_result = timer->Mark();
189 last_reset_poll_count_ = cur_poll_count;
190 } else {
191 // merge snapshots of each thread histogram
192 for (size_t i = 0; i < threads_.size(); i++) {
193 threads_[i]->MergeStatsInto(&latencies, &statuses);
194 }
195 timer_result = timer_->Mark();
196 }
197
198 // Print the median latency per interval for one thread.
199 // If the number of warmup seconds is x, then the first x + 1 numbers in the
200 // vector are from the warmup period and should be discarded.
201 if (median_latency_collection_interval_seconds_ > 0) {
202 std::vector<double> medians_per_interval =
203 threads_[0]->GetMedianPerIntervalList();
204 LOG(INFO) << "Num threads: " << threads_.size();
205 LOG(INFO) << "Number of medians: " << medians_per_interval.size();
206 for (size_t j = 0; j < medians_per_interval.size(); j++) {
207 LOG(INFO) << medians_per_interval[j];
208 }
209 }
210
211 ClientStats stats;
212 latencies.FillProto(stats.mutable_latencies());
213 for (StatusHistogram::const_iterator it = statuses.begin();
214 it != statuses.end(); ++it) {
215 RequestResultCount* rrc = stats.add_request_results();
216 rrc->set_status_code(it->first);
217 rrc->set_count(it->second);
218 }
219 stats.set_time_elapsed(timer_result.wall);
220 stats.set_time_system(timer_result.system);
221 stats.set_time_user(timer_result.user);
222 stats.set_cq_poll_count(poll_count);
223 return stats;
224 }
225
226 // Must call AwaitThreadsCompletion before destructor to avoid a race
227 // between destructor and invocation of virtual ThreadFunc
AwaitThreadsCompletion()228 void AwaitThreadsCompletion() {
229 gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(true));
230 DestroyMultithreading();
231 std::unique_lock<std::mutex> g(thread_completion_mu_);
232 while (threads_remaining_ != 0) {
233 threads_complete_.wait(g);
234 }
235 }
236
237 // Returns the interval (in seconds) between collecting latency medians. If 0,
238 // no periodic median latencies will be collected.
GetLatencyCollectionIntervalInSeconds()239 double GetLatencyCollectionIntervalInSeconds() {
240 return median_latency_collection_interval_seconds_;
241 }
242
GetPollCount()243 virtual int GetPollCount() {
244 // For sync client.
245 return 0;
246 }
247
IsClosedLoop()248 bool IsClosedLoop() { return closed_loop_; }
249
NextIssueTime(int thread_idx)250 gpr_timespec NextIssueTime(int thread_idx) {
251 const gpr_timespec result = next_time_[thread_idx];
252 next_time_[thread_idx] =
253 gpr_time_add(next_time_[thread_idx],
254 gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
255 GPR_TIMESPAN));
256 return result;
257 }
258
ThreadCompleted()259 bool ThreadCompleted() {
260 return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
261 }
262
263 class Thread {
264 public:
Thread(Client * client,size_t idx)265 Thread(Client* client, size_t idx)
266 : client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {}
267
~Thread()268 ~Thread() { impl_.join(); }
269
BeginSwap(Histogram * n,StatusHistogram * s)270 void BeginSwap(Histogram* n, StatusHistogram* s) {
271 std::lock_guard<std::mutex> g(mu_);
272 n->Swap(&histogram_);
273 s->swap(statuses_);
274 }
275
MergeStatsInto(Histogram * hist,StatusHistogram * s)276 void MergeStatsInto(Histogram* hist, StatusHistogram* s) {
277 std::unique_lock<std::mutex> g(mu_);
278 hist->Merge(histogram_);
279 MergeStatusHistogram(statuses_, s);
280 }
281
GetMedianPerIntervalList()282 std::vector<double> GetMedianPerIntervalList() {
283 return medians_each_interval_list_;
284 }
285
UpdateHistogram(HistogramEntry * entry)286 void UpdateHistogram(HistogramEntry* entry) {
287 std::lock_guard<std::mutex> g(mu_);
288 if (entry->value_used()) {
289 histogram_.Add(entry->value());
290 if (client_->GetLatencyCollectionIntervalInSeconds() > 0) {
291 histogram_per_interval_.Add(entry->value());
292 double now = UsageTimer::Now();
293 if ((now - interval_start_time_) >=
294 client_->GetLatencyCollectionIntervalInSeconds()) {
295 // Record the median latency of requests from the last interval.
296 // Divide by 1e3 to get microseconds.
297 medians_each_interval_list_.push_back(
298 histogram_per_interval_.Percentile(50) / 1e3);
299 histogram_per_interval_.Reset();
300 interval_start_time_ = now;
301 }
302 }
303 }
304 if (entry->status_used()) {
305 statuses_[entry->status()]++;
306 }
307 }
308
309 private:
310 Thread(const Thread&);
311 Thread& operator=(const Thread&);
312
ThreadFunc()313 void ThreadFunc() {
314 int wait_loop = 0;
315 while (!gpr_event_wait(
316 &client_->start_requests_,
317 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
318 gpr_time_from_seconds(20, GPR_TIMESPAN)))) {
319 LOG(INFO) << idx_ << ": Waiting for benchmark to start (" << wait_loop
320 << ")";
321 wait_loop++;
322 }
323
324 client_->ThreadFunc(idx_, this);
325 client_->CompleteThread();
326 }
327
328 std::mutex mu_;
329 Histogram histogram_;
330 StatusHistogram statuses_;
331 Client* client_;
332 const size_t idx_;
333 std::thread impl_;
334 // The following are used only if
335 // median_latency_collection_interval_seconds_ is greater than 0
336 Histogram histogram_per_interval_;
337 std::vector<double> medians_each_interval_list_;
338 double interval_start_time_;
339 };
340
341 protected:
342 bool closed_loop_;
343 gpr_atm thread_pool_done_;
344 double median_latency_collection_interval_seconds_; // In seconds
345
StartThreads(size_t num_threads)346 void StartThreads(size_t num_threads) {
347 gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
348 threads_remaining_ = num_threads;
349 for (size_t i = 0; i < num_threads; i++) {
350 threads_.emplace_back(new Thread(this, i));
351 }
352 }
353
EndThreads()354 void EndThreads() {
355 MaybeStartRequests();
356 threads_.clear();
357 }
358
359 virtual void DestroyMultithreading() = 0;
360
SetupLoadTest(const ClientConfig & config,size_t num_threads)361 void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
362 // Set up the load distribution based on the number of threads
363 const auto& load = config.load_params();
364
365 std::unique_ptr<RandomDistInterface> random_dist;
366 switch (load.load_case()) {
367 case LoadParams::kClosedLoop:
368 // Closed-loop doesn't use random dist at all
369 break;
370 case LoadParams::kPoisson:
371 random_dist = std::make_unique<ExpDist>(load.poisson().offered_load() /
372 num_threads);
373 break;
374 default:
375 grpc_core::Crash("unreachable");
376 }
377
378 // Set closed_loop_ based on whether or not random_dist is set
379 if (!random_dist) {
380 closed_loop_ = true;
381 } else {
382 closed_loop_ = false;
383 // set up interarrival timer according to random dist
384 interarrival_timer_.init(*random_dist, num_threads);
385 const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
386 for (size_t i = 0; i < num_threads; i++) {
387 next_time_.push_back(gpr_time_add(
388 now,
389 gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
390 }
391 }
392 }
393
NextIssuer(int thread_idx)394 std::function<gpr_timespec()> NextIssuer(int thread_idx) {
395 return closed_loop_ ? std::function<gpr_timespec()>()
396 : std::bind(&Client::NextIssueTime, this, thread_idx);
397 }
398
399 virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
400
401 std::vector<std::unique_ptr<Thread>> threads_;
402 std::unique_ptr<UsageTimer> timer_;
403
404 InterarrivalTimer interarrival_timer_;
405 std::vector<gpr_timespec> next_time_;
406
407 std::mutex thread_completion_mu_;
408 size_t threads_remaining_;
409 std::condition_variable threads_complete_;
410
411 gpr_event start_requests_;
412 bool started_requests_;
413
414 int last_reset_poll_count_;
415
MaybeStartRequests()416 void MaybeStartRequests() {
417 if (!started_requests_) {
418 started_requests_ = true;
419 gpr_event_set(&start_requests_, reinterpret_cast<void*>(1));
420 }
421 }
422
CompleteThread()423 void CompleteThread() {
424 std::lock_guard<std::mutex> g(thread_completion_mu_);
425 threads_remaining_--;
426 if (threads_remaining_ == 0) {
427 threads_complete_.notify_all();
428 }
429 }
430 };
431
432 template <class StubType, class RequestType>
433 class ClientImpl : public Client {
434 public:
ClientImpl(const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub)435 ClientImpl(const ClientConfig& config,
436 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
437 create_stub)
438 : cores_(gpr_cpu_num_cores()), create_stub_(create_stub) {
439 for (int i = 0; i < config.client_channels(); i++) {
440 channels_.emplace_back(
441 config.server_targets(i % config.server_targets_size()), config,
442 create_stub_, i);
443 }
444 WaitForChannelsToConnect();
445 median_latency_collection_interval_seconds_ =
446 config.median_latency_collection_interval_millis() / 1e3;
447 ClientRequestCreator<RequestType> create_req(&request_,
448 config.payload_config());
449 }
~ClientImpl()450 ~ClientImpl() override {}
request()451 const RequestType* request() { return &request_; }
452
WaitForChannelsToConnect()453 void WaitForChannelsToConnect() {
454 int connect_deadline_seconds = 10;
455 // Allow optionally overriding connect_deadline in order
456 // to deal with benchmark environments in which the server
457 // can take a long time to become ready.
458 auto channel_connect_timeout_str =
459 grpc_core::GetEnv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
460 if (channel_connect_timeout_str.has_value() &&
461 !channel_connect_timeout_str->empty()) {
462 connect_deadline_seconds = atoi(channel_connect_timeout_str->c_str());
463 }
464 LOG(INFO) << "Waiting for up to " << connect_deadline_seconds
465 << " seconds for all channels to connect";
466 gpr_timespec connect_deadline = gpr_time_add(
467 gpr_now(GPR_CLOCK_REALTIME),
468 gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN));
469 CompletionQueue cq;
470 size_t num_remaining = 0;
471 for (auto& c : channels_) {
472 if (!c.is_inproc()) {
473 Channel* channel = c.get_channel();
474 grpc_connectivity_state last_observed = channel->GetState(true);
475 if (last_observed == GRPC_CHANNEL_READY) {
476 LOG(INFO) << "Channel " << channel << " connected!";
477 } else {
478 num_remaining++;
479 channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
480 channel);
481 }
482 }
483 }
484 while (num_remaining > 0) {
485 bool ok = false;
486 void* tag = nullptr;
487 cq.Next(&tag, &ok);
488 Channel* channel = static_cast<Channel*>(tag);
489 if (!ok) {
490 grpc_core::Crash(absl::StrFormat(
491 "Channel %p failed to connect within the deadline", channel));
492 } else {
493 grpc_connectivity_state last_observed = channel->GetState(true);
494 if (last_observed == GRPC_CHANNEL_READY) {
495 LOG(INFO) << "Channel " << channel << " connected!";
496 num_remaining--;
497 } else {
498 channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
499 channel);
500 }
501 }
502 }
503 }
504
505 protected:
506 const int cores_;
507 RequestType request_;
508
509 class ClientChannelInfo {
510 public:
ClientChannelInfo(const std::string & target,const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub,int shard)511 ClientChannelInfo(
512 const std::string& target, const ClientConfig& config,
513 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
514 create_stub,
515 int shard) {
516 ChannelArguments args;
517 args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
518 set_channel_args(config, &args);
519
520 std::string type;
521 if (config.has_security_params() &&
522 config.security_params().cred_type().empty()) {
523 type = kTlsCredentialsType;
524 } else {
525 type = config.security_params().cred_type();
526 }
527
528 std::string inproc_pfx(INPROC_NAME_PREFIX);
529 if (!absl::StartsWith(target, inproc_pfx)) {
530 channel_ = CreateTestChannel(
531 target, type, config.security_params().server_host_override(),
532 !config.security_params().use_test_ca(),
533 std::shared_ptr<CallCredentials>(), args);
534 LOG(INFO) << "Connecting to " << target;
535 is_inproc_ = false;
536 } else {
537 std::string tgt = target;
538 tgt.erase(0, inproc_pfx.length());
539 int srv_num = std::stoi(tgt);
540 channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args);
541 is_inproc_ = true;
542 }
543 stub_ = create_stub(channel_);
544 }
get_channel()545 Channel* get_channel() { return channel_.get(); }
get_stub()546 StubType* get_stub() { return stub_.get(); }
is_inproc()547 bool is_inproc() { return is_inproc_; }
548
549 private:
set_channel_args(const ClientConfig & config,ChannelArguments * args)550 void set_channel_args(const ClientConfig& config, ChannelArguments* args) {
551 for (const auto& channel_arg : config.channel_args()) {
552 if (channel_arg.value_case() == ChannelArg::kStrValue) {
553 args->SetString(channel_arg.name(), channel_arg.str_value());
554 } else if (channel_arg.value_case() == ChannelArg::kIntValue) {
555 args->SetInt(channel_arg.name(), channel_arg.int_value());
556 } else {
557 LOG(ERROR) << "Empty channel arg value.";
558 }
559 }
560 }
561
562 std::shared_ptr<Channel> channel_;
563 std::unique_ptr<StubType> stub_;
564 bool is_inproc_;
565 };
566 std::vector<ClientChannelInfo> channels_;
567 std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
568 create_stub_;
569 };
570
571 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config);
572 std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config);
573 std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config);
574 std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
575 const ClientConfig& config);
576
577 } // namespace testing
578 } // namespace grpc
579
580 #endif // GRPC_TEST_CPP_QPS_CLIENT_H
581