1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <atomic>
20 #include <thread>
21
22 #include <grpc/grpc.h>
23 #include <grpc/support/alloc.h>
24 #include <grpcpp/security/server_credentials.h>
25 #include <grpcpp/server.h>
26 #include <grpcpp/server_context.h>
27
28 #include "src/core/lib/gprpp/host_port.h"
29 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
30 #include "test/cpp/qps/qps_server_builder.h"
31 #include "test/cpp/qps/server.h"
32 #include "test/cpp/qps/usage_timer.h"
33
34 namespace grpc {
35 namespace testing {
36
37 class BenchmarkServiceImpl final : public BenchmarkService::Service {
38 public:
UnaryCall(ServerContext *,const SimpleRequest * request,SimpleResponse * response)39 Status UnaryCall(ServerContext* /*context*/, const SimpleRequest* request,
40 SimpleResponse* response) override {
41 auto s = SetResponse(request, response);
42 if (!s.ok()) {
43 return s;
44 }
45 return Status::OK;
46 }
StreamingCall(ServerContext *,ServerReaderWriter<SimpleResponse,SimpleRequest> * stream)47 Status StreamingCall(
48 ServerContext* /*context*/,
49 ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
50 SimpleRequest request;
51 while (stream->Read(&request)) {
52 SimpleResponse response;
53 auto s = SetResponse(&request, &response);
54 if (!s.ok()) {
55 return s;
56 }
57 if (!stream->Write(response)) {
58 return Status(StatusCode::INTERNAL, "Server couldn't respond");
59 }
60 }
61 return Status::OK;
62 }
StreamingFromClient(ServerContext * context,ServerReader<SimpleRequest> * stream,SimpleResponse * response)63 Status StreamingFromClient(ServerContext* context,
64 ServerReader<SimpleRequest>* stream,
65 SimpleResponse* response) override {
66 auto s = ClientPull(context, stream, response);
67 if (!s.ok()) {
68 return s;
69 }
70 return Status::OK;
71 }
StreamingFromServer(ServerContext * context,const SimpleRequest * request,ServerWriter<SimpleResponse> * stream)72 Status StreamingFromServer(ServerContext* context,
73 const SimpleRequest* request,
74 ServerWriter<SimpleResponse>* stream) override {
75 SimpleResponse response;
76 auto s = SetResponse(request, &response);
77 if (!s.ok()) {
78 return s;
79 }
80 return ServerPush(context, stream, response, nullptr);
81 }
StreamingBothWays(ServerContext * context,ServerReaderWriter<SimpleResponse,SimpleRequest> * stream)82 Status StreamingBothWays(
83 ServerContext* context,
84 ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
85 // Read the first client message to setup server response
86 SimpleRequest request;
87 if (!stream->Read(&request)) {
88 return Status::OK;
89 }
90 SimpleResponse response;
91 auto s = SetResponse(&request, &response);
92 if (!s.ok()) {
93 return s;
94 }
95 std::atomic_bool done;
96 Status sp;
97 std::thread t([context, stream, &response, &done, &sp]() {
98 sp = ServerPush(context, stream, response, [&done]() {
99 return done.load(std::memory_order_relaxed);
100 });
101 });
102 SimpleResponse dummy;
103 auto cp = ClientPull(context, stream, &dummy);
104 done.store(true, std::memory_order_relaxed); // can be lazy
105 t.join();
106 if (!cp.ok()) {
107 return cp;
108 }
109 if (!sp.ok()) {
110 return sp;
111 }
112 return Status::OK;
113 }
114
115 private:
116 template <class R>
ClientPull(ServerContext *,R * stream,SimpleResponse * response)117 static Status ClientPull(ServerContext* /*context*/, R* stream,
118 SimpleResponse* response) {
119 SimpleRequest request;
120 while (stream->Read(&request)) {
121 }
122 if (request.response_size() > 0) {
123 if (!Server::SetPayload(request.response_type(), request.response_size(),
124 response->mutable_payload())) {
125 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
126 }
127 }
128 return Status::OK;
129 }
130 template <class W>
ServerPush(ServerContext *,W * stream,const SimpleResponse & response,const std::function<bool ()> & done)131 static Status ServerPush(ServerContext* /*context*/, W* stream,
132 const SimpleResponse& response,
133 const std::function<bool()>& done) {
134 while ((done == nullptr) || !done()) {
135 // TODO(vjpai): Add potential for rate-pacing on this
136 if (!stream->Write(response)) {
137 return Status(StatusCode::INTERNAL, "Server couldn't push");
138 }
139 }
140 return Status::OK;
141 }
SetResponse(const SimpleRequest * request,SimpleResponse * response)142 static Status SetResponse(const SimpleRequest* request,
143 SimpleResponse* response) {
144 if (request->response_size() > 0) {
145 if (!Server::SetPayload(request->response_type(),
146 request->response_size(),
147 response->mutable_payload())) {
148 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
149 }
150 }
151 return Status::OK;
152 }
153 };
154
155 class SynchronousServer final : public grpc::testing::Server {
156 public:
SynchronousServer(const ServerConfig & config)157 explicit SynchronousServer(const ServerConfig& config) : Server(config) {
158 std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
159
160 auto port_num = port();
161 // Negative port number means inproc server, so no listen port needed
162 if (port_num >= 0) {
163 std::string server_address = grpc_core::JoinHostPort("::", port_num);
164 builder->AddListeningPort(server_address.c_str(),
165 Server::CreateServerCredentials(config),
166 &port_num);
167 }
168
169 ApplyConfigToBuilder(config, builder.get());
170
171 builder->RegisterService(&service_);
172
173 impl_ = builder->BuildAndStart();
174 if (impl_ == nullptr) {
175 gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
176 } else {
177 gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
178 }
179 }
180
InProcessChannel(const ChannelArguments & args)181 std::shared_ptr<Channel> InProcessChannel(
182 const ChannelArguments& args) override {
183 return impl_->InProcessChannel(args);
184 }
185
186 private:
187 BenchmarkServiceImpl service_;
188 std::unique_ptr<grpc::Server> impl_;
189 };
190
CreateSynchronousServer(const ServerConfig & config)191 std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
192 const ServerConfig& config) {
193 return std::unique_ptr<Server>(new SynchronousServer(config));
194 }
195
196 } // namespace testing
197 } // namespace grpc
198