1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpcpp/security/server_credentials.h>
22 #include <grpcpp/server.h>
23 #include <grpcpp/server_context.h>
24
25 #include "absl/log/log.h"
26 #include "src/core/util/host_port.h"
27 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
28 #include "test/cpp/qps/qps_server_builder.h"
29 #include "test/cpp/qps/server.h"
30 #include "test/cpp/qps/usage_timer.h"
31
32 namespace grpc {
33 namespace testing {
34
35 class BenchmarkCallbackServiceImpl final
36 : public BenchmarkService::CallbackService {
37 public:
UnaryCall(grpc::CallbackServerContext * context,const SimpleRequest * request,SimpleResponse * response)38 grpc::ServerUnaryReactor* UnaryCall(grpc::CallbackServerContext* context,
39 const SimpleRequest* request,
40 SimpleResponse* response) override {
41 auto* reactor = context->DefaultReactor();
42 reactor->Finish(SetResponse(request, response));
43 return reactor;
44 }
45
46 grpc::ServerBidiReactor<grpc::testing::SimpleRequest,
47 grpc::testing::SimpleResponse>*
StreamingCall(grpc::CallbackServerContext *)48 StreamingCall(grpc::CallbackServerContext*) override {
49 class Reactor
50 : public grpc::ServerBidiReactor<grpc::testing::SimpleRequest,
51 grpc::testing::SimpleResponse> {
52 public:
53 Reactor() { StartRead(&request_); }
54
55 void OnReadDone(bool ok) override {
56 if (!ok) {
57 Finish(grpc::Status::OK);
58 return;
59 }
60 auto s = SetResponse(&request_, &response_);
61 if (!s.ok()) {
62 Finish(s);
63 return;
64 }
65 StartWrite(&response_);
66 }
67
68 void OnWriteDone(bool ok) override {
69 if (!ok) {
70 Finish(grpc::Status::OK);
71 return;
72 }
73 StartRead(&request_);
74 }
75
76 void OnDone() override { delete (this); }
77
78 private:
79 SimpleRequest request_;
80 SimpleResponse response_;
81 };
82 return new Reactor;
83 }
84
85 private:
SetResponse(const SimpleRequest * request,SimpleResponse * response)86 static Status SetResponse(const SimpleRequest* request,
87 SimpleResponse* response) {
88 if (request->response_size() > 0) {
89 if (!Server::SetPayload(request->response_type(),
90 request->response_size(),
91 response->mutable_payload())) {
92 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
93 }
94 }
95 return Status::OK;
96 }
97 };
98
99 class CallbackServer final : public grpc::testing::Server {
100 public:
CallbackServer(const ServerConfig & config)101 explicit CallbackServer(const ServerConfig& config) : Server(config) {
102 std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
103
104 auto port_num = port();
105 // Negative port number means inproc server, so no listen port needed
106 if (port_num >= 0) {
107 std::string server_address = grpc_core::JoinHostPort("::", port_num);
108 builder->AddListeningPort(
109 server_address, Server::CreateServerCredentials(config), &port_num);
110 }
111
112 ApplyConfigToBuilder(config, builder.get());
113
114 builder->RegisterService(&service_);
115
116 impl_ = builder->BuildAndStart();
117 if (impl_ == nullptr) {
118 LOG(ERROR) << "Server: Fail to BuildAndStart(port=" << port_num << ")";
119 } else {
120 LOG(INFO) << "Server: BuildAndStart(port=" << port_num << ")";
121 }
122 }
123
InProcessChannel(const ChannelArguments & args)124 std::shared_ptr<Channel> InProcessChannel(
125 const ChannelArguments& args) override {
126 return impl_->InProcessChannel(args);
127 }
128
129 private:
130 BenchmarkCallbackServiceImpl service_;
131 std::unique_ptr<grpc::Server> impl_;
132 };
133
CreateCallbackServer(const ServerConfig & config)134 std::unique_ptr<grpc::testing::Server> CreateCallbackServer(
135 const ServerConfig& config) {
136 return std::unique_ptr<Server>(new CallbackServer(config));
137 }
138
139 } // namespace testing
140 } // namespace grpc
141