1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 #include <grpc/grpc.h>
19 #include <grpc/support/alloc.h>
20 #include <grpcpp/security/server_credentials.h>
21 #include <grpcpp/server.h>
22 #include <grpcpp/server_context.h>
23
24 #include "src/core/lib/gprpp/host_port.h"
25 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
26 #include "test/cpp/qps/qps_server_builder.h"
27 #include "test/cpp/qps/server.h"
28 #include "test/cpp/qps/usage_timer.h"
29
30 namespace grpc {
31 namespace testing {
32
33 class BenchmarkCallbackServiceImpl final
34 : public BenchmarkService::ExperimentalCallbackService {
35 public:
UnaryCall(::grpc::experimental::CallbackServerContext * context,const SimpleRequest * request,SimpleResponse * response)36 ::grpc::experimental::ServerUnaryReactor* UnaryCall(
37 ::grpc::experimental::CallbackServerContext* context,
38 const SimpleRequest* request, SimpleResponse* response) override {
39 auto* reactor = context->DefaultReactor();
40 reactor->Finish(SetResponse(request, response));
41 return reactor;
42 }
43
44 ::grpc::experimental::ServerBidiReactor<::grpc::testing::SimpleRequest,
45 ::grpc::testing::SimpleResponse>*
StreamingCall(::grpc::experimental::CallbackServerContext *)46 StreamingCall(::grpc::experimental::CallbackServerContext*) override {
47 class Reactor
48 : public ::grpc::experimental::ServerBidiReactor<
49 ::grpc::testing::SimpleRequest, ::grpc::testing::SimpleResponse> {
50 public:
51 Reactor() { StartRead(&request_); }
52
53 void OnReadDone(bool ok) override {
54 if (!ok) {
55 Finish(::grpc::Status::OK);
56 return;
57 }
58 auto s = SetResponse(&request_, &response_);
59 if (!s.ok()) {
60 Finish(s);
61 return;
62 }
63 StartWrite(&response_);
64 }
65
66 void OnWriteDone(bool ok) override {
67 if (!ok) {
68 Finish(::grpc::Status::OK);
69 return;
70 }
71 StartRead(&request_);
72 }
73
74 void OnDone() override { delete (this); }
75
76 private:
77 SimpleRequest request_;
78 SimpleResponse response_;
79 };
80 return new Reactor;
81 }
82
83 private:
SetResponse(const SimpleRequest * request,SimpleResponse * response)84 static Status SetResponse(const SimpleRequest* request,
85 SimpleResponse* response) {
86 if (request->response_size() > 0) {
87 if (!Server::SetPayload(request->response_type(),
88 request->response_size(),
89 response->mutable_payload())) {
90 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
91 }
92 }
93 return Status::OK;
94 }
95 };
96
97 class CallbackServer final : public grpc::testing::Server {
98 public:
CallbackServer(const ServerConfig & config)99 explicit CallbackServer(const ServerConfig& config) : Server(config) {
100 std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
101
102 auto port_num = port();
103 // Negative port number means inproc server, so no listen port needed
104 if (port_num >= 0) {
105 std::string server_address = grpc_core::JoinHostPort("::", port_num);
106 builder->AddListeningPort(server_address.c_str(),
107 Server::CreateServerCredentials(config),
108 &port_num);
109 }
110
111 ApplyConfigToBuilder(config, builder.get());
112
113 builder->RegisterService(&service_);
114
115 impl_ = builder->BuildAndStart();
116 if (impl_ == nullptr) {
117 gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
118 } else {
119 gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
120 }
121 }
122
InProcessChannel(const ChannelArguments & args)123 std::shared_ptr<Channel> InProcessChannel(
124 const ChannelArguments& args) override {
125 return impl_->InProcessChannel(args);
126 }
127
128 private:
129 BenchmarkCallbackServiceImpl service_;
130 std::unique_ptr<grpc::Server> impl_;
131 };
132
CreateCallbackServer(const ServerConfig & config)133 std::unique_ptr<grpc::testing::Server> CreateCallbackServer(
134 const ServerConfig& config) {
135 return std::unique_ptr<Server>(new CallbackServer(config));
136 }
137
138 } // namespace testing
139 } // namespace grpc
140