• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/grpc.h>
20 #include <grpc/support/cpu.h>
21 #include <grpcpp/alarm.h>
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24 
25 #include <list>
26 #include <memory>
27 #include <mutex>
28 #include <sstream>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33 
34 #include "absl/log/log.h"
35 #include "absl/memory/memory.h"
36 #include "src/core/util/crash.h"
37 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
38 #include "test/cpp/qps/client.h"
39 #include "test/cpp/qps/usage_timer.h"
40 
41 namespace grpc {
42 namespace testing {
43 
44 ///
45 /// Maintains context info per RPC
46 ///
47 struct CallbackClientRpcContext {
CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext48   explicit CallbackClientRpcContext(BenchmarkService::Stub* stub)
49       : alarm_(nullptr), stub_(stub) {}
50 
~CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext51   ~CallbackClientRpcContext() {}
52 
53   SimpleResponse response_;
54   ClientContext context_;
55   std::unique_ptr<Alarm> alarm_;
56   BenchmarkService::Stub* stub_;
57 };
58 
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)59 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
60     const std::shared_ptr<Channel>& ch) {
61   return BenchmarkService::NewStub(ch);
62 }
63 
64 class CallbackClient
65     : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
66  public:
CallbackClient(const ClientConfig & config)67   explicit CallbackClient(const ClientConfig& config)
68       : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
69             config, BenchmarkStubCreator) {
70     num_threads_ = NumThreads(config);
71     rpcs_done_ = 0;
72 
73     //  Don't divide the fixed load among threads as the user threads
74     //  only bootstrap the RPCs
75     SetupLoadTest(config, 1);
76     total_outstanding_rpcs_ =
77         config.client_channels() * config.outstanding_rpcs_per_channel();
78   }
79 
~CallbackClient()80   ~CallbackClient() override {}
81 
82   ///
83   /// The main thread of the benchmark will be waiting on DestroyMultithreading.
84   /// Increment the rpcs_done_ variable to signify that the Callback RPC
85   /// after thread completion is done. When the last outstanding rpc increments
86   /// the counter it should also signal the main thread's conditional variable.
87   ///
NotifyMainThreadOfThreadCompletion()88   void NotifyMainThreadOfThreadCompletion() {
89     std::lock_guard<std::mutex> l(shutdown_mu_);
90     rpcs_done_++;
91     if (rpcs_done_ == total_outstanding_rpcs_) {
92       shutdown_cv_.notify_one();
93     }
94   }
95 
NextRPCIssueTime()96   gpr_timespec NextRPCIssueTime() {
97     std::lock_guard<std::mutex> l(next_issue_time_mu_);
98     return Client::NextIssueTime(0);
99   }
100 
101  protected:
102   size_t num_threads_;
103   size_t total_outstanding_rpcs_;
104   // The below mutex and condition variable is used by main benchmark thread to
105   // wait on completion of all RPCs before shutdown
106   std::mutex shutdown_mu_;
107   std::condition_variable shutdown_cv_;
108   // Number of rpcs done after thread completion
109   size_t rpcs_done_;
110   // Vector of Context data pointers for running a RPC
111   std::vector<std::unique_ptr<CallbackClientRpcContext>> ctx_;
112 
113   virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
114   virtual bool ThreadFuncImpl(Thread* t, size_t thread_idx) = 0;
115 
ThreadFunc(size_t thread_idx,Thread * t)116   void ThreadFunc(size_t thread_idx, Thread* t) override {
117     InitThreadFuncImpl(thread_idx);
118     ThreadFuncImpl(t, thread_idx);
119   }
120 
121  private:
122   std::mutex next_issue_time_mu_;  // Used by next issue time
123 
NumThreads(const ClientConfig & config)124   int NumThreads(const ClientConfig& config) {
125     int num_threads = config.async_client_threads();
126     if (num_threads <= 0) {  // Use dynamic sizing
127       num_threads = cores_;
128       LOG(INFO) << "Sizing callback client to " << num_threads << " threads";
129     }
130     return num_threads;
131   }
132 
133   ///
134   /// Wait until all outstanding Callback RPCs are done
135   ///
DestroyMultithreading()136   void DestroyMultithreading() final {
137     std::unique_lock<std::mutex> l(shutdown_mu_);
138     while (rpcs_done_ != total_outstanding_rpcs_) {
139       shutdown_cv_.wait(l);
140     }
141     EndThreads();
142   }
143 };
144 
145 class CallbackUnaryClient final : public CallbackClient {
146  public:
CallbackUnaryClient(const ClientConfig & config)147   explicit CallbackUnaryClient(const ClientConfig& config)
148       : CallbackClient(config) {
149     for (int ch = 0; ch < config.client_channels(); ch++) {
150       for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
151         ctx_.emplace_back(
152             new CallbackClientRpcContext(channels_[ch].get_stub()));
153       }
154     }
155     StartThreads(num_threads_);
156   }
~CallbackUnaryClient()157   ~CallbackUnaryClient() override {}
158 
159  protected:
ThreadFuncImpl(Thread * t,size_t thread_idx)160   bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
161     for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
162          vector_idx += num_threads_) {
163       ScheduleRpc(t, vector_idx);
164     }
165     return true;
166   }
167 
InitThreadFuncImpl(size_t)168   void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
169 
170  private:
ScheduleRpc(Thread * t,size_t vector_idx)171   void ScheduleRpc(Thread* t, size_t vector_idx) {
172     if (!closed_loop_) {
173       gpr_timespec next_issue_time = NextRPCIssueTime();
174       // Start an alarm callback to run the internal callback after
175       // next_issue_time
176       if (ctx_[vector_idx]->alarm_ == nullptr) {
177         ctx_[vector_idx]->alarm_ = std::make_unique<Alarm>();
178       }
179       ctx_[vector_idx]->alarm_->Set(next_issue_time,
180                                     [this, t, vector_idx](bool /*ok*/) {
181                                       IssueUnaryCallbackRpc(t, vector_idx);
182                                     });
183     } else {
184       IssueUnaryCallbackRpc(t, vector_idx);
185     }
186   }
187 
IssueUnaryCallbackRpc(Thread * t,size_t vector_idx)188   void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
189     double start = UsageTimer::Now();
190     ctx_[vector_idx]->stub_->async()->UnaryCall(
191         (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
192         [this, t, start, vector_idx](grpc::Status s) {
193           // Update Histogram with data from the callback run
194           HistogramEntry entry;
195           if (s.ok()) {
196             entry.set_value((UsageTimer::Now() - start) * 1e9);
197           }
198           entry.set_status(s.error_code());
199           t->UpdateHistogram(&entry);
200 
201           if (ThreadCompleted() || !s.ok()) {
202             // Notify thread of completion
203             NotifyMainThreadOfThreadCompletion();
204           } else {
205             // Reallocate ctx for next RPC
206             ctx_[vector_idx] = std::make_unique<CallbackClientRpcContext>(
207                 ctx_[vector_idx]->stub_);
208             // Schedule a new RPC
209             ScheduleRpc(t, vector_idx);
210           }
211         });
212   }
213 };
214 
215 class CallbackStreamingClient : public CallbackClient {
216  public:
CallbackStreamingClient(const ClientConfig & config)217   explicit CallbackStreamingClient(const ClientConfig& config)
218       : CallbackClient(config),
219         messages_per_stream_(config.messages_per_stream()) {
220     for (int ch = 0; ch < config.client_channels(); ch++) {
221       for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
222         ctx_.emplace_back(
223             new CallbackClientRpcContext(channels_[ch].get_stub()));
224       }
225     }
226     StartThreads(num_threads_);
227   }
~CallbackStreamingClient()228   ~CallbackStreamingClient() override {}
229 
AddHistogramEntry(double start,bool ok,Thread * thread_ptr)230   void AddHistogramEntry(double start, bool ok, Thread* thread_ptr) {
231     // Update Histogram with data from the callback run
232     HistogramEntry entry;
233     if (ok) {
234       entry.set_value((UsageTimer::Now() - start) * 1e9);
235     }
236     thread_ptr->UpdateHistogram(&entry);
237   }
238 
messages_per_stream()239   int messages_per_stream() { return messages_per_stream_; }
240 
241  protected:
242   const int messages_per_stream_;
243 };
244 
245 class CallbackStreamingPingPongClient : public CallbackStreamingClient {
246  public:
CallbackStreamingPingPongClient(const ClientConfig & config)247   explicit CallbackStreamingPingPongClient(const ClientConfig& config)
248       : CallbackStreamingClient(config) {}
~CallbackStreamingPingPongClient()249   ~CallbackStreamingPingPongClient() override {}
250 };
251 
252 class CallbackStreamingPingPongReactor final
253     : public grpc::ClientBidiReactor<SimpleRequest, SimpleResponse> {
254  public:
CallbackStreamingPingPongReactor(CallbackStreamingPingPongClient * client,std::unique_ptr<CallbackClientRpcContext> ctx)255   CallbackStreamingPingPongReactor(
256       CallbackStreamingPingPongClient* client,
257       std::unique_ptr<CallbackClientRpcContext> ctx)
258       : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
259 
StartNewRpc()260   void StartNewRpc() {
261     ctx_->stub_->async()->StreamingCall(&(ctx_->context_), this);
262     write_time_ = UsageTimer::Now();
263     StartWrite(client_->request());
264     writes_done_started_.clear();
265     StartCall();
266   }
267 
OnWriteDone(bool ok)268   void OnWriteDone(bool ok) override {
269     if (!ok) {
270       LOG(ERROR) << "Error writing RPC";
271     }
272     if ((!ok || client_->ThreadCompleted()) &&
273         !writes_done_started_.test_and_set()) {
274       StartWritesDone();
275     }
276     StartRead(&ctx_->response_);
277   }
278 
OnReadDone(bool ok)279   void OnReadDone(bool ok) override {
280     client_->AddHistogramEntry(write_time_, ok, thread_ptr_);
281 
282     if (client_->ThreadCompleted() || !ok ||
283         (client_->messages_per_stream() != 0 &&
284          ++messages_issued_ >= client_->messages_per_stream())) {
285       if (!ok) {
286         LOG(ERROR) << "Error reading RPC";
287       }
288       if (!writes_done_started_.test_and_set()) {
289         StartWritesDone();
290       }
291       return;
292     }
293     if (!client_->IsClosedLoop()) {
294       gpr_timespec next_issue_time = client_->NextRPCIssueTime();
295       // Start an alarm callback to run the internal callback after
296       // next_issue_time
297       ctx_->alarm_->Set(next_issue_time, [this](bool /*ok*/) {
298         write_time_ = UsageTimer::Now();
299         StartWrite(client_->request());
300       });
301     } else {
302       write_time_ = UsageTimer::Now();
303       StartWrite(client_->request());
304     }
305   }
306 
OnDone(const Status & s)307   void OnDone(const Status& s) override {
308     if (client_->ThreadCompleted() || !s.ok()) {
309       client_->NotifyMainThreadOfThreadCompletion();
310       return;
311     }
312     ctx_ = std::make_unique<CallbackClientRpcContext>(ctx_->stub_);
313     ScheduleRpc();
314   }
315 
ScheduleRpc()316   void ScheduleRpc() {
317     if (!client_->IsClosedLoop()) {
318       gpr_timespec next_issue_time = client_->NextRPCIssueTime();
319       // Start an alarm callback to run the internal callback after
320       // next_issue_time
321       if (ctx_->alarm_ == nullptr) {
322         ctx_->alarm_ = std::make_unique<Alarm>();
323       }
324       ctx_->alarm_->Set(next_issue_time,
325                         [this](bool /*ok*/) { StartNewRpc(); });
326     } else {
327       StartNewRpc();
328     }
329   }
330 
set_thread_ptr(Client::Thread * ptr)331   void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
332 
333   CallbackStreamingPingPongClient* client_;
334   std::unique_ptr<CallbackClientRpcContext> ctx_;
335   std::atomic_flag writes_done_started_;
336   Client::Thread* thread_ptr_;  // Needed to update histogram entries
337   double write_time_;           // Track ping-pong round start time
338   int messages_issued_;         // Messages issued by this stream
339 };
340 
341 class CallbackStreamingPingPongClientImpl final
342     : public CallbackStreamingPingPongClient {
343  public:
CallbackStreamingPingPongClientImpl(const ClientConfig & config)344   explicit CallbackStreamingPingPongClientImpl(const ClientConfig& config)
345       : CallbackStreamingPingPongClient(config) {
346     for (size_t i = 0; i < total_outstanding_rpcs_; i++) {
347       reactor_.emplace_back(
348           new CallbackStreamingPingPongReactor(this, std::move(ctx_[i])));
349     }
350   }
~CallbackStreamingPingPongClientImpl()351   ~CallbackStreamingPingPongClientImpl() override {}
352 
ThreadFuncImpl(Client::Thread * t,size_t thread_idx)353   bool ThreadFuncImpl(Client::Thread* t, size_t thread_idx) override {
354     for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
355          vector_idx += num_threads_) {
356       reactor_[vector_idx]->set_thread_ptr(t);
357       reactor_[vector_idx]->ScheduleRpc();
358     }
359     return true;
360   }
361 
InitThreadFuncImpl(size_t)362   void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
363 
364  private:
365   std::vector<std::unique_ptr<CallbackStreamingPingPongReactor>> reactor_;
366 };
367 
368 // TODO(mhaidry) : Implement Streaming from client, server and both ways
369 
CreateCallbackClient(const ClientConfig & config)370 std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) {
371   switch (config.rpc_type()) {
372     case UNARY:
373       return std::unique_ptr<Client>(new CallbackUnaryClient(config));
374     case STREAMING:
375       return std::unique_ptr<Client>(
376           new CallbackStreamingPingPongClientImpl(config));
377     case STREAMING_FROM_CLIENT:
378     case STREAMING_FROM_SERVER:
379     case STREAMING_BOTH_WAYS:
380       grpc_core::Crash(
381           "STREAMING_FROM_* scenarios are not supported by the callback "
382           "API");
383     default:
384       grpc_core::Crash(absl::StrCat("Unknown RPC type: ", config.rpc_type()));
385   }
386 }
387 
388 }  // namespace testing
389 }  // namespace grpc
390