• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpcpp/security/server_credentials.h>
22 #include <grpcpp/server.h>
23 #include <grpcpp/server_context.h>
24 
25 #include <atomic>
26 #include <thread>
27 
28 #include "absl/log/log.h"
29 #include "src/core/util/host_port.h"
30 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
31 #include "test/cpp/qps/qps_server_builder.h"
32 #include "test/cpp/qps/server.h"
33 #include "test/cpp/qps/usage_timer.h"
34 
35 namespace grpc {
36 namespace testing {
37 
38 class BenchmarkServiceImpl final : public BenchmarkService::Service {
39  public:
UnaryCall(ServerContext *,const SimpleRequest * request,SimpleResponse * response)40   Status UnaryCall(ServerContext* /*context*/, const SimpleRequest* request,
41                    SimpleResponse* response) override {
42     auto s = SetResponse(request, response);
43     if (!s.ok()) {
44       return s;
45     }
46     return Status::OK;
47   }
StreamingCall(ServerContext *,ServerReaderWriter<SimpleResponse,SimpleRequest> * stream)48   Status StreamingCall(
49       ServerContext* /*context*/,
50       ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
51     SimpleRequest request;
52     while (stream->Read(&request)) {
53       SimpleResponse response;
54       auto s = SetResponse(&request, &response);
55       if (!s.ok()) {
56         return s;
57       }
58       if (!stream->Write(response)) {
59         return Status(StatusCode::INTERNAL, "Server couldn't respond");
60       }
61     }
62     return Status::OK;
63   }
StreamingFromClient(ServerContext * context,ServerReader<SimpleRequest> * stream,SimpleResponse * response)64   Status StreamingFromClient(ServerContext* context,
65                              ServerReader<SimpleRequest>* stream,
66                              SimpleResponse* response) override {
67     auto s = ClientPull(context, stream, response);
68     if (!s.ok()) {
69       return s;
70     }
71     return Status::OK;
72   }
StreamingFromServer(ServerContext * context,const SimpleRequest * request,ServerWriter<SimpleResponse> * stream)73   Status StreamingFromServer(ServerContext* context,
74                              const SimpleRequest* request,
75                              ServerWriter<SimpleResponse>* stream) override {
76     SimpleResponse response;
77     auto s = SetResponse(request, &response);
78     if (!s.ok()) {
79       return s;
80     }
81     return ServerPush(context, stream, response, nullptr);
82   }
StreamingBothWays(ServerContext * context,ServerReaderWriter<SimpleResponse,SimpleRequest> * stream)83   Status StreamingBothWays(
84       ServerContext* context,
85       ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
86     // Read the first client message to setup server response
87     SimpleRequest request;
88     if (!stream->Read(&request)) {
89       return Status::OK;
90     }
91     SimpleResponse response;
92     auto s = SetResponse(&request, &response);
93     if (!s.ok()) {
94       return s;
95     }
96     std::atomic_bool done;
97     Status sp;
98     std::thread t([context, stream, &response, &done, &sp]() {
99       sp = ServerPush(context, stream, response, [&done]() {
100         return done.load(std::memory_order_relaxed);
101       });
102     });
103     SimpleResponse phony;
104     auto cp = ClientPull(context, stream, &phony);
105     done.store(true, std::memory_order_relaxed);  // can be lazy
106     t.join();
107     if (!cp.ok()) {
108       return cp;
109     }
110     if (!sp.ok()) {
111       return sp;
112     }
113     return Status::OK;
114   }
115 
116  private:
117   template <class R>
ClientPull(ServerContext *,R * stream,SimpleResponse * response)118   static Status ClientPull(ServerContext* /*context*/, R* stream,
119                            SimpleResponse* response) {
120     SimpleRequest request;
121     while (stream->Read(&request)) {
122     }
123     if (request.response_size() > 0) {
124       if (!Server::SetPayload(request.response_type(), request.response_size(),
125                               response->mutable_payload())) {
126         return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
127       }
128     }
129     return Status::OK;
130   }
131   template <class W>
ServerPush(ServerContext *,W * stream,const SimpleResponse & response,const std::function<bool ()> & done)132   static Status ServerPush(ServerContext* /*context*/, W* stream,
133                            const SimpleResponse& response,
134                            const std::function<bool()>& done) {
135     while ((done == nullptr) || !done()) {
136       // TODO(vjpai): Add potential for rate-pacing on this
137       if (!stream->Write(response)) {
138         return Status(StatusCode::INTERNAL, "Server couldn't push");
139       }
140     }
141     return Status::OK;
142   }
SetResponse(const SimpleRequest * request,SimpleResponse * response)143   static Status SetResponse(const SimpleRequest* request,
144                             SimpleResponse* response) {
145     if (request->response_size() > 0) {
146       if (!Server::SetPayload(request->response_type(),
147                               request->response_size(),
148                               response->mutable_payload())) {
149         return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
150       }
151     }
152     return Status::OK;
153   }
154 };
155 
156 class SynchronousServer final : public grpc::testing::Server {
157  public:
SynchronousServer(const ServerConfig & config)158   explicit SynchronousServer(const ServerConfig& config) : Server(config) {
159     std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
160 
161     auto port_num = port();
162     // Negative port number means inproc server, so no listen port needed
163     if (port_num >= 0) {
164       std::string server_address = grpc_core::JoinHostPort("::", port_num);
165       builder->AddListeningPort(
166           server_address, Server::CreateServerCredentials(config), &port_num);
167     }
168 
169     ApplyConfigToBuilder(config, builder.get());
170 
171     builder->RegisterService(&service_);
172 
173     impl_ = builder->BuildAndStart();
174     if (impl_ == nullptr) {
175       LOG(ERROR) << "Server: Fail to BuildAndStart(port=" << port_num << ")";
176     } else {
177       LOG(INFO) << "Server: BuildAndStart(port=" << port_num << ")";
178     }
179   }
180 
InProcessChannel(const ChannelArguments & args)181   std::shared_ptr<Channel> InProcessChannel(
182       const ChannelArguments& args) override {
183     return impl_->InProcessChannel(args);
184   }
185 
186  private:
187   BenchmarkServiceImpl service_;
188   std::unique_ptr<grpc::Server> impl_;
189 };
190 
CreateSynchronousServer(const ServerConfig & config)191 std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
192     const ServerConfig& config) {
193   return std::unique_ptr<Server>(new SynchronousServer(config));
194 }
195 
196 }  // namespace testing
197 }  // namespace grpc
198