1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <limits.h>
20 #include <stdio.h>
21
22 #include <chrono>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/flags/flag.h"
29 #include "absl/flags/parse.h"
30 #include "absl/strings/match.h"
31 #include "absl/strings/str_cat.h"
32
33 #include <grpc/impl/channel_arg_names.h>
34 #include <grpc/support/log.h>
35 #include <grpcpp/grpcpp.h>
36 #include <grpcpp/security/credentials.h>
37 #include <grpcpp/support/channel_arguments.h>
38 #include <grpcpp/support/status.h>
39
40 #include "src/core/lib/gprpp/notification.h"
41 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
42 #include "src/proto/grpc/testing/messages.pb.h"
43 #include "test/core/memory_usage/memstats.h"
44 #include "test/core/util/test_config.h"
45
46 ABSL_FLAG(std::string, target, "", "Target host:port");
47 ABSL_FLAG(bool, secure, false, "Use SSL Credentials");
48 ABSL_FLAG(int, server_pid, 0, "Server's pid");
49 ABSL_FLAG(int, size, 50, "Number of channels");
50
CreateChannelForTest(int index)51 std::shared_ptr<grpc::Channel> CreateChannelForTest(int index) {
52 // Set the authentication mechanism.
53 std::shared_ptr<grpc::ChannelCredentials> creds =
54 grpc::InsecureChannelCredentials();
55 if (absl::GetFlag(FLAGS_secure)) {
56 // TODO (chennancy) Add in secure credentials
57 gpr_log(GPR_INFO, "Supposed to be secure, is not yet");
58 }
59
60 // Channel args to prevent connection from closing after RPC is done
61 grpc::ChannelArguments channel_args;
62 channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_IDLE_MS, INT_MAX);
63 channel_args.SetInt(GRPC_ARG_MAX_CONNECTION_AGE_MS, INT_MAX);
64 // Arg to bypass mechanism that combines channels on the serverside if they
65 // have the same channel args. Allows for one channel per connection
66 channel_args.SetInt("grpc.memory_usage_counter", index);
67
68 // Create a channel to the server and a stub
69 std::shared_ptr<grpc::Channel> channel =
70 CreateCustomChannel(absl::GetFlag(FLAGS_target), creds, channel_args);
71 return channel;
72 }
73
74 struct CallParams {
75 grpc::ClientContext context;
76 grpc::testing::SimpleRequest request;
77 grpc::testing::SimpleResponse response;
78 grpc::testing::MemorySize snapshot_response;
79 grpc_core::Notification done;
80 };
81
82 // Simple Unary RPC to send to confirm connection is open
UnaryCall(std::shared_ptr<grpc::Channel> channel)83 std::shared_ptr<CallParams> UnaryCall(std::shared_ptr<grpc::Channel> channel) {
84 std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
85 grpc::testing::BenchmarkService::NewStub(channel);
86
87 // Start a call.
88 auto params = std::make_shared<CallParams>();
89 stub->async()->UnaryCall(¶ms->context, ¶ms->request,
90 ¶ms->response,
91 [params](const grpc::Status& status) {
92 if (!status.ok()) {
93 gpr_log(GPR_ERROR, "UnaryCall RPC failed.");
94 }
95 params->done.Notify();
96 });
97 return params;
98 }
99
100 // Get memory usage of server's process before the server is made
GetBeforeSnapshot(std::shared_ptr<grpc::Channel> channel,long & before_server_memory)101 std::shared_ptr<CallParams> GetBeforeSnapshot(
102 std::shared_ptr<grpc::Channel> channel, long& before_server_memory) {
103 std::unique_ptr<grpc::testing::BenchmarkService::Stub> stub =
104 grpc::testing::BenchmarkService::NewStub(channel);
105
106 // Start a call.
107 auto params = std::make_shared<CallParams>();
108 stub->async()->GetBeforeSnapshot(
109 ¶ms->context, ¶ms->request, ¶ms->snapshot_response,
110 [params, &before_server_memory](const grpc::Status& status) {
111 if (status.ok()) {
112 before_server_memory = params->snapshot_response.rss();
113 gpr_log(GPR_INFO, "Server Before RPC: %ld",
114 params->snapshot_response.rss());
115 gpr_log(GPR_INFO, "GetBeforeSnapshot succeeded.");
116 } else {
117 gpr_log(GPR_ERROR, "GetBeforeSnapshot failed.");
118 }
119 params->done.Notify();
120 });
121 return params;
122 }
123
main(int argc,char ** argv)124 int main(int argc, char** argv) {
125 absl::ParseCommandLine(argc, argv);
126 char* fake_argv[1];
127 GPR_ASSERT(argc >= 1);
128 fake_argv[0] = argv[0];
129 grpc::testing::TestEnvironment env(&argc, argv);
130 if (absl::GetFlag(FLAGS_target).empty()) {
131 gpr_log(GPR_ERROR, "Client: No target port entered");
132 return 1;
133 }
134 gpr_log(GPR_INFO, "Client Target: %s", absl::GetFlag(FLAGS_target).c_str());
135 gpr_log(GPR_INFO, "Client Size: %d", absl::GetFlag(FLAGS_size));
136
137 // Getting initial memory usage
138 std::shared_ptr<grpc::Channel> get_memory_channel = CreateChannelForTest(0);
139 long before_server_memory;
140 GetBeforeSnapshot(get_memory_channel, before_server_memory)
141 ->done.WaitForNotification();
142 long before_client_memory = GetMemUsage();
143
144 // Create the channels and send an RPC to confirm they're open
145 int size = absl::GetFlag(FLAGS_size);
146 std::vector<std::shared_ptr<grpc::Channel>> channels_list(size);
147 for (int i = 0; i < size; ++i) {
148 std::shared_ptr<grpc::Channel> channel = CreateChannelForTest(i);
149 channels_list[i] = channel;
150 UnaryCall(channel)->done.WaitForNotification();
151 }
152
153 // Getting peak memory usage
154 long peak_server_memory = absl::GetFlag(FLAGS_server_pid) > 0
155 ? GetMemUsage(absl::GetFlag(FLAGS_server_pid))
156 : 0;
157 long peak_client_memory = GetMemUsage();
158
159 // Checking that all channels are still open
160 for (int i = 0; i < size; ++i) {
161 GPR_ASSERT(!std::exchange(channels_list[i], nullptr)
162 ->WaitForStateChange(GRPC_CHANNEL_READY,
163 std::chrono::system_clock::now() +
164 std::chrono::milliseconds(1)));
165 }
166
167 std::string prefix;
168 if (absl::StartsWith(absl::GetFlag(FLAGS_target), "xds:")) prefix = "xds ";
169 if (absl::GetFlag(FLAGS_server_pid) == 0) {
170 absl::StrAppend(&prefix, "multi_address ");
171 }
172 printf("---------Client channel stats--------\n");
173 printf("%sclient channel memory usage: %f bytes per channel\n",
174 prefix.c_str(),
175 static_cast<double>(peak_client_memory - before_client_memory) / size *
176 1024);
177 if (absl::GetFlag(FLAGS_server_pid) > 0) {
178 printf("---------Server channel stats--------\n");
179 printf("%sserver channel memory usage: %f bytes per channel\n",
180 prefix.c_str(),
181 static_cast<double>(peak_server_memory - before_server_memory) /
182 size * 1024);
183 }
184 gpr_log(GPR_INFO, "Client Done");
185 return 0;
186 }
187