• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef TEST_QPS_CLIENT_H
20 #define TEST_QPS_CLIENT_H
21 
22 #include <stdlib.h>
23 
24 #include <condition_variable>
25 #include <mutex>
26 #include <unordered_map>
27 #include <vector>
28 
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/support/byte_buffer.h>
33 #include <grpcpp/support/channel_arguments.h>
34 #include <grpcpp/support/slice.h>
35 
36 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
37 #include "src/proto/grpc/testing/payloads.pb.h"
38 
39 #include "src/core/lib/gpr/env.h"
40 #include "src/cpp/util/core_stats.h"
41 #include "test/cpp/qps/histogram.h"
42 #include "test/cpp/qps/interarrival.h"
43 #include "test/cpp/qps/qps_worker.h"
44 #include "test/cpp/qps/server.h"
45 #include "test/cpp/qps/usage_timer.h"
46 #include "test/cpp/util/create_test_channel.h"
47 #include "test/cpp/util/test_credentials_provider.h"
48 
49 #define INPROC_NAME_PREFIX "qpsinproc:"
50 
51 namespace grpc {
52 namespace testing {
53 
54 template <class RequestType>
55 class ClientRequestCreator {
56  public:
ClientRequestCreator(RequestType *,const PayloadConfig &)57   ClientRequestCreator(RequestType* /*req*/, const PayloadConfig&) {
58     // this template must be specialized
59     // fail with an assertion rather than a compile-time
60     // check since these only happen at the beginning anyway
61     GPR_ASSERT(false);
62   }
63 };
64 
65 template <>
66 class ClientRequestCreator<SimpleRequest> {
67  public:
ClientRequestCreator(SimpleRequest * req,const PayloadConfig & payload_config)68   ClientRequestCreator(SimpleRequest* req,
69                        const PayloadConfig& payload_config) {
70     if (payload_config.has_bytebuf_params()) {
71       GPR_ASSERT(false);  // not appropriate for this specialization
72     } else if (payload_config.has_simple_params()) {
73       req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
74       req->set_response_size(payload_config.simple_params().resp_size());
75       req->mutable_payload()->set_type(
76           grpc::testing::PayloadType::COMPRESSABLE);
77       int size = payload_config.simple_params().req_size();
78       std::unique_ptr<char[]> body(new char[size]);
79       req->mutable_payload()->set_body(body.get(), size);
80     } else if (payload_config.has_complex_params()) {
81       GPR_ASSERT(false);  // not appropriate for this specialization
82     } else {
83       // default should be simple proto without payloads
84       req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
85       req->set_response_size(0);
86       req->mutable_payload()->set_type(
87           grpc::testing::PayloadType::COMPRESSABLE);
88     }
89   }
90 };
91 
92 template <>
93 class ClientRequestCreator<ByteBuffer> {
94  public:
ClientRequestCreator(ByteBuffer * req,const PayloadConfig & payload_config)95   ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) {
96     if (payload_config.has_bytebuf_params()) {
97       size_t req_sz =
98           static_cast<size_t>(payload_config.bytebuf_params().req_size());
99       std::unique_ptr<char[]> buf(new char[req_sz]);
100       memset(buf.get(), 0, req_sz);
101       Slice slice(buf.get(), req_sz);
102       *req = ByteBuffer(&slice, 1);
103     } else {
104       GPR_ASSERT(false);  // not appropriate for this specialization
105     }
106   }
107 };
108 
109 class HistogramEntry final {
110  public:
HistogramEntry()111   HistogramEntry() : value_used_(false), status_used_(false) {}
value_used()112   bool value_used() const { return value_used_; }
value()113   double value() const { return value_; }
set_value(double v)114   void set_value(double v) {
115     value_used_ = true;
116     value_ = v;
117   }
status_used()118   bool status_used() const { return status_used_; }
status()119   int status() const { return status_; }
set_status(int status)120   void set_status(int status) {
121     status_used_ = true;
122     status_ = status;
123   }
124 
125  private:
126   bool value_used_;
127   double value_;
128   bool status_used_;
129   int status_;
130 };
131 
132 typedef std::unordered_map<int, int64_t> StatusHistogram;
133 
MergeStatusHistogram(const StatusHistogram & from,StatusHistogram * to)134 inline void MergeStatusHistogram(const StatusHistogram& from,
135                                  StatusHistogram* to) {
136   for (StatusHistogram::const_iterator it = from.begin(); it != from.end();
137        ++it) {
138     (*to)[it->first] += it->second;
139   }
140 }
141 
142 class Client {
143  public:
Client()144   Client()
145       : timer_(new UsageTimer),
146         interarrival_timer_(),
147         started_requests_(false),
148         last_reset_poll_count_(0) {
149     gpr_event_init(&start_requests_);
150   }
~Client()151   virtual ~Client() {}
152 
Mark(bool reset)153   ClientStats Mark(bool reset) {
154     Histogram latencies;
155     StatusHistogram statuses;
156     UsageTimer::Result timer_result;
157 
158     MaybeStartRequests();
159 
160     int cur_poll_count = GetPollCount();
161     int poll_count = cur_poll_count - last_reset_poll_count_;
162     if (reset) {
163       std::vector<Histogram> to_merge(threads_.size());
164       std::vector<StatusHistogram> to_merge_status(threads_.size());
165 
166       for (size_t i = 0; i < threads_.size(); i++) {
167         threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]);
168       }
169       std::unique_ptr<UsageTimer> timer(new UsageTimer);
170       timer_.swap(timer);
171       for (size_t i = 0; i < threads_.size(); i++) {
172         latencies.Merge(to_merge[i]);
173         MergeStatusHistogram(to_merge_status[i], &statuses);
174       }
175       timer_result = timer->Mark();
176       last_reset_poll_count_ = cur_poll_count;
177     } else {
178       // merge snapshots of each thread histogram
179       for (size_t i = 0; i < threads_.size(); i++) {
180         threads_[i]->MergeStatsInto(&latencies, &statuses);
181       }
182       timer_result = timer_->Mark();
183     }
184 
185     // Print the median latency per interval for one thread.
186     // If the number of warmup seconds is x, then the first x + 1 numbers in the
187     // vector are from the warmup period and should be discarded.
188     if (median_latency_collection_interval_seconds_ > 0) {
189       std::vector<double> medians_per_interval =
190           threads_[0]->GetMedianPerIntervalList();
191       gpr_log(GPR_INFO, "Num threads: %ld", threads_.size());
192       gpr_log(GPR_INFO, "Number of medians: %ld", medians_per_interval.size());
193       for (size_t j = 0; j < medians_per_interval.size(); j++) {
194         gpr_log(GPR_INFO, "%f", medians_per_interval[j]);
195       }
196     }
197 
198     grpc_stats_data core_stats;
199     grpc_stats_collect(&core_stats);
200 
201     ClientStats stats;
202     latencies.FillProto(stats.mutable_latencies());
203     for (StatusHistogram::const_iterator it = statuses.begin();
204          it != statuses.end(); ++it) {
205       RequestResultCount* rrc = stats.add_request_results();
206       rrc->set_status_code(it->first);
207       rrc->set_count(it->second);
208     }
209     stats.set_time_elapsed(timer_result.wall);
210     stats.set_time_system(timer_result.system);
211     stats.set_time_user(timer_result.user);
212     stats.set_cq_poll_count(poll_count);
213     CoreStatsToProto(core_stats, stats.mutable_core_stats());
214     return stats;
215   }
216 
217   // Must call AwaitThreadsCompletion before destructor to avoid a race
218   // between destructor and invocation of virtual ThreadFunc
AwaitThreadsCompletion()219   void AwaitThreadsCompletion() {
220     gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(true));
221     DestroyMultithreading();
222     std::unique_lock<std::mutex> g(thread_completion_mu_);
223     while (threads_remaining_ != 0) {
224       threads_complete_.wait(g);
225     }
226   }
227 
228   // Returns the interval (in seconds) between collecting latency medians. If 0,
229   // no periodic median latencies will be collected.
GetLatencyCollectionIntervalInSeconds()230   double GetLatencyCollectionIntervalInSeconds() {
231     return median_latency_collection_interval_seconds_;
232   }
233 
GetPollCount()234   virtual int GetPollCount() {
235     // For sync client.
236     return 0;
237   }
238 
IsClosedLoop()239   bool IsClosedLoop() { return closed_loop_; }
240 
NextIssueTime(int thread_idx)241   gpr_timespec NextIssueTime(int thread_idx) {
242     const gpr_timespec result = next_time_[thread_idx];
243     next_time_[thread_idx] =
244         gpr_time_add(next_time_[thread_idx],
245                      gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
246                                          GPR_TIMESPAN));
247     return result;
248   }
249 
ThreadCompleted()250   bool ThreadCompleted() {
251     return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
252   }
253 
254   class Thread {
255    public:
Thread(Client * client,size_t idx)256     Thread(Client* client, size_t idx)
257         : client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {}
258 
~Thread()259     ~Thread() { impl_.join(); }
260 
BeginSwap(Histogram * n,StatusHistogram * s)261     void BeginSwap(Histogram* n, StatusHistogram* s) {
262       std::lock_guard<std::mutex> g(mu_);
263       n->Swap(&histogram_);
264       s->swap(statuses_);
265     }
266 
MergeStatsInto(Histogram * hist,StatusHistogram * s)267     void MergeStatsInto(Histogram* hist, StatusHistogram* s) {
268       std::unique_lock<std::mutex> g(mu_);
269       hist->Merge(histogram_);
270       MergeStatusHistogram(statuses_, s);
271     }
272 
GetMedianPerIntervalList()273     std::vector<double> GetMedianPerIntervalList() {
274       return medians_each_interval_list_;
275     }
276 
UpdateHistogram(HistogramEntry * entry)277     void UpdateHistogram(HistogramEntry* entry) {
278       std::lock_guard<std::mutex> g(mu_);
279       if (entry->value_used()) {
280         histogram_.Add(entry->value());
281         if (client_->GetLatencyCollectionIntervalInSeconds() > 0) {
282           histogram_per_interval_.Add(entry->value());
283           double now = UsageTimer::Now();
284           if ((now - interval_start_time_) >=
285               client_->GetLatencyCollectionIntervalInSeconds()) {
286             // Record the median latency of requests from the last interval.
287             // Divide by 1e3 to get microseconds.
288             medians_each_interval_list_.push_back(
289                 histogram_per_interval_.Percentile(50) / 1e3);
290             histogram_per_interval_.Reset();
291             interval_start_time_ = now;
292           }
293         }
294       }
295       if (entry->status_used()) {
296         statuses_[entry->status()]++;
297       }
298     }
299 
300    private:
301     Thread(const Thread&);
302     Thread& operator=(const Thread&);
303 
ThreadFunc()304     void ThreadFunc() {
305       int wait_loop = 0;
306       while (!gpr_event_wait(
307           &client_->start_requests_,
308           gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
309                        gpr_time_from_seconds(20, GPR_TIMESPAN)))) {
310         gpr_log(GPR_INFO, "%" PRIdPTR ": Waiting for benchmark to start (%d)",
311                 idx_, wait_loop);
312         wait_loop++;
313       }
314 
315       client_->ThreadFunc(idx_, this);
316       client_->CompleteThread();
317     }
318 
319     std::mutex mu_;
320     Histogram histogram_;
321     StatusHistogram statuses_;
322     Client* client_;
323     const size_t idx_;
324     std::thread impl_;
325     // The following are used only if
326     // median_latency_collection_interval_seconds_ is greater than 0
327     Histogram histogram_per_interval_;
328     std::vector<double> medians_each_interval_list_;
329     double interval_start_time_;
330   };
331 
332  protected:
333   bool closed_loop_;
334   gpr_atm thread_pool_done_;
335   double median_latency_collection_interval_seconds_;  // In seconds
336 
StartThreads(size_t num_threads)337   void StartThreads(size_t num_threads) {
338     gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
339     threads_remaining_ = num_threads;
340     for (size_t i = 0; i < num_threads; i++) {
341       threads_.emplace_back(new Thread(this, i));
342     }
343   }
344 
EndThreads()345   void EndThreads() {
346     MaybeStartRequests();
347     threads_.clear();
348   }
349 
350   virtual void DestroyMultithreading() = 0;
351 
SetupLoadTest(const ClientConfig & config,size_t num_threads)352   void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
353     // Set up the load distribution based on the number of threads
354     const auto& load = config.load_params();
355 
356     std::unique_ptr<RandomDistInterface> random_dist;
357     switch (load.load_case()) {
358       case LoadParams::kClosedLoop:
359         // Closed-loop doesn't use random dist at all
360         break;
361       case LoadParams::kPoisson:
362         random_dist.reset(
363             new ExpDist(load.poisson().offered_load() / num_threads));
364         break;
365       default:
366         GPR_ASSERT(false);
367     }
368 
369     // Set closed_loop_ based on whether or not random_dist is set
370     if (!random_dist) {
371       closed_loop_ = true;
372     } else {
373       closed_loop_ = false;
374       // set up interarrival timer according to random dist
375       interarrival_timer_.init(*random_dist, num_threads);
376       const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
377       for (size_t i = 0; i < num_threads; i++) {
378         next_time_.push_back(gpr_time_add(
379             now,
380             gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
381       }
382     }
383   }
384 
NextIssuer(int thread_idx)385   std::function<gpr_timespec()> NextIssuer(int thread_idx) {
386     return closed_loop_ ? std::function<gpr_timespec()>()
387                         : std::bind(&Client::NextIssueTime, this, thread_idx);
388   }
389 
390   virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
391 
392   std::vector<std::unique_ptr<Thread>> threads_;
393   std::unique_ptr<UsageTimer> timer_;
394 
395   InterarrivalTimer interarrival_timer_;
396   std::vector<gpr_timespec> next_time_;
397 
398   std::mutex thread_completion_mu_;
399   size_t threads_remaining_;
400   std::condition_variable threads_complete_;
401 
402   gpr_event start_requests_;
403   bool started_requests_;
404 
405   int last_reset_poll_count_;
406 
MaybeStartRequests()407   void MaybeStartRequests() {
408     if (!started_requests_) {
409       started_requests_ = true;
410       gpr_event_set(&start_requests_, (void*)1);
411     }
412   }
413 
CompleteThread()414   void CompleteThread() {
415     std::lock_guard<std::mutex> g(thread_completion_mu_);
416     threads_remaining_--;
417     if (threads_remaining_ == 0) {
418       threads_complete_.notify_all();
419     }
420   }
421 };
422 
423 template <class StubType, class RequestType>
424 class ClientImpl : public Client {
425  public:
ClientImpl(const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub)426   ClientImpl(const ClientConfig& config,
427              std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
428                  create_stub)
429       : cores_(gpr_cpu_num_cores()), create_stub_(create_stub) {
430     for (int i = 0; i < config.client_channels(); i++) {
431       channels_.emplace_back(
432           config.server_targets(i % config.server_targets_size()), config,
433           create_stub_, i);
434     }
435     WaitForChannelsToConnect();
436     median_latency_collection_interval_seconds_ =
437         config.median_latency_collection_interval_millis() / 1e3;
438     ClientRequestCreator<RequestType> create_req(&request_,
439                                                  config.payload_config());
440   }
~ClientImpl()441   virtual ~ClientImpl() {}
request()442   const RequestType* request() { return &request_; }
443 
WaitForChannelsToConnect()444   void WaitForChannelsToConnect() {
445     int connect_deadline_seconds = 10;
446     /* Allow optionally overriding connect_deadline in order
447      * to deal with benchmark environments in which the server
448      * can take a long time to become ready. */
449     char* channel_connect_timeout_str =
450         gpr_getenv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
451     if (channel_connect_timeout_str != nullptr &&
452         strcmp(channel_connect_timeout_str, "") != 0) {
453       connect_deadline_seconds = atoi(channel_connect_timeout_str);
454     }
455     gpr_log(GPR_INFO,
456             "Waiting for up to %d seconds for all channels to connect",
457             connect_deadline_seconds);
458     gpr_free(channel_connect_timeout_str);
459     gpr_timespec connect_deadline = gpr_time_add(
460         gpr_now(GPR_CLOCK_REALTIME),
461         gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN));
462     CompletionQueue cq;
463     size_t num_remaining = 0;
464     for (auto& c : channels_) {
465       if (!c.is_inproc()) {
466         Channel* channel = c.get_channel();
467         grpc_connectivity_state last_observed = channel->GetState(true);
468         if (last_observed == GRPC_CHANNEL_READY) {
469           gpr_log(GPR_INFO, "Channel %p connected!", channel);
470         } else {
471           num_remaining++;
472           channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
473                                        channel);
474         }
475       }
476     }
477     while (num_remaining > 0) {
478       bool ok = false;
479       void* tag = nullptr;
480       cq.Next(&tag, &ok);
481       Channel* channel = static_cast<Channel*>(tag);
482       if (!ok) {
483         gpr_log(GPR_ERROR, "Channel %p failed to connect within the deadline",
484                 channel);
485         abort();
486       } else {
487         grpc_connectivity_state last_observed = channel->GetState(true);
488         if (last_observed == GRPC_CHANNEL_READY) {
489           gpr_log(GPR_INFO, "Channel %p connected!", channel);
490           num_remaining--;
491         } else {
492           channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
493                                        channel);
494         }
495       }
496     }
497   }
498 
499  protected:
500   const int cores_;
501   RequestType request_;
502 
503   class ClientChannelInfo {
504    public:
ClientChannelInfo(const std::string & target,const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub,int shard)505     ClientChannelInfo(
506         const std::string& target, const ClientConfig& config,
507         std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
508             create_stub,
509         int shard) {
510       ChannelArguments args;
511       args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
512       set_channel_args(config, &args);
513 
514       std::string type;
515       if (config.has_security_params() &&
516           config.security_params().cred_type().empty()) {
517         type = kTlsCredentialsType;
518       } else {
519         type = config.security_params().cred_type();
520       }
521 
522       std::string inproc_pfx(INPROC_NAME_PREFIX);
523       if (target.find(inproc_pfx) != 0) {
524         channel_ = CreateTestChannel(
525             target, type, config.security_params().server_host_override(),
526             !config.security_params().use_test_ca(),
527             std::shared_ptr<CallCredentials>(), args);
528         gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
529         is_inproc_ = false;
530       } else {
531         std::string tgt = target;
532         tgt.erase(0, inproc_pfx.length());
533         int srv_num = std::stoi(tgt);
534         channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args);
535         is_inproc_ = true;
536       }
537       stub_ = create_stub(channel_);
538     }
get_channel()539     Channel* get_channel() { return channel_.get(); }
get_stub()540     StubType* get_stub() { return stub_.get(); }
is_inproc()541     bool is_inproc() { return is_inproc_; }
542 
543    private:
set_channel_args(const ClientConfig & config,ChannelArguments * args)544     void set_channel_args(const ClientConfig& config, ChannelArguments* args) {
545       for (const auto& channel_arg : config.channel_args()) {
546         if (channel_arg.value_case() == ChannelArg::kStrValue) {
547           args->SetString(channel_arg.name(), channel_arg.str_value());
548         } else if (channel_arg.value_case() == ChannelArg::kIntValue) {
549           args->SetInt(channel_arg.name(), channel_arg.int_value());
550         } else {
551           gpr_log(GPR_ERROR, "Empty channel arg value.");
552         }
553       }
554     }
555 
556     std::shared_ptr<Channel> channel_;
557     std::unique_ptr<StubType> stub_;
558     bool is_inproc_;
559   };
560   std::vector<ClientChannelInfo> channels_;
561   std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
562       create_stub_;
563 };
564 
565 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args);
566 std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args);
567 std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& args);
568 std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
569     const ClientConfig& args);
570 
571 }  // namespace testing
572 }  // namespace grpc
573 
574 #endif
575