1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #ifndef TEST_QPS_CLIENT_H
20 #define TEST_QPS_CLIENT_H
21
22 #include <stdlib.h>
23
24 #include <condition_variable>
25 #include <mutex>
26 #include <unordered_map>
27 #include <vector>
28
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/support/byte_buffer.h>
33 #include <grpcpp/support/channel_arguments.h>
34 #include <grpcpp/support/slice.h>
35
36 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
37 #include "src/proto/grpc/testing/payloads.pb.h"
38
39 #include "src/core/lib/gpr/env.h"
40 #include "src/cpp/util/core_stats.h"
41 #include "test/cpp/qps/histogram.h"
42 #include "test/cpp/qps/interarrival.h"
43 #include "test/cpp/qps/qps_worker.h"
44 #include "test/cpp/qps/server.h"
45 #include "test/cpp/qps/usage_timer.h"
46 #include "test/cpp/util/create_test_channel.h"
47 #include "test/cpp/util/test_credentials_provider.h"
48
49 #define INPROC_NAME_PREFIX "qpsinproc:"
50
51 namespace grpc {
52 namespace testing {
53
54 template <class RequestType>
55 class ClientRequestCreator {
56 public:
ClientRequestCreator(RequestType * req,const PayloadConfig &)57 ClientRequestCreator(RequestType* req, const PayloadConfig&) {
58 // this template must be specialized
59 // fail with an assertion rather than a compile-time
60 // check since these only happen at the beginning anyway
61 GPR_ASSERT(false);
62 }
63 };
64
65 template <>
66 class ClientRequestCreator<SimpleRequest> {
67 public:
ClientRequestCreator(SimpleRequest * req,const PayloadConfig & payload_config)68 ClientRequestCreator(SimpleRequest* req,
69 const PayloadConfig& payload_config) {
70 if (payload_config.has_bytebuf_params()) {
71 GPR_ASSERT(false); // not appropriate for this specialization
72 } else if (payload_config.has_simple_params()) {
73 req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
74 req->set_response_size(payload_config.simple_params().resp_size());
75 req->mutable_payload()->set_type(
76 grpc::testing::PayloadType::COMPRESSABLE);
77 int size = payload_config.simple_params().req_size();
78 std::unique_ptr<char[]> body(new char[size]);
79 req->mutable_payload()->set_body(body.get(), size);
80 } else if (payload_config.has_complex_params()) {
81 GPR_ASSERT(false); // not appropriate for this specialization
82 } else {
83 // default should be simple proto without payloads
84 req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
85 req->set_response_size(0);
86 req->mutable_payload()->set_type(
87 grpc::testing::PayloadType::COMPRESSABLE);
88 }
89 }
90 };
91
92 template <>
93 class ClientRequestCreator<ByteBuffer> {
94 public:
ClientRequestCreator(ByteBuffer * req,const PayloadConfig & payload_config)95 ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) {
96 if (payload_config.has_bytebuf_params()) {
97 std::unique_ptr<char[]> buf(
98 new char[payload_config.bytebuf_params().req_size()]);
99 Slice slice(buf.get(), payload_config.bytebuf_params().req_size());
100 *req = ByteBuffer(&slice, 1);
101 } else {
102 GPR_ASSERT(false); // not appropriate for this specialization
103 }
104 }
105 };
106
107 class HistogramEntry final {
108 public:
HistogramEntry()109 HistogramEntry() : value_used_(false), status_used_(false) {}
value_used()110 bool value_used() const { return value_used_; }
value()111 double value() const { return value_; }
set_value(double v)112 void set_value(double v) {
113 value_used_ = true;
114 value_ = v;
115 }
status_used()116 bool status_used() const { return status_used_; }
status()117 int status() const { return status_; }
set_status(int status)118 void set_status(int status) {
119 status_used_ = true;
120 status_ = status;
121 }
122
123 private:
124 bool value_used_;
125 double value_;
126 bool status_used_;
127 int status_;
128 };
129
130 typedef std::unordered_map<int, int64_t> StatusHistogram;
131
MergeStatusHistogram(const StatusHistogram & from,StatusHistogram * to)132 inline void MergeStatusHistogram(const StatusHistogram& from,
133 StatusHistogram* to) {
134 for (StatusHistogram::const_iterator it = from.begin(); it != from.end();
135 ++it) {
136 (*to)[it->first] += it->second;
137 }
138 }
139
140 class Client {
141 public:
Client()142 Client()
143 : timer_(new UsageTimer),
144 interarrival_timer_(),
145 started_requests_(false),
146 last_reset_poll_count_(0) {
147 gpr_event_init(&start_requests_);
148 }
~Client()149 virtual ~Client() {}
150
Mark(bool reset)151 ClientStats Mark(bool reset) {
152 Histogram latencies;
153 StatusHistogram statuses;
154 UsageTimer::Result timer_result;
155
156 MaybeStartRequests();
157
158 int cur_poll_count = GetPollCount();
159 int poll_count = cur_poll_count - last_reset_poll_count_;
160 if (reset) {
161 std::vector<Histogram> to_merge(threads_.size());
162 std::vector<StatusHistogram> to_merge_status(threads_.size());
163
164 for (size_t i = 0; i < threads_.size(); i++) {
165 threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]);
166 }
167 std::unique_ptr<UsageTimer> timer(new UsageTimer);
168 timer_.swap(timer);
169 for (size_t i = 0; i < threads_.size(); i++) {
170 latencies.Merge(to_merge[i]);
171 MergeStatusHistogram(to_merge_status[i], &statuses);
172 }
173 timer_result = timer->Mark();
174 last_reset_poll_count_ = cur_poll_count;
175 } else {
176 // merge snapshots of each thread histogram
177 for (size_t i = 0; i < threads_.size(); i++) {
178 threads_[i]->MergeStatsInto(&latencies, &statuses);
179 }
180 timer_result = timer_->Mark();
181 }
182
183 // Print the median latency per interval for one thread.
184 // If the number of warmup seconds is x, then the first x + 1 numbers in the
185 // vector are from the warmup period and should be discarded.
186 if (median_latency_collection_interval_seconds_ > 0) {
187 std::vector<double> medians_per_interval =
188 threads_[0]->GetMedianPerIntervalList();
189 gpr_log(GPR_INFO, "Num threads: %ld", threads_.size());
190 gpr_log(GPR_INFO, "Number of medians: %ld", medians_per_interval.size());
191 for (size_t j = 0; j < medians_per_interval.size(); j++) {
192 gpr_log(GPR_INFO, "%f", medians_per_interval[j]);
193 }
194 }
195
196 grpc_stats_data core_stats;
197 grpc_stats_collect(&core_stats);
198
199 ClientStats stats;
200 latencies.FillProto(stats.mutable_latencies());
201 for (StatusHistogram::const_iterator it = statuses.begin();
202 it != statuses.end(); ++it) {
203 RequestResultCount* rrc = stats.add_request_results();
204 rrc->set_status_code(it->first);
205 rrc->set_count(it->second);
206 }
207 stats.set_time_elapsed(timer_result.wall);
208 stats.set_time_system(timer_result.system);
209 stats.set_time_user(timer_result.user);
210 stats.set_cq_poll_count(poll_count);
211 CoreStatsToProto(core_stats, stats.mutable_core_stats());
212 return stats;
213 }
214
215 // Must call AwaitThreadsCompletion before destructor to avoid a race
216 // between destructor and invocation of virtual ThreadFunc
AwaitThreadsCompletion()217 void AwaitThreadsCompletion() {
218 gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(true));
219 DestroyMultithreading();
220 std::unique_lock<std::mutex> g(thread_completion_mu_);
221 while (threads_remaining_ != 0) {
222 threads_complete_.wait(g);
223 }
224 }
225
226 // Returns the interval (in seconds) between collecting latency medians. If 0,
227 // no periodic median latencies will be collected.
GetLatencyCollectionIntervalInSeconds()228 double GetLatencyCollectionIntervalInSeconds() {
229 return median_latency_collection_interval_seconds_;
230 }
231
GetPollCount()232 virtual int GetPollCount() {
233 // For sync client.
234 return 0;
235 }
236
237 protected:
238 bool closed_loop_;
239 gpr_atm thread_pool_done_;
240 double median_latency_collection_interval_seconds_; // In seconds
241
StartThreads(size_t num_threads)242 void StartThreads(size_t num_threads) {
243 gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
244 threads_remaining_ = num_threads;
245 for (size_t i = 0; i < num_threads; i++) {
246 threads_.emplace_back(new Thread(this, i));
247 }
248 }
249
EndThreads()250 void EndThreads() {
251 MaybeStartRequests();
252 threads_.clear();
253 }
254
255 virtual void DestroyMultithreading() = 0;
256
SetupLoadTest(const ClientConfig & config,size_t num_threads)257 void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
258 // Set up the load distribution based on the number of threads
259 const auto& load = config.load_params();
260
261 std::unique_ptr<RandomDistInterface> random_dist;
262 switch (load.load_case()) {
263 case LoadParams::kClosedLoop:
264 // Closed-loop doesn't use random dist at all
265 break;
266 case LoadParams::kPoisson:
267 random_dist.reset(
268 new ExpDist(load.poisson().offered_load() / num_threads));
269 break;
270 default:
271 GPR_ASSERT(false);
272 }
273
274 // Set closed_loop_ based on whether or not random_dist is set
275 if (!random_dist) {
276 closed_loop_ = true;
277 } else {
278 closed_loop_ = false;
279 // set up interarrival timer according to random dist
280 interarrival_timer_.init(*random_dist, num_threads);
281 const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
282 for (size_t i = 0; i < num_threads; i++) {
283 next_time_.push_back(gpr_time_add(
284 now,
285 gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
286 }
287 }
288 }
289
NextIssueTime(int thread_idx)290 gpr_timespec NextIssueTime(int thread_idx) {
291 const gpr_timespec result = next_time_[thread_idx];
292 next_time_[thread_idx] =
293 gpr_time_add(next_time_[thread_idx],
294 gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
295 GPR_TIMESPAN));
296 return result;
297 }
NextIssuer(int thread_idx)298 std::function<gpr_timespec()> NextIssuer(int thread_idx) {
299 return closed_loop_ ? std::function<gpr_timespec()>()
300 : std::bind(&Client::NextIssueTime, this, thread_idx);
301 }
302
303 class Thread {
304 public:
Thread(Client * client,size_t idx)305 Thread(Client* client, size_t idx)
306 : client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {}
307
~Thread()308 ~Thread() { impl_.join(); }
309
BeginSwap(Histogram * n,StatusHistogram * s)310 void BeginSwap(Histogram* n, StatusHistogram* s) {
311 std::lock_guard<std::mutex> g(mu_);
312 n->Swap(&histogram_);
313 s->swap(statuses_);
314 }
315
MergeStatsInto(Histogram * hist,StatusHistogram * s)316 void MergeStatsInto(Histogram* hist, StatusHistogram* s) {
317 std::unique_lock<std::mutex> g(mu_);
318 hist->Merge(histogram_);
319 MergeStatusHistogram(statuses_, s);
320 }
321
GetMedianPerIntervalList()322 std::vector<double> GetMedianPerIntervalList() {
323 return medians_each_interval_list_;
324 }
325
UpdateHistogram(HistogramEntry * entry)326 void UpdateHistogram(HistogramEntry* entry) {
327 std::lock_guard<std::mutex> g(mu_);
328 if (entry->value_used()) {
329 histogram_.Add(entry->value());
330 if (client_->GetLatencyCollectionIntervalInSeconds() > 0) {
331 histogram_per_interval_.Add(entry->value());
332 double now = UsageTimer::Now();
333 if ((now - interval_start_time_) >=
334 client_->GetLatencyCollectionIntervalInSeconds()) {
335 // Record the median latency of requests from the last interval.
336 // Divide by 1e3 to get microseconds.
337 medians_each_interval_list_.push_back(
338 histogram_per_interval_.Percentile(50) / 1e3);
339 histogram_per_interval_.Reset();
340 interval_start_time_ = now;
341 }
342 }
343 }
344 if (entry->status_used()) {
345 statuses_[entry->status()]++;
346 }
347 }
348
349 private:
350 Thread(const Thread&);
351 Thread& operator=(const Thread&);
352
ThreadFunc()353 void ThreadFunc() {
354 int wait_loop = 0;
355 while (!gpr_event_wait(
356 &client_->start_requests_,
357 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
358 gpr_time_from_seconds(20, GPR_TIMESPAN)))) {
359 gpr_log(GPR_INFO, "%" PRIdPTR ": Waiting for benchmark to start (%d)",
360 idx_, wait_loop);
361 wait_loop++;
362 }
363
364 client_->ThreadFunc(idx_, this);
365 client_->CompleteThread();
366 }
367
368 std::mutex mu_;
369 Histogram histogram_;
370 StatusHistogram statuses_;
371 Client* client_;
372 const size_t idx_;
373 std::thread impl_;
374 // The following are used only if
375 // median_latency_collection_interval_seconds_ is greater than 0
376 Histogram histogram_per_interval_;
377 std::vector<double> medians_each_interval_list_;
378 double interval_start_time_;
379 };
380
ThreadCompleted()381 bool ThreadCompleted() {
382 return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
383 }
384
385 virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
386
387 std::vector<std::unique_ptr<Thread>> threads_;
388 std::unique_ptr<UsageTimer> timer_;
389
390 InterarrivalTimer interarrival_timer_;
391 std::vector<gpr_timespec> next_time_;
392
393 std::mutex thread_completion_mu_;
394 size_t threads_remaining_;
395 std::condition_variable threads_complete_;
396
397 gpr_event start_requests_;
398 bool started_requests_;
399
400 int last_reset_poll_count_;
401
MaybeStartRequests()402 void MaybeStartRequests() {
403 if (!started_requests_) {
404 started_requests_ = true;
405 gpr_event_set(&start_requests_, (void*)1);
406 }
407 }
408
CompleteThread()409 void CompleteThread() {
410 std::lock_guard<std::mutex> g(thread_completion_mu_);
411 threads_remaining_--;
412 if (threads_remaining_ == 0) {
413 threads_complete_.notify_all();
414 }
415 }
416 };
417
418 template <class StubType, class RequestType>
419 class ClientImpl : public Client {
420 public:
ClientImpl(const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub)421 ClientImpl(const ClientConfig& config,
422 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
423 create_stub)
424 : cores_(gpr_cpu_num_cores()), create_stub_(create_stub) {
425 for (int i = 0; i < config.client_channels(); i++) {
426 channels_.emplace_back(
427 config.server_targets(i % config.server_targets_size()), config,
428 create_stub_, i);
429 }
430 std::vector<std::unique_ptr<std::thread>> connecting_threads;
431 for (auto& c : channels_) {
432 connecting_threads.emplace_back(c.WaitForReady());
433 }
434 for (auto& t : connecting_threads) {
435 t->join();
436 }
437 median_latency_collection_interval_seconds_ =
438 config.median_latency_collection_interval_millis() / 1e3;
439 ClientRequestCreator<RequestType> create_req(&request_,
440 config.payload_config());
441 }
~ClientImpl()442 virtual ~ClientImpl() {}
443
444 protected:
445 const int cores_;
446 RequestType request_;
447
448 class ClientChannelInfo {
449 public:
ClientChannelInfo(const grpc::string & target,const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub,int shard)450 ClientChannelInfo(
451 const grpc::string& target, const ClientConfig& config,
452 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
453 create_stub,
454 int shard) {
455 ChannelArguments args;
456 args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
457 set_channel_args(config, &args);
458
459 grpc::string type;
460 if (config.has_security_params() &&
461 config.security_params().cred_type().empty()) {
462 type = kTlsCredentialsType;
463 } else {
464 type = config.security_params().cred_type();
465 }
466
467 grpc::string inproc_pfx(INPROC_NAME_PREFIX);
468 if (target.find(inproc_pfx) != 0) {
469 channel_ = CreateTestChannel(
470 target, type, config.security_params().server_host_override(),
471 !config.security_params().use_test_ca(),
472 std::shared_ptr<CallCredentials>(), args);
473 gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
474 is_inproc_ = false;
475 } else {
476 grpc::string tgt = target;
477 tgt.erase(0, inproc_pfx.length());
478 int srv_num = std::stoi(tgt);
479 channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args);
480 is_inproc_ = true;
481 }
482 stub_ = create_stub(channel_);
483 }
get_channel()484 Channel* get_channel() { return channel_.get(); }
get_stub()485 StubType* get_stub() { return stub_.get(); }
486
WaitForReady()487 std::unique_ptr<std::thread> WaitForReady() {
488 return std::unique_ptr<std::thread>(new std::thread([this]() {
489 if (!is_inproc_) {
490 int connect_deadline = 10;
491 /* Allow optionally overriding connect_deadline in order
492 * to deal with benchmark environments in which the server
493 * can take a long time to become ready. */
494 char* channel_connect_timeout_str =
495 gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
496 if (channel_connect_timeout_str != nullptr &&
497 strcmp(channel_connect_timeout_str, "") != 0) {
498 connect_deadline = atoi(channel_connect_timeout_str);
499 }
500 gpr_log(GPR_INFO,
501 "Waiting for up to %d seconds for the channel %p to connect",
502 connect_deadline, channel_.get());
503 gpr_free(channel_connect_timeout_str);
504 GPR_ASSERT(channel_->WaitForConnected(gpr_time_add(
505 gpr_now(GPR_CLOCK_REALTIME),
506 gpr_time_from_seconds(connect_deadline, GPR_TIMESPAN))));
507 gpr_log(GPR_INFO, "Channel %p connected!", channel_.get());
508 }
509 }));
510 }
511
512 private:
set_channel_args(const ClientConfig & config,ChannelArguments * args)513 void set_channel_args(const ClientConfig& config, ChannelArguments* args) {
514 for (const auto& channel_arg : config.channel_args()) {
515 if (channel_arg.value_case() == ChannelArg::kStrValue) {
516 args->SetString(channel_arg.name(), channel_arg.str_value());
517 } else if (channel_arg.value_case() == ChannelArg::kIntValue) {
518 args->SetInt(channel_arg.name(), channel_arg.int_value());
519 } else {
520 gpr_log(GPR_ERROR, "Empty channel arg value.");
521 }
522 }
523 }
524
525 std::shared_ptr<Channel> channel_;
526 std::unique_ptr<StubType> stub_;
527 bool is_inproc_;
528 };
529 std::vector<ClientChannelInfo> channels_;
530 std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
531 create_stub_;
532 };
533
534 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args);
535 std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args);
536 std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
537 const ClientConfig& args);
538
539 } // namespace testing
540 } // namespace grpc
541
542 #endif
543