1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <algorithm>
20 #include <forward_list>
21 #include <functional>
22 #include <memory>
23 #include <mutex>
24 #include <thread>
25
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpcpp/generic/async_generic_service.h>
30 #include <grpcpp/resource_quota.h>
31 #include <grpcpp/security/server_credentials.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
35 #include <grpcpp/support/config.h>
36
37 #include "src/core/lib/gprpp/host_port.h"
38 #include "src/core/lib/surface/completion_queue.h"
39 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
40 #include "test/core/util/test_config.h"
41 #include "test/cpp/qps/qps_server_builder.h"
42 #include "test/cpp/qps/server.h"
43
44 namespace grpc {
45 namespace testing {
46
47 template <class RequestType, class ResponseType, class ServiceType,
48 class ServerContextType>
49 class AsyncQpsServerTest final : public grpc::testing::Server {
50 public:
AsyncQpsServerTest(const ServerConfig & config,std::function<void (ServerBuilder *,ServiceType *)> register_service,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncResponseWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_unary_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReader<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_client_function,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_server_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_both_ways_function,std::function<grpc::Status (const PayloadConfig &,RequestType *,ResponseType *)> process_rpc)51 AsyncQpsServerTest(
52 const ServerConfig& config,
53 std::function<void(ServerBuilder*, ServiceType*)> register_service,
54 std::function<void(ServiceType*, ServerContextType*, RequestType*,
55 ServerAsyncResponseWriter<ResponseType>*,
56 CompletionQueue*, ServerCompletionQueue*, void*)>
57 request_unary_function,
58 std::function<void(ServiceType*, ServerContextType*,
59 ServerAsyncReaderWriter<ResponseType, RequestType>*,
60 CompletionQueue*, ServerCompletionQueue*, void*)>
61 request_streaming_function,
62 std::function<void(ServiceType*, ServerContextType*,
63 ServerAsyncReader<ResponseType, RequestType>*,
64 CompletionQueue*, ServerCompletionQueue*, void*)>
65 request_streaming_from_client_function,
66 std::function<void(ServiceType*, ServerContextType*, RequestType*,
67 ServerAsyncWriter<ResponseType>*, CompletionQueue*,
68 ServerCompletionQueue*, void*)>
69 request_streaming_from_server_function,
70 std::function<void(ServiceType*, ServerContextType*,
71 ServerAsyncReaderWriter<ResponseType, RequestType>*,
72 CompletionQueue*, ServerCompletionQueue*, void*)>
73 request_streaming_both_ways_function,
74 std::function<grpc::Status(const PayloadConfig&, RequestType*,
75 ResponseType*)>
76 process_rpc)
77 : Server(config) {
78 std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
79
80 auto port_num = port();
81 // Negative port number means inproc server, so no listen port needed
82 if (port_num >= 0) {
83 std::string server_address = grpc_core::JoinHostPort("::", port_num);
84 builder->AddListeningPort(server_address.c_str(),
85 Server::CreateServerCredentials(config),
86 &port_num);
87 }
88
89 register_service(builder.get(), &async_service_);
90
91 int num_threads = config.async_server_threads();
92 if (num_threads <= 0) { // dynamic sizing
93 num_threads = cores();
94 gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads);
95 }
96
97 int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
98 int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator
99 for (int i = 0; i < num_cqs; i++) {
100 srv_cqs_.emplace_back(builder->AddCompletionQueue());
101 }
102 for (int i = 0; i < num_threads; i++) {
103 cq_.emplace_back(i % srv_cqs_.size());
104 }
105
106 ApplyConfigToBuilder(config, builder.get());
107
108 server_ = builder->BuildAndStart();
109 if (server_ == nullptr) {
110 gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
111 } else {
112 gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
113 }
114
115 auto process_rpc_bound =
116 std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
117 std::placeholders::_2);
118
119 for (int i = 0; i < 5000; i++) {
120 for (int j = 0; j < num_cqs; j++) {
121 if (request_unary_function) {
122 auto request_unary = std::bind(
123 request_unary_function, &async_service_, std::placeholders::_1,
124 std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(),
125 srv_cqs_[j].get(), std::placeholders::_4);
126 contexts_.emplace_back(
127 new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound));
128 }
129 if (request_streaming_function) {
130 auto request_streaming = std::bind(
131 request_streaming_function, &async_service_,
132 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
133 srv_cqs_[j].get(), std::placeholders::_3);
134 contexts_.emplace_back(new ServerRpcContextStreamingImpl(
135 request_streaming, process_rpc_bound));
136 }
137 if (request_streaming_from_client_function) {
138 auto request_streaming_from_client = std::bind(
139 request_streaming_from_client_function, &async_service_,
140 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
141 srv_cqs_[j].get(), std::placeholders::_3);
142 contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl(
143 request_streaming_from_client, process_rpc_bound));
144 }
145 if (request_streaming_from_server_function) {
146 auto request_streaming_from_server =
147 std::bind(request_streaming_from_server_function, &async_service_,
148 std::placeholders::_1, std::placeholders::_2,
149 std::placeholders::_3, srv_cqs_[j].get(),
150 srv_cqs_[j].get(), std::placeholders::_4);
151 contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl(
152 request_streaming_from_server, process_rpc_bound));
153 }
154 if (request_streaming_both_ways_function) {
155 // TODO(vjpai): Add this code
156 }
157 }
158 }
159
160 for (int i = 0; i < num_threads; i++) {
161 shutdown_state_.emplace_back(new PerThreadShutdownState());
162 threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
163 }
164 }
~AsyncQpsServerTest()165 ~AsyncQpsServerTest() {
166 for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
167 std::lock_guard<std::mutex> lock((*ss)->mutex);
168 (*ss)->shutdown = true;
169 }
170 // TODO(vjpai): Remove the following deadline and allow full proper
171 // shutdown.
172 server_->Shutdown(std::chrono::system_clock::now() +
173 std::chrono::seconds(3));
174 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
175 (*cq)->Shutdown();
176 }
177 for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
178 thr->join();
179 }
180 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
181 bool ok;
182 void* got_tag;
183 while ((*cq)->Next(&got_tag, &ok))
184 ;
185 }
186 }
187
GetPollCount()188 int GetPollCount() override {
189 int count = 0;
190 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
191 count += grpc_get_cq_poll_num((*cq)->cq());
192 }
193 return count;
194 }
195
InProcessChannel(const ChannelArguments & args)196 std::shared_ptr<Channel> InProcessChannel(
197 const ChannelArguments& args) override {
198 return server_->InProcessChannel(args);
199 }
200
201 private:
ThreadFunc(int thread_idx)202 void ThreadFunc(int thread_idx) {
203 // Wait until work is available or we are shutting down
204 bool ok;
205 void* got_tag;
206 if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
207 return;
208 }
209 ServerRpcContext* ctx;
210 std::mutex* mu_ptr = &shutdown_state_[thread_idx]->mutex;
211 do {
212 ctx = detag(got_tag);
213 // The tag is a pointer to an RPC context to invoke
214 // Proceed while holding a lock to make sure that
215 // this thread isn't supposed to shut down
216 mu_ptr->lock();
217 if (shutdown_state_[thread_idx]->shutdown) {
218 mu_ptr->unlock();
219 return;
220 }
221 } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
222 [&, ctx, ok, mu_ptr]() {
223 ctx->lock();
224 if (!ctx->RunNextState(ok)) {
225 ctx->Reset();
226 }
227 ctx->unlock();
228 mu_ptr->unlock();
229 },
230 &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
231 }
232
233 class ServerRpcContext {
234 public:
ServerRpcContext()235 ServerRpcContext() {}
lock()236 void lock() { mu_.lock(); }
unlock()237 void unlock() { mu_.unlock(); }
~ServerRpcContext()238 virtual ~ServerRpcContext(){};
239 virtual bool RunNextState(bool) = 0; // next state, return false if done
240 virtual void Reset() = 0; // start this back at a clean state
241 private:
242 std::mutex mu_;
243 };
tag(ServerRpcContext * func)244 static void* tag(ServerRpcContext* func) { return static_cast<void*>(func); }
detag(void * tag)245 static ServerRpcContext* detag(void* tag) {
246 return static_cast<ServerRpcContext*>(tag);
247 }
248
249 class ServerRpcContextUnaryImpl final : public ServerRpcContext {
250 public:
ServerRpcContextUnaryImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncResponseWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)251 ServerRpcContextUnaryImpl(
252 std::function<void(ServerContextType*, RequestType*,
253 grpc::ServerAsyncResponseWriter<ResponseType>*,
254 void*)>
255 request_method,
256 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
257 : srv_ctx_(new ServerContextType),
258 next_state_(&ServerRpcContextUnaryImpl::invoker),
259 request_method_(request_method),
260 invoke_method_(invoke_method),
261 response_writer_(srv_ctx_.get()) {
262 request_method_(srv_ctx_.get(), &req_, &response_writer_,
263 AsyncQpsServerTest::tag(this));
264 }
~ServerRpcContextUnaryImpl()265 ~ServerRpcContextUnaryImpl() override {}
RunNextState(bool ok)266 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()267 void Reset() override {
268 srv_ctx_.reset(new ServerContextType);
269 req_ = RequestType();
270 response_writer_ =
271 grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get());
272
273 // Then request the method
274 next_state_ = &ServerRpcContextUnaryImpl::invoker;
275 request_method_(srv_ctx_.get(), &req_, &response_writer_,
276 AsyncQpsServerTest::tag(this));
277 }
278
279 private:
finisher(bool)280 bool finisher(bool) { return false; }
invoker(bool ok)281 bool invoker(bool ok) {
282 if (!ok) {
283 return false;
284 }
285
286 // Call the RPC processing function
287 grpc::Status status = invoke_method_(&req_, &response_);
288
289 // Have the response writer work and invoke on_finish when done
290 next_state_ = &ServerRpcContextUnaryImpl::finisher;
291 response_writer_.Finish(response_, status, AsyncQpsServerTest::tag(this));
292 return true;
293 }
294 std::unique_ptr<ServerContextType> srv_ctx_;
295 RequestType req_;
296 ResponseType response_;
297 bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
298 std::function<void(ServerContextType*, RequestType*,
299 grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
300 request_method_;
301 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
302 grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
303 };
304
305 class ServerRpcContextStreamingImpl final : public ServerRpcContext {
306 public:
ServerRpcContextStreamingImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReaderWriter<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)307 ServerRpcContextStreamingImpl(
308 std::function<void(
309 ServerContextType*,
310 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
311 request_method,
312 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
313 : srv_ctx_(new ServerContextType),
314 next_state_(&ServerRpcContextStreamingImpl::request_done),
315 request_method_(request_method),
316 invoke_method_(invoke_method),
317 stream_(srv_ctx_.get()) {
318 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
319 }
~ServerRpcContextStreamingImpl()320 ~ServerRpcContextStreamingImpl() override {}
RunNextState(bool ok)321 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()322 void Reset() override {
323 srv_ctx_.reset(new ServerContextType);
324 req_ = RequestType();
325 stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(
326 srv_ctx_.get());
327
328 // Then request the method
329 next_state_ = &ServerRpcContextStreamingImpl::request_done;
330 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
331 }
332
333 private:
request_done(bool ok)334 bool request_done(bool ok) {
335 if (!ok) {
336 return false;
337 }
338 next_state_ = &ServerRpcContextStreamingImpl::read_done;
339 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
340 return true;
341 }
342
read_done(bool ok)343 bool read_done(bool ok) {
344 if (ok) {
345 // invoke the method
346 // Call the RPC processing function
347 grpc::Status status = invoke_method_(&req_, &response_);
348 // initiate the write
349 next_state_ = &ServerRpcContextStreamingImpl::write_done;
350 stream_.Write(response_, AsyncQpsServerTest::tag(this));
351 } else { // client has sent writes done
352 // finish the stream
353 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
354 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
355 }
356 return true;
357 }
write_done(bool ok)358 bool write_done(bool ok) {
359 // now go back and get another streaming read!
360 if (ok) {
361 next_state_ = &ServerRpcContextStreamingImpl::read_done;
362 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
363 } else {
364 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
365 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
366 }
367 return true;
368 }
finish_done(bool)369 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
370
371 std::unique_ptr<ServerContextType> srv_ctx_;
372 RequestType req_;
373 ResponseType response_;
374 bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
375 std::function<void(
376 ServerContextType*,
377 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
378 request_method_;
379 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
380 grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
381 };
382
383 class ServerRpcContextStreamingFromClientImpl final
384 : public ServerRpcContext {
385 public:
ServerRpcContextStreamingFromClientImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReader<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)386 ServerRpcContextStreamingFromClientImpl(
387 std::function<void(ServerContextType*,
388 grpc::ServerAsyncReader<ResponseType, RequestType>*,
389 void*)>
390 request_method,
391 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
392 : srv_ctx_(new ServerContextType),
393 next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
394 request_method_(request_method),
395 invoke_method_(invoke_method),
396 stream_(srv_ctx_.get()) {
397 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
398 }
~ServerRpcContextStreamingFromClientImpl()399 ~ServerRpcContextStreamingFromClientImpl() override {}
RunNextState(bool ok)400 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()401 void Reset() override {
402 srv_ctx_.reset(new ServerContextType);
403 req_ = RequestType();
404 stream_ =
405 grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get());
406
407 // Then request the method
408 next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done;
409 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
410 }
411
412 private:
request_done(bool ok)413 bool request_done(bool ok) {
414 if (!ok) {
415 return false;
416 }
417 next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done;
418 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
419 return true;
420 }
421
read_done(bool ok)422 bool read_done(bool ok) {
423 if (ok) {
424 // In this case, just do another read
425 // next_state_ is unchanged
426 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
427 return true;
428 } else { // client has sent writes done
429 // invoke the method
430 // Call the RPC processing function
431 grpc::Status status = invoke_method_(&req_, &response_);
432 // finish the stream
433 next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done;
434 stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this));
435 }
436 return true;
437 }
finish_done(bool)438 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
439
440 std::unique_ptr<ServerContextType> srv_ctx_;
441 RequestType req_;
442 ResponseType response_;
443 bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool);
444 std::function<void(ServerContextType*,
445 grpc::ServerAsyncReader<ResponseType, RequestType>*,
446 void*)>
447 request_method_;
448 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
449 grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
450 };
451
452 class ServerRpcContextStreamingFromServerImpl final
453 : public ServerRpcContext {
454 public:
ServerRpcContextStreamingFromServerImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)455 ServerRpcContextStreamingFromServerImpl(
456 std::function<void(ServerContextType*, RequestType*,
457 grpc::ServerAsyncWriter<ResponseType>*, void*)>
458 request_method,
459 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
460 : srv_ctx_(new ServerContextType),
461 next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
462 request_method_(request_method),
463 invoke_method_(invoke_method),
464 stream_(srv_ctx_.get()) {
465 request_method_(srv_ctx_.get(), &req_, &stream_,
466 AsyncQpsServerTest::tag(this));
467 }
~ServerRpcContextStreamingFromServerImpl()468 ~ServerRpcContextStreamingFromServerImpl() override {}
RunNextState(bool ok)469 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()470 void Reset() override {
471 srv_ctx_.reset(new ServerContextType);
472 req_ = RequestType();
473 stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get());
474
475 // Then request the method
476 next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done;
477 request_method_(srv_ctx_.get(), &req_, &stream_,
478 AsyncQpsServerTest::tag(this));
479 }
480
481 private:
request_done(bool ok)482 bool request_done(bool ok) {
483 if (!ok) {
484 return false;
485 }
486 // invoke the method
487 // Call the RPC processing function
488 grpc::Status status = invoke_method_(&req_, &response_);
489
490 next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done;
491 stream_.Write(response_, AsyncQpsServerTest::tag(this));
492 return true;
493 }
494
write_done(bool ok)495 bool write_done(bool ok) {
496 if (ok) {
497 // Do another write!
498 // next_state_ is unchanged
499 stream_.Write(response_, AsyncQpsServerTest::tag(this));
500 } else { // must be done so let's finish
501 next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done;
502 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
503 }
504 return true;
505 }
finish_done(bool)506 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
507
508 std::unique_ptr<ServerContextType> srv_ctx_;
509 RequestType req_;
510 ResponseType response_;
511 bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool);
512 std::function<void(ServerContextType*, RequestType*,
513 grpc::ServerAsyncWriter<ResponseType>*, void*)>
514 request_method_;
515 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
516 grpc::ServerAsyncWriter<ResponseType> stream_;
517 };
518
519 std::vector<std::thread> threads_;
520 std::unique_ptr<grpc::Server> server_;
521 std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
522 std::vector<int> cq_;
523 ServiceType async_service_;
524 std::vector<std::unique_ptr<ServerRpcContext>> contexts_;
525
526 struct PerThreadShutdownState {
527 mutable std::mutex mutex;
528 bool shutdown;
PerThreadShutdownStategrpc::testing::AsyncQpsServerTest::PerThreadShutdownState529 PerThreadShutdownState() : shutdown(false) {}
530 };
531
532 std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
533 };
534
RegisterBenchmarkService(ServerBuilder * builder,BenchmarkService::AsyncService * service)535 static void RegisterBenchmarkService(ServerBuilder* builder,
536 BenchmarkService::AsyncService* service) {
537 builder->RegisterService(service);
538 }
RegisterGenericService(ServerBuilder * builder,grpc::AsyncGenericService * service)539 static void RegisterGenericService(ServerBuilder* builder,
540 grpc::AsyncGenericService* service) {
541 builder->RegisterAsyncGenericService(service);
542 }
543
ProcessSimpleRPC(const PayloadConfig &,SimpleRequest * request,SimpleResponse * response)544 static Status ProcessSimpleRPC(const PayloadConfig&, SimpleRequest* request,
545 SimpleResponse* response) {
546 if (request->response_size() > 0) {
547 if (!Server::SetPayload(request->response_type(), request->response_size(),
548 response->mutable_payload())) {
549 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
550 }
551 }
552 // We are done using the request. Clear it to reduce working memory.
553 // This proves to reduce cache misses in large message size cases.
554 request->Clear();
555 return Status::OK;
556 }
557
ProcessGenericRPC(const PayloadConfig & payload_config,ByteBuffer * request,ByteBuffer * response)558 static Status ProcessGenericRPC(const PayloadConfig& payload_config,
559 ByteBuffer* request, ByteBuffer* response) {
560 // We are done using the request. Clear it to reduce working memory.
561 // This proves to reduce cache misses in large message size cases.
562 request->Clear();
563 int resp_size = payload_config.bytebuf_params().resp_size();
564 std::unique_ptr<char[]> buf(new char[resp_size]);
565 memset(buf.get(), 0, static_cast<size_t>(resp_size));
566 Slice slice(buf.get(), resp_size);
567 *response = ByteBuffer(&slice, 1);
568 return Status::OK;
569 }
570
CreateAsyncServer(const ServerConfig & config)571 std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) {
572 return std::unique_ptr<Server>(
573 new AsyncQpsServerTest<SimpleRequest, SimpleResponse,
574 BenchmarkService::AsyncService,
575 grpc::ServerContext>(
576 config, RegisterBenchmarkService,
577 &BenchmarkService::AsyncService::RequestUnaryCall,
578 &BenchmarkService::AsyncService::RequestStreamingCall,
579 &BenchmarkService::AsyncService::RequestStreamingFromClient,
580 &BenchmarkService::AsyncService::RequestStreamingFromServer,
581 &BenchmarkService::AsyncService::RequestStreamingBothWays,
582 ProcessSimpleRPC));
583 }
CreateAsyncGenericServer(const ServerConfig & config)584 std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config) {
585 return std::unique_ptr<Server>(
586 new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,
587 grpc::GenericServerContext>(
588 config, RegisterGenericService, nullptr,
589 &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
590 ProcessGenericRPC));
591 }
592
593 } // namespace testing
594 } // namespace grpc
595