1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpcpp/generic/async_generic_service.h>
22 #include <grpcpp/resource_quota.h>
23 #include <grpcpp/security/server_credentials.h>
24 #include <grpcpp/server.h>
25 #include <grpcpp/server_builder.h>
26 #include <grpcpp/server_context.h>
27 #include <grpcpp/support/config.h>
28
29 #include <algorithm>
30 #include <forward_list>
31 #include <functional>
32 #include <memory>
33 #include <mutex>
34 #include <thread>
35
36 #include "absl/log/log.h"
37 #include "src/core/lib/surface/completion_queue.h"
38 #include "src/core/util/crash.h"
39 #include "src/core/util/host_port.h"
40 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
41 #include "test/core/test_util/test_config.h"
42 #include "test/cpp/qps/qps_server_builder.h"
43 #include "test/cpp/qps/server.h"
44
45 namespace grpc {
46 namespace testing {
47
48 template <class RequestType, class ResponseType, class ServiceType,
49 class ServerContextType>
50 class AsyncQpsServerTest final : public grpc::testing::Server {
51 public:
AsyncQpsServerTest(const ServerConfig & config,std::function<void (ServerBuilder *,ServiceType *)> register_service,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncResponseWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_unary_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReader<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_client_function,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_server_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_both_ways_function,std::function<grpc::Status (const PayloadConfig &,RequestType *,ResponseType *)> process_rpc)52 AsyncQpsServerTest(
53 const ServerConfig& config,
54 std::function<void(ServerBuilder*, ServiceType*)> register_service,
55 std::function<void(ServiceType*, ServerContextType*, RequestType*,
56 ServerAsyncResponseWriter<ResponseType>*,
57 CompletionQueue*, ServerCompletionQueue*, void*)>
58 request_unary_function,
59 std::function<void(ServiceType*, ServerContextType*,
60 ServerAsyncReaderWriter<ResponseType, RequestType>*,
61 CompletionQueue*, ServerCompletionQueue*, void*)>
62 request_streaming_function,
63 std::function<void(ServiceType*, ServerContextType*,
64 ServerAsyncReader<ResponseType, RequestType>*,
65 CompletionQueue*, ServerCompletionQueue*, void*)>
66 request_streaming_from_client_function,
67 std::function<void(ServiceType*, ServerContextType*, RequestType*,
68 ServerAsyncWriter<ResponseType>*, CompletionQueue*,
69 ServerCompletionQueue*, void*)>
70 request_streaming_from_server_function,
71 std::function<void(ServiceType*, ServerContextType*,
72 ServerAsyncReaderWriter<ResponseType, RequestType>*,
73 CompletionQueue*, ServerCompletionQueue*, void*)>
74 request_streaming_both_ways_function,
75 std::function<grpc::Status(const PayloadConfig&, RequestType*,
76 ResponseType*)>
77 process_rpc)
78 : Server(config) {
79 std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
80
81 auto port_num = port();
82 // Negative port number means inproc server, so no listen port needed
83 if (port_num >= 0) {
84 std::string server_address = grpc_core::JoinHostPort("::", port_num);
85 builder->AddListeningPort(
86 server_address, Server::CreateServerCredentials(config), &port_num);
87 }
88
89 register_service(builder.get(), &async_service_);
90
91 int num_threads = config.async_server_threads();
92 if (num_threads <= 0) { // dynamic sizing
93 num_threads = std::min(64, cores());
94 LOG(INFO) << "Sizing async server to " << num_threads
95 << " threads. Defaults to number of cores in machine or 64 "
96 "threads if machine has more than 64 cores to avoid OOMs.";
97 }
98
99 int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
100 int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator
101 for (int i = 0; i < num_cqs; i++) {
102 srv_cqs_.emplace_back(builder->AddCompletionQueue());
103 }
104 for (int i = 0; i < num_threads; i++) {
105 cq_.emplace_back(i % srv_cqs_.size());
106 }
107
108 ApplyConfigToBuilder(config, builder.get());
109
110 server_ = builder->BuildAndStart();
111 if (server_ == nullptr) {
112 LOG(ERROR) << "Server: Fail to BuildAndStart(port=" << port_num << ")";
113 } else {
114 LOG(INFO) << "Server: BuildAndStart(port=" << port_num << ")";
115 }
116
117 auto process_rpc_bound =
118 std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
119 std::placeholders::_2);
120
121 for (int i = 0; i < 5000; i++) {
122 for (int j = 0; j < num_cqs; j++) {
123 if (request_unary_function) {
124 auto request_unary = std::bind(
125 request_unary_function, &async_service_, std::placeholders::_1,
126 std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(),
127 srv_cqs_[j].get(), std::placeholders::_4);
128 contexts_.emplace_back(
129 new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound));
130 }
131 if (request_streaming_function) {
132 auto request_streaming = std::bind(
133 request_streaming_function, &async_service_,
134 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
135 srv_cqs_[j].get(), std::placeholders::_3);
136 contexts_.emplace_back(new ServerRpcContextStreamingImpl(
137 request_streaming, process_rpc_bound));
138 }
139 if (request_streaming_from_client_function) {
140 auto request_streaming_from_client = std::bind(
141 request_streaming_from_client_function, &async_service_,
142 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
143 srv_cqs_[j].get(), std::placeholders::_3);
144 contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl(
145 request_streaming_from_client, process_rpc_bound));
146 }
147 if (request_streaming_from_server_function) {
148 auto request_streaming_from_server =
149 std::bind(request_streaming_from_server_function, &async_service_,
150 std::placeholders::_1, std::placeholders::_2,
151 std::placeholders::_3, srv_cqs_[j].get(),
152 srv_cqs_[j].get(), std::placeholders::_4);
153 contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl(
154 request_streaming_from_server, process_rpc_bound));
155 }
156 if (request_streaming_both_ways_function) {
157 // TODO(vjpai): Add this code
158 }
159 }
160 }
161
162 for (int i = 0; i < num_threads; i++) {
163 shutdown_state_.emplace_back(new PerThreadShutdownState());
164 threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
165 }
166 }
~AsyncQpsServerTest()167 ~AsyncQpsServerTest() override {
168 for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
169 std::lock_guard<std::mutex> lock((*ss)->mutex);
170 (*ss)->shutdown = true;
171 }
172 // TODO(vjpai): Remove the following deadline and allow full proper
173 // shutdown.
174 server_->Shutdown(std::chrono::system_clock::now() +
175 std::chrono::seconds(3));
176 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
177 (*cq)->Shutdown();
178 }
179 for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
180 thr->join();
181 }
182 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
183 bool ok;
184 void* got_tag;
185 while ((*cq)->Next(&got_tag, &ok)) {
186 }
187 }
188 }
189
GetPollCount()190 int GetPollCount() override {
191 int count = 0;
192 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
193 count += grpc_get_cq_poll_num((*cq)->cq());
194 }
195 return count;
196 }
197
InProcessChannel(const ChannelArguments & args)198 std::shared_ptr<Channel> InProcessChannel(
199 const ChannelArguments& args) override {
200 return server_->InProcessChannel(args);
201 }
202
203 private:
ThreadFunc(int thread_idx)204 void ThreadFunc(int thread_idx) {
205 // Wait until work is available or we are shutting down
206 bool ok;
207 void* got_tag;
208 if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
209 return;
210 }
211 ServerRpcContext* ctx;
212 std::mutex* mu_ptr = &shutdown_state_[thread_idx]->mutex;
213 do {
214 ctx = detag(got_tag);
215 // The tag is a pointer to an RPC context to invoke
216 // Proceed while holding a lock to make sure that
217 // this thread isn't supposed to shut down
218 mu_ptr->lock();
219 if (shutdown_state_[thread_idx]->shutdown) {
220 mu_ptr->unlock();
221 return;
222 }
223 } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
224 [&, ctx, ok, mu_ptr]() {
225 ctx->lock();
226 if (!ctx->RunNextState(ok)) {
227 ctx->Reset();
228 }
229 ctx->unlock();
230 mu_ptr->unlock();
231 },
232 &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
233 }
234
235 class ServerRpcContext {
236 public:
ServerRpcContext()237 ServerRpcContext() {}
lock()238 void lock() { mu_.lock(); }
unlock()239 void unlock() { mu_.unlock(); }
~ServerRpcContext()240 virtual ~ServerRpcContext() {};
241 virtual bool RunNextState(bool) = 0; // next state, return false if done
242 virtual void Reset() = 0; // start this back at a clean state
243 private:
244 std::mutex mu_;
245 };
tag(ServerRpcContext * func)246 static void* tag(ServerRpcContext* func) { return static_cast<void*>(func); }
detag(void * tag)247 static ServerRpcContext* detag(void* tag) {
248 return static_cast<ServerRpcContext*>(tag);
249 }
250
251 class ServerRpcContextUnaryImpl final : public ServerRpcContext {
252 public:
ServerRpcContextUnaryImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncResponseWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)253 ServerRpcContextUnaryImpl(
254 std::function<void(ServerContextType*, RequestType*,
255 grpc::ServerAsyncResponseWriter<ResponseType>*,
256 void*)>
257 request_method,
258 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
259 : srv_ctx_(new ServerContextType),
260 next_state_(&ServerRpcContextUnaryImpl::invoker),
261 request_method_(request_method),
262 invoke_method_(invoke_method),
263 response_writer_(srv_ctx_.get()) {
264 request_method_(srv_ctx_.get(), &req_, &response_writer_,
265 AsyncQpsServerTest::tag(this));
266 }
~ServerRpcContextUnaryImpl()267 ~ServerRpcContextUnaryImpl() override {}
RunNextState(bool ok)268 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()269 void Reset() override {
270 srv_ctx_.reset(new ServerContextType);
271 req_ = RequestType();
272 response_writer_ =
273 grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get());
274
275 // Then request the method
276 next_state_ = &ServerRpcContextUnaryImpl::invoker;
277 request_method_(srv_ctx_.get(), &req_, &response_writer_,
278 AsyncQpsServerTest::tag(this));
279 }
280
281 private:
finisher(bool)282 bool finisher(bool) { return false; }
invoker(bool ok)283 bool invoker(bool ok) {
284 if (!ok) {
285 return false;
286 }
287
288 // Call the RPC processing function
289 grpc::Status status = invoke_method_(&req_, &response_);
290
291 // Have the response writer work and invoke on_finish when done
292 next_state_ = &ServerRpcContextUnaryImpl::finisher;
293 response_writer_.Finish(response_, status, AsyncQpsServerTest::tag(this));
294 return true;
295 }
296 std::unique_ptr<ServerContextType> srv_ctx_;
297 RequestType req_;
298 ResponseType response_;
299 bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
300 std::function<void(ServerContextType*, RequestType*,
301 grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
302 request_method_;
303 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
304 grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
305 };
306
307 class ServerRpcContextStreamingImpl final : public ServerRpcContext {
308 public:
ServerRpcContextStreamingImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReaderWriter<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)309 ServerRpcContextStreamingImpl(
310 std::function<void(
311 ServerContextType*,
312 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
313 request_method,
314 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
315 : srv_ctx_(new ServerContextType),
316 next_state_(&ServerRpcContextStreamingImpl::request_done),
317 request_method_(request_method),
318 invoke_method_(invoke_method),
319 stream_(srv_ctx_.get()) {
320 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
321 }
~ServerRpcContextStreamingImpl()322 ~ServerRpcContextStreamingImpl() override {}
RunNextState(bool ok)323 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()324 void Reset() override {
325 srv_ctx_.reset(new ServerContextType);
326 req_ = RequestType();
327 stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(
328 srv_ctx_.get());
329
330 // Then request the method
331 next_state_ = &ServerRpcContextStreamingImpl::request_done;
332 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
333 }
334
335 private:
request_done(bool ok)336 bool request_done(bool ok) {
337 if (!ok) {
338 return false;
339 }
340 next_state_ = &ServerRpcContextStreamingImpl::read_done;
341 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
342 return true;
343 }
344
read_done(bool ok)345 bool read_done(bool ok) {
346 if (ok) {
347 // invoke the method
348 // Call the RPC processing function
349 grpc::Status status = invoke_method_(&req_, &response_);
350 // initiate the write
351 next_state_ = &ServerRpcContextStreamingImpl::write_done;
352 stream_.Write(response_, AsyncQpsServerTest::tag(this));
353 } else { // client has sent writes done
354 // finish the stream
355 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
356 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
357 }
358 return true;
359 }
write_done(bool ok)360 bool write_done(bool ok) {
361 // now go back and get another streaming read!
362 if (ok) {
363 next_state_ = &ServerRpcContextStreamingImpl::read_done;
364 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
365 } else {
366 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
367 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
368 }
369 return true;
370 }
finish_done(bool)371 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
372
373 std::unique_ptr<ServerContextType> srv_ctx_;
374 RequestType req_;
375 ResponseType response_;
376 bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
377 std::function<void(
378 ServerContextType*,
379 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
380 request_method_;
381 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
382 grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
383 };
384
385 class ServerRpcContextStreamingFromClientImpl final
386 : public ServerRpcContext {
387 public:
ServerRpcContextStreamingFromClientImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReader<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)388 ServerRpcContextStreamingFromClientImpl(
389 std::function<void(ServerContextType*,
390 grpc::ServerAsyncReader<ResponseType, RequestType>*,
391 void*)>
392 request_method,
393 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
394 : srv_ctx_(new ServerContextType),
395 next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
396 request_method_(request_method),
397 invoke_method_(invoke_method),
398 stream_(srv_ctx_.get()) {
399 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
400 }
~ServerRpcContextStreamingFromClientImpl()401 ~ServerRpcContextStreamingFromClientImpl() override {}
RunNextState(bool ok)402 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()403 void Reset() override {
404 srv_ctx_.reset(new ServerContextType);
405 req_ = RequestType();
406 stream_ =
407 grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get());
408
409 // Then request the method
410 next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done;
411 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
412 }
413
414 private:
request_done(bool ok)415 bool request_done(bool ok) {
416 if (!ok) {
417 return false;
418 }
419 next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done;
420 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
421 return true;
422 }
423
read_done(bool ok)424 bool read_done(bool ok) {
425 if (ok) {
426 // In this case, just do another read
427 // next_state_ is unchanged
428 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
429 return true;
430 } else { // client has sent writes done
431 // invoke the method
432 // Call the RPC processing function
433 grpc::Status status = invoke_method_(&req_, &response_);
434 // finish the stream
435 next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done;
436 stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this));
437 }
438 return true;
439 }
finish_done(bool)440 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
441
442 std::unique_ptr<ServerContextType> srv_ctx_;
443 RequestType req_;
444 ResponseType response_;
445 bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool);
446 std::function<void(ServerContextType*,
447 grpc::ServerAsyncReader<ResponseType, RequestType>*,
448 void*)>
449 request_method_;
450 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
451 grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
452 };
453
454 class ServerRpcContextStreamingFromServerImpl final
455 : public ServerRpcContext {
456 public:
ServerRpcContextStreamingFromServerImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)457 ServerRpcContextStreamingFromServerImpl(
458 std::function<void(ServerContextType*, RequestType*,
459 grpc::ServerAsyncWriter<ResponseType>*, void*)>
460 request_method,
461 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
462 : srv_ctx_(new ServerContextType),
463 next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
464 request_method_(request_method),
465 invoke_method_(invoke_method),
466 stream_(srv_ctx_.get()) {
467 request_method_(srv_ctx_.get(), &req_, &stream_,
468 AsyncQpsServerTest::tag(this));
469 }
~ServerRpcContextStreamingFromServerImpl()470 ~ServerRpcContextStreamingFromServerImpl() override {}
RunNextState(bool ok)471 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()472 void Reset() override {
473 srv_ctx_.reset(new ServerContextType);
474 req_ = RequestType();
475 stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get());
476
477 // Then request the method
478 next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done;
479 request_method_(srv_ctx_.get(), &req_, &stream_,
480 AsyncQpsServerTest::tag(this));
481 }
482
483 private:
request_done(bool ok)484 bool request_done(bool ok) {
485 if (!ok) {
486 return false;
487 }
488 // invoke the method
489 // Call the RPC processing function
490 grpc::Status status = invoke_method_(&req_, &response_);
491
492 next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done;
493 stream_.Write(response_, AsyncQpsServerTest::tag(this));
494 return true;
495 }
496
write_done(bool ok)497 bool write_done(bool ok) {
498 if (ok) {
499 // Do another write!
500 // next_state_ is unchanged
501 stream_.Write(response_, AsyncQpsServerTest::tag(this));
502 } else { // must be done so let's finish
503 next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done;
504 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
505 }
506 return true;
507 }
finish_done(bool)508 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
509
510 std::unique_ptr<ServerContextType> srv_ctx_;
511 RequestType req_;
512 ResponseType response_;
513 bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool);
514 std::function<void(ServerContextType*, RequestType*,
515 grpc::ServerAsyncWriter<ResponseType>*, void*)>
516 request_method_;
517 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
518 grpc::ServerAsyncWriter<ResponseType> stream_;
519 };
520
521 std::vector<std::thread> threads_;
522 std::unique_ptr<grpc::Server> server_;
523 std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
524 std::vector<int> cq_;
525 ServiceType async_service_;
526 std::vector<std::unique_ptr<ServerRpcContext>> contexts_;
527
528 struct PerThreadShutdownState {
529 mutable std::mutex mutex;
530 bool shutdown;
PerThreadShutdownStategrpc::testing::AsyncQpsServerTest::PerThreadShutdownState531 PerThreadShutdownState() : shutdown(false) {}
532 };
533
534 std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
535 };
536
RegisterBenchmarkService(ServerBuilder * builder,BenchmarkService::AsyncService * service)537 static void RegisterBenchmarkService(ServerBuilder* builder,
538 BenchmarkService::AsyncService* service) {
539 builder->RegisterService(service);
540 }
RegisterGenericService(ServerBuilder * builder,grpc::AsyncGenericService * service)541 static void RegisterGenericService(ServerBuilder* builder,
542 grpc::AsyncGenericService* service) {
543 builder->RegisterAsyncGenericService(service);
544 }
545
ProcessSimpleRPC(const PayloadConfig &,SimpleRequest * request,SimpleResponse * response)546 static Status ProcessSimpleRPC(const PayloadConfig&, SimpleRequest* request,
547 SimpleResponse* response) {
548 if (request->response_size() > 0) {
549 if (!Server::SetPayload(request->response_type(), request->response_size(),
550 response->mutable_payload())) {
551 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
552 }
553 }
554 // We are done using the request. Clear it to reduce working memory.
555 // This proves to reduce cache misses in large message size cases.
556 request->Clear();
557 return Status::OK;
558 }
559
ProcessGenericRPC(const PayloadConfig & payload_config,ByteBuffer * request,ByteBuffer * response)560 static Status ProcessGenericRPC(const PayloadConfig& payload_config,
561 ByteBuffer* request, ByteBuffer* response) {
562 // We are done using the request. Clear it to reduce working memory.
563 // This proves to reduce cache misses in large message size cases.
564 request->Clear();
565 int resp_size = payload_config.bytebuf_params().resp_size();
566 std::unique_ptr<char[]> buf(new char[resp_size]);
567 memset(buf.get(), 0, static_cast<size_t>(resp_size));
568 Slice slice(buf.get(), resp_size);
569 *response = ByteBuffer(&slice, 1);
570 return Status::OK;
571 }
572
CreateAsyncServer(const ServerConfig & config)573 std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) {
574 return std::unique_ptr<Server>(
575 new AsyncQpsServerTest<SimpleRequest, SimpleResponse,
576 BenchmarkService::AsyncService,
577 grpc::ServerContext>(
578 config, RegisterBenchmarkService,
579 &BenchmarkService::AsyncService::RequestUnaryCall,
580 &BenchmarkService::AsyncService::RequestStreamingCall,
581 &BenchmarkService::AsyncService::RequestStreamingFromClient,
582 &BenchmarkService::AsyncService::RequestStreamingFromServer,
583 &BenchmarkService::AsyncService::RequestStreamingBothWays,
584 ProcessSimpleRPC));
585 }
CreateAsyncGenericServer(const ServerConfig & config)586 std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config) {
587 return std::unique_ptr<Server>(
588 new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,
589 grpc::GenericServerContext>(
590 config, RegisterGenericService, nullptr,
591 &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
592 ProcessGenericRPC));
593 }
594
595 } // namespace testing
596 } // namespace grpc
597