• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpc/support/time.h>
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/server.h>
25 #include <grpcpp/server_builder.h>
26 
27 #include <chrono>
28 #include <memory>
29 #include <mutex>
30 #include <sstream>
31 #include <string>
32 #include <thread>
33 #include <vector>
34 
35 #include "absl/log/check.h"
36 #include "absl/log/log.h"
37 #include "src/core/util/crash.h"
38 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
39 #include "test/cpp/qps/client.h"
40 #include "test/cpp/qps/interarrival.h"
41 #include "test/cpp/qps/usage_timer.h"
42 
43 namespace grpc {
44 namespace testing {
45 
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)46 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
47     const std::shared_ptr<Channel>& ch) {
48   return BenchmarkService::NewStub(ch);
49 }
50 
51 class SynchronousClient
52     : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
53  public:
SynchronousClient(const ClientConfig & config)54   explicit SynchronousClient(const ClientConfig& config)
55       : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
56             config, BenchmarkStubCreator) {
57     num_threads_ =
58         config.outstanding_rpcs_per_channel() * config.client_channels();
59     responses_.resize(num_threads_);
60     SetupLoadTest(config, num_threads_);
61   }
62 
~SynchronousClient()63   ~SynchronousClient() override {}
64 
65   virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
66   virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
67 
ThreadFunc(size_t thread_idx,Thread * t)68   void ThreadFunc(size_t thread_idx, Thread* t) override {
69     if (!InitThreadFuncImpl(thread_idx)) {
70       return;
71     }
72     for (;;) {
73       // run the loop body
74       HistogramEntry entry;
75       const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
76       t->UpdateHistogram(&entry);
77       if (!thread_still_ok || ThreadCompleted()) {
78         return;
79       }
80     }
81   }
82 
83  protected:
84   // WaitToIssue returns false if we realize that we need to break out
WaitToIssue(int thread_idx)85   bool WaitToIssue(int thread_idx) {
86     if (!closed_loop_) {
87       const gpr_timespec next_issue_time = NextIssueTime(thread_idx);
88       // Avoid sleeping for too long continuously because we might
89       // need to terminate before then. This is an issue since
90       // exponential distribution can occasionally produce bad outliers
91       while (true) {
92         const gpr_timespec one_sec_delay =
93             gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
94                          gpr_time_from_seconds(1, GPR_TIMESPAN));
95         if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) {
96           gpr_sleep_until(next_issue_time);
97           return true;
98         } else {
99           gpr_sleep_until(one_sec_delay);
100           if (gpr_atm_acq_load(&thread_pool_done_) != gpr_atm{0}) {
101             return false;
102           }
103         }
104       }
105     }
106     return true;
107   }
108 
109   size_t num_threads_;
110   std::vector<SimpleResponse> responses_;
111 };
112 
113 class SynchronousUnaryClient final : public SynchronousClient {
114  public:
SynchronousUnaryClient(const ClientConfig & config)115   explicit SynchronousUnaryClient(const ClientConfig& config)
116       : SynchronousClient(config) {
117     StartThreads(num_threads_);
118   }
~SynchronousUnaryClient()119   ~SynchronousUnaryClient() override {}
120 
InitThreadFuncImpl(size_t)121   bool InitThreadFuncImpl(size_t /*thread_idx*/) override { return true; }
122 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)123   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
124     if (!WaitToIssue(thread_idx)) {
125       return true;
126     }
127     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
128     double start = UsageTimer::Now();
129     grpc::ClientContext context;
130     grpc::Status s =
131         stub->UnaryCall(&context, request_, &responses_[thread_idx]);
132     if (s.ok()) {
133       entry->set_value((UsageTimer::Now() - start) * 1e9);
134     }
135     entry->set_status(s.error_code());
136     return true;
137   }
138 
139  private:
DestroyMultithreading()140   void DestroyMultithreading() final { EndThreads(); }
141 };
142 
143 template <class StreamType>
144 class SynchronousStreamingClient : public SynchronousClient {
145  public:
SynchronousStreamingClient(const ClientConfig & config)146   explicit SynchronousStreamingClient(const ClientConfig& config)
147       : SynchronousClient(config),
148         context_(num_threads_),
149         stream_(num_threads_),
150         stream_mu_(num_threads_),
151         shutdown_(num_threads_),
152         messages_per_stream_(config.messages_per_stream()),
153         messages_issued_(num_threads_) {
154     StartThreads(num_threads_);
155   }
~SynchronousStreamingClient()156   ~SynchronousStreamingClient() override {
157     CleanupAllStreams([this](size_t thread_idx) {
158       // Don't log any kind of error since we may have canceled this
159       stream_[thread_idx]->Finish().IgnoreError();
160     });
161   }
162 
163  protected:
164   std::vector<grpc::ClientContext> context_;
165   std::vector<std::unique_ptr<StreamType>> stream_;
166   // stream_mu_ is only needed when changing an element of stream_ or context_
167   std::vector<std::mutex> stream_mu_;
168   // use struct Bool rather than bool because vector<bool> is not concurrent
169   struct Bool {
170     bool val;
Boolgrpc::testing::SynchronousStreamingClient::Bool171     Bool() : val(false) {}
172   };
173   std::vector<Bool> shutdown_;
174   const int messages_per_stream_;
175   std::vector<int> messages_issued_;
176 
FinishStream(HistogramEntry * entry,size_t thread_idx)177   void FinishStream(HistogramEntry* entry, size_t thread_idx) {
178     Status s = stream_[thread_idx]->Finish();
179     // don't set the value since the stream is failed and shouldn't be timed
180     entry->set_status(s.error_code());
181     if (!s.ok()) {
182       std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
183       if (!shutdown_[thread_idx].val) {
184         LOG(ERROR) << "Stream " << thread_idx << " received an error "
185                    << s.error_message();
186       }
187     }
188     // Lock the stream_mu_ now because the client context could change
189     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
190     context_[thread_idx].~ClientContext();
191     new (&context_[thread_idx]) ClientContext();
192   }
193 
CleanupAllStreams(const std::function<void (size_t)> & cleaner)194   void CleanupAllStreams(const std::function<void(size_t)>& cleaner) {
195     std::vector<std::thread> cleanup_threads;
196     for (size_t i = 0; i < num_threads_; i++) {
197       cleanup_threads.emplace_back([this, i, cleaner] {
198         std::lock_guard<std::mutex> l(stream_mu_[i]);
199         shutdown_[i].val = true;
200         if (stream_[i]) {
201           cleaner(i);
202         }
203       });
204     }
205     for (auto& th : cleanup_threads) {
206       th.join();
207     }
208   }
209 
210  private:
DestroyMultithreading()211   void DestroyMultithreading() final {
212     CleanupAllStreams(
213         [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
214     EndThreads();
215   }
216 };
217 
218 class SynchronousStreamingPingPongClient final
219     : public SynchronousStreamingClient<
220           grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
221  public:
SynchronousStreamingPingPongClient(const ClientConfig & config)222   explicit SynchronousStreamingPingPongClient(const ClientConfig& config)
223       : SynchronousStreamingClient(config) {}
~SynchronousStreamingPingPongClient()224   ~SynchronousStreamingPingPongClient() override {
225     CleanupAllStreams(
226         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
227   }
228 
229  private:
InitThreadFuncImpl(size_t thread_idx)230   bool InitThreadFuncImpl(size_t thread_idx) override {
231     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
232     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
233     if (!shutdown_[thread_idx].val) {
234       stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
235     } else {
236       return false;
237     }
238     messages_issued_[thread_idx] = 0;
239     return true;
240   }
241 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)242   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
243     if (!WaitToIssue(thread_idx)) {
244       return true;
245     }
246     double start = UsageTimer::Now();
247     if (stream_[thread_idx]->Write(request_) &&
248         stream_[thread_idx]->Read(&responses_[thread_idx])) {
249       entry->set_value((UsageTimer::Now() - start) * 1e9);
250       // don't set the status since there isn't one yet
251       if ((messages_per_stream_ != 0) &&
252           (++messages_issued_[thread_idx] < messages_per_stream_)) {
253         return true;
254       } else if (messages_per_stream_ == 0) {
255         return true;
256       } else {
257         // Fall through to the below resetting code after finish
258       }
259     }
260     stream_[thread_idx]->WritesDone();
261     FinishStream(entry, thread_idx);
262     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
263     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
264     if (!shutdown_[thread_idx].val) {
265       stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
266     } else {
267       stream_[thread_idx].reset();
268       return false;
269     }
270     messages_issued_[thread_idx] = 0;
271     return true;
272   }
273 };
274 
275 class SynchronousStreamingFromClientClient final
276     : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
277  public:
SynchronousStreamingFromClientClient(const ClientConfig & config)278   explicit SynchronousStreamingFromClientClient(const ClientConfig& config)
279       : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient()280   ~SynchronousStreamingFromClientClient() override {
281     CleanupAllStreams(
282         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
283   }
284 
285  private:
286   std::vector<double> last_issue_;
287 
InitThreadFuncImpl(size_t thread_idx)288   bool InitThreadFuncImpl(size_t thread_idx) override {
289     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
290     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
291     if (!shutdown_[thread_idx].val) {
292       stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
293                                                       &responses_[thread_idx]);
294     } else {
295       return false;
296     }
297     last_issue_[thread_idx] = UsageTimer::Now();
298     return true;
299   }
300 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)301   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
302     // Figure out how to make histogram sensible if this is rate-paced
303     if (!WaitToIssue(thread_idx)) {
304       return true;
305     }
306     if (stream_[thread_idx]->Write(request_)) {
307       double now = UsageTimer::Now();
308       entry->set_value((now - last_issue_[thread_idx]) * 1e9);
309       last_issue_[thread_idx] = now;
310       return true;
311     }
312     stream_[thread_idx]->WritesDone();
313     FinishStream(entry, thread_idx);
314     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
315     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
316     if (!shutdown_[thread_idx].val) {
317       stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
318                                                       &responses_[thread_idx]);
319     } else {
320       stream_[thread_idx].reset();
321       return false;
322     }
323     return true;
324   }
325 };
326 
327 class SynchronousStreamingFromServerClient final
328     : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
329  public:
SynchronousStreamingFromServerClient(const ClientConfig & config)330   explicit SynchronousStreamingFromServerClient(const ClientConfig& config)
331       : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
~SynchronousStreamingFromServerClient()332   ~SynchronousStreamingFromServerClient() override {}
333 
334  private:
335   std::vector<double> last_recv_;
336 
InitThreadFuncImpl(size_t thread_idx)337   bool InitThreadFuncImpl(size_t thread_idx) override {
338     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
339     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
340     if (!shutdown_[thread_idx].val) {
341       stream_[thread_idx] =
342           stub->StreamingFromServer(&context_[thread_idx], request_);
343     } else {
344       return false;
345     }
346     last_recv_[thread_idx] = UsageTimer::Now();
347     return true;
348   }
349 
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)350   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
351     if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
352       double now = UsageTimer::Now();
353       entry->set_value((now - last_recv_[thread_idx]) * 1e9);
354       last_recv_[thread_idx] = now;
355       return true;
356     }
357     FinishStream(entry, thread_idx);
358     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
359     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
360     if (!shutdown_[thread_idx].val) {
361       stream_[thread_idx] =
362           stub->StreamingFromServer(&context_[thread_idx], request_);
363     } else {
364       stream_[thread_idx].reset();
365       return false;
366     }
367     return true;
368   }
369 };
370 
371 class SynchronousStreamingBothWaysClient final
372     : public SynchronousStreamingClient<
373           grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
374  public:
SynchronousStreamingBothWaysClient(const ClientConfig & config)375   explicit SynchronousStreamingBothWaysClient(const ClientConfig& config)
376       : SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient()377   ~SynchronousStreamingBothWaysClient() override {
378     CleanupAllStreams(
379         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
380   }
381 
382  private:
InitThreadFuncImpl(size_t thread_idx)383   bool InitThreadFuncImpl(size_t thread_idx) override {
384     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
385     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
386     if (!shutdown_[thread_idx].val) {
387       stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
388     } else {
389       return false;
390     }
391     return true;
392   }
393 
ThreadFuncImpl(HistogramEntry *,size_t)394   bool ThreadFuncImpl(HistogramEntry* /*entry*/,
395                       size_t /*thread_idx*/) override {
396     // TODO (vjpai): Do this
397     return true;
398   }
399 };
400 
CreateSynchronousClient(const ClientConfig & config)401 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
402   CHECK(!config.use_coalesce_api());  // not supported yet.
403   switch (config.rpc_type()) {
404     case UNARY:
405       return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
406     case STREAMING:
407       return std::unique_ptr<Client>(
408           new SynchronousStreamingPingPongClient(config));
409     case STREAMING_FROM_CLIENT:
410       return std::unique_ptr<Client>(
411           new SynchronousStreamingFromClientClient(config));
412     case STREAMING_FROM_SERVER:
413       return std::unique_ptr<Client>(
414           new SynchronousStreamingFromServerClient(config));
415     case STREAMING_BOTH_WAYS:
416       return std::unique_ptr<Client>(
417           new SynchronousStreamingBothWaysClient(config));
418     default:
419       assert(false);
420       return nullptr;
421   }
422 }
423 
424 }  // namespace testing
425 }  // namespace grpc
426