• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/impl/channel_arg_names.h>
20 #include <grpcpp/grpcpp.h>
21 #include <grpcpp/security/credentials.h>
22 #include <grpcpp/support/channel_arguments.h>
23 #include <grpcpp/support/status.h>
24 #include <limits.h>
25 #include <stdio.h>
26 
27 #include <chrono>
28 #include <memory>
29 #include <string>
30 #include <utility>
31 #include <vector>
32 
33 #include "absl/flags/flag.h"
34 #include "absl/flags/parse.h"
35 #include "absl/log/check.h"
36 #include "absl/log/log.h"
37 #include "absl/strings/match.h"
38 #include "absl/strings/str_cat.h"
39 #include "src/core/util/notification.h"
40 #include "src/cpp/ext/chaotic_good.h"
41 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
42 #include "src/proto/grpc/testing/messages.pb.h"
43 #include "test/core/memory_usage/memstats.h"
44 #include "test/core/test_util/test_config.h"
45 
46 ABSL_FLAG(std::string, target, "", "Target host:port");
47 ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
48 ABSL_FLAG(int, server_pid, 0, "Server's pid");
49 ABSL_FLAG(int, size, 50, "Number of channels");
50 ABSL_FLAG(bool, chaotic_good, false, "Use chaotic good");
51 
CreateChannelForTest(int index)52 std::shared_ptr<grpc::Channel> CreateChannelForTest(int index) {
53   // Set the authentication mechanism.
54   std::shared_ptr<grpc::ChannelCredentials> creds =
55       grpc::InsecureChannelCredentials();
56   if (absl::GetFlag(FLAGS_chaotic_good)) {
57     creds = grpc::ChaoticGoodInsecureChannelCredentials();
58   } else if (absl::GetFlag(FLAGS_secure)) {
59     // TODO (chennancy) Add in secure credentials
60     LOG(INFO) << "Supposed to be secure, is not yet";
61   }
62 
63   // Channel args to prevent connection from closing after RPC is done
64   grpc::ChannelArguments channel_args;
65   channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_IDLE_MS, INT_MAX);
66   channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_AGE_MS, INT_MAX);
67   // Arg to bypass mechanism that combines channels on the serverside if they
68   // have the same channel args. Allows for one channel per connection
69   channel_args.SetInt("grpc.memory_usage_counter", index);
70 
71   // Create a channel to the server and a stub
72   std::shared_ptr<grpc::Channel> channel =
73       CreateCustomChannel(absl::GetFlag(FLAGS_target), creds, channel_args);
74   return channel;
75 }
76 
77 struct CallParams {
78   grpc::ClientContext context;
79   grpc::testing::SimpleRequest request;
80   grpc::testing::SimpleResponse response;
81   grpc::testing::MemorySize snapshot_response;
82   grpc_core::Notification done;
83 };
84 
85 // Simple Unary RPC to send to confirm connection is open
UnaryCall(std::shared_ptr<grpc::Channel> channel)86 std::shared_ptr<CallParams> UnaryCall(std::shared_ptr<grpc::Channel> channel) {
87   std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
88       grpc::testing::BenchmarkService::NewStub(channel);
89 
90   // Start a call.
91   auto params = std::make_shared<CallParams>();
92   stub->async()->UnaryCall(&params->context, &params->request,
93                            &params->response,
94                            [params](const grpc::Status& status) {
95                              if (!status.ok()) {
96                                LOG(ERROR) << "UnaryCall RPC failed.";
97                              }
98                              params->done.Notify();
99                            });
100   return params;
101 }
102 
103 // Get memory usage of server's process before the server is made
GetBeforeSnapshot(std::shared_ptr<grpc::Channel> channel,long & before_server_memory)104 std::shared_ptr<CallParams> GetBeforeSnapshot(
105     std::shared_ptr<grpc::Channel> channel, long& before_server_memory) {
106   std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
107       grpc::testing::BenchmarkService::NewStub(channel);
108 
109   // Start a call.
110   auto params = std::make_shared<CallParams>();
111   stub->async()->GetBeforeSnapshot(
112       &params->context, &params->request, &params->snapshot_response,
113       [params, &before_server_memory](const grpc::Status& status) {
114         if (status.ok()) {
115           before_server_memory = params->snapshot_response.rss();
116           LOG(INFO) << "Server Before RPC: " << params->snapshot_response.rss();
117           LOG(INFO) << "GetBeforeSnapshot succeeded.";
118         } else {
119           LOG(ERROR) << "GetBeforeSnapshot failed.";
120         }
121         params->done.Notify();
122       });
123   return params;
124 }
125 
main(int argc,char ** argv)126 int main(int argc, char** argv) {
127   absl::ParseCommandLine(argc, argv);
128   char* fake_argv[1];
129   CHECK_GE(argc, 1);
130   fake_argv[0] = argv[0];
131   grpc::testing::TestEnvironment env(&argc, argv);
132   if (absl::GetFlag(FLAGS_target).empty()) {
133     LOG(ERROR) << "Client: No target port entered";
134     return 1;
135   }
136   LOG(INFO) << "Client Target: " << absl::GetFlag(FLAGS_target);
137   LOG(INFO) << "Client Size: " << absl::GetFlag(FLAGS_size);
138 
139   // Getting initial memory usage
140   std::shared_ptr<grpc::Channel> get_memory_channel = CreateChannelForTest(0);
141   long before_server_memory;
142   GetBeforeSnapshot(get_memory_channel, before_server_memory)
143       ->done.WaitForNotification();
144   long before_client_memory = GetMemUsage();
145 
146   // Create the channels and send an RPC to confirm they're open
147   int size = absl::GetFlag(FLAGS_size);
148   std::vector<std::shared_ptr<grpc::Channel>> channels_list(size);
149   for (int i = 0; i < size; ++i) {
150     std::shared_ptr<grpc::Channel> channel = CreateChannelForTest(i);
151     channels_list[i] = channel;
152     UnaryCall(channel)->done.WaitForNotification();
153   }
154 
155   // Getting peak memory usage
156   long peak_server_memory = absl::GetFlag(FLAGS_server_pid) > 0
157                                 ? GetMemUsage(absl::GetFlag(FLAGS_server_pid))
158                                 : 0;
159   long peak_client_memory = GetMemUsage();
160 
161   // Checking that all channels are still open
162   for (int i = 0; i < size; ++i) {
163     CHECK(!std::exchange(channels_list[i], nullptr)
164                ->WaitForStateChange(GRPC_CHANNEL_READY,
165                                     std::chrono::system_clock::now() +
166                                         std::chrono::milliseconds(1)));
167   }
168 
169   std::string prefix;
170   if (absl::StartsWith(absl::GetFlag(FLAGS_target), "xds:")) prefix = "xds ";
171   if (absl::GetFlag(FLAGS_server_pid) == 0) {
172     absl::StrAppend(&prefix, "multi_address ");
173   }
174   printf("---------Client channel stats--------\n");
175   printf("%sclient channel memory usage: %f bytes per channel\n",
176          prefix.c_str(),
177          static_cast<double>(peak_client_memory - before_client_memory) / size *
178              1024);
179   if (absl::GetFlag(FLAGS_server_pid) > 0) {
180     printf("---------Server channel stats--------\n");
181     printf("%sserver channel memory usage: %f bytes per channel\n",
182            prefix.c_str(),
183            static_cast<double>(peak_server_memory - before_server_memory) /
184                size * 1024);
185   }
186   LOG(INFO) << "Client Done";
187   return 0;
188 }
189