• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include <grpc/grpc.h>
19 #include <grpc/support/alloc.h>
20 #include <grpcpp/security/server_credentials.h>
21 #include <grpcpp/server.h>
22 #include <grpcpp/server_context.h>
23 
24 #include "src/core/lib/gprpp/host_port.h"
25 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
26 #include "test/cpp/qps/qps_server_builder.h"
27 #include "test/cpp/qps/server.h"
28 #include "test/cpp/qps/usage_timer.h"
29 
30 namespace grpc {
31 namespace testing {
32 
33 class BenchmarkCallbackServiceImpl final
34     : public BenchmarkService::ExperimentalCallbackService {
35  public:
UnaryCall(::grpc::experimental::CallbackServerContext * context,const SimpleRequest * request,SimpleResponse * response)36   ::grpc::experimental::ServerUnaryReactor* UnaryCall(
37       ::grpc::experimental::CallbackServerContext* context,
38       const SimpleRequest* request, SimpleResponse* response) override {
39     auto* reactor = context->DefaultReactor();
40     reactor->Finish(SetResponse(request, response));
41     return reactor;
42   }
43 
44   ::grpc::experimental::ServerBidiReactor<::grpc::testing::SimpleRequest,
45                                           ::grpc::testing::SimpleResponse>*
StreamingCall(::grpc::experimental::CallbackServerContext *)46   StreamingCall(::grpc::experimental::CallbackServerContext*) override {
47     class Reactor
48         : public ::grpc::experimental::ServerBidiReactor<
49               ::grpc::testing::SimpleRequest, ::grpc::testing::SimpleResponse> {
50      public:
51       Reactor() { StartRead(&request_); }
52 
53       void OnReadDone(bool ok) override {
54         if (!ok) {
55           Finish(::grpc::Status::OK);
56           return;
57         }
58         auto s = SetResponse(&request_, &response_);
59         if (!s.ok()) {
60           Finish(s);
61           return;
62         }
63         StartWrite(&response_);
64       }
65 
66       void OnWriteDone(bool ok) override {
67         if (!ok) {
68           Finish(::grpc::Status::OK);
69           return;
70         }
71         StartRead(&request_);
72       }
73 
74       void OnDone() override { delete (this); }
75 
76      private:
77       SimpleRequest request_;
78       SimpleResponse response_;
79     };
80     return new Reactor;
81   }
82 
83  private:
SetResponse(const SimpleRequest * request,SimpleResponse * response)84   static Status SetResponse(const SimpleRequest* request,
85                             SimpleResponse* response) {
86     if (request->response_size() > 0) {
87       if (!Server::SetPayload(request->response_type(),
88                               request->response_size(),
89                               response->mutable_payload())) {
90         return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
91       }
92     }
93     return Status::OK;
94   }
95 };
96 
97 class CallbackServer final : public grpc::testing::Server {
98  public:
CallbackServer(const ServerConfig & config)99   explicit CallbackServer(const ServerConfig& config) : Server(config) {
100     std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
101 
102     auto port_num = port();
103     // Negative port number means inproc server, so no listen port needed
104     if (port_num >= 0) {
105       std::string server_address = grpc_core::JoinHostPort("::", port_num);
106       builder->AddListeningPort(server_address.c_str(),
107                                 Server::CreateServerCredentials(config),
108                                 &port_num);
109     }
110 
111     ApplyConfigToBuilder(config, builder.get());
112 
113     builder->RegisterService(&service_);
114 
115     impl_ = builder->BuildAndStart();
116     if (impl_ == nullptr) {
117       gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
118     } else {
119       gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
120     }
121   }
122 
InProcessChannel(const ChannelArguments & args)123   std::shared_ptr<Channel> InProcessChannel(
124       const ChannelArguments& args) override {
125     return impl_->InProcessChannel(args);
126   }
127 
128  private:
129   BenchmarkCallbackServiceImpl service_;
130   std::unique_ptr<grpc::Server> impl_;
131 };
132 
CreateCallbackServer(const ServerConfig & config)133 std::unique_ptr<grpc::testing::Server> CreateCallbackServer(
134     const ServerConfig& config) {
135   return std::unique_ptr<Server>(new CallbackServer(config));
136 }
137 
138 }  // namespace testing
139 }  // namespace grpc
140