• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2024 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include <grpcpp/ext/proto_server_reflection_plugin.h>
19 #include <grpcpp/grpcpp.h>
20 #include <grpcpp/health_check_service_interface.h>
21 #include <grpcpp/support/status.h>
22 
23 #include <cstddef>
24 #include <iostream>
25 #include <memory>
26 #include <string>
27 
28 #include "absl/flags/flag.h"
29 #include "absl/flags/parse.h"
30 #include "absl/strings/str_cat.h"
31 
32 #ifdef BAZEL_BUILD
33 #include "examples/protos/helloworld.grpc.pb.h"
34 #else
35 #include "helloworld.grpc.pb.h"
36 #endif
37 
38 ABSL_FLAG(uint16_t, port, 50051, "Server port for the service");
39 ABSL_FLAG(size_t, message_size, 3 * 1024 * 1024,
40           "Size of the messages to send");
41 ABSL_FLAG(uint32_t, to_send, 10,
42           "Messages to send in response to a single request");
43 
44 using grpc::CallbackServerContext;
45 using grpc::Server;
46 using grpc::ServerBuilder;
47 using grpc::ServerUnaryReactor;
48 using grpc::Status;
49 
50 namespace {
51 
52 //
53 // Will write the replies as fast as it can, starting a new write as soon as
54 // previous one is done.
55 //
56 class HelloReactor final
57     : public grpc::ServerWriteReactor<helloworld::HelloReply> {
58  public:
HelloReactor(size_t message_size,size_t to_send)59   HelloReactor(size_t message_size, size_t to_send)
60       : messages_to_send_(to_send) {
61     res_.set_message(std::string(message_size, '#'));
62     Write();
63   }
64 
Write()65   void Write() {
66     absl::MutexLock lock(&mu_);
67     StartWrite(&res_);
68     --messages_to_send_;
69     write_start_time_ = absl::Now();
70   }
71 
OnWriteDone(bool ok)72   void OnWriteDone(bool ok) override {
73     bool more = false;
74     {
75       absl::MutexLock lock(&mu_);
76       std::cout << "Write #" << messages_to_send_ << " done (Ok: " << ok
77                 << "): " << absl::Now() - *write_start_time_ << "\n";
78       write_start_time_ = absl::nullopt;
79       more = ok && messages_to_send_ > 0;
80     }
81     if (more) {
82       Write();
83     } else {
84       Finish(grpc::Status::OK);
85       std::cout << "Done sending messages\n";
86     }
87   }
88 
OnDone()89   void OnDone() override { delete this; }
90 
91  private:
92   helloworld::HelloReply res_;
93   size_t messages_to_send_;
94   absl::optional<absl::Time> write_start_time_;
95   absl::Mutex mu_;
96 };
97 
98 class GreeterService final : public helloworld::Greeter::CallbackService {
99  public:
GreeterService(size_t message_size,size_t to_send)100   GreeterService(size_t message_size, size_t to_send)
101       : message_size_(message_size), to_send_(to_send) {}
102 
SayHelloStreamReply(grpc::CallbackServerContext *,const helloworld::HelloRequest * request)103   grpc::ServerWriteReactor<helloworld::HelloReply>* SayHelloStreamReply(
104       grpc::CallbackServerContext* /*context*/,
105       const helloworld::HelloRequest* request) override {
106     return new HelloReactor(message_size_, to_send_);
107   }
108 
109  private:
110   size_t message_size_;
111   size_t to_send_;
112 };
113 
114 }  // namespace
115 
main(int argc,char * argv[])116 int main(int argc, char* argv[]) {
117   absl::ParseCommandLine(argc, argv);
118   std::string server_address =
119       absl::StrCat("0.0.0.0:", absl::GetFlag(FLAGS_port));
120   grpc::EnableDefaultHealthCheckService(true);
121   grpc::reflection::InitProtoReflectionServerBuilderPlugin();
122   GreeterService service(absl::GetFlag(FLAGS_message_size),
123                          absl::GetFlag(FLAGS_to_send));
124   ServerBuilder builder;
125   builder.RegisterService(&service);
126   builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
127   std::unique_ptr<Server> server(builder.BuildAndStart());
128   std::cout << "Server listening on " << server_address << std::endl;
129   server->Wait();
130   return 0;
131 }
132