1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpcpp/security/server_credentials.h>
22 #include <grpcpp/server.h>
23 #include <grpcpp/server_context.h>
24
25 #include <atomic>
26 #include <thread>
27
28 #include "absl/log/log.h"
29 #include "src/core/util/host_port.h"
30 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
31 #include "test/cpp/qps/qps_server_builder.h"
32 #include "test/cpp/qps/server.h"
33 #include "test/cpp/qps/usage_timer.h"
34
35 namespace grpc {
36 namespace testing {
37
38 class BenchmarkServiceImpl final : public BenchmarkService::Service {
39 public:
UnaryCall(ServerContext *,const SimpleRequest * request,SimpleResponse * response)40 Status UnaryCall(ServerContext* /*context*/, const SimpleRequest* request,
41 SimpleResponse* response) override {
42 auto s = SetResponse(request, response);
43 if (!s.ok()) {
44 return s;
45 }
46 return Status::OK;
47 }
StreamingCall(ServerContext *,ServerReaderWriter<SimpleResponse,SimpleRequest> * stream)48 Status StreamingCall(
49 ServerContext* /*context*/,
50 ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
51 SimpleRequest request;
52 while (stream->Read(&request)) {
53 SimpleResponse response;
54 auto s = SetResponse(&request, &response);
55 if (!s.ok()) {
56 return s;
57 }
58 if (!stream->Write(response)) {
59 return Status(StatusCode::INTERNAL, "Server couldn't respond");
60 }
61 }
62 return Status::OK;
63 }
StreamingFromClient(ServerContext * context,ServerReader<SimpleRequest> * stream,SimpleResponse * response)64 Status StreamingFromClient(ServerContext* context,
65 ServerReader<SimpleRequest>* stream,
66 SimpleResponse* response) override {
67 auto s = ClientPull(context, stream, response);
68 if (!s.ok()) {
69 return s;
70 }
71 return Status::OK;
72 }
StreamingFromServer(ServerContext * context,const SimpleRequest * request,ServerWriter<SimpleResponse> * stream)73 Status StreamingFromServer(ServerContext* context,
74 const SimpleRequest* request,
75 ServerWriter<SimpleResponse>* stream) override {
76 SimpleResponse response;
77 auto s = SetResponse(request, &response);
78 if (!s.ok()) {
79 return s;
80 }
81 return ServerPush(context, stream, response, nullptr);
82 }
StreamingBothWays(ServerContext * context,ServerReaderWriter<SimpleResponse,SimpleRequest> * stream)83 Status StreamingBothWays(
84 ServerContext* context,
85 ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
86 // Read the first client message to setup server response
87 SimpleRequest request;
88 if (!stream->Read(&request)) {
89 return Status::OK;
90 }
91 SimpleResponse response;
92 auto s = SetResponse(&request, &response);
93 if (!s.ok()) {
94 return s;
95 }
96 std::atomic_bool done;
97 Status sp;
98 std::thread t([context, stream, &response, &done, &sp]() {
99 sp = ServerPush(context, stream, response, [&done]() {
100 return done.load(std::memory_order_relaxed);
101 });
102 });
103 SimpleResponse phony;
104 auto cp = ClientPull(context, stream, &phony);
105 done.store(true, std::memory_order_relaxed); // can be lazy
106 t.join();
107 if (!cp.ok()) {
108 return cp;
109 }
110 if (!sp.ok()) {
111 return sp;
112 }
113 return Status::OK;
114 }
115
116 private:
117 template <class R>
ClientPull(ServerContext *,R * stream,SimpleResponse * response)118 static Status ClientPull(ServerContext* /*context*/, R* stream,
119 SimpleResponse* response) {
120 SimpleRequest request;
121 while (stream->Read(&request)) {
122 }
123 if (request.response_size() > 0) {
124 if (!Server::SetPayload(request.response_type(), request.response_size(),
125 response->mutable_payload())) {
126 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
127 }
128 }
129 return Status::OK;
130 }
131 template <class W>
ServerPush(ServerContext *,W * stream,const SimpleResponse & response,const std::function<bool ()> & done)132 static Status ServerPush(ServerContext* /*context*/, W* stream,
133 const SimpleResponse& response,
134 const std::function<bool()>& done) {
135 while ((done == nullptr) || !done()) {
136 // TODO(vjpai): Add potential for rate-pacing on this
137 if (!stream->Write(response)) {
138 return Status(StatusCode::INTERNAL, "Server couldn't push");
139 }
140 }
141 return Status::OK;
142 }
SetResponse(const SimpleRequest * request,SimpleResponse * response)143 static Status SetResponse(const SimpleRequest* request,
144 SimpleResponse* response) {
145 if (request->response_size() > 0) {
146 if (!Server::SetPayload(request->response_type(),
147 request->response_size(),
148 response->mutable_payload())) {
149 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
150 }
151 }
152 return Status::OK;
153 }
154 };
155
156 class SynchronousServer final : public grpc::testing::Server {
157 public:
SynchronousServer(const ServerConfig & config)158 explicit SynchronousServer(const ServerConfig& config) : Server(config) {
159 std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
160
161 auto port_num = port();
162 // Negative port number means inproc server, so no listen port needed
163 if (port_num >= 0) {
164 std::string server_address = grpc_core::JoinHostPort("::", port_num);
165 builder->AddListeningPort(
166 server_address, Server::CreateServerCredentials(config), &port_num);
167 }
168
169 ApplyConfigToBuilder(config, builder.get());
170
171 builder->RegisterService(&service_);
172
173 impl_ = builder->BuildAndStart();
174 if (impl_ == nullptr) {
175 LOG(ERROR) << "Server: Fail to BuildAndStart(port=" << port_num << ")";
176 } else {
177 LOG(INFO) << "Server: BuildAndStart(port=" << port_num << ")";
178 }
179 }
180
InProcessChannel(const ChannelArguments & args)181 std::shared_ptr<Channel> InProcessChannel(
182 const ChannelArguments& args) override {
183 return impl_->InProcessChannel(args);
184 }
185
186 private:
187 BenchmarkServiceImpl service_;
188 std::unique_ptr<grpc::Server> impl_;
189 };
190
CreateSynchronousServer(const ServerConfig & config)191 std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
192 const ServerConfig& config) {
193 return std::unique_ptr<Server>(new SynchronousServer(config));
194 }
195
196 } // namespace testing
197 } // namespace grpc
198