1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/grpc.h>
20 #include <grpc/support/cpu.h>
21 #include <grpcpp/alarm.h>
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24
25 #include <list>
26 #include <memory>
27 #include <mutex>
28 #include <sstream>
29 #include <string>
30 #include <thread>
31 #include <utility>
32 #include <vector>
33
34 #include "absl/log/log.h"
35 #include "absl/memory/memory.h"
36 #include "src/core/util/crash.h"
37 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
38 #include "test/cpp/qps/client.h"
39 #include "test/cpp/qps/usage_timer.h"
40
41 namespace grpc {
42 namespace testing {
43
44 ///
45 /// Maintains context info per RPC
46 ///
47 struct CallbackClientRpcContext {
CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext48 explicit CallbackClientRpcContext(BenchmarkService::Stub* stub)
49 : alarm_(nullptr), stub_(stub) {}
50
~CallbackClientRpcContextgrpc::testing::CallbackClientRpcContext51 ~CallbackClientRpcContext() {}
52
53 SimpleResponse response_;
54 ClientContext context_;
55 std::unique_ptr<Alarm> alarm_;
56 BenchmarkService::Stub* stub_;
57 };
58
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)59 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
60 const std::shared_ptr<Channel>& ch) {
61 return BenchmarkService::NewStub(ch);
62 }
63
64 class CallbackClient
65 : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
66 public:
CallbackClient(const ClientConfig & config)67 explicit CallbackClient(const ClientConfig& config)
68 : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
69 config, BenchmarkStubCreator) {
70 num_threads_ = NumThreads(config);
71 rpcs_done_ = 0;
72
73 // Don't divide the fixed load among threads as the user threads
74 // only bootstrap the RPCs
75 SetupLoadTest(config, 1);
76 total_outstanding_rpcs_ =
77 config.client_channels() * config.outstanding_rpcs_per_channel();
78 }
79
~CallbackClient()80 ~CallbackClient() override {}
81
82 ///
83 /// The main thread of the benchmark will be waiting on DestroyMultithreading.
84 /// Increment the rpcs_done_ variable to signify that the Callback RPC
85 /// after thread completion is done. When the last outstanding rpc increments
86 /// the counter it should also signal the main thread's conditional variable.
87 ///
NotifyMainThreadOfThreadCompletion()88 void NotifyMainThreadOfThreadCompletion() {
89 std::lock_guard<std::mutex> l(shutdown_mu_);
90 rpcs_done_++;
91 if (rpcs_done_ == total_outstanding_rpcs_) {
92 shutdown_cv_.notify_one();
93 }
94 }
95
NextRPCIssueTime()96 gpr_timespec NextRPCIssueTime() {
97 std::lock_guard<std::mutex> l(next_issue_time_mu_);
98 return Client::NextIssueTime(0);
99 }
100
101 protected:
102 size_t num_threads_;
103 size_t total_outstanding_rpcs_;
104 // The below mutex and condition variable is used by main benchmark thread to
105 // wait on completion of all RPCs before shutdown
106 std::mutex shutdown_mu_;
107 std::condition_variable shutdown_cv_;
108 // Number of rpcs done after thread completion
109 size_t rpcs_done_;
110 // Vector of Context data pointers for running a RPC
111 std::vector<std::unique_ptr<CallbackClientRpcContext>> ctx_;
112
113 virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
114 virtual bool ThreadFuncImpl(Thread* t, size_t thread_idx) = 0;
115
ThreadFunc(size_t thread_idx,Thread * t)116 void ThreadFunc(size_t thread_idx, Thread* t) override {
117 InitThreadFuncImpl(thread_idx);
118 ThreadFuncImpl(t, thread_idx);
119 }
120
121 private:
122 std::mutex next_issue_time_mu_; // Used by next issue time
123
NumThreads(const ClientConfig & config)124 int NumThreads(const ClientConfig& config) {
125 int num_threads = config.async_client_threads();
126 if (num_threads <= 0) { // Use dynamic sizing
127 num_threads = cores_;
128 LOG(INFO) << "Sizing callback client to " << num_threads << " threads";
129 }
130 return num_threads;
131 }
132
133 ///
134 /// Wait until all outstanding Callback RPCs are done
135 ///
DestroyMultithreading()136 void DestroyMultithreading() final {
137 std::unique_lock<std::mutex> l(shutdown_mu_);
138 while (rpcs_done_ != total_outstanding_rpcs_) {
139 shutdown_cv_.wait(l);
140 }
141 EndThreads();
142 }
143 };
144
145 class CallbackUnaryClient final : public CallbackClient {
146 public:
CallbackUnaryClient(const ClientConfig & config)147 explicit CallbackUnaryClient(const ClientConfig& config)
148 : CallbackClient(config) {
149 for (int ch = 0; ch < config.client_channels(); ch++) {
150 for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
151 ctx_.emplace_back(
152 new CallbackClientRpcContext(channels_[ch].get_stub()));
153 }
154 }
155 StartThreads(num_threads_);
156 }
~CallbackUnaryClient()157 ~CallbackUnaryClient() override {}
158
159 protected:
ThreadFuncImpl(Thread * t,size_t thread_idx)160 bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
161 for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
162 vector_idx += num_threads_) {
163 ScheduleRpc(t, vector_idx);
164 }
165 return true;
166 }
167
InitThreadFuncImpl(size_t)168 void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
169
170 private:
ScheduleRpc(Thread * t,size_t vector_idx)171 void ScheduleRpc(Thread* t, size_t vector_idx) {
172 if (!closed_loop_) {
173 gpr_timespec next_issue_time = NextRPCIssueTime();
174 // Start an alarm callback to run the internal callback after
175 // next_issue_time
176 if (ctx_[vector_idx]->alarm_ == nullptr) {
177 ctx_[vector_idx]->alarm_ = std::make_unique<Alarm>();
178 }
179 ctx_[vector_idx]->alarm_->Set(next_issue_time,
180 [this, t, vector_idx](bool /*ok*/) {
181 IssueUnaryCallbackRpc(t, vector_idx);
182 });
183 } else {
184 IssueUnaryCallbackRpc(t, vector_idx);
185 }
186 }
187
IssueUnaryCallbackRpc(Thread * t,size_t vector_idx)188 void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
189 double start = UsageTimer::Now();
190 ctx_[vector_idx]->stub_->async()->UnaryCall(
191 (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
192 [this, t, start, vector_idx](grpc::Status s) {
193 // Update Histogram with data from the callback run
194 HistogramEntry entry;
195 if (s.ok()) {
196 entry.set_value((UsageTimer::Now() - start) * 1e9);
197 }
198 entry.set_status(s.error_code());
199 t->UpdateHistogram(&entry);
200
201 if (ThreadCompleted() || !s.ok()) {
202 // Notify thread of completion
203 NotifyMainThreadOfThreadCompletion();
204 } else {
205 // Reallocate ctx for next RPC
206 ctx_[vector_idx] = std::make_unique<CallbackClientRpcContext>(
207 ctx_[vector_idx]->stub_);
208 // Schedule a new RPC
209 ScheduleRpc(t, vector_idx);
210 }
211 });
212 }
213 };
214
215 class CallbackStreamingClient : public CallbackClient {
216 public:
CallbackStreamingClient(const ClientConfig & config)217 explicit CallbackStreamingClient(const ClientConfig& config)
218 : CallbackClient(config),
219 messages_per_stream_(config.messages_per_stream()) {
220 for (int ch = 0; ch < config.client_channels(); ch++) {
221 for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
222 ctx_.emplace_back(
223 new CallbackClientRpcContext(channels_[ch].get_stub()));
224 }
225 }
226 StartThreads(num_threads_);
227 }
~CallbackStreamingClient()228 ~CallbackStreamingClient() override {}
229
AddHistogramEntry(double start,bool ok,Thread * thread_ptr)230 void AddHistogramEntry(double start, bool ok, Thread* thread_ptr) {
231 // Update Histogram with data from the callback run
232 HistogramEntry entry;
233 if (ok) {
234 entry.set_value((UsageTimer::Now() - start) * 1e9);
235 }
236 thread_ptr->UpdateHistogram(&entry);
237 }
238
messages_per_stream()239 int messages_per_stream() { return messages_per_stream_; }
240
241 protected:
242 const int messages_per_stream_;
243 };
244
245 class CallbackStreamingPingPongClient : public CallbackStreamingClient {
246 public:
CallbackStreamingPingPongClient(const ClientConfig & config)247 explicit CallbackStreamingPingPongClient(const ClientConfig& config)
248 : CallbackStreamingClient(config) {}
~CallbackStreamingPingPongClient()249 ~CallbackStreamingPingPongClient() override {}
250 };
251
252 class CallbackStreamingPingPongReactor final
253 : public grpc::ClientBidiReactor<SimpleRequest, SimpleResponse> {
254 public:
CallbackStreamingPingPongReactor(CallbackStreamingPingPongClient * client,std::unique_ptr<CallbackClientRpcContext> ctx)255 CallbackStreamingPingPongReactor(
256 CallbackStreamingPingPongClient* client,
257 std::unique_ptr<CallbackClientRpcContext> ctx)
258 : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
259
StartNewRpc()260 void StartNewRpc() {
261 ctx_->stub_->async()->StreamingCall(&(ctx_->context_), this);
262 write_time_ = UsageTimer::Now();
263 StartWrite(client_->request());
264 writes_done_started_.clear();
265 StartCall();
266 }
267
OnWriteDone(bool ok)268 void OnWriteDone(bool ok) override {
269 if (!ok) {
270 LOG(ERROR) << "Error writing RPC";
271 }
272 if ((!ok || client_->ThreadCompleted()) &&
273 !writes_done_started_.test_and_set()) {
274 StartWritesDone();
275 }
276 StartRead(&ctx_->response_);
277 }
278
OnReadDone(bool ok)279 void OnReadDone(bool ok) override {
280 client_->AddHistogramEntry(write_time_, ok, thread_ptr_);
281
282 if (client_->ThreadCompleted() || !ok ||
283 (client_->messages_per_stream() != 0 &&
284 ++messages_issued_ >= client_->messages_per_stream())) {
285 if (!ok) {
286 LOG(ERROR) << "Error reading RPC";
287 }
288 if (!writes_done_started_.test_and_set()) {
289 StartWritesDone();
290 }
291 return;
292 }
293 if (!client_->IsClosedLoop()) {
294 gpr_timespec next_issue_time = client_->NextRPCIssueTime();
295 // Start an alarm callback to run the internal callback after
296 // next_issue_time
297 ctx_->alarm_->Set(next_issue_time, [this](bool /*ok*/) {
298 write_time_ = UsageTimer::Now();
299 StartWrite(client_->request());
300 });
301 } else {
302 write_time_ = UsageTimer::Now();
303 StartWrite(client_->request());
304 }
305 }
306
OnDone(const Status & s)307 void OnDone(const Status& s) override {
308 if (client_->ThreadCompleted() || !s.ok()) {
309 client_->NotifyMainThreadOfThreadCompletion();
310 return;
311 }
312 ctx_ = std::make_unique<CallbackClientRpcContext>(ctx_->stub_);
313 ScheduleRpc();
314 }
315
ScheduleRpc()316 void ScheduleRpc() {
317 if (!client_->IsClosedLoop()) {
318 gpr_timespec next_issue_time = client_->NextRPCIssueTime();
319 // Start an alarm callback to run the internal callback after
320 // next_issue_time
321 if (ctx_->alarm_ == nullptr) {
322 ctx_->alarm_ = std::make_unique<Alarm>();
323 }
324 ctx_->alarm_->Set(next_issue_time,
325 [this](bool /*ok*/) { StartNewRpc(); });
326 } else {
327 StartNewRpc();
328 }
329 }
330
set_thread_ptr(Client::Thread * ptr)331 void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
332
333 CallbackStreamingPingPongClient* client_;
334 std::unique_ptr<CallbackClientRpcContext> ctx_;
335 std::atomic_flag writes_done_started_;
336 Client::Thread* thread_ptr_; // Needed to update histogram entries
337 double write_time_; // Track ping-pong round start time
338 int messages_issued_; // Messages issued by this stream
339 };
340
341 class CallbackStreamingPingPongClientImpl final
342 : public CallbackStreamingPingPongClient {
343 public:
CallbackStreamingPingPongClientImpl(const ClientConfig & config)344 explicit CallbackStreamingPingPongClientImpl(const ClientConfig& config)
345 : CallbackStreamingPingPongClient(config) {
346 for (size_t i = 0; i < total_outstanding_rpcs_; i++) {
347 reactor_.emplace_back(
348 new CallbackStreamingPingPongReactor(this, std::move(ctx_[i])));
349 }
350 }
~CallbackStreamingPingPongClientImpl()351 ~CallbackStreamingPingPongClientImpl() override {}
352
ThreadFuncImpl(Client::Thread * t,size_t thread_idx)353 bool ThreadFuncImpl(Client::Thread* t, size_t thread_idx) override {
354 for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
355 vector_idx += num_threads_) {
356 reactor_[vector_idx]->set_thread_ptr(t);
357 reactor_[vector_idx]->ScheduleRpc();
358 }
359 return true;
360 }
361
InitThreadFuncImpl(size_t)362 void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
363
364 private:
365 std::vector<std::unique_ptr<CallbackStreamingPingPongReactor>> reactor_;
366 };
367
368 // TODO(mhaidry) : Implement Streaming from client, server and both ways
369
CreateCallbackClient(const ClientConfig & config)370 std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) {
371 switch (config.rpc_type()) {
372 case UNARY:
373 return std::unique_ptr<Client>(new CallbackUnaryClient(config));
374 case STREAMING:
375 return std::unique_ptr<Client>(
376 new CallbackStreamingPingPongClientImpl(config));
377 case STREAMING_FROM_CLIENT:
378 case STREAMING_FROM_SERVER:
379 case STREAMING_BOTH_WAYS:
380 grpc_core::Crash(
381 "STREAMING_FROM_* scenarios are not supported by the callback "
382 "API");
383 default:
384 grpc_core::Crash(absl::StrCat("Unknown RPC type: ", config.rpc_type()));
385 }
386 }
387
388 } // namespace testing
389 } // namespace grpc
390