1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpc/support/time.h>
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/server.h>
25 #include <grpcpp/server_builder.h>
26
27 #include <chrono>
28 #include <memory>
29 #include <mutex>
30 #include <sstream>
31 #include <string>
32 #include <thread>
33 #include <vector>
34
35 #include "absl/log/check.h"
36 #include "absl/log/log.h"
37 #include "src/core/util/crash.h"
38 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
39 #include "test/cpp/qps/client.h"
40 #include "test/cpp/qps/interarrival.h"
41 #include "test/cpp/qps/usage_timer.h"
42
43 namespace grpc {
44 namespace testing {
45
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)46 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
47 const std::shared_ptr<Channel>& ch) {
48 return BenchmarkService::NewStub(ch);
49 }
50
51 class SynchronousClient
52 : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
53 public:
SynchronousClient(const ClientConfig & config)54 explicit SynchronousClient(const ClientConfig& config)
55 : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
56 config, BenchmarkStubCreator) {
57 num_threads_ =
58 config.outstanding_rpcs_per_channel() * config.client_channels();
59 responses_.resize(num_threads_);
60 SetupLoadTest(config, num_threads_);
61 }
62
~SynchronousClient()63 ~SynchronousClient() override {}
64
65 virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
66 virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
67
ThreadFunc(size_t thread_idx,Thread * t)68 void ThreadFunc(size_t thread_idx, Thread* t) override {
69 if (!InitThreadFuncImpl(thread_idx)) {
70 return;
71 }
72 for (;;) {
73 // run the loop body
74 HistogramEntry entry;
75 const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
76 t->UpdateHistogram(&entry);
77 if (!thread_still_ok || ThreadCompleted()) {
78 return;
79 }
80 }
81 }
82
83 protected:
84 // WaitToIssue returns false if we realize that we need to break out
WaitToIssue(int thread_idx)85 bool WaitToIssue(int thread_idx) {
86 if (!closed_loop_) {
87 const gpr_timespec next_issue_time = NextIssueTime(thread_idx);
88 // Avoid sleeping for too long continuously because we might
89 // need to terminate before then. This is an issue since
90 // exponential distribution can occasionally produce bad outliers
91 while (true) {
92 const gpr_timespec one_sec_delay =
93 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
94 gpr_time_from_seconds(1, GPR_TIMESPAN));
95 if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) {
96 gpr_sleep_until(next_issue_time);
97 return true;
98 } else {
99 gpr_sleep_until(one_sec_delay);
100 if (gpr_atm_acq_load(&thread_pool_done_) != gpr_atm{0}) {
101 return false;
102 }
103 }
104 }
105 }
106 return true;
107 }
108
109 size_t num_threads_;
110 std::vector<SimpleResponse> responses_;
111 };
112
113 class SynchronousUnaryClient final : public SynchronousClient {
114 public:
SynchronousUnaryClient(const ClientConfig & config)115 explicit SynchronousUnaryClient(const ClientConfig& config)
116 : SynchronousClient(config) {
117 StartThreads(num_threads_);
118 }
~SynchronousUnaryClient()119 ~SynchronousUnaryClient() override {}
120
InitThreadFuncImpl(size_t)121 bool InitThreadFuncImpl(size_t /*thread_idx*/) override { return true; }
122
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)123 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
124 if (!WaitToIssue(thread_idx)) {
125 return true;
126 }
127 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
128 double start = UsageTimer::Now();
129 grpc::ClientContext context;
130 grpc::Status s =
131 stub->UnaryCall(&context, request_, &responses_[thread_idx]);
132 if (s.ok()) {
133 entry->set_value((UsageTimer::Now() - start) * 1e9);
134 }
135 entry->set_status(s.error_code());
136 return true;
137 }
138
139 private:
DestroyMultithreading()140 void DestroyMultithreading() final { EndThreads(); }
141 };
142
143 template <class StreamType>
144 class SynchronousStreamingClient : public SynchronousClient {
145 public:
SynchronousStreamingClient(const ClientConfig & config)146 explicit SynchronousStreamingClient(const ClientConfig& config)
147 : SynchronousClient(config),
148 context_(num_threads_),
149 stream_(num_threads_),
150 stream_mu_(num_threads_),
151 shutdown_(num_threads_),
152 messages_per_stream_(config.messages_per_stream()),
153 messages_issued_(num_threads_) {
154 StartThreads(num_threads_);
155 }
~SynchronousStreamingClient()156 ~SynchronousStreamingClient() override {
157 CleanupAllStreams([this](size_t thread_idx) {
158 // Don't log any kind of error since we may have canceled this
159 stream_[thread_idx]->Finish().IgnoreError();
160 });
161 }
162
163 protected:
164 std::vector<grpc::ClientContext> context_;
165 std::vector<std::unique_ptr<StreamType>> stream_;
166 // stream_mu_ is only needed when changing an element of stream_ or context_
167 std::vector<std::mutex> stream_mu_;
168 // use struct Bool rather than bool because vector<bool> is not concurrent
169 struct Bool {
170 bool val;
Boolgrpc::testing::SynchronousStreamingClient::Bool171 Bool() : val(false) {}
172 };
173 std::vector<Bool> shutdown_;
174 const int messages_per_stream_;
175 std::vector<int> messages_issued_;
176
FinishStream(HistogramEntry * entry,size_t thread_idx)177 void FinishStream(HistogramEntry* entry, size_t thread_idx) {
178 Status s = stream_[thread_idx]->Finish();
179 // don't set the value since the stream is failed and shouldn't be timed
180 entry->set_status(s.error_code());
181 if (!s.ok()) {
182 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
183 if (!shutdown_[thread_idx].val) {
184 LOG(ERROR) << "Stream " << thread_idx << " received an error "
185 << s.error_message();
186 }
187 }
188 // Lock the stream_mu_ now because the client context could change
189 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
190 context_[thread_idx].~ClientContext();
191 new (&context_[thread_idx]) ClientContext();
192 }
193
CleanupAllStreams(const std::function<void (size_t)> & cleaner)194 void CleanupAllStreams(const std::function<void(size_t)>& cleaner) {
195 std::vector<std::thread> cleanup_threads;
196 for (size_t i = 0; i < num_threads_; i++) {
197 cleanup_threads.emplace_back([this, i, cleaner] {
198 std::lock_guard<std::mutex> l(stream_mu_[i]);
199 shutdown_[i].val = true;
200 if (stream_[i]) {
201 cleaner(i);
202 }
203 });
204 }
205 for (auto& th : cleanup_threads) {
206 th.join();
207 }
208 }
209
210 private:
DestroyMultithreading()211 void DestroyMultithreading() final {
212 CleanupAllStreams(
213 [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
214 EndThreads();
215 }
216 };
217
218 class SynchronousStreamingPingPongClient final
219 : public SynchronousStreamingClient<
220 grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
221 public:
SynchronousStreamingPingPongClient(const ClientConfig & config)222 explicit SynchronousStreamingPingPongClient(const ClientConfig& config)
223 : SynchronousStreamingClient(config) {}
~SynchronousStreamingPingPongClient()224 ~SynchronousStreamingPingPongClient() override {
225 CleanupAllStreams(
226 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
227 }
228
229 private:
InitThreadFuncImpl(size_t thread_idx)230 bool InitThreadFuncImpl(size_t thread_idx) override {
231 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
232 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
233 if (!shutdown_[thread_idx].val) {
234 stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
235 } else {
236 return false;
237 }
238 messages_issued_[thread_idx] = 0;
239 return true;
240 }
241
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)242 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
243 if (!WaitToIssue(thread_idx)) {
244 return true;
245 }
246 double start = UsageTimer::Now();
247 if (stream_[thread_idx]->Write(request_) &&
248 stream_[thread_idx]->Read(&responses_[thread_idx])) {
249 entry->set_value((UsageTimer::Now() - start) * 1e9);
250 // don't set the status since there isn't one yet
251 if ((messages_per_stream_ != 0) &&
252 (++messages_issued_[thread_idx] < messages_per_stream_)) {
253 return true;
254 } else if (messages_per_stream_ == 0) {
255 return true;
256 } else {
257 // Fall through to the below resetting code after finish
258 }
259 }
260 stream_[thread_idx]->WritesDone();
261 FinishStream(entry, thread_idx);
262 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
263 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
264 if (!shutdown_[thread_idx].val) {
265 stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
266 } else {
267 stream_[thread_idx].reset();
268 return false;
269 }
270 messages_issued_[thread_idx] = 0;
271 return true;
272 }
273 };
274
275 class SynchronousStreamingFromClientClient final
276 : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
277 public:
SynchronousStreamingFromClientClient(const ClientConfig & config)278 explicit SynchronousStreamingFromClientClient(const ClientConfig& config)
279 : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient()280 ~SynchronousStreamingFromClientClient() override {
281 CleanupAllStreams(
282 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
283 }
284
285 private:
286 std::vector<double> last_issue_;
287
InitThreadFuncImpl(size_t thread_idx)288 bool InitThreadFuncImpl(size_t thread_idx) override {
289 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
290 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
291 if (!shutdown_[thread_idx].val) {
292 stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
293 &responses_[thread_idx]);
294 } else {
295 return false;
296 }
297 last_issue_[thread_idx] = UsageTimer::Now();
298 return true;
299 }
300
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)301 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
302 // Figure out how to make histogram sensible if this is rate-paced
303 if (!WaitToIssue(thread_idx)) {
304 return true;
305 }
306 if (stream_[thread_idx]->Write(request_)) {
307 double now = UsageTimer::Now();
308 entry->set_value((now - last_issue_[thread_idx]) * 1e9);
309 last_issue_[thread_idx] = now;
310 return true;
311 }
312 stream_[thread_idx]->WritesDone();
313 FinishStream(entry, thread_idx);
314 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
315 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
316 if (!shutdown_[thread_idx].val) {
317 stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
318 &responses_[thread_idx]);
319 } else {
320 stream_[thread_idx].reset();
321 return false;
322 }
323 return true;
324 }
325 };
326
327 class SynchronousStreamingFromServerClient final
328 : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
329 public:
SynchronousStreamingFromServerClient(const ClientConfig & config)330 explicit SynchronousStreamingFromServerClient(const ClientConfig& config)
331 : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
~SynchronousStreamingFromServerClient()332 ~SynchronousStreamingFromServerClient() override {}
333
334 private:
335 std::vector<double> last_recv_;
336
InitThreadFuncImpl(size_t thread_idx)337 bool InitThreadFuncImpl(size_t thread_idx) override {
338 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
339 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
340 if (!shutdown_[thread_idx].val) {
341 stream_[thread_idx] =
342 stub->StreamingFromServer(&context_[thread_idx], request_);
343 } else {
344 return false;
345 }
346 last_recv_[thread_idx] = UsageTimer::Now();
347 return true;
348 }
349
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)350 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
351 if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
352 double now = UsageTimer::Now();
353 entry->set_value((now - last_recv_[thread_idx]) * 1e9);
354 last_recv_[thread_idx] = now;
355 return true;
356 }
357 FinishStream(entry, thread_idx);
358 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
359 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
360 if (!shutdown_[thread_idx].val) {
361 stream_[thread_idx] =
362 stub->StreamingFromServer(&context_[thread_idx], request_);
363 } else {
364 stream_[thread_idx].reset();
365 return false;
366 }
367 return true;
368 }
369 };
370
371 class SynchronousStreamingBothWaysClient final
372 : public SynchronousStreamingClient<
373 grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
374 public:
SynchronousStreamingBothWaysClient(const ClientConfig & config)375 explicit SynchronousStreamingBothWaysClient(const ClientConfig& config)
376 : SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient()377 ~SynchronousStreamingBothWaysClient() override {
378 CleanupAllStreams(
379 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
380 }
381
382 private:
InitThreadFuncImpl(size_t thread_idx)383 bool InitThreadFuncImpl(size_t thread_idx) override {
384 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
385 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
386 if (!shutdown_[thread_idx].val) {
387 stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
388 } else {
389 return false;
390 }
391 return true;
392 }
393
ThreadFuncImpl(HistogramEntry *,size_t)394 bool ThreadFuncImpl(HistogramEntry* /*entry*/,
395 size_t /*thread_idx*/) override {
396 // TODO (vjpai): Do this
397 return true;
398 }
399 };
400
CreateSynchronousClient(const ClientConfig & config)401 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
402 CHECK(!config.use_coalesce_api()); // not supported yet.
403 switch (config.rpc_type()) {
404 case UNARY:
405 return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
406 case STREAMING:
407 return std::unique_ptr<Client>(
408 new SynchronousStreamingPingPongClient(config));
409 case STREAMING_FROM_CLIENT:
410 return std::unique_ptr<Client>(
411 new SynchronousStreamingFromClientClient(config));
412 case STREAMING_FROM_SERVER:
413 return std::unique_ptr<Client>(
414 new SynchronousStreamingFromServerClient(config));
415 case STREAMING_BOTH_WAYS:
416 return std::unique_ptr<Client>(
417 new SynchronousStreamingBothWaysClient(config));
418 default:
419 assert(false);
420 return nullptr;
421 }
422 }
423
424 } // namespace testing
425 } // namespace grpc
426