1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <chrono>
20 #include <memory>
21 #include <mutex>
22 #include <sstream>
23 #include <string>
24 #include <thread>
25 #include <vector>
26
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35
36 #include "src/core/lib/gpr/host_port.h"
37 #include "src/core/lib/profiling/timers.h"
38 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
39 #include "test/cpp/qps/client.h"
40 #include "test/cpp/qps/interarrival.h"
41 #include "test/cpp/qps/usage_timer.h"
42
43 namespace grpc {
44 namespace testing {
45
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)46 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
47 const std::shared_ptr<Channel>& ch) {
48 return BenchmarkService::NewStub(ch);
49 }
50
51 class SynchronousClient
52 : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
53 public:
SynchronousClient(const ClientConfig & config)54 SynchronousClient(const ClientConfig& config)
55 : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
56 config, BenchmarkStubCreator) {
57 num_threads_ =
58 config.outstanding_rpcs_per_channel() * config.client_channels();
59 responses_.resize(num_threads_);
60 SetupLoadTest(config, num_threads_);
61 }
62
~SynchronousClient()63 virtual ~SynchronousClient() {}
64
65 virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
66 virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
67
ThreadFunc(size_t thread_idx,Thread * t)68 void ThreadFunc(size_t thread_idx, Thread* t) override {
69 if (!InitThreadFuncImpl(thread_idx)) {
70 return;
71 }
72 for (;;) {
73 // run the loop body
74 HistogramEntry entry;
75 const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
76 t->UpdateHistogram(&entry);
77 if (!thread_still_ok || ThreadCompleted()) {
78 return;
79 }
80 }
81 }
82
83 protected:
84 // WaitToIssue returns false if we realize that we need to break out
WaitToIssue(int thread_idx)85 bool WaitToIssue(int thread_idx) {
86 if (!closed_loop_) {
87 const gpr_timespec next_issue_time = NextIssueTime(thread_idx);
88 // Avoid sleeping for too long continuously because we might
89 // need to terminate before then. This is an issue since
90 // exponential distribution can occasionally produce bad outliers
91 while (true) {
92 const gpr_timespec one_sec_delay =
93 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
94 gpr_time_from_seconds(1, GPR_TIMESPAN));
95 if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) {
96 gpr_sleep_until(next_issue_time);
97 return true;
98 } else {
99 gpr_sleep_until(one_sec_delay);
100 if (gpr_atm_acq_load(&thread_pool_done_) != static_cast<gpr_atm>(0)) {
101 return false;
102 }
103 }
104 }
105 }
106 return true;
107 }
108
109 size_t num_threads_;
110 std::vector<SimpleResponse> responses_;
111 };
112
113 class SynchronousUnaryClient final : public SynchronousClient {
114 public:
SynchronousUnaryClient(const ClientConfig & config)115 SynchronousUnaryClient(const ClientConfig& config)
116 : SynchronousClient(config) {
117 StartThreads(num_threads_);
118 }
~SynchronousUnaryClient()119 ~SynchronousUnaryClient() {}
120
InitThreadFuncImpl(size_t thread_idx)121 bool InitThreadFuncImpl(size_t thread_idx) override { return true; }
122
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)123 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
124 if (!WaitToIssue(thread_idx)) {
125 return true;
126 }
127 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
128 double start = UsageTimer::Now();
129 GPR_TIMER_SCOPE("SynchronousUnaryClient::ThreadFunc", 0);
130 grpc::ClientContext context;
131 grpc::Status s =
132 stub->UnaryCall(&context, request_, &responses_[thread_idx]);
133 if (s.ok()) {
134 entry->set_value((UsageTimer::Now() - start) * 1e9);
135 }
136 entry->set_status(s.error_code());
137 return true;
138 }
139
140 private:
DestroyMultithreading()141 void DestroyMultithreading() override final { EndThreads(); }
142 };
143
144 template <class StreamType>
145 class SynchronousStreamingClient : public SynchronousClient {
146 public:
SynchronousStreamingClient(const ClientConfig & config)147 SynchronousStreamingClient(const ClientConfig& config)
148 : SynchronousClient(config),
149 context_(num_threads_),
150 stream_(num_threads_),
151 stream_mu_(num_threads_),
152 shutdown_(num_threads_),
153 messages_per_stream_(config.messages_per_stream()),
154 messages_issued_(num_threads_) {
155 StartThreads(num_threads_);
156 }
~SynchronousStreamingClient()157 virtual ~SynchronousStreamingClient() {
158 CleanupAllStreams([this](size_t thread_idx) {
159 // Don't log any kind of error since we may have canceled this
160 stream_[thread_idx]->Finish().IgnoreError();
161 });
162 }
163
164 protected:
165 std::vector<grpc::ClientContext> context_;
166 std::vector<std::unique_ptr<StreamType>> stream_;
167 // stream_mu_ is only needed when changing an element of stream_ or context_
168 std::vector<std::mutex> stream_mu_;
169 // use struct Bool rather than bool because vector<bool> is not concurrent
170 struct Bool {
171 bool val;
Boolgrpc::testing::SynchronousStreamingClient::Bool172 Bool() : val(false) {}
173 };
174 std::vector<Bool> shutdown_;
175 const int messages_per_stream_;
176 std::vector<int> messages_issued_;
177
FinishStream(HistogramEntry * entry,size_t thread_idx)178 void FinishStream(HistogramEntry* entry, size_t thread_idx) {
179 Status s = stream_[thread_idx]->Finish();
180 // don't set the value since the stream is failed and shouldn't be timed
181 entry->set_status(s.error_code());
182 if (!s.ok()) {
183 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
184 if (!shutdown_[thread_idx].val) {
185 gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
186 thread_idx, s.error_message().c_str());
187 }
188 }
189 // Lock the stream_mu_ now because the client context could change
190 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
191 context_[thread_idx].~ClientContext();
192 new (&context_[thread_idx]) ClientContext();
193 }
194
CleanupAllStreams(const std::function<void (size_t)> & cleaner)195 void CleanupAllStreams(const std::function<void(size_t)>& cleaner) {
196 std::vector<std::thread> cleanup_threads;
197 for (size_t i = 0; i < num_threads_; i++) {
198 cleanup_threads.emplace_back([this, i, cleaner] {
199 std::lock_guard<std::mutex> l(stream_mu_[i]);
200 shutdown_[i].val = true;
201 if (stream_[i]) {
202 cleaner(i);
203 }
204 });
205 }
206 for (auto& th : cleanup_threads) {
207 th.join();
208 }
209 }
210
211 private:
DestroyMultithreading()212 void DestroyMultithreading() override final {
213 CleanupAllStreams(
214 [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
215 EndThreads();
216 }
217 };
218
219 class SynchronousStreamingPingPongClient final
220 : public SynchronousStreamingClient<
221 grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
222 public:
SynchronousStreamingPingPongClient(const ClientConfig & config)223 SynchronousStreamingPingPongClient(const ClientConfig& config)
224 : SynchronousStreamingClient(config) {}
~SynchronousStreamingPingPongClient()225 ~SynchronousStreamingPingPongClient() {
226 CleanupAllStreams(
227 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
228 }
229
230 private:
InitThreadFuncImpl(size_t thread_idx)231 bool InitThreadFuncImpl(size_t thread_idx) override {
232 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
233 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
234 if (!shutdown_[thread_idx].val) {
235 stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
236 } else {
237 return false;
238 }
239 messages_issued_[thread_idx] = 0;
240 return true;
241 }
242
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)243 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
244 if (!WaitToIssue(thread_idx)) {
245 return true;
246 }
247 GPR_TIMER_SCOPE("SynchronousStreamingPingPongClient::ThreadFunc", 0);
248 double start = UsageTimer::Now();
249 if (stream_[thread_idx]->Write(request_) &&
250 stream_[thread_idx]->Read(&responses_[thread_idx])) {
251 entry->set_value((UsageTimer::Now() - start) * 1e9);
252 // don't set the status since there isn't one yet
253 if ((messages_per_stream_ != 0) &&
254 (++messages_issued_[thread_idx] < messages_per_stream_)) {
255 return true;
256 } else if (messages_per_stream_ == 0) {
257 return true;
258 } else {
259 // Fall through to the below resetting code after finish
260 }
261 }
262 stream_[thread_idx]->WritesDone();
263 FinishStream(entry, thread_idx);
264 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
265 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
266 if (!shutdown_[thread_idx].val) {
267 stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
268 } else {
269 stream_[thread_idx].reset();
270 return false;
271 }
272 messages_issued_[thread_idx] = 0;
273 return true;
274 }
275 };
276
277 class SynchronousStreamingFromClientClient final
278 : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
279 public:
SynchronousStreamingFromClientClient(const ClientConfig & config)280 SynchronousStreamingFromClientClient(const ClientConfig& config)
281 : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient()282 ~SynchronousStreamingFromClientClient() {
283 CleanupAllStreams(
284 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
285 }
286
287 private:
288 std::vector<double> last_issue_;
289
InitThreadFuncImpl(size_t thread_idx)290 bool InitThreadFuncImpl(size_t thread_idx) override {
291 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
292 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
293 if (!shutdown_[thread_idx].val) {
294 stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
295 &responses_[thread_idx]);
296 } else {
297 return false;
298 }
299 last_issue_[thread_idx] = UsageTimer::Now();
300 return true;
301 }
302
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)303 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
304 // Figure out how to make histogram sensible if this is rate-paced
305 if (!WaitToIssue(thread_idx)) {
306 return true;
307 }
308 GPR_TIMER_SCOPE("SynchronousStreamingFromClientClient::ThreadFunc", 0);
309 if (stream_[thread_idx]->Write(request_)) {
310 double now = UsageTimer::Now();
311 entry->set_value((now - last_issue_[thread_idx]) * 1e9);
312 last_issue_[thread_idx] = now;
313 return true;
314 }
315 stream_[thread_idx]->WritesDone();
316 FinishStream(entry, thread_idx);
317 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
318 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
319 if (!shutdown_[thread_idx].val) {
320 stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
321 &responses_[thread_idx]);
322 } else {
323 stream_[thread_idx].reset();
324 return false;
325 }
326 return true;
327 }
328 };
329
330 class SynchronousStreamingFromServerClient final
331 : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
332 public:
SynchronousStreamingFromServerClient(const ClientConfig & config)333 SynchronousStreamingFromServerClient(const ClientConfig& config)
334 : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
~SynchronousStreamingFromServerClient()335 ~SynchronousStreamingFromServerClient() {}
336
337 private:
338 std::vector<double> last_recv_;
339
InitThreadFuncImpl(size_t thread_idx)340 bool InitThreadFuncImpl(size_t thread_idx) override {
341 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
342 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
343 if (!shutdown_[thread_idx].val) {
344 stream_[thread_idx] =
345 stub->StreamingFromServer(&context_[thread_idx], request_);
346 } else {
347 return false;
348 }
349 last_recv_[thread_idx] = UsageTimer::Now();
350 return true;
351 }
352
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)353 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
354 GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
355 if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
356 double now = UsageTimer::Now();
357 entry->set_value((now - last_recv_[thread_idx]) * 1e9);
358 last_recv_[thread_idx] = now;
359 return true;
360 }
361 FinishStream(entry, thread_idx);
362 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
363 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
364 if (!shutdown_[thread_idx].val) {
365 stream_[thread_idx] =
366 stub->StreamingFromServer(&context_[thread_idx], request_);
367 } else {
368 stream_[thread_idx].reset();
369 return false;
370 }
371 return true;
372 }
373 };
374
375 class SynchronousStreamingBothWaysClient final
376 : public SynchronousStreamingClient<
377 grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
378 public:
SynchronousStreamingBothWaysClient(const ClientConfig & config)379 SynchronousStreamingBothWaysClient(const ClientConfig& config)
380 : SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient()381 ~SynchronousStreamingBothWaysClient() {
382 CleanupAllStreams(
383 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
384 }
385
386 private:
InitThreadFuncImpl(size_t thread_idx)387 bool InitThreadFuncImpl(size_t thread_idx) override {
388 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
389 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
390 if (!shutdown_[thread_idx].val) {
391 stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
392 } else {
393 return false;
394 }
395 return true;
396 }
397
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)398 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
399 // TODO (vjpai): Do this
400 return true;
401 }
402 };
403
CreateSynchronousClient(const ClientConfig & config)404 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
405 GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
406 switch (config.rpc_type()) {
407 case UNARY:
408 return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
409 case STREAMING:
410 return std::unique_ptr<Client>(
411 new SynchronousStreamingPingPongClient(config));
412 case STREAMING_FROM_CLIENT:
413 return std::unique_ptr<Client>(
414 new SynchronousStreamingFromClientClient(config));
415 case STREAMING_FROM_SERVER:
416 return std::unique_ptr<Client>(
417 new SynchronousStreamingFromServerClient(config));
418 case STREAMING_BOTH_WAYS:
419 return std::unique_ptr<Client>(
420 new SynchronousStreamingBothWaysClient(config));
421 default:
422 assert(false);
423 return nullptr;
424 }
425 }
426
427 } // namespace testing
428 } // namespace grpc
429