1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/impl/channel_arg_names.h>
20 #include <grpcpp/grpcpp.h>
21 #include <grpcpp/security/credentials.h>
22 #include <grpcpp/support/channel_arguments.h>
23 #include <grpcpp/support/status.h>
24 #include <limits.h>
25 #include <stdio.h>
26
27 #include <chrono>
28 #include <memory>
29 #include <string>
30 #include <utility>
31 #include <vector>
32
33 #include "absl/flags/flag.h"
34 #include "absl/flags/parse.h"
35 #include "absl/log/check.h"
36 #include "absl/log/log.h"
37 #include "absl/strings/match.h"
38 #include "absl/strings/str_cat.h"
39 #include "src/core/util/notification.h"
40 #include "src/cpp/ext/chaotic_good.h"
41 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
42 #include "src/proto/grpc/testing/messages.pb.h"
43 #include "test/core/memory_usage/memstats.h"
44 #include "test/core/test_util/test_config.h"
45
46 ABSL_FLAG(std::string, target, "", "Target host:port");
47 ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
48 ABSL_FLAG(int, server_pid, 0, "Server's pid");
49 ABSL_FLAG(int, size, 50, "Number of channels");
50 ABSL_FLAG(bool, chaotic_good, false, "Use chaotic good");
51
CreateChannelForTest(int index)52 std::shared_ptr<grpc::Channel> CreateChannelForTest(int index) {
53 // Set the authentication mechanism.
54 std::shared_ptr<grpc::ChannelCredentials> creds =
55 grpc::InsecureChannelCredentials();
56 if (absl::GetFlag(FLAGS_chaotic_good)) {
57 creds = grpc::ChaoticGoodInsecureChannelCredentials();
58 } else if (absl::GetFlag(FLAGS_secure)) {
59 // TODO (chennancy) Add in secure credentials
60 LOG(INFO) << "Supposed to be secure, is not yet";
61 }
62
63 // Channel args to prevent connection from closing after RPC is done
64 grpc::ChannelArguments channel_args;
65 channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_IDLE_MS, INT_MAX);
66 channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_AGE_MS, INT_MAX);
67 // Arg to bypass mechanism that combines channels on the serverside if they
68 // have the same channel args. Allows for one channel per connection
69 channel_args.SetInt("grpc.memory_usage_counter", index);
70
71 // Create a channel to the server and a stub
72 std::shared_ptr<grpc::Channel> channel =
73 CreateCustomChannel(absl::GetFlag(FLAGS_target), creds, channel_args);
74 return channel;
75 }
76
77 struct CallParams {
78 grpc::ClientContext context;
79 grpc::testing::SimpleRequest request;
80 grpc::testing::SimpleResponse response;
81 grpc::testing::MemorySize snapshot_response;
82 grpc_core::Notification done;
83 };
84
85 // Simple Unary RPC to send to confirm connection is open
UnaryCall(std::shared_ptr<grpc::Channel> channel)86 std::shared_ptr<CallParams> UnaryCall(std::shared_ptr<grpc::Channel> channel) {
87 std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
88 grpc::testing::BenchmarkService::NewStub(channel);
89
90 // Start a call.
91 auto params = std::make_shared<CallParams>();
92 stub->async()->UnaryCall(¶ms->context, ¶ms->request,
93 ¶ms->response,
94 [params](const grpc::Status& status) {
95 if (!status.ok()) {
96 LOG(ERROR) << "UnaryCall RPC failed.";
97 }
98 params->done.Notify();
99 });
100 return params;
101 }
102
103 // Get memory usage of server's process before the server is made
GetBeforeSnapshot(std::shared_ptr<grpc::Channel> channel,long & before_server_memory)104 std::shared_ptr<CallParams> GetBeforeSnapshot(
105 std::shared_ptr<grpc::Channel> channel, long& before_server_memory) {
106 std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
107 grpc::testing::BenchmarkService::NewStub(channel);
108
109 // Start a call.
110 auto params = std::make_shared<CallParams>();
111 stub->async()->GetBeforeSnapshot(
112 ¶ms->context, ¶ms->request, ¶ms->snapshot_response,
113 [params, &before_server_memory](const grpc::Status& status) {
114 if (status.ok()) {
115 before_server_memory = params->snapshot_response.rss();
116 LOG(INFO) << "Server Before RPC: " << params->snapshot_response.rss();
117 LOG(INFO) << "GetBeforeSnapshot succeeded.";
118 } else {
119 LOG(ERROR) << "GetBeforeSnapshot failed.";
120 }
121 params->done.Notify();
122 });
123 return params;
124 }
125
main(int argc,char ** argv)126 int main(int argc, char** argv) {
127 absl::ParseCommandLine(argc, argv);
128 char* fake_argv[1];
129 CHECK_GE(argc, 1);
130 fake_argv[0] = argv[0];
131 grpc::testing::TestEnvironment env(&argc, argv);
132 if (absl::GetFlag(FLAGS_target).empty()) {
133 LOG(ERROR) << "Client: No target port entered";
134 return 1;
135 }
136 LOG(INFO) << "Client Target: " << absl::GetFlag(FLAGS_target);
137 LOG(INFO) << "Client Size: " << absl::GetFlag(FLAGS_size);
138
139 // Getting initial memory usage
140 std::shared_ptr<grpc::Channel> get_memory_channel = CreateChannelForTest(0);
141 long before_server_memory;
142 GetBeforeSnapshot(get_memory_channel, before_server_memory)
143 ->done.WaitForNotification();
144 long before_client_memory = GetMemUsage();
145
146 // Create the channels and send an RPC to confirm they're open
147 int size = absl::GetFlag(FLAGS_size);
148 std::vector<std::shared_ptr<grpc::Channel>> channels_list(size);
149 for (int i = 0; i < size; ++i) {
150 std::shared_ptr<grpc::Channel> channel = CreateChannelForTest(i);
151 channels_list[i] = channel;
152 UnaryCall(channel)->done.WaitForNotification();
153 }
154
155 // Getting peak memory usage
156 long peak_server_memory = absl::GetFlag(FLAGS_server_pid) > 0
157 ? GetMemUsage(absl::GetFlag(FLAGS_server_pid))
158 : 0;
159 long peak_client_memory = GetMemUsage();
160
161 // Checking that all channels are still open
162 for (int i = 0; i < size; ++i) {
163 CHECK(!std::exchange(channels_list[i], nullptr)
164 ->WaitForStateChange(GRPC_CHANNEL_READY,
165 std::chrono::system_clock::now() +
166 std::chrono::milliseconds(1)));
167 }
168
169 std::string prefix;
170 if (absl::StartsWith(absl::GetFlag(FLAGS_target), "xds:")) prefix = "xds ";
171 if (absl::GetFlag(FLAGS_server_pid) == 0) {
172 absl::StrAppend(&prefix, "multi_address ");
173 }
174 printf("---------Client channel stats--------\n");
175 printf("%sclient channel memory usage: %f bytes per channel\n",
176 prefix.c_str(),
177 static_cast<double>(peak_client_memory - before_client_memory) / size *
178 1024);
179 if (absl::GetFlag(FLAGS_server_pid) > 0) {
180 printf("---------Server channel stats--------\n");
181 printf("%sserver channel memory usage: %f bytes per channel\n",
182 prefix.c_str(),
183 static_cast<double>(peak_server_memory - before_server_memory) /
184 size * 1024);
185 }
186 LOG(INFO) << "Client Done";
187 return 0;
188 }
189