1 /*
2 *
3 * Copyright 2024 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 #include <grpcpp/ext/proto_server_reflection_plugin.h>
19 #include <grpcpp/grpcpp.h>
20 #include <grpcpp/health_check_service_interface.h>
21 #include <grpcpp/support/status.h>
22
23 #include <cstddef>
24 #include <iostream>
25 #include <memory>
26 #include <string>
27
28 #include "absl/flags/flag.h"
29 #include "absl/flags/parse.h"
30 #include "absl/strings/str_cat.h"
31
32 #ifdef BAZEL_BUILD
33 #include "examples/protos/helloworld.grpc.pb.h"
34 #else
35 #include "helloworld.grpc.pb.h"
36 #endif
37
38 ABSL_FLAG(uint16_t, port, 50051, "Server port for the service");
39 ABSL_FLAG(size_t, message_size, 3 * 1024 * 1024,
40 "Size of the messages to send");
41 ABSL_FLAG(uint32_t, to_send, 10,
42 "Messages to send in response to a single request");
43
44 using grpc::CallbackServerContext;
45 using grpc::Server;
46 using grpc::ServerBuilder;
47 using grpc::ServerUnaryReactor;
48 using grpc::Status;
49
50 namespace {
51
52 //
53 // Will write the replies as fast as it can, starting a new write as soon as
54 // previous one is done.
55 //
56 class HelloReactor final
57 : public grpc::ServerWriteReactor<helloworld::HelloReply> {
58 public:
HelloReactor(size_t message_size,size_t to_send)59 HelloReactor(size_t message_size, size_t to_send)
60 : messages_to_send_(to_send) {
61 res_.set_message(std::string(message_size, '#'));
62 Write();
63 }
64
Write()65 void Write() {
66 absl::MutexLock lock(&mu_);
67 StartWrite(&res_);
68 --messages_to_send_;
69 write_start_time_ = absl::Now();
70 }
71
OnWriteDone(bool ok)72 void OnWriteDone(bool ok) override {
73 bool more = false;
74 {
75 absl::MutexLock lock(&mu_);
76 std::cout << "Write #" << messages_to_send_ << " done (Ok: " << ok
77 << "): " << absl::Now() - *write_start_time_ << "\n";
78 write_start_time_ = absl::nullopt;
79 more = ok && messages_to_send_ > 0;
80 }
81 if (more) {
82 Write();
83 } else {
84 Finish(grpc::Status::OK);
85 std::cout << "Done sending messages\n";
86 }
87 }
88
OnDone()89 void OnDone() override { delete this; }
90
91 private:
92 helloworld::HelloReply res_;
93 size_t messages_to_send_;
94 absl::optional<absl::Time> write_start_time_;
95 absl::Mutex mu_;
96 };
97
98 class GreeterService final : public helloworld::Greeter::CallbackService {
99 public:
GreeterService(size_t message_size,size_t to_send)100 GreeterService(size_t message_size, size_t to_send)
101 : message_size_(message_size), to_send_(to_send) {}
102
SayHelloStreamReply(grpc::CallbackServerContext *,const helloworld::HelloRequest * request)103 grpc::ServerWriteReactor<helloworld::HelloReply>* SayHelloStreamReply(
104 grpc::CallbackServerContext* /*context*/,
105 const helloworld::HelloRequest* request) override {
106 return new HelloReactor(message_size_, to_send_);
107 }
108
109 private:
110 size_t message_size_;
111 size_t to_send_;
112 };
113
114 } // namespace
115
main(int argc,char * argv[])116 int main(int argc, char* argv[]) {
117 absl::ParseCommandLine(argc, argv);
118 std::string server_address =
119 absl::StrCat("0.0.0.0:", absl::GetFlag(FLAGS_port));
120 grpc::EnableDefaultHealthCheckService(true);
121 grpc::reflection::InitProtoReflectionServerBuilderPlugin();
122 GreeterService service(absl::GetFlag(FLAGS_message_size),
123 absl::GetFlag(FLAGS_to_send));
124 ServerBuilder builder;
125 builder.RegisterService(&service);
126 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
127 std::unique_ptr<Server> server(builder.BuildAndStart());
128 std::cout << "Server listening on " << server_address << std::endl;
129 server->Wait();
130 return 0;
131 }
132