• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_TEST_CPP_QPS_CLIENT_H
20 #define GRPC_TEST_CPP_QPS_CLIENT_H
21 
22 #include <grpc/support/time.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/support/byte_buffer.h>
25 #include <grpcpp/support/channel_arguments.h>
26 #include <grpcpp/support/slice.h>
27 #include <inttypes.h>
28 #include <stdint.h>
29 #include <stdlib.h>
30 
31 #include <condition_variable>
32 #include <mutex>
33 #include <thread>
34 #include <unordered_map>
35 #include <vector>
36 
37 #include "absl/log/log.h"
38 #include "absl/memory/memory.h"
39 #include "absl/strings/match.h"
40 #include "absl/strings/str_format.h"
41 #include "src/core/util/crash.h"
42 #include "src/core/util/env.h"
43 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
44 #include "src/proto/grpc/testing/payloads.pb.h"
45 #include "test/cpp/qps/histogram.h"
46 #include "test/cpp/qps/interarrival.h"
47 #include "test/cpp/qps/qps_worker.h"
48 #include "test/cpp/qps/server.h"
49 #include "test/cpp/qps/usage_timer.h"
50 #include "test/cpp/util/create_test_channel.h"
51 #include "test/cpp/util/test_credentials_provider.h"
52 
53 #define INPROC_NAME_PREFIX "qpsinproc:"
54 
55 namespace grpc {
56 namespace testing {
57 
58 template <class RequestType>
59 class ClientRequestCreator {
60  public:
ClientRequestCreator(RequestType *,const PayloadConfig &)61   ClientRequestCreator(RequestType* /*req*/, const PayloadConfig&) {
62     // this template must be specialized
63     // fail with an assertion rather than a compile-time
64     // check since these only happen at the beginning anyway
65     grpc_core::Crash("unreachable");
66   }
67 };
68 
69 template <>
70 class ClientRequestCreator<SimpleRequest> {
71  public:
ClientRequestCreator(SimpleRequest * req,const PayloadConfig & payload_config)72   ClientRequestCreator(SimpleRequest* req,
73                        const PayloadConfig& payload_config) {
74     if (payload_config.has_bytebuf_params()) {
75       grpc_core::Crash(absl::StrFormat(
76           "Invalid PayloadConfig, config cannot have bytebuf_params: %s",
77           payload_config.DebugString()
78               .c_str()));  // not appropriate for this specialization
79     } else if (payload_config.has_simple_params()) {
80       req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
81       req->set_response_size(payload_config.simple_params().resp_size());
82       req->mutable_payload()->set_type(
83           grpc::testing::PayloadType::COMPRESSABLE);
84       int size = payload_config.simple_params().req_size();
85       std::unique_ptr<char[]> body(new char[size]);
86       req->mutable_payload()->set_body(body.get(), size);
87     } else if (payload_config.has_complex_params()) {
88       grpc_core::Crash(absl::StrFormat(
89           "Invalid PayloadConfig, cannot have complex_params: %s",
90           payload_config.DebugString()
91               .c_str()));  // not appropriate for this specialization
92     } else {
93       // default should be simple proto without payloads
94       req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
95       req->set_response_size(0);
96       req->mutable_payload()->set_type(
97           grpc::testing::PayloadType::COMPRESSABLE);
98     }
99   }
100 };
101 
102 template <>
103 class ClientRequestCreator<ByteBuffer> {
104  public:
ClientRequestCreator(ByteBuffer * req,const PayloadConfig & payload_config)105   ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) {
106     if (payload_config.has_bytebuf_params()) {
107       size_t req_sz =
108           static_cast<size_t>(payload_config.bytebuf_params().req_size());
109       std::unique_ptr<char[]> buf(new char[req_sz]);
110       memset(buf.get(), 0, req_sz);
111       Slice slice(buf.get(), req_sz);
112       *req = ByteBuffer(&slice, 1);
113     } else {
114       grpc_core::Crash(absl::StrFormat(
115           "Invalid PayloadConfig, missing bytebug_params: %s",
116           payload_config.DebugString()
117               .c_str()));  // not appropriate for this specialization
118     }
119   }
120 };
121 
122 class HistogramEntry final {
123  public:
HistogramEntry()124   HistogramEntry() : value_used_(false), status_used_(false) {}
value_used()125   bool value_used() const { return value_used_; }
value()126   double value() const { return value_; }
set_value(double v)127   void set_value(double v) {
128     value_used_ = true;
129     value_ = v;
130   }
status_used()131   bool status_used() const { return status_used_; }
status()132   int status() const { return status_; }
set_status(int status)133   void set_status(int status) {
134     status_used_ = true;
135     status_ = status;
136   }
137 
138  private:
139   bool value_used_;
140   double value_;
141   bool status_used_;
142   int status_;
143 };
144 
145 typedef std::unordered_map<int, int64_t> StatusHistogram;
146 
MergeStatusHistogram(const StatusHistogram & from,StatusHistogram * to)147 inline void MergeStatusHistogram(const StatusHistogram& from,
148                                  StatusHistogram* to) {
149   for (StatusHistogram::const_iterator it = from.begin(); it != from.end();
150        ++it) {
151     (*to)[it->first] += it->second;
152   }
153 }
154 
155 class Client {
156  public:
Client()157   Client()
158       : timer_(new UsageTimer),
159         interarrival_timer_(),
160         started_requests_(false),
161         last_reset_poll_count_(0) {
162     gpr_event_init(&start_requests_);
163   }
~Client()164   virtual ~Client() {}
165 
Mark(bool reset)166   ClientStats Mark(bool reset) {
167     Histogram latencies;
168     StatusHistogram statuses;
169     UsageTimer::Result timer_result;
170 
171     MaybeStartRequests();
172 
173     int cur_poll_count = GetPollCount();
174     int poll_count = cur_poll_count - last_reset_poll_count_;
175     if (reset) {
176       std::vector<Histogram> to_merge(threads_.size());
177       std::vector<StatusHistogram> to_merge_status(threads_.size());
178 
179       for (size_t i = 0; i < threads_.size(); i++) {
180         threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]);
181       }
182       std::unique_ptr<UsageTimer> timer(new UsageTimer);
183       timer_.swap(timer);
184       for (size_t i = 0; i < threads_.size(); i++) {
185         latencies.Merge(to_merge[i]);
186         MergeStatusHistogram(to_merge_status[i], &statuses);
187       }
188       timer_result = timer->Mark();
189       last_reset_poll_count_ = cur_poll_count;
190     } else {
191       // merge snapshots of each thread histogram
192       for (size_t i = 0; i < threads_.size(); i++) {
193         threads_[i]->MergeStatsInto(&latencies, &statuses);
194       }
195       timer_result = timer_->Mark();
196     }
197 
198     // Print the median latency per interval for one thread.
199     // If the number of warmup seconds is x, then the first x + 1 numbers in the
200     // vector are from the warmup period and should be discarded.
201     if (median_latency_collection_interval_seconds_ > 0) {
202       std::vector<double> medians_per_interval =
203           threads_[0]->GetMedianPerIntervalList();
204       LOG(INFO) << "Num threads: " << threads_.size();
205       LOG(INFO) << "Number of medians: " << medians_per_interval.size();
206       for (size_t j = 0; j < medians_per_interval.size(); j++) {
207         LOG(INFO) << medians_per_interval[j];
208       }
209     }
210 
211     ClientStats stats;
212     latencies.FillProto(stats.mutable_latencies());
213     for (StatusHistogram::const_iterator it = statuses.begin();
214          it != statuses.end(); ++it) {
215       RequestResultCount* rrc = stats.add_request_results();
216       rrc->set_status_code(it->first);
217       rrc->set_count(it->second);
218     }
219     stats.set_time_elapsed(timer_result.wall);
220     stats.set_time_system(timer_result.system);
221     stats.set_time_user(timer_result.user);
222     stats.set_cq_poll_count(poll_count);
223     return stats;
224   }
225 
226   // Must call AwaitThreadsCompletion before destructor to avoid a race
227   // between destructor and invocation of virtual ThreadFunc
AwaitThreadsCompletion()228   void AwaitThreadsCompletion() {
229     gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(true));
230     DestroyMultithreading();
231     std::unique_lock<std::mutex> g(thread_completion_mu_);
232     while (threads_remaining_ != 0) {
233       threads_complete_.wait(g);
234     }
235   }
236 
237   // Returns the interval (in seconds) between collecting latency medians. If 0,
238   // no periodic median latencies will be collected.
GetLatencyCollectionIntervalInSeconds()239   double GetLatencyCollectionIntervalInSeconds() {
240     return median_latency_collection_interval_seconds_;
241   }
242 
GetPollCount()243   virtual int GetPollCount() {
244     // For sync client.
245     return 0;
246   }
247 
IsClosedLoop()248   bool IsClosedLoop() { return closed_loop_; }
249 
NextIssueTime(int thread_idx)250   gpr_timespec NextIssueTime(int thread_idx) {
251     const gpr_timespec result = next_time_[thread_idx];
252     next_time_[thread_idx] =
253         gpr_time_add(next_time_[thread_idx],
254                      gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
255                                          GPR_TIMESPAN));
256     return result;
257   }
258 
ThreadCompleted()259   bool ThreadCompleted() {
260     return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
261   }
262 
263   class Thread {
264    public:
Thread(Client * client,size_t idx)265     Thread(Client* client, size_t idx)
266         : client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {}
267 
~Thread()268     ~Thread() { impl_.join(); }
269 
BeginSwap(Histogram * n,StatusHistogram * s)270     void BeginSwap(Histogram* n, StatusHistogram* s) {
271       std::lock_guard<std::mutex> g(mu_);
272       n->Swap(&histogram_);
273       s->swap(statuses_);
274     }
275 
MergeStatsInto(Histogram * hist,StatusHistogram * s)276     void MergeStatsInto(Histogram* hist, StatusHistogram* s) {
277       std::unique_lock<std::mutex> g(mu_);
278       hist->Merge(histogram_);
279       MergeStatusHistogram(statuses_, s);
280     }
281 
GetMedianPerIntervalList()282     std::vector<double> GetMedianPerIntervalList() {
283       return medians_each_interval_list_;
284     }
285 
UpdateHistogram(HistogramEntry * entry)286     void UpdateHistogram(HistogramEntry* entry) {
287       std::lock_guard<std::mutex> g(mu_);
288       if (entry->value_used()) {
289         histogram_.Add(entry->value());
290         if (client_->GetLatencyCollectionIntervalInSeconds() > 0) {
291           histogram_per_interval_.Add(entry->value());
292           double now = UsageTimer::Now();
293           if ((now - interval_start_time_) >=
294               client_->GetLatencyCollectionIntervalInSeconds()) {
295             // Record the median latency of requests from the last interval.
296             // Divide by 1e3 to get microseconds.
297             medians_each_interval_list_.push_back(
298                 histogram_per_interval_.Percentile(50) / 1e3);
299             histogram_per_interval_.Reset();
300             interval_start_time_ = now;
301           }
302         }
303       }
304       if (entry->status_used()) {
305         statuses_[entry->status()]++;
306       }
307     }
308 
309    private:
310     Thread(const Thread&);
311     Thread& operator=(const Thread&);
312 
ThreadFunc()313     void ThreadFunc() {
314       int wait_loop = 0;
315       while (!gpr_event_wait(
316           &client_->start_requests_,
317           gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
318                        gpr_time_from_seconds(20, GPR_TIMESPAN)))) {
319         LOG(INFO) << idx_ << ": Waiting for benchmark to start (" << wait_loop
320                   << ")";
321         wait_loop++;
322       }
323 
324       client_->ThreadFunc(idx_, this);
325       client_->CompleteThread();
326     }
327 
328     std::mutex mu_;
329     Histogram histogram_;
330     StatusHistogram statuses_;
331     Client* client_;
332     const size_t idx_;
333     std::thread impl_;
334     // The following are used only if
335     // median_latency_collection_interval_seconds_ is greater than 0
336     Histogram histogram_per_interval_;
337     std::vector<double> medians_each_interval_list_;
338     double interval_start_time_;
339   };
340 
341  protected:
342   bool closed_loop_;
343   gpr_atm thread_pool_done_;
344   double median_latency_collection_interval_seconds_;  // In seconds
345 
StartThreads(size_t num_threads)346   void StartThreads(size_t num_threads) {
347     gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
348     threads_remaining_ = num_threads;
349     for (size_t i = 0; i < num_threads; i++) {
350       threads_.emplace_back(new Thread(this, i));
351     }
352   }
353 
EndThreads()354   void EndThreads() {
355     MaybeStartRequests();
356     threads_.clear();
357   }
358 
359   virtual void DestroyMultithreading() = 0;
360 
SetupLoadTest(const ClientConfig & config,size_t num_threads)361   void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
362     // Set up the load distribution based on the number of threads
363     const auto& load = config.load_params();
364 
365     std::unique_ptr<RandomDistInterface> random_dist;
366     switch (load.load_case()) {
367       case LoadParams::kClosedLoop:
368         // Closed-loop doesn't use random dist at all
369         break;
370       case LoadParams::kPoisson:
371         random_dist = std::make_unique<ExpDist>(load.poisson().offered_load() /
372                                                 num_threads);
373         break;
374       default:
375         grpc_core::Crash("unreachable");
376     }
377 
378     // Set closed_loop_ based on whether or not random_dist is set
379     if (!random_dist) {
380       closed_loop_ = true;
381     } else {
382       closed_loop_ = false;
383       // set up interarrival timer according to random dist
384       interarrival_timer_.init(*random_dist, num_threads);
385       const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
386       for (size_t i = 0; i < num_threads; i++) {
387         next_time_.push_back(gpr_time_add(
388             now,
389             gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
390       }
391     }
392   }
393 
NextIssuer(int thread_idx)394   std::function<gpr_timespec()> NextIssuer(int thread_idx) {
395     return closed_loop_ ? std::function<gpr_timespec()>()
396                         : std::bind(&Client::NextIssueTime, this, thread_idx);
397   }
398 
399   virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
400 
401   std::vector<std::unique_ptr<Thread>> threads_;
402   std::unique_ptr<UsageTimer> timer_;
403 
404   InterarrivalTimer interarrival_timer_;
405   std::vector<gpr_timespec> next_time_;
406 
407   std::mutex thread_completion_mu_;
408   size_t threads_remaining_;
409   std::condition_variable threads_complete_;
410 
411   gpr_event start_requests_;
412   bool started_requests_;
413 
414   int last_reset_poll_count_;
415 
MaybeStartRequests()416   void MaybeStartRequests() {
417     if (!started_requests_) {
418       started_requests_ = true;
419       gpr_event_set(&start_requests_, reinterpret_cast<void*>(1));
420     }
421   }
422 
CompleteThread()423   void CompleteThread() {
424     std::lock_guard<std::mutex> g(thread_completion_mu_);
425     threads_remaining_--;
426     if (threads_remaining_ == 0) {
427       threads_complete_.notify_all();
428     }
429   }
430 };
431 
432 template <class StubType, class RequestType>
433 class ClientImpl : public Client {
434  public:
ClientImpl(const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub)435   ClientImpl(const ClientConfig& config,
436              std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
437                  create_stub)
438       : cores_(gpr_cpu_num_cores()), create_stub_(create_stub) {
439     for (int i = 0; i < config.client_channels(); i++) {
440       channels_.emplace_back(
441           config.server_targets(i % config.server_targets_size()), config,
442           create_stub_, i);
443     }
444     WaitForChannelsToConnect();
445     median_latency_collection_interval_seconds_ =
446         config.median_latency_collection_interval_millis() / 1e3;
447     ClientRequestCreator<RequestType> create_req(&request_,
448                                                  config.payload_config());
449   }
~ClientImpl()450   ~ClientImpl() override {}
request()451   const RequestType* request() { return &request_; }
452 
WaitForChannelsToConnect()453   void WaitForChannelsToConnect() {
454     int connect_deadline_seconds = 10;
455     // Allow optionally overriding connect_deadline in order
456     // to deal with benchmark environments in which the server
457     // can take a long time to become ready.
458     auto channel_connect_timeout_str =
459         grpc_core::GetEnv("QPS_WORKER_CHANNEL_CONNECT_TIMEOUT");
460     if (channel_connect_timeout_str.has_value() &&
461         !channel_connect_timeout_str->empty()) {
462       connect_deadline_seconds = atoi(channel_connect_timeout_str->c_str());
463     }
464     LOG(INFO) << "Waiting for up to " << connect_deadline_seconds
465               << " seconds for all channels to connect";
466     gpr_timespec connect_deadline = gpr_time_add(
467         gpr_now(GPR_CLOCK_REALTIME),
468         gpr_time_from_seconds(connect_deadline_seconds, GPR_TIMESPAN));
469     CompletionQueue cq;
470     size_t num_remaining = 0;
471     for (auto& c : channels_) {
472       if (!c.is_inproc()) {
473         Channel* channel = c.get_channel();
474         grpc_connectivity_state last_observed = channel->GetState(true);
475         if (last_observed == GRPC_CHANNEL_READY) {
476           LOG(INFO) << "Channel " << channel << " connected!";
477         } else {
478           num_remaining++;
479           channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
480                                        channel);
481         }
482       }
483     }
484     while (num_remaining > 0) {
485       bool ok = false;
486       void* tag = nullptr;
487       cq.Next(&tag, &ok);
488       Channel* channel = static_cast<Channel*>(tag);
489       if (!ok) {
490         grpc_core::Crash(absl::StrFormat(
491             "Channel %p failed to connect within the deadline", channel));
492       } else {
493         grpc_connectivity_state last_observed = channel->GetState(true);
494         if (last_observed == GRPC_CHANNEL_READY) {
495           LOG(INFO) << "Channel " << channel << " connected!";
496           num_remaining--;
497         } else {
498           channel->NotifyOnStateChange(last_observed, connect_deadline, &cq,
499                                        channel);
500         }
501       }
502     }
503   }
504 
505  protected:
506   const int cores_;
507   RequestType request_;
508 
509   class ClientChannelInfo {
510    public:
ClientChannelInfo(const std::string & target,const ClientConfig & config,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub,int shard)511     ClientChannelInfo(
512         const std::string& target, const ClientConfig& config,
513         std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
514             create_stub,
515         int shard) {
516       ChannelArguments args;
517       args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
518       set_channel_args(config, &args);
519 
520       std::string type;
521       if (config.has_security_params() &&
522           config.security_params().cred_type().empty()) {
523         type = kTlsCredentialsType;
524       } else {
525         type = config.security_params().cred_type();
526       }
527 
528       std::string inproc_pfx(INPROC_NAME_PREFIX);
529       if (!absl::StartsWith(target, inproc_pfx)) {
530         channel_ = CreateTestChannel(
531             target, type, config.security_params().server_host_override(),
532             !config.security_params().use_test_ca(),
533             std::shared_ptr<CallCredentials>(), args);
534         LOG(INFO) << "Connecting to " << target;
535         is_inproc_ = false;
536       } else {
537         std::string tgt = target;
538         tgt.erase(0, inproc_pfx.length());
539         int srv_num = std::stoi(tgt);
540         channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args);
541         is_inproc_ = true;
542       }
543       stub_ = create_stub(channel_);
544     }
get_channel()545     Channel* get_channel() { return channel_.get(); }
get_stub()546     StubType* get_stub() { return stub_.get(); }
is_inproc()547     bool is_inproc() { return is_inproc_; }
548 
549    private:
set_channel_args(const ClientConfig & config,ChannelArguments * args)550     void set_channel_args(const ClientConfig& config, ChannelArguments* args) {
551       for (const auto& channel_arg : config.channel_args()) {
552         if (channel_arg.value_case() == ChannelArg::kStrValue) {
553           args->SetString(channel_arg.name(), channel_arg.str_value());
554         } else if (channel_arg.value_case() == ChannelArg::kIntValue) {
555           args->SetInt(channel_arg.name(), channel_arg.int_value());
556         } else {
557           LOG(ERROR) << "Empty channel arg value.";
558         }
559       }
560     }
561 
562     std::shared_ptr<Channel> channel_;
563     std::unique_ptr<StubType> stub_;
564     bool is_inproc_;
565   };
566   std::vector<ClientChannelInfo> channels_;
567   std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
568       create_stub_;
569 };
570 
571 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config);
572 std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config);
573 std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config);
574 std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
575     const ClientConfig& config);
576 
577 }  // namespace testing
578 }  // namespace grpc
579 
580 #endif  // GRPC_TEST_CPP_QPS_CLIENT_H
581