• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <limits.h>
20 #include <stdio.h>
21 
22 #include <chrono>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/flags/flag.h"
29 #include "absl/flags/parse.h"
30 #include "absl/strings/match.h"
31 #include "absl/strings/str_cat.h"
32 
33 #include <grpc/impl/channel_arg_names.h>
34 #include <grpc/support/log.h>
35 #include <grpcpp/grpcpp.h>
36 #include <grpcpp/security/credentials.h>
37 #include <grpcpp/support/channel_arguments.h>
38 #include <grpcpp/support/status.h>
39 
40 #include "src/core/lib/gprpp/notification.h"
41 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
42 #include "src/proto/grpc/testing/messages.pb.h"
43 #include "test/core/memory_usage/memstats.h"
44 #include "test/core/util/test_config.h"
45 
46 ABSL_FLAG(std::string, target, "", "Target host:port");
47 ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
48 ABSL_FLAG(int, server_pid, 0, "Server's pid");
49 ABSL_FLAG(int, size, 50, "Number of channels");
50 
CreateChannelForTest(int index)51 std::shared_ptr<grpc::Channel> CreateChannelForTest(int index) {
52   // Set the authentication mechanism.
53   std::shared_ptr<grpc::ChannelCredentials> creds =
54       grpc::InsecureChannelCredentials();
55   if (absl::GetFlag(FLAGS_secure)) {
56     // TODO (chennancy) Add in secure credentials
57     gpr_log(GPR_INFO, "Supposed to be secure, is not yet");
58   }
59 
60   // Channel args to prevent connection from closing after RPC is done
61   grpc::ChannelArguments channel_args;
62   channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_IDLE_MS, INT_MAX);
63   channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_AGE_MS, INT_MAX);
64   // Arg to bypass mechanism that combines channels on the serverside if they
65   // have the same channel args. Allows for one channel per connection
66   channel_args.SetInt("grpc.memory_usage_counter", index);
67 
68   // Create a channel to the server and a stub
69   std::shared_ptr<grpc::Channel> channel =
70       CreateCustomChannel(absl::GetFlag(FLAGS_target), creds, channel_args);
71   return channel;
72 }
73 
74 struct CallParams {
75   grpc::ClientContext context;
76   grpc::testing::SimpleRequest request;
77   grpc::testing::SimpleResponse response;
78   grpc::testing::MemorySize snapshot_response;
79   grpc_core::Notification done;
80 };
81 
82 // Simple Unary RPC to send to confirm connection is open
UnaryCall(std::shared_ptr<grpc::Channel> channel)83 std::shared_ptr<CallParams> UnaryCall(std::shared_ptr<grpc::Channel> channel) {
84   std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
85       grpc::testing::BenchmarkService::NewStub(channel);
86 
87   // Start a call.
88   auto params = std::make_shared<CallParams>();
89   stub->async()->UnaryCall(&params->context, &params->request,
90                            &params->response,
91                            [params](const grpc::Status& status) {
92                              if (!status.ok()) {
93                                gpr_log(GPR_ERROR, "UnaryCall RPC failed.");
94                              }
95                              params->done.Notify();
96                            });
97   return params;
98 }
99 
100 // Get memory usage of server's process before the server is made
GetBeforeSnapshot(std::shared_ptr<grpc::Channel> channel,long & before_server_memory)101 std::shared_ptr<CallParams> GetBeforeSnapshot(
102     std::shared_ptr<grpc::Channel> channel, long& before_server_memory) {
103   std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
104       grpc::testing::BenchmarkService::NewStub(channel);
105 
106   // Start a call.
107   auto params = std::make_shared<CallParams>();
108   stub->async()->GetBeforeSnapshot(
109       &params->context, &params->request, &params->snapshot_response,
110       [params, &before_server_memory](const grpc::Status& status) {
111         if (status.ok()) {
112           before_server_memory = params->snapshot_response.rss();
113           gpr_log(GPR_INFO, "Server Before RPC: %ld",
114                   params->snapshot_response.rss());
115           gpr_log(GPR_INFO, "GetBeforeSnapshot succeeded.");
116         } else {
117           gpr_log(GPR_ERROR, "GetBeforeSnapshot failed.");
118         }
119         params->done.Notify();
120       });
121   return params;
122 }
123 
main(int argc,char ** argv)124 int main(int argc, char** argv) {
125   absl::ParseCommandLine(argc, argv);
126   char* fake_argv[1];
127   GPR_ASSERT(argc >= 1);
128   fake_argv[0] = argv[0];
129   grpc::testing::TestEnvironment env(&argc, argv);
130   if (absl::GetFlag(FLAGS_target).empty()) {
131     gpr_log(GPR_ERROR, "Client: No target port entered");
132     return 1;
133   }
134   gpr_log(GPR_INFO, "Client Target: %s", absl::GetFlag(FLAGS_target).c_str());
135   gpr_log(GPR_INFO, "Client Size: %d", absl::GetFlag(FLAGS_size));
136 
137   // Getting initial memory usage
138   std::shared_ptr<grpc::Channel> get_memory_channel = CreateChannelForTest(0);
139   long before_server_memory;
140   GetBeforeSnapshot(get_memory_channel, before_server_memory)
141       ->done.WaitForNotification();
142   long before_client_memory = GetMemUsage();
143 
144   // Create the channels and send an RPC to confirm they're open
145   int size = absl::GetFlag(FLAGS_size);
146   std::vector<std::shared_ptr<grpc::Channel>> channels_list(size);
147   for (int i = 0; i < size; ++i) {
148     std::shared_ptr<grpc::Channel> channel = CreateChannelForTest(i);
149     channels_list[i] = channel;
150     UnaryCall(channel)->done.WaitForNotification();
151   }
152 
153   // Getting peak memory usage
154   long peak_server_memory = absl::GetFlag(FLAGS_server_pid) > 0
155                                 ? GetMemUsage(absl::GetFlag(FLAGS_server_pid))
156                                 : 0;
157   long peak_client_memory = GetMemUsage();
158 
159   // Checking that all channels are still open
160   for (int i = 0; i < size; ++i) {
161     GPR_ASSERT(!std::exchange(channels_list[i], nullptr)
162                     ->WaitForStateChange(GRPC_CHANNEL_READY,
163                                          std::chrono::system_clock::now() +
164                                              std::chrono::milliseconds(1)));
165   }
166 
167   std::string prefix;
168   if (absl::StartsWith(absl::GetFlag(FLAGS_target), "xds:")) prefix = "xds ";
169   if (absl::GetFlag(FLAGS_server_pid) == 0) {
170     absl::StrAppend(&prefix, "multi_address ");
171   }
172   printf("---------Client channel stats--------\n");
173   printf("%sclient channel memory usage: %f bytes per channel\n",
174          prefix.c_str(),
175          static_cast<double>(peak_client_memory - before_client_memory) / size *
176              1024);
177   if (absl::GetFlag(FLAGS_server_pid) > 0) {
178     printf("---------Server channel stats--------\n");
179     printf("%sserver channel memory usage: %f bytes per channel\n",
180            prefix.c_str(),
181            static_cast<double>(peak_server_memory - before_server_memory) /
182                size * 1024);
183   }
184   gpr_log(GPR_INFO, "Client Done");
185   return 0;
186 }
187