1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <algorithm>
20 #include <forward_list>
21 #include <functional>
22 #include <memory>
23 #include <mutex>
24 #include <thread>
25
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpcpp/generic/async_generic_service.h>
30 #include <grpcpp/resource_quota.h>
31 #include <grpcpp/security/server_credentials.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
35 #include <grpcpp/support/config.h>
36
37 #include "src/core/lib/gpr/host_port.h"
38 #include "src/core/lib/surface/completion_queue.h"
39 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
40 #include "test/core/util/test_config.h"
41 #include "test/cpp/qps/qps_server_builder.h"
42 #include "test/cpp/qps/server.h"
43
44 namespace grpc {
45 namespace testing {
46
47 template <class RequestType, class ResponseType, class ServiceType,
48 class ServerContextType>
49 class AsyncQpsServerTest final : public grpc::testing::Server {
50 public:
AsyncQpsServerTest(const ServerConfig & config,std::function<void (ServerBuilder *,ServiceType *)> register_service,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncResponseWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_unary_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReader<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_client_function,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_server_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_both_ways_function,std::function<grpc::Status (const PayloadConfig &,RequestType *,ResponseType *)> process_rpc)51 AsyncQpsServerTest(
52 const ServerConfig& config,
53 std::function<void(ServerBuilder*, ServiceType*)> register_service,
54 std::function<void(ServiceType*, ServerContextType*, RequestType*,
55 ServerAsyncResponseWriter<ResponseType>*,
56 CompletionQueue*, ServerCompletionQueue*, void*)>
57 request_unary_function,
58 std::function<void(ServiceType*, ServerContextType*,
59 ServerAsyncReaderWriter<ResponseType, RequestType>*,
60 CompletionQueue*, ServerCompletionQueue*, void*)>
61 request_streaming_function,
62 std::function<void(ServiceType*, ServerContextType*,
63 ServerAsyncReader<ResponseType, RequestType>*,
64 CompletionQueue*, ServerCompletionQueue*, void*)>
65 request_streaming_from_client_function,
66 std::function<void(ServiceType*, ServerContextType*, RequestType*,
67 ServerAsyncWriter<ResponseType>*, CompletionQueue*,
68 ServerCompletionQueue*, void*)>
69 request_streaming_from_server_function,
70 std::function<void(ServiceType*, ServerContextType*,
71 ServerAsyncReaderWriter<ResponseType, RequestType>*,
72 CompletionQueue*, ServerCompletionQueue*, void*)>
73 request_streaming_both_ways_function,
74 std::function<grpc::Status(const PayloadConfig&, RequestType*,
75 ResponseType*)>
76 process_rpc)
77 : Server(config) {
78 std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
79
80 auto port_num = port();
81 // Negative port number means inproc server, so no listen port needed
82 if (port_num >= 0) {
83 char* server_address = nullptr;
84 gpr_join_host_port(&server_address, "::", port_num);
85 builder->AddListeningPort(server_address,
86 Server::CreateServerCredentials(config));
87 gpr_free(server_address);
88 }
89
90 register_service(builder.get(), &async_service_);
91
92 int num_threads = config.async_server_threads();
93 if (num_threads <= 0) { // dynamic sizing
94 num_threads = cores();
95 gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads);
96 }
97
98 int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
99 int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator
100 for (int i = 0; i < num_cqs; i++) {
101 srv_cqs_.emplace_back(builder->AddCompletionQueue());
102 }
103 for (int i = 0; i < num_threads; i++) {
104 cq_.emplace_back(i % srv_cqs_.size());
105 }
106
107 ApplyConfigToBuilder(config, builder.get());
108
109 server_ = builder->BuildAndStart();
110
111 auto process_rpc_bound =
112 std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
113 std::placeholders::_2);
114
115 for (int i = 0; i < 5000; i++) {
116 for (int j = 0; j < num_cqs; j++) {
117 if (request_unary_function) {
118 auto request_unary = std::bind(
119 request_unary_function, &async_service_, std::placeholders::_1,
120 std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(),
121 srv_cqs_[j].get(), std::placeholders::_4);
122 contexts_.emplace_back(
123 new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound));
124 }
125 if (request_streaming_function) {
126 auto request_streaming = std::bind(
127 request_streaming_function, &async_service_,
128 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
129 srv_cqs_[j].get(), std::placeholders::_3);
130 contexts_.emplace_back(new ServerRpcContextStreamingImpl(
131 request_streaming, process_rpc_bound));
132 }
133 if (request_streaming_from_client_function) {
134 auto request_streaming_from_client = std::bind(
135 request_streaming_from_client_function, &async_service_,
136 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
137 srv_cqs_[j].get(), std::placeholders::_3);
138 contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl(
139 request_streaming_from_client, process_rpc_bound));
140 }
141 if (request_streaming_from_server_function) {
142 auto request_streaming_from_server =
143 std::bind(request_streaming_from_server_function, &async_service_,
144 std::placeholders::_1, std::placeholders::_2,
145 std::placeholders::_3, srv_cqs_[j].get(),
146 srv_cqs_[j].get(), std::placeholders::_4);
147 contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl(
148 request_streaming_from_server, process_rpc_bound));
149 }
150 if (request_streaming_both_ways_function) {
151 // TODO(vjpai): Add this code
152 }
153 }
154 }
155
156 for (int i = 0; i < num_threads; i++) {
157 shutdown_state_.emplace_back(new PerThreadShutdownState());
158 threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
159 }
160 }
~AsyncQpsServerTest()161 ~AsyncQpsServerTest() {
162 for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
163 std::lock_guard<std::mutex> lock((*ss)->mutex);
164 (*ss)->shutdown = true;
165 }
166 std::thread shutdown_thread(&AsyncQpsServerTest::ShutdownThreadFunc, this);
167 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
168 (*cq)->Shutdown();
169 }
170 for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
171 thr->join();
172 }
173 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
174 bool ok;
175 void* got_tag;
176 while ((*cq)->Next(&got_tag, &ok))
177 ;
178 }
179 shutdown_thread.join();
180 }
181
GetPollCount()182 int GetPollCount() override {
183 int count = 0;
184 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
185 count += grpc_get_cq_poll_num((*cq)->cq());
186 }
187 return count;
188 }
189
InProcessChannel(const ChannelArguments & args)190 std::shared_ptr<Channel> InProcessChannel(
191 const ChannelArguments& args) override {
192 return server_->InProcessChannel(args);
193 }
194
195 private:
ShutdownThreadFunc()196 void ShutdownThreadFunc() {
197 // TODO (vpai): Remove this deadline and allow Shutdown to finish properly
198 auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3);
199 server_->Shutdown(deadline);
200 }
201
ThreadFunc(int thread_idx)202 void ThreadFunc(int thread_idx) {
203 // Wait until work is available or we are shutting down
204 bool ok;
205 void* got_tag;
206 if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
207 return;
208 }
209 ServerRpcContext* ctx;
210 std::mutex* mu_ptr = &shutdown_state_[thread_idx]->mutex;
211 do {
212 ctx = detag(got_tag);
213 // The tag is a pointer to an RPC context to invoke
214 // Proceed while holding a lock to make sure that
215 // this thread isn't supposed to shut down
216 mu_ptr->lock();
217 if (shutdown_state_[thread_idx]->shutdown) {
218 mu_ptr->unlock();
219 return;
220 }
221 } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
222 [&, ctx, ok, mu_ptr]() {
223 ctx->lock();
224 if (!ctx->RunNextState(ok)) {
225 ctx->Reset();
226 }
227 ctx->unlock();
228 mu_ptr->unlock();
229 },
230 &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
231 }
232
233 class ServerRpcContext {
234 public:
ServerRpcContext()235 ServerRpcContext() {}
lock()236 void lock() { mu_.lock(); }
unlock()237 void unlock() { mu_.unlock(); }
~ServerRpcContext()238 virtual ~ServerRpcContext(){};
239 virtual bool RunNextState(bool) = 0; // next state, return false if done
240 virtual void Reset() = 0; // start this back at a clean state
241 private:
242 std::mutex mu_;
243 };
tag(ServerRpcContext * func)244 static void* tag(ServerRpcContext* func) { return static_cast<void*>(func); }
detag(void * tag)245 static ServerRpcContext* detag(void* tag) {
246 return static_cast<ServerRpcContext*>(tag);
247 }
248
249 class ServerRpcContextUnaryImpl final : public ServerRpcContext {
250 public:
ServerRpcContextUnaryImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncResponseWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)251 ServerRpcContextUnaryImpl(
252 std::function<void(ServerContextType*, RequestType*,
253 grpc::ServerAsyncResponseWriter<ResponseType>*,
254 void*)>
255 request_method,
256 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
257 : srv_ctx_(new ServerContextType),
258 next_state_(&ServerRpcContextUnaryImpl::invoker),
259 request_method_(request_method),
260 invoke_method_(invoke_method),
261 response_writer_(srv_ctx_.get()) {
262 request_method_(srv_ctx_.get(), &req_, &response_writer_,
263 AsyncQpsServerTest::tag(this));
264 }
~ServerRpcContextUnaryImpl()265 ~ServerRpcContextUnaryImpl() override {}
RunNextState(bool ok)266 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()267 void Reset() override {
268 srv_ctx_.reset(new ServerContextType);
269 req_ = RequestType();
270 response_writer_ =
271 grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get());
272
273 // Then request the method
274 next_state_ = &ServerRpcContextUnaryImpl::invoker;
275 request_method_(srv_ctx_.get(), &req_, &response_writer_,
276 AsyncQpsServerTest::tag(this));
277 }
278
279 private:
finisher(bool)280 bool finisher(bool) { return false; }
invoker(bool ok)281 bool invoker(bool ok) {
282 if (!ok) {
283 return false;
284 }
285
286 // Call the RPC processing function
287 grpc::Status status = invoke_method_(&req_, &response_);
288
289 // Have the response writer work and invoke on_finish when done
290 next_state_ = &ServerRpcContextUnaryImpl::finisher;
291 response_writer_.Finish(response_, status, AsyncQpsServerTest::tag(this));
292 return true;
293 }
294 std::unique_ptr<ServerContextType> srv_ctx_;
295 RequestType req_;
296 ResponseType response_;
297 bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
298 std::function<void(ServerContextType*, RequestType*,
299 grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
300 request_method_;
301 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
302 grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
303 };
304
305 class ServerRpcContextStreamingImpl final : public ServerRpcContext {
306 public:
ServerRpcContextStreamingImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReaderWriter<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)307 ServerRpcContextStreamingImpl(
308 std::function<void(
309 ServerContextType*,
310 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
311 request_method,
312 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
313 : srv_ctx_(new ServerContextType),
314 next_state_(&ServerRpcContextStreamingImpl::request_done),
315 request_method_(request_method),
316 invoke_method_(invoke_method),
317 stream_(srv_ctx_.get()) {
318 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
319 }
~ServerRpcContextStreamingImpl()320 ~ServerRpcContextStreamingImpl() override {}
RunNextState(bool ok)321 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()322 void Reset() override {
323 srv_ctx_.reset(new ServerContextType);
324 req_ = RequestType();
325 stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(
326 srv_ctx_.get());
327
328 // Then request the method
329 next_state_ = &ServerRpcContextStreamingImpl::request_done;
330 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
331 }
332
333 private:
request_done(bool ok)334 bool request_done(bool ok) {
335 if (!ok) {
336 return false;
337 }
338 next_state_ = &ServerRpcContextStreamingImpl::read_done;
339 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
340 return true;
341 }
342
read_done(bool ok)343 bool read_done(bool ok) {
344 if (ok) {
345 // invoke the method
346 // Call the RPC processing function
347 grpc::Status status = invoke_method_(&req_, &response_);
348 // initiate the write
349 next_state_ = &ServerRpcContextStreamingImpl::write_done;
350 stream_.Write(response_, AsyncQpsServerTest::tag(this));
351 } else { // client has sent writes done
352 // finish the stream
353 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
354 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
355 }
356 return true;
357 }
write_done(bool ok)358 bool write_done(bool ok) {
359 // now go back and get another streaming read!
360 if (ok) {
361 next_state_ = &ServerRpcContextStreamingImpl::read_done;
362 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
363 } else {
364 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
365 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
366 }
367 return true;
368 }
finish_done(bool ok)369 bool finish_done(bool ok) { return false; /* reset the context */ }
370
371 std::unique_ptr<ServerContextType> srv_ctx_;
372 RequestType req_;
373 ResponseType response_;
374 bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
375 std::function<void(
376 ServerContextType*,
377 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
378 request_method_;
379 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
380 grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
381 };
382
383 class ServerRpcContextStreamingFromClientImpl final
384 : public ServerRpcContext {
385 public:
ServerRpcContextStreamingFromClientImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReader<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)386 ServerRpcContextStreamingFromClientImpl(
387 std::function<void(ServerContextType*,
388 grpc::ServerAsyncReader<ResponseType, RequestType>*,
389 void*)>
390 request_method,
391 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
392 : srv_ctx_(new ServerContextType),
393 next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
394 request_method_(request_method),
395 invoke_method_(invoke_method),
396 stream_(srv_ctx_.get()) {
397 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
398 }
~ServerRpcContextStreamingFromClientImpl()399 ~ServerRpcContextStreamingFromClientImpl() override {}
RunNextState(bool ok)400 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()401 void Reset() override {
402 srv_ctx_.reset(new ServerContextType);
403 req_ = RequestType();
404 stream_ =
405 grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get());
406
407 // Then request the method
408 next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done;
409 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
410 }
411
412 private:
request_done(bool ok)413 bool request_done(bool ok) {
414 if (!ok) {
415 return false;
416 }
417 next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done;
418 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
419 return true;
420 }
421
read_done(bool ok)422 bool read_done(bool ok) {
423 if (ok) {
424 // In this case, just do another read
425 // next_state_ is unchanged
426 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
427 return true;
428 } else { // client has sent writes done
429 // invoke the method
430 // Call the RPC processing function
431 grpc::Status status = invoke_method_(&req_, &response_);
432 // finish the stream
433 next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done;
434 stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this));
435 }
436 return true;
437 }
finish_done(bool ok)438 bool finish_done(bool ok) { return false; /* reset the context */ }
439
440 std::unique_ptr<ServerContextType> srv_ctx_;
441 RequestType req_;
442 ResponseType response_;
443 bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool);
444 std::function<void(ServerContextType*,
445 grpc::ServerAsyncReader<ResponseType, RequestType>*,
446 void*)>
447 request_method_;
448 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
449 grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
450 };
451
452 class ServerRpcContextStreamingFromServerImpl final
453 : public ServerRpcContext {
454 public:
ServerRpcContextStreamingFromServerImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)455 ServerRpcContextStreamingFromServerImpl(
456 std::function<void(ServerContextType*, RequestType*,
457 grpc::ServerAsyncWriter<ResponseType>*, void*)>
458 request_method,
459 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
460 : srv_ctx_(new ServerContextType),
461 next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
462 request_method_(request_method),
463 invoke_method_(invoke_method),
464 stream_(srv_ctx_.get()) {
465 request_method_(srv_ctx_.get(), &req_, &stream_,
466 AsyncQpsServerTest::tag(this));
467 }
~ServerRpcContextStreamingFromServerImpl()468 ~ServerRpcContextStreamingFromServerImpl() override {}
RunNextState(bool ok)469 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()470 void Reset() override {
471 srv_ctx_.reset(new ServerContextType);
472 req_ = RequestType();
473 stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get());
474
475 // Then request the method
476 next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done;
477 request_method_(srv_ctx_.get(), &req_, &stream_,
478 AsyncQpsServerTest::tag(this));
479 }
480
481 private:
request_done(bool ok)482 bool request_done(bool ok) {
483 if (!ok) {
484 return false;
485 }
486 // invoke the method
487 // Call the RPC processing function
488 grpc::Status status = invoke_method_(&req_, &response_);
489
490 next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done;
491 stream_.Write(response_, AsyncQpsServerTest::tag(this));
492 return true;
493 }
494
write_done(bool ok)495 bool write_done(bool ok) {
496 if (ok) {
497 // Do another write!
498 // next_state_ is unchanged
499 stream_.Write(response_, AsyncQpsServerTest::tag(this));
500 } else { // must be done so let's finish
501 next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done;
502 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
503 }
504 return true;
505 }
finish_done(bool ok)506 bool finish_done(bool ok) { return false; /* reset the context */ }
507
508 std::unique_ptr<ServerContextType> srv_ctx_;
509 RequestType req_;
510 ResponseType response_;
511 bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool);
512 std::function<void(ServerContextType*, RequestType*,
513 grpc::ServerAsyncWriter<ResponseType>*, void*)>
514 request_method_;
515 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
516 grpc::ServerAsyncWriter<ResponseType> stream_;
517 };
518
519 std::vector<std::thread> threads_;
520 std::unique_ptr<grpc::Server> server_;
521 std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
522 std::vector<int> cq_;
523 ServiceType async_service_;
524 std::vector<std::unique_ptr<ServerRpcContext>> contexts_;
525
526 struct PerThreadShutdownState {
527 mutable std::mutex mutex;
528 bool shutdown;
PerThreadShutdownStategrpc::testing::AsyncQpsServerTest::PerThreadShutdownState529 PerThreadShutdownState() : shutdown(false) {}
530 };
531
532 std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
533 };
534
RegisterBenchmarkService(ServerBuilder * builder,BenchmarkService::AsyncService * service)535 static void RegisterBenchmarkService(ServerBuilder* builder,
536 BenchmarkService::AsyncService* service) {
537 builder->RegisterService(service);
538 }
RegisterGenericService(ServerBuilder * builder,grpc::AsyncGenericService * service)539 static void RegisterGenericService(ServerBuilder* builder,
540 grpc::AsyncGenericService* service) {
541 builder->RegisterAsyncGenericService(service);
542 }
543
ProcessSimpleRPC(const PayloadConfig &,SimpleRequest * request,SimpleResponse * response)544 static Status ProcessSimpleRPC(const PayloadConfig&, SimpleRequest* request,
545 SimpleResponse* response) {
546 if (request->response_size() > 0) {
547 if (!Server::SetPayload(request->response_type(), request->response_size(),
548 response->mutable_payload())) {
549 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
550 }
551 }
552 // We are done using the request. Clear it to reduce working memory.
553 // This proves to reduce cache misses in large message size cases.
554 request->Clear();
555 return Status::OK;
556 }
557
ProcessGenericRPC(const PayloadConfig & payload_config,ByteBuffer * request,ByteBuffer * response)558 static Status ProcessGenericRPC(const PayloadConfig& payload_config,
559 ByteBuffer* request, ByteBuffer* response) {
560 // We are done using the request. Clear it to reduce working memory.
561 // This proves to reduce cache misses in large message size cases.
562 request->Clear();
563 int resp_size = payload_config.bytebuf_params().resp_size();
564 std::unique_ptr<char[]> buf(new char[resp_size]);
565 Slice slice(buf.get(), resp_size);
566 *response = ByteBuffer(&slice, 1);
567 return Status::OK;
568 }
569
CreateAsyncServer(const ServerConfig & config)570 std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) {
571 return std::unique_ptr<Server>(
572 new AsyncQpsServerTest<SimpleRequest, SimpleResponse,
573 BenchmarkService::AsyncService,
574 grpc::ServerContext>(
575 config, RegisterBenchmarkService,
576 &BenchmarkService::AsyncService::RequestUnaryCall,
577 &BenchmarkService::AsyncService::RequestStreamingCall,
578 &BenchmarkService::AsyncService::RequestStreamingFromClient,
579 &BenchmarkService::AsyncService::RequestStreamingFromServer,
580 &BenchmarkService::AsyncService::RequestStreamingBothWays,
581 ProcessSimpleRPC));
582 }
CreateAsyncGenericServer(const ServerConfig & config)583 std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config) {
584 return std::unique_ptr<Server>(
585 new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,
586 grpc::GenericServerContext>(
587 config, RegisterGenericService, nullptr,
588 &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
589 ProcessGenericRPC));
590 }
591
592 } // namespace testing
593 } // namespace grpc
594