• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <forward_list>
20 #include <functional>
21 #include <list>
22 #include <memory>
23 #include <mutex>
24 #include <sstream>
25 #include <string>
26 #include <thread>
27 #include <utility>
28 #include <vector>
29 
30 #include <grpc/grpc.h>
31 #include <grpc/support/cpu.h>
32 #include <grpc/support/log.h>
33 #include <grpcpp/alarm.h>
34 #include <grpcpp/channel.h>
35 #include <grpcpp/client_context.h>
36 #include <grpcpp/generic/generic_stub.h>
37 
38 #include "src/core/lib/surface/completion_queue.h"
39 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
40 #include "test/cpp/qps/client.h"
41 #include "test/cpp/qps/usage_timer.h"
42 #include "test/cpp/util/create_test_channel.h"
43 
44 namespace grpc {
45 namespace testing {
46 
47 class ClientRpcContext {
48  public:
ClientRpcContext()49   ClientRpcContext() {}
~ClientRpcContext()50   virtual ~ClientRpcContext() {}
51   // next state, return false if done. Collect stats when appropriate
52   virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
53   virtual void StartNewClone(CompletionQueue* cq) = 0;
tag(ClientRpcContext * c)54   static void* tag(ClientRpcContext* c) { return static_cast<void*>(c); }
detag(void * t)55   static ClientRpcContext* detag(void* t) {
56     return static_cast<ClientRpcContext*>(t);
57   }
58 
59   virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
60   virtual void TryCancel() = 0;
61 };
62 
63 template <class RequestType, class ResponseType>
64 class ClientRpcContextUnaryImpl : public ClientRpcContext {
65  public:
ClientRpcContextUnaryImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> (BenchmarkService::Stub *,grpc::ClientContext *,const RequestType &,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *,HistogramEntry *)> on_done)66   ClientRpcContextUnaryImpl(
67       BenchmarkService::Stub* stub, const RequestType& req,
68       std::function<gpr_timespec()> next_issue,
69       std::function<
70           std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
71               BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
72               CompletionQueue*)>
73           prepare_req,
74       std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done)
75       : context_(),
76         stub_(stub),
77         cq_(nullptr),
78         req_(req),
79         response_(),
80         next_state_(State::READY),
81         callback_(on_done),
82         next_issue_(std::move(next_issue)),
83         prepare_req_(prepare_req) {}
~ClientRpcContextUnaryImpl()84   ~ClientRpcContextUnaryImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)85   void Start(CompletionQueue* cq, const ClientConfig& config) override {
86     GPR_ASSERT(!config.use_coalesce_api());  // not supported.
87     StartInternal(cq);
88   }
RunNextState(bool,HistogramEntry * entry)89   bool RunNextState(bool /*ok*/, HistogramEntry* entry) override {
90     switch (next_state_) {
91       case State::READY:
92         start_ = UsageTimer::Now();
93         response_reader_ = prepare_req_(stub_, &context_, req_, cq_);
94         response_reader_->StartCall();
95         next_state_ = State::RESP_DONE;
96         response_reader_->Finish(&response_, &status_,
97                                  ClientRpcContext::tag(this));
98         return true;
99       case State::RESP_DONE:
100         if (status_.ok()) {
101           entry->set_value((UsageTimer::Now() - start_) * 1e9);
102         }
103         callback_(status_, &response_, entry);
104         next_state_ = State::INVALID;
105         return false;
106       default:
107         GPR_ASSERT(false);
108         return false;
109     }
110   }
StartNewClone(CompletionQueue * cq)111   void StartNewClone(CompletionQueue* cq) override {
112     auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
113                                                 prepare_req_, callback_);
114     clone->StartInternal(cq);
115   }
TryCancel()116   void TryCancel() override { context_.TryCancel(); }
117 
118  private:
119   grpc::ClientContext context_;
120   BenchmarkService::Stub* stub_;
121   CompletionQueue* cq_;
122   std::unique_ptr<Alarm> alarm_;
123   const RequestType& req_;
124   ResponseType response_;
125   enum State { INVALID, READY, RESP_DONE };
126   State next_state_;
127   std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> callback_;
128   std::function<gpr_timespec()> next_issue_;
129   std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
130       BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
131       CompletionQueue*)>
132       prepare_req_;
133   grpc::Status status_;
134   double start_;
135   std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
136       response_reader_;
137 
StartInternal(CompletionQueue * cq)138   void StartInternal(CompletionQueue* cq) {
139     cq_ = cq;
140     if (!next_issue_) {  // ready to issue
141       RunNextState(true, nullptr);
142     } else {  // wait for the issue time
143       alarm_.reset(new Alarm);
144       alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
145     }
146   }
147 };
148 
149 template <class StubType, class RequestType>
150 class AsyncClient : public ClientImpl<StubType, RequestType> {
151   // Specify which protected members we are using since there is no
152   // member name resolution until the template types are fully resolved
153  public:
154   using Client::closed_loop_;
155   using Client::NextIssuer;
156   using Client::SetupLoadTest;
157   using ClientImpl<StubType, RequestType>::cores_;
158   using ClientImpl<StubType, RequestType>::channels_;
159   using ClientImpl<StubType, RequestType>::request_;
AsyncClient(const ClientConfig & config,std::function<ClientRpcContext * (StubType *,std::function<gpr_timespec ()> next_issue,const RequestType &)> setup_ctx,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub)160   AsyncClient(const ClientConfig& config,
161               std::function<ClientRpcContext*(
162                   StubType*, std::function<gpr_timespec()> next_issue,
163                   const RequestType&)>
164                   setup_ctx,
165               std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
166                   create_stub)
167       : ClientImpl<StubType, RequestType>(config, create_stub),
168         num_async_threads_(NumThreads(config)) {
169     SetupLoadTest(config, num_async_threads_);
170 
171     int tpc = std::max(1, config.threads_per_cq());      // 1 if unspecified
172     int num_cqs = (num_async_threads_ + tpc - 1) / tpc;  // ceiling operator
173     for (int i = 0; i < num_cqs; i++) {
174       cli_cqs_.emplace_back(new CompletionQueue);
175     }
176 
177     for (int i = 0; i < num_async_threads_; i++) {
178       cq_.emplace_back(i % cli_cqs_.size());
179       next_issuers_.emplace_back(NextIssuer(i));
180       shutdown_state_.emplace_back(new PerThreadShutdownState());
181     }
182 
183     int t = 0;
184     for (int ch = 0; ch < config.client_channels(); ch++) {
185       for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
186         auto* cq = cli_cqs_[t].get();
187         auto ctx =
188             setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);
189         ctx->Start(cq, config);
190       }
191       t = (t + 1) % cli_cqs_.size();
192     }
193   }
~AsyncClient()194   virtual ~AsyncClient() {
195     for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
196       void* got_tag;
197       bool ok;
198       while ((*cq)->Next(&got_tag, &ok)) {
199         delete ClientRpcContext::detag(got_tag);
200       }
201     }
202   }
203 
GetPollCount()204   int GetPollCount() override {
205     int count = 0;
206     for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
207       count += grpc_get_cq_poll_num((*cq)->cq());
208     }
209     return count;
210   }
211 
212  protected:
213   const int num_async_threads_;
214 
215  private:
216   struct PerThreadShutdownState {
217     mutable std::mutex mutex;
218     bool shutdown;
PerThreadShutdownStategrpc::testing::AsyncClient::PerThreadShutdownState219     PerThreadShutdownState() : shutdown(false) {}
220   };
221 
NumThreads(const ClientConfig & config)222   int NumThreads(const ClientConfig& config) {
223     int num_threads = config.async_client_threads();
224     if (num_threads <= 0) {  // Use dynamic sizing
225       num_threads = cores_;
226       gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
227     }
228     return num_threads;
229   }
DestroyMultithreading()230   void DestroyMultithreading() override final {
231     for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
232       std::lock_guard<std::mutex> lock((*ss)->mutex);
233       (*ss)->shutdown = true;
234     }
235     for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
236       (*cq)->Shutdown();
237     }
238     this->EndThreads();  // this needed for resolution
239   }
240 
ProcessTag(size_t thread_idx,void * tag)241   ClientRpcContext* ProcessTag(size_t thread_idx, void* tag) {
242     ClientRpcContext* ctx = ClientRpcContext::detag(tag);
243     if (shutdown_state_[thread_idx]->shutdown) {
244       ctx->TryCancel();
245       delete ctx;
246       bool ok;
247       while (cli_cqs_[cq_[thread_idx]]->Next(&tag, &ok)) {
248         ctx = ClientRpcContext::detag(tag);
249         ctx->TryCancel();
250         delete ctx;
251       }
252       return nullptr;
253     }
254     return ctx;
255   }
256 
ThreadFunc(size_t thread_idx,Client::Thread * t)257   void ThreadFunc(size_t thread_idx, Client::Thread* t) override final {
258     void* got_tag;
259     bool ok;
260 
261     HistogramEntry entry;
262     HistogramEntry* entry_ptr = &entry;
263     if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
264       return;
265     }
266     std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
267     shutdown_mu->lock();
268     ClientRpcContext* ctx = ProcessTag(thread_idx, got_tag);
269     if (ctx == nullptr) {
270       shutdown_mu->unlock();
271       return;
272     }
273     while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
274         [&, ctx, ok, entry_ptr, shutdown_mu]() {
275           if (!ctx->RunNextState(ok, entry_ptr)) {
276             // The RPC and callback are done, so clone the ctx
277             // and kickstart the new one
278             ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
279             delete ctx;
280           }
281           shutdown_mu->unlock();
282         },
283         &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) {
284       t->UpdateHistogram(entry_ptr);
285       entry = HistogramEntry();
286       shutdown_mu->lock();
287       ctx = ProcessTag(thread_idx, got_tag);
288       if (ctx == nullptr) {
289         shutdown_mu->unlock();
290         return;
291       }
292     }
293   }
294 
295   std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
296   std::vector<int> cq_;
297   std::vector<std::function<gpr_timespec()>> next_issuers_;
298   std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
299 };
300 
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)301 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
302     const std::shared_ptr<Channel>& ch) {
303   return BenchmarkService::NewStub(ch);
304 }
305 
306 class AsyncUnaryClient final
307     : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
308  public:
AsyncUnaryClient(const ClientConfig & config)309   explicit AsyncUnaryClient(const ClientConfig& config)
310       : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
311             config, SetupCtx, BenchmarkStubCreator) {
312     StartThreads(num_async_threads_);
313   }
~AsyncUnaryClient()314   ~AsyncUnaryClient() override {}
315 
316  private:
CheckDone(const grpc::Status & s,SimpleResponse *,HistogramEntry * entry)317   static void CheckDone(const grpc::Status& s, SimpleResponse* /*response*/,
318                         HistogramEntry* entry) {
319     entry->set_status(s.error_code());
320   }
321   static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,const SimpleRequest & request,CompletionQueue * cq)322   PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
323              const SimpleRequest& request, CompletionQueue* cq) {
324     return stub->PrepareAsyncUnaryCall(ctx, request, cq);
325   };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)326   static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
327                                     std::function<gpr_timespec()> next_issue,
328                                     const SimpleRequest& req) {
329     return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
330         stub, req, std::move(next_issue), AsyncUnaryClient::PrepareReq,
331         AsyncUnaryClient::CheckDone);
332   }
333 };
334 
335 template <class RequestType, class ResponseType>
336 class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
337  public:
ClientRpcContextStreamingPingPongImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType,ResponseType>> (BenchmarkService::Stub *,grpc::ClientContext *,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *)> on_done)338   ClientRpcContextStreamingPingPongImpl(
339       BenchmarkService::Stub* stub, const RequestType& req,
340       std::function<gpr_timespec()> next_issue,
341       std::function<std::unique_ptr<
342           grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
343           BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
344           prepare_req,
345       std::function<void(grpc::Status, ResponseType*)> on_done)
346       : context_(),
347         stub_(stub),
348         cq_(nullptr),
349         req_(req),
350         response_(),
351         next_state_(State::INVALID),
352         callback_(on_done),
353         next_issue_(std::move(next_issue)),
354         prepare_req_(prepare_req),
355         coalesce_(false) {}
~ClientRpcContextStreamingPingPongImpl()356   ~ClientRpcContextStreamingPingPongImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)357   void Start(CompletionQueue* cq, const ClientConfig& config) override {
358     StartInternal(cq, config.messages_per_stream(), config.use_coalesce_api());
359   }
RunNextState(bool ok,HistogramEntry * entry)360   bool RunNextState(bool ok, HistogramEntry* entry) override {
361     while (true) {
362       switch (next_state_) {
363         case State::STREAM_IDLE:
364           if (!next_issue_) {  // ready to issue
365             next_state_ = State::READY_TO_WRITE;
366           } else {
367             next_state_ = State::WAIT;
368           }
369           break;  // loop around, don't return
370         case State::WAIT:
371           next_state_ = State::READY_TO_WRITE;
372           alarm_.reset(new Alarm);
373           alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
374           return true;
375         case State::READY_TO_WRITE:
376           if (!ok) {
377             return false;
378           }
379           start_ = UsageTimer::Now();
380           next_state_ = State::WRITE_DONE;
381           if (coalesce_ && messages_issued_ == messages_per_stream_ - 1) {
382             stream_->WriteLast(req_, WriteOptions(),
383                                ClientRpcContext::tag(this));
384           } else {
385             stream_->Write(req_, ClientRpcContext::tag(this));
386           }
387           return true;
388         case State::WRITE_DONE:
389           if (!ok) {
390             return false;
391           }
392           next_state_ = State::READ_DONE;
393           stream_->Read(&response_, ClientRpcContext::tag(this));
394           return true;
395           break;
396         case State::READ_DONE:
397           entry->set_value((UsageTimer::Now() - start_) * 1e9);
398           callback_(status_, &response_);
399           if ((messages_per_stream_ != 0) &&
400               (++messages_issued_ >= messages_per_stream_)) {
401             next_state_ = State::WRITES_DONE_DONE;
402             if (coalesce_) {
403               // WritesDone should have been called on the last Write.
404               // loop around to call Finish.
405               break;
406             }
407             stream_->WritesDone(ClientRpcContext::tag(this));
408             return true;
409           }
410           next_state_ = State::STREAM_IDLE;
411           break;  // loop around
412         case State::WRITES_DONE_DONE:
413           next_state_ = State::FINISH_DONE;
414           stream_->Finish(&status_, ClientRpcContext::tag(this));
415           return true;
416         case State::FINISH_DONE:
417           next_state_ = State::INVALID;
418           return false;
419           break;
420         default:
421           GPR_ASSERT(false);
422           return false;
423       }
424     }
425   }
StartNewClone(CompletionQueue * cq)426   void StartNewClone(CompletionQueue* cq) override {
427     auto* clone = new ClientRpcContextStreamingPingPongImpl(
428         stub_, req_, next_issue_, prepare_req_, callback_);
429     clone->StartInternal(cq, messages_per_stream_, coalesce_);
430   }
TryCancel()431   void TryCancel() override { context_.TryCancel(); }
432 
433  private:
434   grpc::ClientContext context_;
435   BenchmarkService::Stub* stub_;
436   CompletionQueue* cq_;
437   std::unique_ptr<Alarm> alarm_;
438   const RequestType& req_;
439   ResponseType response_;
440   enum State {
441     INVALID,
442     STREAM_IDLE,
443     WAIT,
444     READY_TO_WRITE,
445     WRITE_DONE,
446     READ_DONE,
447     WRITES_DONE_DONE,
448     FINISH_DONE
449   };
450   State next_state_;
451   std::function<void(grpc::Status, ResponseType*)> callback_;
452   std::function<gpr_timespec()> next_issue_;
453   std::function<
454       std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
455           BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
456       prepare_req_;
457   grpc::Status status_;
458   double start_;
459   std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
460       stream_;
461 
462   // Allow a limit on number of messages in a stream
463   int messages_per_stream_;
464   int messages_issued_;
465   // Whether to use coalescing API.
466   bool coalesce_;
467 
StartInternal(CompletionQueue * cq,int messages_per_stream,bool coalesce)468   void StartInternal(CompletionQueue* cq, int messages_per_stream,
469                      bool coalesce) {
470     cq_ = cq;
471     messages_per_stream_ = messages_per_stream;
472     messages_issued_ = 0;
473     coalesce_ = coalesce;
474     if (coalesce_) {
475       GPR_ASSERT(messages_per_stream_ != 0);
476       context_.set_initial_metadata_corked(true);
477     }
478     stream_ = prepare_req_(stub_, &context_, cq);
479     next_state_ = State::STREAM_IDLE;
480     stream_->StartCall(ClientRpcContext::tag(this));
481     if (coalesce_) {
482       // When the initial metadata is corked, the tag will not come back and we
483       // need to manually drive the state machine.
484       RunNextState(true, nullptr);
485     }
486   }
487 };
488 
489 class AsyncStreamingPingPongClient final
490     : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
491  public:
AsyncStreamingPingPongClient(const ClientConfig & config)492   explicit AsyncStreamingPingPongClient(const ClientConfig& config)
493       : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
494             config, SetupCtx, BenchmarkStubCreator) {
495     StartThreads(num_async_threads_);
496   }
497 
~AsyncStreamingPingPongClient()498   ~AsyncStreamingPingPongClient() override {}
499 
500  private:
CheckDone(const grpc::Status &,SimpleResponse *)501   static void CheckDone(const grpc::Status& /*s*/,
502                         SimpleResponse* /*response*/) {}
503   static std::unique_ptr<
504       grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,CompletionQueue * cq)505   PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
506              CompletionQueue* cq) {
507     auto stream = stub->PrepareAsyncStreamingCall(ctx, cq);
508     return stream;
509   };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)510   static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
511                                     std::function<gpr_timespec()> next_issue,
512                                     const SimpleRequest& req) {
513     return new ClientRpcContextStreamingPingPongImpl<SimpleRequest,
514                                                      SimpleResponse>(
515         stub, req, std::move(next_issue),
516         AsyncStreamingPingPongClient::PrepareReq,
517         AsyncStreamingPingPongClient::CheckDone);
518   }
519 };
520 
521 template <class RequestType, class ResponseType>
522 class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
523  public:
ClientRpcContextStreamingFromClientImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> (BenchmarkService::Stub *,grpc::ClientContext *,ResponseType *,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *)> on_done)524   ClientRpcContextStreamingFromClientImpl(
525       BenchmarkService::Stub* stub, const RequestType& req,
526       std::function<gpr_timespec()> next_issue,
527       std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
528           BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
529           CompletionQueue*)>
530           prepare_req,
531       std::function<void(grpc::Status, ResponseType*)> on_done)
532       : context_(),
533         stub_(stub),
534         cq_(nullptr),
535         req_(req),
536         response_(),
537         next_state_(State::INVALID),
538         callback_(on_done),
539         next_issue_(std::move(next_issue)),
540         prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromClientImpl()541   ~ClientRpcContextStreamingFromClientImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)542   void Start(CompletionQueue* cq, const ClientConfig& config) override {
543     GPR_ASSERT(!config.use_coalesce_api());  // not supported yet.
544     StartInternal(cq);
545   }
RunNextState(bool ok,HistogramEntry * entry)546   bool RunNextState(bool ok, HistogramEntry* entry) override {
547     while (true) {
548       switch (next_state_) {
549         case State::STREAM_IDLE:
550           if (!next_issue_) {  // ready to issue
551             next_state_ = State::READY_TO_WRITE;
552           } else {
553             next_state_ = State::WAIT;
554           }
555           break;  // loop around, don't return
556         case State::WAIT:
557           alarm_.reset(new Alarm);
558           alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
559           next_state_ = State::READY_TO_WRITE;
560           return true;
561         case State::READY_TO_WRITE:
562           if (!ok) {
563             return false;
564           }
565           start_ = UsageTimer::Now();
566           next_state_ = State::WRITE_DONE;
567           stream_->Write(req_, ClientRpcContext::tag(this));
568           return true;
569         case State::WRITE_DONE:
570           if (!ok) {
571             return false;
572           }
573           entry->set_value((UsageTimer::Now() - start_) * 1e9);
574           next_state_ = State::STREAM_IDLE;
575           break;  // loop around
576         default:
577           GPR_ASSERT(false);
578           return false;
579       }
580     }
581   }
StartNewClone(CompletionQueue * cq)582   void StartNewClone(CompletionQueue* cq) override {
583     auto* clone = new ClientRpcContextStreamingFromClientImpl(
584         stub_, req_, next_issue_, prepare_req_, callback_);
585     clone->StartInternal(cq);
586   }
TryCancel()587   void TryCancel() override { context_.TryCancel(); }
588 
589  private:
590   grpc::ClientContext context_;
591   BenchmarkService::Stub* stub_;
592   CompletionQueue* cq_;
593   std::unique_ptr<Alarm> alarm_;
594   const RequestType& req_;
595   ResponseType response_;
596   enum State {
597     INVALID,
598     STREAM_IDLE,
599     WAIT,
600     READY_TO_WRITE,
601     WRITE_DONE,
602   };
603   State next_state_;
604   std::function<void(grpc::Status, ResponseType*)> callback_;
605   std::function<gpr_timespec()> next_issue_;
606   std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
607       BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
608       CompletionQueue*)>
609       prepare_req_;
610   grpc::Status status_;
611   double start_;
612   std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_;
613 
StartInternal(CompletionQueue * cq)614   void StartInternal(CompletionQueue* cq) {
615     cq_ = cq;
616     stream_ = prepare_req_(stub_, &context_, &response_, cq);
617     next_state_ = State::STREAM_IDLE;
618     stream_->StartCall(ClientRpcContext::tag(this));
619   }
620 };
621 
622 class AsyncStreamingFromClientClient final
623     : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
624  public:
AsyncStreamingFromClientClient(const ClientConfig & config)625   explicit AsyncStreamingFromClientClient(const ClientConfig& config)
626       : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
627             config, SetupCtx, BenchmarkStubCreator) {
628     StartThreads(num_async_threads_);
629   }
630 
~AsyncStreamingFromClientClient()631   ~AsyncStreamingFromClientClient() override {}
632 
633  private:
CheckDone(const grpc::Status &,SimpleResponse *)634   static void CheckDone(const grpc::Status& /*s*/,
635                         SimpleResponse* /*response*/) {}
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,SimpleResponse * resp,CompletionQueue * cq)636   static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> PrepareReq(
637       BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
638       SimpleResponse* resp, CompletionQueue* cq) {
639     auto stream = stub->PrepareAsyncStreamingFromClient(ctx, resp, cq);
640     return stream;
641   };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)642   static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
643                                     std::function<gpr_timespec()> next_issue,
644                                     const SimpleRequest& req) {
645     return new ClientRpcContextStreamingFromClientImpl<SimpleRequest,
646                                                        SimpleResponse>(
647         stub, req, std::move(next_issue),
648         AsyncStreamingFromClientClient::PrepareReq,
649         AsyncStreamingFromClientClient::CheckDone);
650   }
651 };
652 
653 template <class RequestType, class ResponseType>
654 class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
655  public:
ClientRpcContextStreamingFromServerImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> (BenchmarkService::Stub *,grpc::ClientContext *,const RequestType &,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *)> on_done)656   ClientRpcContextStreamingFromServerImpl(
657       BenchmarkService::Stub* stub, const RequestType& req,
658       std::function<gpr_timespec()> next_issue,
659       std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
660           BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
661           CompletionQueue*)>
662           prepare_req,
663       std::function<void(grpc::Status, ResponseType*)> on_done)
664       : context_(),
665         stub_(stub),
666         cq_(nullptr),
667         req_(req),
668         response_(),
669         next_state_(State::INVALID),
670         callback_(on_done),
671         next_issue_(std::move(next_issue)),
672         prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromServerImpl()673   ~ClientRpcContextStreamingFromServerImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)674   void Start(CompletionQueue* cq, const ClientConfig& config) override {
675     GPR_ASSERT(!config.use_coalesce_api());  // not supported
676     StartInternal(cq);
677   }
RunNextState(bool ok,HistogramEntry * entry)678   bool RunNextState(bool ok, HistogramEntry* entry) override {
679     while (true) {
680       switch (next_state_) {
681         case State::STREAM_IDLE:
682           if (!ok) {
683             return false;
684           }
685           start_ = UsageTimer::Now();
686           next_state_ = State::READ_DONE;
687           stream_->Read(&response_, ClientRpcContext::tag(this));
688           return true;
689         case State::READ_DONE:
690           if (!ok) {
691             return false;
692           }
693           entry->set_value((UsageTimer::Now() - start_) * 1e9);
694           callback_(status_, &response_);
695           next_state_ = State::STREAM_IDLE;
696           break;  // loop around
697         default:
698           GPR_ASSERT(false);
699           return false;
700       }
701     }
702   }
StartNewClone(CompletionQueue * cq)703   void StartNewClone(CompletionQueue* cq) override {
704     auto* clone = new ClientRpcContextStreamingFromServerImpl(
705         stub_, req_, next_issue_, prepare_req_, callback_);
706     clone->StartInternal(cq);
707   }
TryCancel()708   void TryCancel() override { context_.TryCancel(); }
709 
710  private:
711   grpc::ClientContext context_;
712   BenchmarkService::Stub* stub_;
713   CompletionQueue* cq_;
714   std::unique_ptr<Alarm> alarm_;
715   const RequestType& req_;
716   ResponseType response_;
717   enum State { INVALID, STREAM_IDLE, READ_DONE };
718   State next_state_;
719   std::function<void(grpc::Status, ResponseType*)> callback_;
720   std::function<gpr_timespec()> next_issue_;
721   std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
722       BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
723       CompletionQueue*)>
724       prepare_req_;
725   grpc::Status status_;
726   double start_;
727   std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_;
728 
StartInternal(CompletionQueue * cq)729   void StartInternal(CompletionQueue* cq) {
730     // TODO(vjpai): Add support to rate-pace this
731     cq_ = cq;
732     stream_ = prepare_req_(stub_, &context_, req_, cq);
733     next_state_ = State::STREAM_IDLE;
734     stream_->StartCall(ClientRpcContext::tag(this));
735   }
736 };
737 
738 class AsyncStreamingFromServerClient final
739     : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
740  public:
AsyncStreamingFromServerClient(const ClientConfig & config)741   explicit AsyncStreamingFromServerClient(const ClientConfig& config)
742       : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
743             config, SetupCtx, BenchmarkStubCreator) {
744     StartThreads(num_async_threads_);
745   }
746 
~AsyncStreamingFromServerClient()747   ~AsyncStreamingFromServerClient() override {}
748 
749  private:
CheckDone(const grpc::Status &,SimpleResponse *)750   static void CheckDone(const grpc::Status& /*s*/,
751                         SimpleResponse* /*response*/) {}
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,const SimpleRequest & req,CompletionQueue * cq)752   static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> PrepareReq(
753       BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
754       const SimpleRequest& req, CompletionQueue* cq) {
755     auto stream = stub->PrepareAsyncStreamingFromServer(ctx, req, cq);
756     return stream;
757   };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)758   static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
759                                     std::function<gpr_timespec()> next_issue,
760                                     const SimpleRequest& req) {
761     return new ClientRpcContextStreamingFromServerImpl<SimpleRequest,
762                                                        SimpleResponse>(
763         stub, req, std::move(next_issue),
764         AsyncStreamingFromServerClient::PrepareReq,
765         AsyncStreamingFromServerClient::CheckDone);
766   }
767 };
768 
769 class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
770  public:
ClientRpcContextGenericStreamingImpl(grpc::GenericStub * stub,const ByteBuffer & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter> (grpc::GenericStub *,grpc::ClientContext *,const std::string & method_name,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ByteBuffer *)> on_done)771   ClientRpcContextGenericStreamingImpl(
772       grpc::GenericStub* stub, const ByteBuffer& req,
773       std::function<gpr_timespec()> next_issue,
774       std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
775           grpc::GenericStub*, grpc::ClientContext*,
776           const std::string& method_name, CompletionQueue*)>
777           prepare_req,
778       std::function<void(grpc::Status, ByteBuffer*)> on_done)
779       : context_(),
780         stub_(stub),
781         cq_(nullptr),
782         req_(req),
783         response_(),
784         next_state_(State::INVALID),
785         callback_(std::move(on_done)),
786         next_issue_(std::move(next_issue)),
787         prepare_req_(std::move(prepare_req)) {}
~ClientRpcContextGenericStreamingImpl()788   ~ClientRpcContextGenericStreamingImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)789   void Start(CompletionQueue* cq, const ClientConfig& config) override {
790     GPR_ASSERT(!config.use_coalesce_api());  // not supported yet.
791     StartInternal(cq, config.messages_per_stream());
792   }
RunNextState(bool ok,HistogramEntry * entry)793   bool RunNextState(bool ok, HistogramEntry* entry) override {
794     while (true) {
795       switch (next_state_) {
796         case State::STREAM_IDLE:
797           if (!next_issue_) {  // ready to issue
798             next_state_ = State::READY_TO_WRITE;
799           } else {
800             next_state_ = State::WAIT;
801           }
802           break;  // loop around, don't return
803         case State::WAIT:
804           next_state_ = State::READY_TO_WRITE;
805           alarm_.reset(new Alarm);
806           alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
807           return true;
808         case State::READY_TO_WRITE:
809           if (!ok) {
810             return false;
811           }
812           start_ = UsageTimer::Now();
813           next_state_ = State::WRITE_DONE;
814           stream_->Write(req_, ClientRpcContext::tag(this));
815           return true;
816         case State::WRITE_DONE:
817           if (!ok) {
818             return false;
819           }
820           next_state_ = State::READ_DONE;
821           stream_->Read(&response_, ClientRpcContext::tag(this));
822           return true;
823           break;
824         case State::READ_DONE:
825           entry->set_value((UsageTimer::Now() - start_) * 1e9);
826           callback_(status_, &response_);
827           if ((messages_per_stream_ != 0) &&
828               (++messages_issued_ >= messages_per_stream_)) {
829             next_state_ = State::WRITES_DONE_DONE;
830             stream_->WritesDone(ClientRpcContext::tag(this));
831             return true;
832           }
833           next_state_ = State::STREAM_IDLE;
834           break;  // loop around
835         case State::WRITES_DONE_DONE:
836           next_state_ = State::FINISH_DONE;
837           stream_->Finish(&status_, ClientRpcContext::tag(this));
838           return true;
839         case State::FINISH_DONE:
840           next_state_ = State::INVALID;
841           return false;
842           break;
843         default:
844           GPR_ASSERT(false);
845           return false;
846       }
847     }
848   }
StartNewClone(CompletionQueue * cq)849   void StartNewClone(CompletionQueue* cq) override {
850     auto* clone = new ClientRpcContextGenericStreamingImpl(
851         stub_, req_, next_issue_, prepare_req_, callback_);
852     clone->StartInternal(cq, messages_per_stream_);
853   }
TryCancel()854   void TryCancel() override { context_.TryCancel(); }
855 
856  private:
857   grpc::ClientContext context_;
858   grpc::GenericStub* stub_;
859   CompletionQueue* cq_;
860   std::unique_ptr<Alarm> alarm_;
861   ByteBuffer req_;
862   ByteBuffer response_;
863   enum State {
864     INVALID,
865     STREAM_IDLE,
866     WAIT,
867     READY_TO_WRITE,
868     WRITE_DONE,
869     READ_DONE,
870     WRITES_DONE_DONE,
871     FINISH_DONE
872   };
873   State next_state_;
874   std::function<void(grpc::Status, ByteBuffer*)> callback_;
875   std::function<gpr_timespec()> next_issue_;
876   std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
877       grpc::GenericStub*, grpc::ClientContext*, const std::string&,
878       CompletionQueue*)>
879       prepare_req_;
880   grpc::Status status_;
881   double start_;
882   std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;
883 
884   // Allow a limit on number of messages in a stream
885   int messages_per_stream_;
886   int messages_issued_;
887 
StartInternal(CompletionQueue * cq,int messages_per_stream)888   void StartInternal(CompletionQueue* cq, int messages_per_stream) {
889     cq_ = cq;
890     const std::string kMethodName(
891         "/grpc.testing.BenchmarkService/StreamingCall");
892     messages_per_stream_ = messages_per_stream;
893     messages_issued_ = 0;
894     stream_ = prepare_req_(stub_, &context_, kMethodName, cq);
895     next_state_ = State::STREAM_IDLE;
896     stream_->StartCall(ClientRpcContext::tag(this));
897   }
898 };
899 
GenericStubCreator(const std::shared_ptr<Channel> & ch)900 static std::unique_ptr<grpc::GenericStub> GenericStubCreator(
901     const std::shared_ptr<Channel>& ch) {
902   return std::unique_ptr<grpc::GenericStub>(new grpc::GenericStub(ch));
903 }
904 
905 class GenericAsyncStreamingClient final
906     : public AsyncClient<grpc::GenericStub, ByteBuffer> {
907  public:
GenericAsyncStreamingClient(const ClientConfig & config)908   explicit GenericAsyncStreamingClient(const ClientConfig& config)
909       : AsyncClient<grpc::GenericStub, ByteBuffer>(config, SetupCtx,
910                                                    GenericStubCreator) {
911     StartThreads(num_async_threads_);
912   }
913 
~GenericAsyncStreamingClient()914   ~GenericAsyncStreamingClient() override {}
915 
916  private:
CheckDone(const grpc::Status &,ByteBuffer *)917   static void CheckDone(const grpc::Status& /*s*/, ByteBuffer* /*response*/) {}
PrepareReq(grpc::GenericStub * stub,grpc::ClientContext * ctx,const std::string & method_name,CompletionQueue * cq)918   static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> PrepareReq(
919       grpc::GenericStub* stub, grpc::ClientContext* ctx,
920       const std::string& method_name, CompletionQueue* cq) {
921     auto stream = stub->PrepareCall(ctx, method_name, cq);
922     return stream;
923   };
SetupCtx(grpc::GenericStub * stub,std::function<gpr_timespec ()> next_issue,const ByteBuffer & req)924   static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
925                                     std::function<gpr_timespec()> next_issue,
926                                     const ByteBuffer& req) {
927     return new ClientRpcContextGenericStreamingImpl(
928         stub, req, std::move(next_issue),
929         GenericAsyncStreamingClient::PrepareReq,
930         GenericAsyncStreamingClient::CheckDone);
931   }
932 };
933 
CreateAsyncClient(const ClientConfig & config)934 std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config) {
935   switch (config.rpc_type()) {
936     case UNARY:
937       return std::unique_ptr<Client>(new AsyncUnaryClient(config));
938     case STREAMING:
939       return std::unique_ptr<Client>(new AsyncStreamingPingPongClient(config));
940     case STREAMING_FROM_CLIENT:
941       return std::unique_ptr<Client>(
942           new AsyncStreamingFromClientClient(config));
943     case STREAMING_FROM_SERVER:
944       return std::unique_ptr<Client>(
945           new AsyncStreamingFromServerClient(config));
946     case STREAMING_BOTH_WAYS:
947       // TODO(vjpai): Implement this
948       assert(false);
949       return nullptr;
950     default:
951       assert(false);
952       return nullptr;
953   }
954 }
CreateGenericAsyncStreamingClient(const ClientConfig & args)955 std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
956     const ClientConfig& args) {
957   return std::unique_ptr<Client>(new GenericAsyncStreamingClient(args));
958 }
959 
960 }  // namespace testing
961 }  // namespace grpc
962