• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpcpp/security/server_credentials.h>
22 #include <grpcpp/server.h>
23 #include <grpcpp/server_context.h>
24 
25 #include "absl/log/log.h"
26 #include "src/core/util/host_port.h"
27 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
28 #include "test/cpp/qps/qps_server_builder.h"
29 #include "test/cpp/qps/server.h"
30 #include "test/cpp/qps/usage_timer.h"
31 
32 namespace grpc {
33 namespace testing {
34 
35 class BenchmarkCallbackServiceImpl final
36     : public BenchmarkService::CallbackService {
37  public:
UnaryCall(grpc::CallbackServerContext * context,const SimpleRequest * request,SimpleResponse * response)38   grpc::ServerUnaryReactor* UnaryCall(grpc::CallbackServerContext* context,
39                                       const SimpleRequest* request,
40                                       SimpleResponse* response) override {
41     auto* reactor = context->DefaultReactor();
42     reactor->Finish(SetResponse(request, response));
43     return reactor;
44   }
45 
46   grpc::ServerBidiReactor<grpc::testing::SimpleRequest,
47                           grpc::testing::SimpleResponse>*
StreamingCall(grpc::CallbackServerContext *)48   StreamingCall(grpc::CallbackServerContext*) override {
49     class Reactor
50         : public grpc::ServerBidiReactor<grpc::testing::SimpleRequest,
51                                          grpc::testing::SimpleResponse> {
52      public:
53       Reactor() { StartRead(&request_); }
54 
55       void OnReadDone(bool ok) override {
56         if (!ok) {
57           Finish(grpc::Status::OK);
58           return;
59         }
60         auto s = SetResponse(&request_, &response_);
61         if (!s.ok()) {
62           Finish(s);
63           return;
64         }
65         StartWrite(&response_);
66       }
67 
68       void OnWriteDone(bool ok) override {
69         if (!ok) {
70           Finish(grpc::Status::OK);
71           return;
72         }
73         StartRead(&request_);
74       }
75 
76       void OnDone() override { delete (this); }
77 
78      private:
79       SimpleRequest request_;
80       SimpleResponse response_;
81     };
82     return new Reactor;
83   }
84 
85  private:
SetResponse(const SimpleRequest * request,SimpleResponse * response)86   static Status SetResponse(const SimpleRequest* request,
87                             SimpleResponse* response) {
88     if (request->response_size() > 0) {
89       if (!Server::SetPayload(request->response_type(),
90                               request->response_size(),
91                               response->mutable_payload())) {
92         return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
93       }
94     }
95     return Status::OK;
96   }
97 };
98 
99 class CallbackServer final : public grpc::testing::Server {
100  public:
CallbackServer(const ServerConfig & config)101   explicit CallbackServer(const ServerConfig& config) : Server(config) {
102     std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
103 
104     auto port_num = port();
105     // Negative port number means inproc server, so no listen port needed
106     if (port_num >= 0) {
107       std::string server_address = grpc_core::JoinHostPort("::", port_num);
108       builder->AddListeningPort(
109           server_address, Server::CreateServerCredentials(config), &port_num);
110     }
111 
112     ApplyConfigToBuilder(config, builder.get());
113 
114     builder->RegisterService(&service_);
115 
116     impl_ = builder->BuildAndStart();
117     if (impl_ == nullptr) {
118       LOG(ERROR) << "Server: Fail to BuildAndStart(port=" << port_num << ")";
119     } else {
120       LOG(INFO) << "Server: BuildAndStart(port=" << port_num << ")";
121     }
122   }
123 
InProcessChannel(const ChannelArguments & args)124   std::shared_ptr<Channel> InProcessChannel(
125       const ChannelArguments& args) override {
126     return impl_->InProcessChannel(args);
127   }
128 
129  private:
130   BenchmarkCallbackServiceImpl service_;
131   std::unique_ptr<grpc::Server> impl_;
132 };
133 
CreateCallbackServer(const ServerConfig & config)134 std::unique_ptr<grpc::testing::Server> CreateCallbackServer(
135     const ServerConfig& config) {
136   return std::unique_ptr<Server>(new CallbackServer(config));
137 }
138 
139 }  // namespace testing
140 }  // namespace grpc
141