1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #ifndef TEST_QPS_CLIENT_H
20 #define TEST_QPS_CLIENT_H
21
22 #include <stdlib.h>
23
24 #include <condition_variable>
25 #include <mutex>
26 #include <unordered_map>
27 #include <vector>
28
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/support/byte_buffer.h>
33 #include <grpcpp/support/channel_arguments.h>
34 #include <grpcpp/support/slice.h>
35
36 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
37 #include "src/proto/grpc/testing/payloads.pb.h"
38
39 #include "src/core/lib/gpr/env.h"
40 #include "src/cpp/util/core_stats.h"
41 #include "test/cpp/qps/histogram.h"
42 #include "test/cpp/qps/interarrival.h"
43 #include "test/cpp/qps/qps_worker.h"
44 #include "test/cpp/qps/server.h"
45 #include "test/cpp/qps/usage_timer.h"
46 #include "test/cpp/util/create_test_channel.h"
47 #include "test/cpp/util/test_credentials_provider.h"
48
49 #define INPROC_NAME_PREFIX "qpsinproc:"
50
51 namespace grpc {
52 namespace testing {
53
54 template <class RequestType>
55 class ClientRequestCreator {
56 public:
ClientRequestCreator(RequestType *,const PayloadConfig &)57 ClientRequestCreator(RequestType* /*req*/, const PayloadConfig&) {
58 // this template must be specialized
59 // fail with an assertion rather than a compile-time
60 // check since these only happen at the beginning anyway
61 GPR_ASSERT(false);
62 }
63 };
64
65 template <>
66 class ClientRequestCreator<SimpleRequest> {
67 public:
ClientRequestCreator(SimpleRequest * req,const PayloadConfig & payload_config)68 ClientRequestCreator(SimpleRequest* req,
69 const PayloadConfig& payload_config) {
70 if (payload_config.has_bytebuf_params()) {
71 GPR_ASSERT(false); // not appropriate for this specialization
72 } else if (payload_config.has_simple_params()) {
73 req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
74 req->set_response_size(payload_config.simple_params().resp_size());
75 req->mutable_payload()->set_type(
76 grpc::testing::PayloadType::COMPRESSABLE);
77 int size = payload_config.simple_params().req_size();
78 std::unique_ptr<char[]> body(new char[size]);
79 req->mutable_payload()->set_body(body.get(), size);
80 } else if (payload_config.has_complex_params()) {
81 GPR_ASSERT(false); // not appropriate for this specialization
82 } else {
83 // default should be simple proto without payloads
84 req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
85 req->set_response_size(0);
86 req->mutable_payload()->set_type(
87 grpc::testing::PayloadType::COMPRESSABLE);
88 }
89 }
90 };
91
92 template <>
93 class ClientRequestCreator<ByteBuffer> {
94 public:
ClientRequestCreator(ByteBuffer * req,const PayloadConfig & payload_config)95 ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) {
96 if (payload_config.has_bytebuf_params()) {
97 size_t req_sz =
98 static_cast<size_t>(payload_config.bytebuf_params().req_size());
99 std::unique_ptr<char[]> buf(new char[req_sz]);
100 memset(buf.get(), 0, req_sz);
101 Slice slice(buf.get(), req_sz);
102 *req = ByteBuffer(&slice, 1);
103 } else {
104 GPR_ASSERT(false); // not appropriate for this specialization
105 }
106 }
107 };
108
109 class HistogramEntry final {
110 public:
HistogramEntry()111 HistogramEntry() : value_used_(false), status_used_(false) {}
value_used()112 bool value_used() const { return value_used_; }
value()113 double value() const { return value_; }
set_value(double v)114 void set_value(double v) {
115 value_used_ = true;
116 value_ = v;
117 }
status_used()118 bool status_used() const { return status_used_; }
status()119 int status() const { return status_; }
set_status(int status)120 void set_status(int status) {
121 status_used_ = true;
122 status_ = status;
123 }
124
125 private:
126 bool value_used_;
127 double value_;
128 bool status_used_;
129 int status_;
130 };
131
132 typedef std::unordered_map<int, int64_t> StatusHistogram;
133
MergeStatusHistogram(const StatusHistogram & from,StatusHistogram * to)134 inline void MergeStatusHistogram(const StatusHistogram& from,
135 StatusHistogram* to) {
136 for (StatusHistogram::const_iterator it = from.begin(); it != from.end();
137 ++it) {
138 (*to)[it->first] += it->second;
139 }
140 }
141
142 class Client {
143 public:
Client()144 Client()
145 : timer_(new UsageTimer),
146 interarrival_timer_(),
147 started_requests_(false),
148 last_reset_poll_count_(0) {
149 gpr_event_init(&start_requests_);
150 }
~Client()151 virtual ~Client() {}
152
Mark(bool reset)153 ClientStats Mark(bool reset) {
154 Histogram latencies;
155 StatusHistogram statuses;
156 UsageTimer::Result timer_result;
157
158 MaybeStartRequests();
159
160 int cur_poll_count = GetPollCount();
161 int poll_count = cur_poll_count - last_reset_poll_count_;
162 if (reset) {
163 std::vector<Histogram> to_merge(threads_.size());
164 std::vector<StatusHistogram> to_merge_status(threads_.size());
165
166 for (size_t i = 0; i < threads_.size(); i++) {
167 threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]);
168 }
169 std::unique_ptr<UsageTimer> timer(new UsageTimer);
170 timer_.swap(timer);
171 for (size_t i = 0; i < threads_.size(); i++) {
172 latencies.Merge(to_merge[i]);
173 MergeStatusHistogram(to_merge_status[i], &statuses);
174 }
175 timer_result = timer->Mark();
176 last_reset_poll_count_ = cur_poll_count;
177 } else {
178 // merge snapshots of each thread histogram
179 for (size_t i = 0; i < threads_.size(); i++) {
180 threads_[i]->MergeStatsInto(&latencies, &statuses);
181 }
182 timer_result = timer_->Mark();
183 }
184
185 // Print the median latency per interval for one thread.
186 // If the number of warmup seconds is x, then the first x + 1 numbers in the
187 // vector are from the warmup period and should be discarded.
188 if (median_latency_collection_interval_seconds_ > 0) {
189 std::vector<double> medians_per_interval =
190 threads_[0]->GetMedianPerIntervalList();
191 gpr_log(GPR_INFO, "Num threads: %ld", threads_.size());
192 gpr_log(GPR_INFO, "Number of medians: %ld", medians_per_interval.size());
193 for (size_t j = 0; j < medians_per_interval.size(); j++) {
194 gpr_log(GPR_INFO, "%f", medians_per_interval[j]);
195 }
196 }
197
198 grpc_stats_data core_stats;
199 grpc_stats_collect(&core_stats);
200
201 ClientStats stats;
202 latencies.FillProto(stats.mutable_latencies());
203 for (StatusHistogram::const_iterator it = statuses.begin();
204 it != statuses.end(); ++it) {
205 RequestResultCount* rrc = stats.add_request_results();
206 rrc->set_status_code(it->first);
207 rrc->set_count(it->second);
208 }
209 stats.set_time_elapsed(timer_result.wall);
210 stats.set_time_system(timer_result.system);
211 stats.set_time_user(timer_result.user);
212 stats.set_cq_poll_count(poll_count);
213 CoreStatsToProto(core_stats, stats.mutable_core_stats());
214 return stats;
215 }
216
217 // Must call AwaitThreadsCompletion before destructor to avoid a race
218 // between destructor and invocation of virtual ThreadFunc
AwaitThreadsCompletion()219 void AwaitThreadsCompletion() {
220 gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(true));
221 DestroyMultithreading();
222 std::unique_lock<std::mutex> g(thread_completion_mu_);
223 while (threads_remaining_ != 0) {
224 threads_complete_.wait(g);
225 }
226 }
227
228 // Returns the interval (in seconds) between collecting latency medians. If 0,
229 // no periodic median latencies will be collected.
GetLatencyCollectionIntervalInSeconds()230 double GetLatencyCollectionIntervalInSeconds() {
231 return median_latency_collection_interval_seconds_;
232 }
233
GetPollCount()234 virtual int GetPollCount() {
235 // For sync client.
236 return 0;
237 }
238
IsClosedLoop()239 bool IsClosedLoop() { return closed_loop_; }
240
NextIssueTime(int thread_idx)241 gpr_timespec NextIssueTime(int thread_idx) {
242 const gpr_timespec result = next_time_[thread_idx];
243 next_time_[thread_idx] =
244 gpr_time_add(next_time_[thread_idx],
245 gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
246 GPR_TIMESPAN));
247 return result;
248 }
249
ThreadCompleted()250 bool ThreadCompleted() {
251 return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
252 }
253
254 class Thread {
255 public:
Thread(Client * client,size_t idx)256 Thread(Client* client, size_t idx)
257 : client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {}
258
~Thread()259 ~Thread() { impl_.join(); }
260
BeginSwap(Histogram * n,StatusHistogram * s)261 void BeginSwap(Histogram* n, StatusHistogram* s) {
262 std::lock_guard<std::mutex> g(mu_);
263 n->Swap(&histogram_);
264 s->swap(statuses_);
265 }
266
MergeStatsInto(Histogram * hist,StatusHistogram * s)267 void MergeStatsInto(Histogram* hist, StatusHistogram* s) {
268 std::unique_lock<std::mutex> g(mu_);
269 hist->Merge(histogram_);
270 MergeStatusHistogram(statuses_, s);
271 }
272
GetMedianPerIntervalList()273 std::vector<double> GetMedianPerIntervalList() {
274 return medians_each_interval_list_;
275 }
276
UpdateHistogram(HistogramEntry * entry)277 void UpdateHistogram(HistogramEntry* entry) {
278 std::lock_guard<std::mutex> g(mu_);
279 if (entry->value_used()) {
280 histogram_.Add(entry->value());
281 if (client_->GetLatencyCollectionIntervalInSeconds() > 0) {
282 histogram_per_interval_.Add(entry->value());
283 double now = UsageTimer::Now();
284 if ((now - interval_start_time_) >=
285 client_->GetLatencyCollectionIntervalInSeconds()) {
286 // Record the median latency of requests from the last interval.
287 // Divide by 1e3 to get microseconds.
288 medians_each_interval_list_.push_back(
289 histogram_per_interval_.Percentile(50) / 1e3);
290 histogram_per_interval_.Reset();
291 interval_start_time_ = now;
292 }
293 }
294 }
295 if (entry->status_used()) {
296 statuses_[entry->status()]++;
297 }
298 }
299
300 private:
301 Thread(const Thread&);
302 Thread& operator=(const Thread&);
303
ThreadFunc()304 void ThreadFunc() {
305 int wait_loop = 0;
306 while (!gpr_event_wait(
307 &client_->start_requests_,
308 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
309 gpr_time_from_seconds(20, GPR_TIMESPAN)))) {
310 gpr_log(GPR_INFO, "%" PRIdPTR ": Waiting for benchmark to start (%d)",
311 idx_, wait_loop);
312 wait_loop++;
313 }
314
315 client_->ThreadFunc(idx_, this);
316 client_->CompleteThread();
317 }
318
319 std::mutex mu_;
320 Histogram histogram_;
321 StatusHistogram statuses_;
322 Client* client_;
323 const size_t idx_;
324 std::thread impl_;
325 // The following are used only if
326 // median_latency_collection_interval_seconds_ is greater than 0
327 Histogram histogram_per_interval_;
328 std::vector<double> medians_each_interval_list_;
329 double interval_start_time_;
330 };
331
332 protected:
333 bool closed_loop_;
334 gpr_atm thread_pool_done_;
335 double median_latency_collection_interval_seconds_; // In seconds
336
StartThreads(size_t num_threads)337 void StartThreads(size_t num_threads) {
338 gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
339 threads_remaining_ = num_threads;
340 for (size_t i = 0; i < num_threads; i++) {
341 threads_.emplace_back(new Thread(this, i));
342 }
343 }
344
EndThreads()345 void EndThreads() {
346 MaybeStartRequests();
347 threads_.clear();
348 }
349
350 virtual void DestroyMultithreading() = 0;
351
SetupLoadTest(const ClientConfig & config,size_t num_threads)352 void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
353 // Set up the load distribution based on the number of threads
354 const auto& load = config.load_params();
355
356 std::unique_ptr<RandomDistInterface> random_dist;
357 switch (load.load_case()) {
358 case LoadParams::kClosedLoop:
359 // Closed-loop doesn't use random dist at all
360 break;
361 case LoadParams::kPoisson:
362 random_dist.reset(
363 new ExpDist(load.poisson().offered_load() / num_threads));
364 break;
365 default:
366 GPR_ASSERT(false);
367 }
368
369 // Set closed_loop_ based on whether or not random_dist is set
370 if (!random_dist) {
371 closed_loop_ = true;
372 } else {
373 closed_loop_ = false;
374 // set up interarrival timer according to random dist
375 interarrival_timer_.init(*random_dist, num_threads);
376 const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
377 for (size_t i = 0; i < num_threads; i++) {
378 next_time_.push_back(gpr_time_add(
379 now,
380 gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
381 }
382 }
383 }
384
NextIssuer(int thread_idx)385 std::function<gpr_timespec()> NextIssuer(int thread_idx) {
386 return closed_loop_ ? std::function<gpr_timespec()>()
387 : std::bind(&Client::NextIssueTime, this, thread_idx);
388 }
389
390 virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
391
392 std::vector<std::unique_ptr<Thread>> threads_;
393 std::unique_ptr<UsageTimer> timer_;
394
395 InterarrivalTimer interarrival_timer_;
396 std::vector<gpr_timespec> next_time_;
397
398 std::mutex thread_completion_mu_;
399 size_t threads_remaining_;
400 std::condition_variable threads_complete_;
401
402 gpr_event start_requests_;
403 bool started_requests_;
404
405 int last_reset_poll_count_;
406
MaybeStartRequests()407 void MaybeStartRequests() {
408 if (!started_requests_) {
409 started_requests_ = true;
410 gpr_event_set(&start_requests_, (void*)1);
411 }
412 }
413
CompleteThread()414 void CompleteThread() {
415 std::lock_guard<std::mutex> g(thread_completion_mu_);
416 threads_remaining_--;
417 if (threads_remaining_ == 0) {
418 threads_complete_.notify_all();
419 }
420 }
421 };
422
423 template <class StubType, class RequestType>
424 class ClientImpl : public Client {
425 public:
ClientImpl(const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub)426 ClientImpl(const ClientConfig& config,
427 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
428 create_stub)
429 : cores_(gpr_cpu_num_cores()), create_stub_(create_stub) {
430 for (int i = 0; i < config.client_channels(); i++) {
431 channels_.emplace_back(
432 config.server_targets(i % config.server_targets_size()), config,
433 create_stub_, i);
434 }
435 WaitForChannelsToConnect();
436 median_latency_collection_interval_seconds_ =
437 config.median_latency_collection_interval_millis() / 1e3;
438 ClientRequestCreator<RequestType> create_req(&request_,
439 config.payload_config());
440 }
~ClientImpl()441 virtual ~ClientImpl() {}
request()442 const RequestType* request() { return &request_; }
443
WaitForChannelsToConnect()444 void WaitForChannelsToConnect() {
445 int connect_deadline_seconds = 10;
446 /* Allow optionally overriding connect_deadline in order
447 * to deal with benchmark environments in which the server
448 * can take a long time to become ready. */
449 char* channel_connect_timeout_str =
450 gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
451 if (channel_connect_timeout_str != nullptr &&
452 strcmp(channel_connect_timeout_str, "") != 0) {
453 connect_deadline_seconds = atoi(channel_connect_timeout_str);
454 }
455 gpr_log(GPR_INFO,
456 "Waiting for up to %d seconds for all channels to connect",
457 connect_deadline_seconds);
458 gpr_free(channel_connect_timeout_str);
459 gpr_timespec connect_deadline = gpr_time_add(
460 gpr_now(GPR_CLOCK_REALTIME),
461 gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN));
462 CompletionQueue cq;
463 size_t num_remaining = 0;
464 for (auto& c : channels_) {
465 if (!c.is_inproc()) {
466 Channel* channel = c.get_channel();
467 grpc_connectivity_state last_observed = channel->GetState(true);
468 if (last_observed == GRPC_CHANNEL_READY) {
469 gpr_log(GPR_INFO, "Channel %p connected!", channel);
470 } else {
471 num_remaining++;
472 channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
473 channel);
474 }
475 }
476 }
477 while (num_remaining > 0) {
478 bool ok = false;
479 void* tag = nullptr;
480 cq.Next(&tag, &ok);
481 Channel* channel = static_cast<Channel*>(tag);
482 if (!ok) {
483 gpr_log(GPR_ERROR, "Channel %p failed to connect within the deadline",
484 channel);
485 abort();
486 } else {
487 grpc_connectivity_state last_observed = channel->GetState(true);
488 if (last_observed == GRPC_CHANNEL_READY) {
489 gpr_log(GPR_INFO, "Channel %p connected!", channel);
490 num_remaining--;
491 } else {
492 channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
493 channel);
494 }
495 }
496 }
497 }
498
499 protected:
500 const int cores_;
501 RequestType request_;
502
503 class ClientChannelInfo {
504 public:
ClientChannelInfo(const std::string & target,const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub,int shard)505 ClientChannelInfo(
506 const std::string& target, const ClientConfig& config,
507 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
508 create_stub,
509 int shard) {
510 ChannelArguments args;
511 args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
512 set_channel_args(config, &args);
513
514 std::string type;
515 if (config.has_security_params() &&
516 config.security_params().cred_type().empty()) {
517 type = kTlsCredentialsType;
518 } else {
519 type = config.security_params().cred_type();
520 }
521
522 std::string inproc_pfx(INPROC_NAME_PREFIX);
523 if (target.find(inproc_pfx) != 0) {
524 channel_ = CreateTestChannel(
525 target, type, config.security_params().server_host_override(),
526 !config.security_params().use_test_ca(),
527 std::shared_ptr<CallCredentials>(), args);
528 gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
529 is_inproc_ = false;
530 } else {
531 std::string tgt = target;
532 tgt.erase(0, inproc_pfx.length());
533 int srv_num = std::stoi(tgt);
534 channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args);
535 is_inproc_ = true;
536 }
537 stub_ = create_stub(channel_);
538 }
get_channel()539 Channel* get_channel() { return channel_.get(); }
get_stub()540 StubType* get_stub() { return stub_.get(); }
is_inproc()541 bool is_inproc() { return is_inproc_; }
542
543 private:
set_channel_args(const ClientConfig & config,ChannelArguments * args)544 void set_channel_args(const ClientConfig& config, ChannelArguments* args) {
545 for (const auto& channel_arg : config.channel_args()) {
546 if (channel_arg.value_case() == ChannelArg::kStrValue) {
547 args->SetString(channel_arg.name(), channel_arg.str_value());
548 } else if (channel_arg.value_case() == ChannelArg::kIntValue) {
549 args->SetInt(channel_arg.name(), channel_arg.int_value());
550 } else {
551 gpr_log(GPR_ERROR, "Empty channel arg value.");
552 }
553 }
554 }
555
556 std::shared_ptr<Channel> channel_;
557 std::unique_ptr<StubType> stub_;
558 bool is_inproc_;
559 };
560 std::vector<ClientChannelInfo> channels_;
561 std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
562 create_stub_;
563 };
564
565 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args);
566 std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args);
567 std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& args);
568 std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
569 const ClientConfig& args);
570
571 } // namespace testing
572 } // namespace grpc
573
574 #endif
575