1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <forward_list>
20 #include <functional>
21 #include <list>
22 #include <memory>
23 #include <mutex>
24 #include <sstream>
25 #include <string>
26 #include <thread>
27 #include <utility>
28 #include <vector>
29
30 #include <grpc/grpc.h>
31 #include <grpc/support/cpu.h>
32 #include <grpc/support/log.h>
33 #include <grpcpp/alarm.h>
34 #include <grpcpp/channel.h>
35 #include <grpcpp/client_context.h>
36 #include <grpcpp/generic/generic_stub.h>
37
38 #include "src/core/lib/surface/completion_queue.h"
39 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
40 #include "test/cpp/qps/client.h"
41 #include "test/cpp/qps/usage_timer.h"
42 #include "test/cpp/util/create_test_channel.h"
43
44 namespace grpc {
45 namespace testing {
46
47 class ClientRpcContext {
48 public:
ClientRpcContext()49 ClientRpcContext() {}
~ClientRpcContext()50 virtual ~ClientRpcContext() {}
51 // next state, return false if done. Collect stats when appropriate
52 virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
53 virtual void StartNewClone(CompletionQueue* cq) = 0;
tag(ClientRpcContext * c)54 static void* tag(ClientRpcContext* c) { return static_cast<void*>(c); }
detag(void * t)55 static ClientRpcContext* detag(void* t) {
56 return static_cast<ClientRpcContext*>(t);
57 }
58
59 virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
60 virtual void TryCancel() = 0;
61 };
62
63 template <class RequestType, class ResponseType>
64 class ClientRpcContextUnaryImpl : public ClientRpcContext {
65 public:
ClientRpcContextUnaryImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>> (BenchmarkService::Stub *,grpc::ClientContext *,const RequestType &,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *,HistogramEntry *)> on_done)66 ClientRpcContextUnaryImpl(
67 BenchmarkService::Stub* stub, const RequestType& req,
68 std::function<gpr_timespec()> next_issue,
69 std::function<
70 std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
71 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
72 CompletionQueue*)>
73 prepare_req,
74 std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done)
75 : context_(),
76 stub_(stub),
77 cq_(nullptr),
78 req_(req),
79 response_(),
80 next_state_(State::READY),
81 callback_(on_done),
82 next_issue_(std::move(next_issue)),
83 prepare_req_(prepare_req) {}
~ClientRpcContextUnaryImpl()84 ~ClientRpcContextUnaryImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)85 void Start(CompletionQueue* cq, const ClientConfig& config) override {
86 GPR_ASSERT(!config.use_coalesce_api()); // not supported.
87 StartInternal(cq);
88 }
RunNextState(bool,HistogramEntry * entry)89 bool RunNextState(bool /*ok*/, HistogramEntry* entry) override {
90 switch (next_state_) {
91 case State::READY:
92 start_ = UsageTimer::Now();
93 response_reader_ = prepare_req_(stub_, &context_, req_, cq_);
94 response_reader_->StartCall();
95 next_state_ = State::RESP_DONE;
96 response_reader_->Finish(&response_, &status_,
97 ClientRpcContext::tag(this));
98 return true;
99 case State::RESP_DONE:
100 if (status_.ok()) {
101 entry->set_value((UsageTimer::Now() - start_) * 1e9);
102 }
103 callback_(status_, &response_, entry);
104 next_state_ = State::INVALID;
105 return false;
106 default:
107 GPR_ASSERT(false);
108 return false;
109 }
110 }
StartNewClone(CompletionQueue * cq)111 void StartNewClone(CompletionQueue* cq) override {
112 auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
113 prepare_req_, callback_);
114 clone->StartInternal(cq);
115 }
TryCancel()116 void TryCancel() override { context_.TryCancel(); }
117
118 private:
119 grpc::ClientContext context_;
120 BenchmarkService::Stub* stub_;
121 CompletionQueue* cq_;
122 std::unique_ptr<Alarm> alarm_;
123 const RequestType& req_;
124 ResponseType response_;
125 enum State { INVALID, READY, RESP_DONE };
126 State next_state_;
127 std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> callback_;
128 std::function<gpr_timespec()> next_issue_;
129 std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
130 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
131 CompletionQueue*)>
132 prepare_req_;
133 grpc::Status status_;
134 double start_;
135 std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
136 response_reader_;
137
StartInternal(CompletionQueue * cq)138 void StartInternal(CompletionQueue* cq) {
139 cq_ = cq;
140 if (!next_issue_) { // ready to issue
141 RunNextState(true, nullptr);
142 } else { // wait for the issue time
143 alarm_.reset(new Alarm);
144 alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
145 }
146 }
147 };
148
149 template <class StubType, class RequestType>
150 class AsyncClient : public ClientImpl<StubType, RequestType> {
151 // Specify which protected members we are using since there is no
152 // member name resolution until the template types are fully resolved
153 public:
154 using Client::closed_loop_;
155 using Client::NextIssuer;
156 using Client::SetupLoadTest;
157 using ClientImpl<StubType, RequestType>::cores_;
158 using ClientImpl<StubType, RequestType>::channels_;
159 using ClientImpl<StubType, RequestType>::request_;
AsyncClient(const ClientConfig & config,std::function<ClientRpcContext * (StubType *,std::function<gpr_timespec ()> next_issue,const RequestType &)> setup_ctx,std::function<std::unique_ptr<StubType> (std::shared_ptr<Channel>)> create_stub)160 AsyncClient(const ClientConfig& config,
161 std::function<ClientRpcContext*(
162 StubType*, std::function<gpr_timespec()> next_issue,
163 const RequestType&)>
164 setup_ctx,
165 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
166 create_stub)
167 : ClientImpl<StubType, RequestType>(config, create_stub),
168 num_async_threads_(NumThreads(config)) {
169 SetupLoadTest(config, num_async_threads_);
170
171 int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
172 int num_cqs = (num_async_threads_ + tpc - 1) / tpc; // ceiling operator
173 for (int i = 0; i < num_cqs; i++) {
174 cli_cqs_.emplace_back(new CompletionQueue);
175 }
176
177 for (int i = 0; i < num_async_threads_; i++) {
178 cq_.emplace_back(i % cli_cqs_.size());
179 next_issuers_.emplace_back(NextIssuer(i));
180 shutdown_state_.emplace_back(new PerThreadShutdownState());
181 }
182
183 int t = 0;
184 for (int ch = 0; ch < config.client_channels(); ch++) {
185 for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
186 auto* cq = cli_cqs_[t].get();
187 auto ctx =
188 setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);
189 ctx->Start(cq, config);
190 }
191 t = (t + 1) % cli_cqs_.size();
192 }
193 }
~AsyncClient()194 virtual ~AsyncClient() {
195 for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
196 void* got_tag;
197 bool ok;
198 while ((*cq)->Next(&got_tag, &ok)) {
199 delete ClientRpcContext::detag(got_tag);
200 }
201 }
202 }
203
GetPollCount()204 int GetPollCount() override {
205 int count = 0;
206 for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
207 count += grpc_get_cq_poll_num((*cq)->cq());
208 }
209 return count;
210 }
211
212 protected:
213 const int num_async_threads_;
214
215 private:
216 struct PerThreadShutdownState {
217 mutable std::mutex mutex;
218 bool shutdown;
PerThreadShutdownStategrpc::testing::AsyncClient::PerThreadShutdownState219 PerThreadShutdownState() : shutdown(false) {}
220 };
221
NumThreads(const ClientConfig & config)222 int NumThreads(const ClientConfig& config) {
223 int num_threads = config.async_client_threads();
224 if (num_threads <= 0) { // Use dynamic sizing
225 num_threads = cores_;
226 gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
227 }
228 return num_threads;
229 }
DestroyMultithreading()230 void DestroyMultithreading() override final {
231 for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
232 std::lock_guard<std::mutex> lock((*ss)->mutex);
233 (*ss)->shutdown = true;
234 }
235 for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
236 (*cq)->Shutdown();
237 }
238 this->EndThreads(); // this needed for resolution
239 }
240
ProcessTag(size_t thread_idx,void * tag)241 ClientRpcContext* ProcessTag(size_t thread_idx, void* tag) {
242 ClientRpcContext* ctx = ClientRpcContext::detag(tag);
243 if (shutdown_state_[thread_idx]->shutdown) {
244 ctx->TryCancel();
245 delete ctx;
246 bool ok;
247 while (cli_cqs_[cq_[thread_idx]]->Next(&tag, &ok)) {
248 ctx = ClientRpcContext::detag(tag);
249 ctx->TryCancel();
250 delete ctx;
251 }
252 return nullptr;
253 }
254 return ctx;
255 }
256
ThreadFunc(size_t thread_idx,Client::Thread * t)257 void ThreadFunc(size_t thread_idx, Client::Thread* t) override final {
258 void* got_tag;
259 bool ok;
260
261 HistogramEntry entry;
262 HistogramEntry* entry_ptr = &entry;
263 if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
264 return;
265 }
266 std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
267 shutdown_mu->lock();
268 ClientRpcContext* ctx = ProcessTag(thread_idx, got_tag);
269 if (ctx == nullptr) {
270 shutdown_mu->unlock();
271 return;
272 }
273 while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
274 [&, ctx, ok, entry_ptr, shutdown_mu]() {
275 if (!ctx->RunNextState(ok, entry_ptr)) {
276 // The RPC and callback are done, so clone the ctx
277 // and kickstart the new one
278 ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
279 delete ctx;
280 }
281 shutdown_mu->unlock();
282 },
283 &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) {
284 t->UpdateHistogram(entry_ptr);
285 entry = HistogramEntry();
286 shutdown_mu->lock();
287 ctx = ProcessTag(thread_idx, got_tag);
288 if (ctx == nullptr) {
289 shutdown_mu->unlock();
290 return;
291 }
292 }
293 }
294
295 std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
296 std::vector<int> cq_;
297 std::vector<std::function<gpr_timespec()>> next_issuers_;
298 std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
299 };
300
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)301 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
302 const std::shared_ptr<Channel>& ch) {
303 return BenchmarkService::NewStub(ch);
304 }
305
306 class AsyncUnaryClient final
307 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
308 public:
AsyncUnaryClient(const ClientConfig & config)309 explicit AsyncUnaryClient(const ClientConfig& config)
310 : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
311 config, SetupCtx, BenchmarkStubCreator) {
312 StartThreads(num_async_threads_);
313 }
~AsyncUnaryClient()314 ~AsyncUnaryClient() override {}
315
316 private:
CheckDone(const grpc::Status & s,SimpleResponse *,HistogramEntry * entry)317 static void CheckDone(const grpc::Status& s, SimpleResponse* /*response*/,
318 HistogramEntry* entry) {
319 entry->set_status(s.error_code());
320 }
321 static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,const SimpleRequest & request,CompletionQueue * cq)322 PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
323 const SimpleRequest& request, CompletionQueue* cq) {
324 return stub->PrepareAsyncUnaryCall(ctx, request, cq);
325 };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)326 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
327 std::function<gpr_timespec()> next_issue,
328 const SimpleRequest& req) {
329 return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
330 stub, req, std::move(next_issue), AsyncUnaryClient::PrepareReq,
331 AsyncUnaryClient::CheckDone);
332 }
333 };
334
335 template <class RequestType, class ResponseType>
336 class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
337 public:
ClientRpcContextStreamingPingPongImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType,ResponseType>> (BenchmarkService::Stub *,grpc::ClientContext *,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *)> on_done)338 ClientRpcContextStreamingPingPongImpl(
339 BenchmarkService::Stub* stub, const RequestType& req,
340 std::function<gpr_timespec()> next_issue,
341 std::function<std::unique_ptr<
342 grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
343 BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
344 prepare_req,
345 std::function<void(grpc::Status, ResponseType*)> on_done)
346 : context_(),
347 stub_(stub),
348 cq_(nullptr),
349 req_(req),
350 response_(),
351 next_state_(State::INVALID),
352 callback_(on_done),
353 next_issue_(std::move(next_issue)),
354 prepare_req_(prepare_req),
355 coalesce_(false) {}
~ClientRpcContextStreamingPingPongImpl()356 ~ClientRpcContextStreamingPingPongImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)357 void Start(CompletionQueue* cq, const ClientConfig& config) override {
358 StartInternal(cq, config.messages_per_stream(), config.use_coalesce_api());
359 }
RunNextState(bool ok,HistogramEntry * entry)360 bool RunNextState(bool ok, HistogramEntry* entry) override {
361 while (true) {
362 switch (next_state_) {
363 case State::STREAM_IDLE:
364 if (!next_issue_) { // ready to issue
365 next_state_ = State::READY_TO_WRITE;
366 } else {
367 next_state_ = State::WAIT;
368 }
369 break; // loop around, don't return
370 case State::WAIT:
371 next_state_ = State::READY_TO_WRITE;
372 alarm_.reset(new Alarm);
373 alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
374 return true;
375 case State::READY_TO_WRITE:
376 if (!ok) {
377 return false;
378 }
379 start_ = UsageTimer::Now();
380 next_state_ = State::WRITE_DONE;
381 if (coalesce_ && messages_issued_ == messages_per_stream_ - 1) {
382 stream_->WriteLast(req_, WriteOptions(),
383 ClientRpcContext::tag(this));
384 } else {
385 stream_->Write(req_, ClientRpcContext::tag(this));
386 }
387 return true;
388 case State::WRITE_DONE:
389 if (!ok) {
390 return false;
391 }
392 next_state_ = State::READ_DONE;
393 stream_->Read(&response_, ClientRpcContext::tag(this));
394 return true;
395 break;
396 case State::READ_DONE:
397 entry->set_value((UsageTimer::Now() - start_) * 1e9);
398 callback_(status_, &response_);
399 if ((messages_per_stream_ != 0) &&
400 (++messages_issued_ >= messages_per_stream_)) {
401 next_state_ = State::WRITES_DONE_DONE;
402 if (coalesce_) {
403 // WritesDone should have been called on the last Write.
404 // loop around to call Finish.
405 break;
406 }
407 stream_->WritesDone(ClientRpcContext::tag(this));
408 return true;
409 }
410 next_state_ = State::STREAM_IDLE;
411 break; // loop around
412 case State::WRITES_DONE_DONE:
413 next_state_ = State::FINISH_DONE;
414 stream_->Finish(&status_, ClientRpcContext::tag(this));
415 return true;
416 case State::FINISH_DONE:
417 next_state_ = State::INVALID;
418 return false;
419 break;
420 default:
421 GPR_ASSERT(false);
422 return false;
423 }
424 }
425 }
StartNewClone(CompletionQueue * cq)426 void StartNewClone(CompletionQueue* cq) override {
427 auto* clone = new ClientRpcContextStreamingPingPongImpl(
428 stub_, req_, next_issue_, prepare_req_, callback_);
429 clone->StartInternal(cq, messages_per_stream_, coalesce_);
430 }
TryCancel()431 void TryCancel() override { context_.TryCancel(); }
432
433 private:
434 grpc::ClientContext context_;
435 BenchmarkService::Stub* stub_;
436 CompletionQueue* cq_;
437 std::unique_ptr<Alarm> alarm_;
438 const RequestType& req_;
439 ResponseType response_;
440 enum State {
441 INVALID,
442 STREAM_IDLE,
443 WAIT,
444 READY_TO_WRITE,
445 WRITE_DONE,
446 READ_DONE,
447 WRITES_DONE_DONE,
448 FINISH_DONE
449 };
450 State next_state_;
451 std::function<void(grpc::Status, ResponseType*)> callback_;
452 std::function<gpr_timespec()> next_issue_;
453 std::function<
454 std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
455 BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
456 prepare_req_;
457 grpc::Status status_;
458 double start_;
459 std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
460 stream_;
461
462 // Allow a limit on number of messages in a stream
463 int messages_per_stream_;
464 int messages_issued_;
465 // Whether to use coalescing API.
466 bool coalesce_;
467
StartInternal(CompletionQueue * cq,int messages_per_stream,bool coalesce)468 void StartInternal(CompletionQueue* cq, int messages_per_stream,
469 bool coalesce) {
470 cq_ = cq;
471 messages_per_stream_ = messages_per_stream;
472 messages_issued_ = 0;
473 coalesce_ = coalesce;
474 if (coalesce_) {
475 GPR_ASSERT(messages_per_stream_ != 0);
476 context_.set_initial_metadata_corked(true);
477 }
478 stream_ = prepare_req_(stub_, &context_, cq);
479 next_state_ = State::STREAM_IDLE;
480 stream_->StartCall(ClientRpcContext::tag(this));
481 if (coalesce_) {
482 // When the initial metadata is corked, the tag will not come back and we
483 // need to manually drive the state machine.
484 RunNextState(true, nullptr);
485 }
486 }
487 };
488
489 class AsyncStreamingPingPongClient final
490 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
491 public:
AsyncStreamingPingPongClient(const ClientConfig & config)492 explicit AsyncStreamingPingPongClient(const ClientConfig& config)
493 : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
494 config, SetupCtx, BenchmarkStubCreator) {
495 StartThreads(num_async_threads_);
496 }
497
~AsyncStreamingPingPongClient()498 ~AsyncStreamingPingPongClient() override {}
499
500 private:
CheckDone(const grpc::Status &,SimpleResponse *)501 static void CheckDone(const grpc::Status& /*s*/,
502 SimpleResponse* /*response*/) {}
503 static std::unique_ptr<
504 grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,CompletionQueue * cq)505 PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
506 CompletionQueue* cq) {
507 auto stream = stub->PrepareAsyncStreamingCall(ctx, cq);
508 return stream;
509 };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)510 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
511 std::function<gpr_timespec()> next_issue,
512 const SimpleRequest& req) {
513 return new ClientRpcContextStreamingPingPongImpl<SimpleRequest,
514 SimpleResponse>(
515 stub, req, std::move(next_issue),
516 AsyncStreamingPingPongClient::PrepareReq,
517 AsyncStreamingPingPongClient::CheckDone);
518 }
519 };
520
521 template <class RequestType, class ResponseType>
522 class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
523 public:
ClientRpcContextStreamingFromClientImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> (BenchmarkService::Stub *,grpc::ClientContext *,ResponseType *,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *)> on_done)524 ClientRpcContextStreamingFromClientImpl(
525 BenchmarkService::Stub* stub, const RequestType& req,
526 std::function<gpr_timespec()> next_issue,
527 std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
528 BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
529 CompletionQueue*)>
530 prepare_req,
531 std::function<void(grpc::Status, ResponseType*)> on_done)
532 : context_(),
533 stub_(stub),
534 cq_(nullptr),
535 req_(req),
536 response_(),
537 next_state_(State::INVALID),
538 callback_(on_done),
539 next_issue_(std::move(next_issue)),
540 prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromClientImpl()541 ~ClientRpcContextStreamingFromClientImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)542 void Start(CompletionQueue* cq, const ClientConfig& config) override {
543 GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
544 StartInternal(cq);
545 }
RunNextState(bool ok,HistogramEntry * entry)546 bool RunNextState(bool ok, HistogramEntry* entry) override {
547 while (true) {
548 switch (next_state_) {
549 case State::STREAM_IDLE:
550 if (!next_issue_) { // ready to issue
551 next_state_ = State::READY_TO_WRITE;
552 } else {
553 next_state_ = State::WAIT;
554 }
555 break; // loop around, don't return
556 case State::WAIT:
557 alarm_.reset(new Alarm);
558 alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
559 next_state_ = State::READY_TO_WRITE;
560 return true;
561 case State::READY_TO_WRITE:
562 if (!ok) {
563 return false;
564 }
565 start_ = UsageTimer::Now();
566 next_state_ = State::WRITE_DONE;
567 stream_->Write(req_, ClientRpcContext::tag(this));
568 return true;
569 case State::WRITE_DONE:
570 if (!ok) {
571 return false;
572 }
573 entry->set_value((UsageTimer::Now() - start_) * 1e9);
574 next_state_ = State::STREAM_IDLE;
575 break; // loop around
576 default:
577 GPR_ASSERT(false);
578 return false;
579 }
580 }
581 }
StartNewClone(CompletionQueue * cq)582 void StartNewClone(CompletionQueue* cq) override {
583 auto* clone = new ClientRpcContextStreamingFromClientImpl(
584 stub_, req_, next_issue_, prepare_req_, callback_);
585 clone->StartInternal(cq);
586 }
TryCancel()587 void TryCancel() override { context_.TryCancel(); }
588
589 private:
590 grpc::ClientContext context_;
591 BenchmarkService::Stub* stub_;
592 CompletionQueue* cq_;
593 std::unique_ptr<Alarm> alarm_;
594 const RequestType& req_;
595 ResponseType response_;
596 enum State {
597 INVALID,
598 STREAM_IDLE,
599 WAIT,
600 READY_TO_WRITE,
601 WRITE_DONE,
602 };
603 State next_state_;
604 std::function<void(grpc::Status, ResponseType*)> callback_;
605 std::function<gpr_timespec()> next_issue_;
606 std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
607 BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
608 CompletionQueue*)>
609 prepare_req_;
610 grpc::Status status_;
611 double start_;
612 std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_;
613
StartInternal(CompletionQueue * cq)614 void StartInternal(CompletionQueue* cq) {
615 cq_ = cq;
616 stream_ = prepare_req_(stub_, &context_, &response_, cq);
617 next_state_ = State::STREAM_IDLE;
618 stream_->StartCall(ClientRpcContext::tag(this));
619 }
620 };
621
622 class AsyncStreamingFromClientClient final
623 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
624 public:
AsyncStreamingFromClientClient(const ClientConfig & config)625 explicit AsyncStreamingFromClientClient(const ClientConfig& config)
626 : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
627 config, SetupCtx, BenchmarkStubCreator) {
628 StartThreads(num_async_threads_);
629 }
630
~AsyncStreamingFromClientClient()631 ~AsyncStreamingFromClientClient() override {}
632
633 private:
CheckDone(const grpc::Status &,SimpleResponse *)634 static void CheckDone(const grpc::Status& /*s*/,
635 SimpleResponse* /*response*/) {}
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,SimpleResponse * resp,CompletionQueue * cq)636 static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> PrepareReq(
637 BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
638 SimpleResponse* resp, CompletionQueue* cq) {
639 auto stream = stub->PrepareAsyncStreamingFromClient(ctx, resp, cq);
640 return stream;
641 };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)642 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
643 std::function<gpr_timespec()> next_issue,
644 const SimpleRequest& req) {
645 return new ClientRpcContextStreamingFromClientImpl<SimpleRequest,
646 SimpleResponse>(
647 stub, req, std::move(next_issue),
648 AsyncStreamingFromClientClient::PrepareReq,
649 AsyncStreamingFromClientClient::CheckDone);
650 }
651 };
652
653 template <class RequestType, class ResponseType>
654 class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
655 public:
ClientRpcContextStreamingFromServerImpl(BenchmarkService::Stub * stub,const RequestType & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> (BenchmarkService::Stub *,grpc::ClientContext *,const RequestType &,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ResponseType *)> on_done)656 ClientRpcContextStreamingFromServerImpl(
657 BenchmarkService::Stub* stub, const RequestType& req,
658 std::function<gpr_timespec()> next_issue,
659 std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
660 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
661 CompletionQueue*)>
662 prepare_req,
663 std::function<void(grpc::Status, ResponseType*)> on_done)
664 : context_(),
665 stub_(stub),
666 cq_(nullptr),
667 req_(req),
668 response_(),
669 next_state_(State::INVALID),
670 callback_(on_done),
671 next_issue_(std::move(next_issue)),
672 prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromServerImpl()673 ~ClientRpcContextStreamingFromServerImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)674 void Start(CompletionQueue* cq, const ClientConfig& config) override {
675 GPR_ASSERT(!config.use_coalesce_api()); // not supported
676 StartInternal(cq);
677 }
RunNextState(bool ok,HistogramEntry * entry)678 bool RunNextState(bool ok, HistogramEntry* entry) override {
679 while (true) {
680 switch (next_state_) {
681 case State::STREAM_IDLE:
682 if (!ok) {
683 return false;
684 }
685 start_ = UsageTimer::Now();
686 next_state_ = State::READ_DONE;
687 stream_->Read(&response_, ClientRpcContext::tag(this));
688 return true;
689 case State::READ_DONE:
690 if (!ok) {
691 return false;
692 }
693 entry->set_value((UsageTimer::Now() - start_) * 1e9);
694 callback_(status_, &response_);
695 next_state_ = State::STREAM_IDLE;
696 break; // loop around
697 default:
698 GPR_ASSERT(false);
699 return false;
700 }
701 }
702 }
StartNewClone(CompletionQueue * cq)703 void StartNewClone(CompletionQueue* cq) override {
704 auto* clone = new ClientRpcContextStreamingFromServerImpl(
705 stub_, req_, next_issue_, prepare_req_, callback_);
706 clone->StartInternal(cq);
707 }
TryCancel()708 void TryCancel() override { context_.TryCancel(); }
709
710 private:
711 grpc::ClientContext context_;
712 BenchmarkService::Stub* stub_;
713 CompletionQueue* cq_;
714 std::unique_ptr<Alarm> alarm_;
715 const RequestType& req_;
716 ResponseType response_;
717 enum State { INVALID, STREAM_IDLE, READ_DONE };
718 State next_state_;
719 std::function<void(grpc::Status, ResponseType*)> callback_;
720 std::function<gpr_timespec()> next_issue_;
721 std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
722 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
723 CompletionQueue*)>
724 prepare_req_;
725 grpc::Status status_;
726 double start_;
727 std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_;
728
StartInternal(CompletionQueue * cq)729 void StartInternal(CompletionQueue* cq) {
730 // TODO(vjpai): Add support to rate-pace this
731 cq_ = cq;
732 stream_ = prepare_req_(stub_, &context_, req_, cq);
733 next_state_ = State::STREAM_IDLE;
734 stream_->StartCall(ClientRpcContext::tag(this));
735 }
736 };
737
738 class AsyncStreamingFromServerClient final
739 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
740 public:
AsyncStreamingFromServerClient(const ClientConfig & config)741 explicit AsyncStreamingFromServerClient(const ClientConfig& config)
742 : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
743 config, SetupCtx, BenchmarkStubCreator) {
744 StartThreads(num_async_threads_);
745 }
746
~AsyncStreamingFromServerClient()747 ~AsyncStreamingFromServerClient() override {}
748
749 private:
CheckDone(const grpc::Status &,SimpleResponse *)750 static void CheckDone(const grpc::Status& /*s*/,
751 SimpleResponse* /*response*/) {}
PrepareReq(BenchmarkService::Stub * stub,grpc::ClientContext * ctx,const SimpleRequest & req,CompletionQueue * cq)752 static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> PrepareReq(
753 BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
754 const SimpleRequest& req, CompletionQueue* cq) {
755 auto stream = stub->PrepareAsyncStreamingFromServer(ctx, req, cq);
756 return stream;
757 };
SetupCtx(BenchmarkService::Stub * stub,std::function<gpr_timespec ()> next_issue,const SimpleRequest & req)758 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
759 std::function<gpr_timespec()> next_issue,
760 const SimpleRequest& req) {
761 return new ClientRpcContextStreamingFromServerImpl<SimpleRequest,
762 SimpleResponse>(
763 stub, req, std::move(next_issue),
764 AsyncStreamingFromServerClient::PrepareReq,
765 AsyncStreamingFromServerClient::CheckDone);
766 }
767 };
768
769 class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
770 public:
ClientRpcContextGenericStreamingImpl(grpc::GenericStub * stub,const ByteBuffer & req,std::function<gpr_timespec ()> next_issue,std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter> (grpc::GenericStub *,grpc::ClientContext *,const std::string & method_name,CompletionQueue *)> prepare_req,std::function<void (grpc::Status,ByteBuffer *)> on_done)771 ClientRpcContextGenericStreamingImpl(
772 grpc::GenericStub* stub, const ByteBuffer& req,
773 std::function<gpr_timespec()> next_issue,
774 std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
775 grpc::GenericStub*, grpc::ClientContext*,
776 const std::string& method_name, CompletionQueue*)>
777 prepare_req,
778 std::function<void(grpc::Status, ByteBuffer*)> on_done)
779 : context_(),
780 stub_(stub),
781 cq_(nullptr),
782 req_(req),
783 response_(),
784 next_state_(State::INVALID),
785 callback_(std::move(on_done)),
786 next_issue_(std::move(next_issue)),
787 prepare_req_(std::move(prepare_req)) {}
~ClientRpcContextGenericStreamingImpl()788 ~ClientRpcContextGenericStreamingImpl() override {}
Start(CompletionQueue * cq,const ClientConfig & config)789 void Start(CompletionQueue* cq, const ClientConfig& config) override {
790 GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
791 StartInternal(cq, config.messages_per_stream());
792 }
RunNextState(bool ok,HistogramEntry * entry)793 bool RunNextState(bool ok, HistogramEntry* entry) override {
794 while (true) {
795 switch (next_state_) {
796 case State::STREAM_IDLE:
797 if (!next_issue_) { // ready to issue
798 next_state_ = State::READY_TO_WRITE;
799 } else {
800 next_state_ = State::WAIT;
801 }
802 break; // loop around, don't return
803 case State::WAIT:
804 next_state_ = State::READY_TO_WRITE;
805 alarm_.reset(new Alarm);
806 alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
807 return true;
808 case State::READY_TO_WRITE:
809 if (!ok) {
810 return false;
811 }
812 start_ = UsageTimer::Now();
813 next_state_ = State::WRITE_DONE;
814 stream_->Write(req_, ClientRpcContext::tag(this));
815 return true;
816 case State::WRITE_DONE:
817 if (!ok) {
818 return false;
819 }
820 next_state_ = State::READ_DONE;
821 stream_->Read(&response_, ClientRpcContext::tag(this));
822 return true;
823 break;
824 case State::READ_DONE:
825 entry->set_value((UsageTimer::Now() - start_) * 1e9);
826 callback_(status_, &response_);
827 if ((messages_per_stream_ != 0) &&
828 (++messages_issued_ >= messages_per_stream_)) {
829 next_state_ = State::WRITES_DONE_DONE;
830 stream_->WritesDone(ClientRpcContext::tag(this));
831 return true;
832 }
833 next_state_ = State::STREAM_IDLE;
834 break; // loop around
835 case State::WRITES_DONE_DONE:
836 next_state_ = State::FINISH_DONE;
837 stream_->Finish(&status_, ClientRpcContext::tag(this));
838 return true;
839 case State::FINISH_DONE:
840 next_state_ = State::INVALID;
841 return false;
842 break;
843 default:
844 GPR_ASSERT(false);
845 return false;
846 }
847 }
848 }
StartNewClone(CompletionQueue * cq)849 void StartNewClone(CompletionQueue* cq) override {
850 auto* clone = new ClientRpcContextGenericStreamingImpl(
851 stub_, req_, next_issue_, prepare_req_, callback_);
852 clone->StartInternal(cq, messages_per_stream_);
853 }
TryCancel()854 void TryCancel() override { context_.TryCancel(); }
855
856 private:
857 grpc::ClientContext context_;
858 grpc::GenericStub* stub_;
859 CompletionQueue* cq_;
860 std::unique_ptr<Alarm> alarm_;
861 ByteBuffer req_;
862 ByteBuffer response_;
863 enum State {
864 INVALID,
865 STREAM_IDLE,
866 WAIT,
867 READY_TO_WRITE,
868 WRITE_DONE,
869 READ_DONE,
870 WRITES_DONE_DONE,
871 FINISH_DONE
872 };
873 State next_state_;
874 std::function<void(grpc::Status, ByteBuffer*)> callback_;
875 std::function<gpr_timespec()> next_issue_;
876 std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
877 grpc::GenericStub*, grpc::ClientContext*, const std::string&,
878 CompletionQueue*)>
879 prepare_req_;
880 grpc::Status status_;
881 double start_;
882 std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;
883
884 // Allow a limit on number of messages in a stream
885 int messages_per_stream_;
886 int messages_issued_;
887
StartInternal(CompletionQueue * cq,int messages_per_stream)888 void StartInternal(CompletionQueue* cq, int messages_per_stream) {
889 cq_ = cq;
890 const std::string kMethodName(
891 "/grpc.testing.BenchmarkService/StreamingCall");
892 messages_per_stream_ = messages_per_stream;
893 messages_issued_ = 0;
894 stream_ = prepare_req_(stub_, &context_, kMethodName, cq);
895 next_state_ = State::STREAM_IDLE;
896 stream_->StartCall(ClientRpcContext::tag(this));
897 }
898 };
899
GenericStubCreator(const std::shared_ptr<Channel> & ch)900 static std::unique_ptr<grpc::GenericStub> GenericStubCreator(
901 const std::shared_ptr<Channel>& ch) {
902 return std::unique_ptr<grpc::GenericStub>(new grpc::GenericStub(ch));
903 }
904
905 class GenericAsyncStreamingClient final
906 : public AsyncClient<grpc::GenericStub, ByteBuffer> {
907 public:
GenericAsyncStreamingClient(const ClientConfig & config)908 explicit GenericAsyncStreamingClient(const ClientConfig& config)
909 : AsyncClient<grpc::GenericStub, ByteBuffer>(config, SetupCtx,
910 GenericStubCreator) {
911 StartThreads(num_async_threads_);
912 }
913
~GenericAsyncStreamingClient()914 ~GenericAsyncStreamingClient() override {}
915
916 private:
CheckDone(const grpc::Status &,ByteBuffer *)917 static void CheckDone(const grpc::Status& /*s*/, ByteBuffer* /*response*/) {}
PrepareReq(grpc::GenericStub * stub,grpc::ClientContext * ctx,const std::string & method_name,CompletionQueue * cq)918 static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> PrepareReq(
919 grpc::GenericStub* stub, grpc::ClientContext* ctx,
920 const std::string& method_name, CompletionQueue* cq) {
921 auto stream = stub->PrepareCall(ctx, method_name, cq);
922 return stream;
923 };
SetupCtx(grpc::GenericStub * stub,std::function<gpr_timespec ()> next_issue,const ByteBuffer & req)924 static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
925 std::function<gpr_timespec()> next_issue,
926 const ByteBuffer& req) {
927 return new ClientRpcContextGenericStreamingImpl(
928 stub, req, std::move(next_issue),
929 GenericAsyncStreamingClient::PrepareReq,
930 GenericAsyncStreamingClient::CheckDone);
931 }
932 };
933
CreateAsyncClient(const ClientConfig & config)934 std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config) {
935 switch (config.rpc_type()) {
936 case UNARY:
937 return std::unique_ptr<Client>(new AsyncUnaryClient(config));
938 case STREAMING:
939 return std::unique_ptr<Client>(new AsyncStreamingPingPongClient(config));
940 case STREAMING_FROM_CLIENT:
941 return std::unique_ptr<Client>(
942 new AsyncStreamingFromClientClient(config));
943 case STREAMING_FROM_SERVER:
944 return std::unique_ptr<Client>(
945 new AsyncStreamingFromServerClient(config));
946 case STREAMING_BOTH_WAYS:
947 // TODO(vjpai): Implement this
948 assert(false);
949 return nullptr;
950 default:
951 assert(false);
952 return nullptr;
953 }
954 }
CreateGenericAsyncStreamingClient(const ClientConfig & args)955 std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
956 const ClientConfig& args) {
957 return std::unique_ptr<Client>(new GenericAsyncStreamingClient(args));
958 }
959
960 } // namespace testing
961 } // namespace grpc
962