• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <grpc/grpc.h>
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/create_channel.h>
25 #include <grpcpp/security/credentials.h>
26 #include <grpcpp/security/server_credentials.h>
27 #include <grpcpp/server.h>
28 #include <grpcpp/server_builder.h>
29 #include <grpcpp/server_context.h>
30 
31 #include <grpcpp/ext/channelz_service_plugin.h>
32 #include "src/proto/grpc/channelz/channelz.grpc.pb.h"
33 #include "src/proto/grpc/testing/echo.grpc.pb.h"
34 #include "test/core/util/port.h"
35 #include "test/core/util/test_config.h"
36 #include "test/cpp/end2end/test_service_impl.h"
37 
38 #include <google/protobuf/text_format.h>
39 
40 #include <gtest/gtest.h>
41 
42 using grpc::channelz::v1::GetChannelRequest;
43 using grpc::channelz::v1::GetChannelResponse;
44 using grpc::channelz::v1::GetServersRequest;
45 using grpc::channelz::v1::GetServersResponse;
46 using grpc::channelz::v1::GetSubchannelRequest;
47 using grpc::channelz::v1::GetSubchannelResponse;
48 using grpc::channelz::v1::GetTopChannelsRequest;
49 using grpc::channelz::v1::GetTopChannelsResponse;
50 
51 namespace grpc {
52 namespace testing {
53 namespace {
54 
55 // Proxy service supports N backends. Sends RPC to backend dictated by
56 // request->backend_channel_idx().
57 class Proxy : public ::grpc::testing::EchoTestService::Service {
58  public:
Proxy()59   Proxy() {}
60 
AddChannelToBackend(const std::shared_ptr<Channel> & channel)61   void AddChannelToBackend(const std::shared_ptr<Channel>& channel) {
62     stubs_.push_back(grpc::testing::EchoTestService::NewStub(channel));
63   }
64 
Echo(ServerContext * server_context,const EchoRequest * request,EchoResponse * response)65   Status Echo(ServerContext* server_context, const EchoRequest* request,
66               EchoResponse* response) override {
67     std::unique_ptr<ClientContext> client_context =
68         ClientContext::FromServerContext(*server_context);
69     size_t idx = request->param().backend_channel_idx();
70     GPR_ASSERT(idx < stubs_.size());
71     return stubs_[idx]->Echo(client_context.get(), *request, response);
72   }
73 
74  private:
75   std::vector<std::unique_ptr<::grpc::testing::EchoTestService::Stub>> stubs_;
76 };
77 
78 }  // namespace
79 
80 class ChannelzServerTest : public ::testing::Test {
81  public:
ChannelzServerTest()82   ChannelzServerTest() {}
83 
SetUp()84   void SetUp() override {
85     // ensure channel server is brought up on all severs we build.
86     ::grpc::channelz::experimental::InitChannelzService();
87 
88     // We set up a proxy server with channelz enabled.
89     proxy_port_ = grpc_pick_unused_port_or_die();
90     ServerBuilder proxy_builder;
91     grpc::string proxy_server_address = "localhost:" + to_string(proxy_port_);
92     proxy_builder.AddListeningPort(proxy_server_address,
93                                    InsecureServerCredentials());
94     // forces channelz and channel tracing to be enabled.
95     proxy_builder.AddChannelArgument(GRPC_ARG_ENABLE_CHANNELZ, 1);
96     proxy_builder.AddChannelArgument(GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE,
97                                      10);
98     proxy_builder.RegisterService(&proxy_service_);
99     proxy_server_ = proxy_builder.BuildAndStart();
100   }
101 
102   // Sets the proxy up to have an arbitrary number of backends.
ConfigureProxy(size_t num_backends)103   void ConfigureProxy(size_t num_backends) {
104     backends_.resize(num_backends);
105     for (size_t i = 0; i < num_backends; ++i) {
106       // create a new backend.
107       backends_[i].port = grpc_pick_unused_port_or_die();
108       ServerBuilder backend_builder;
109       grpc::string backend_server_address =
110           "localhost:" + to_string(backends_[i].port);
111       backend_builder.AddListeningPort(backend_server_address,
112                                        InsecureServerCredentials());
113       backends_[i].service.reset(new TestServiceImpl);
114       // ensure that the backend itself has channelz disabled.
115       backend_builder.AddChannelArgument(GRPC_ARG_ENABLE_CHANNELZ, 0);
116       backend_builder.RegisterService(backends_[i].service.get());
117       backends_[i].server = backend_builder.BuildAndStart();
118       // set up a channel to the backend. We ensure that this channel has
119       // channelz enabled since these channels (proxy outbound to backends)
120       // are the ones that our test will actually be validating.
121       ChannelArguments args;
122       args.SetInt(GRPC_ARG_ENABLE_CHANNELZ, 1);
123       args.SetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE, 10);
124       std::shared_ptr<Channel> channel_to_backend = CreateCustomChannel(
125           backend_server_address, InsecureChannelCredentials(), args);
126       proxy_service_.AddChannelToBackend(channel_to_backend);
127     }
128   }
129 
ResetStubs()130   void ResetStubs() {
131     string target = "dns:localhost:" + to_string(proxy_port_);
132     ChannelArguments args;
133     // disable channelz. We only want to focus on proxy to backend outbound.
134     args.SetInt(GRPC_ARG_ENABLE_CHANNELZ, 0);
135     std::shared_ptr<Channel> channel =
136         CreateCustomChannel(target, InsecureChannelCredentials(), args);
137     channelz_stub_ = grpc::channelz::v1::Channelz::NewStub(channel);
138     echo_stub_ = grpc::testing::EchoTestService::NewStub(channel);
139   }
140 
SendSuccessfulEcho(int channel_idx)141   void SendSuccessfulEcho(int channel_idx) {
142     EchoRequest request;
143     EchoResponse response;
144     request.set_message("Hello channelz");
145     request.mutable_param()->set_backend_channel_idx(channel_idx);
146     ClientContext context;
147     Status s = echo_stub_->Echo(&context, request, &response);
148     EXPECT_EQ(response.message(), request.message());
149     EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
150   }
151 
SendFailedEcho(int channel_idx)152   void SendFailedEcho(int channel_idx) {
153     EchoRequest request;
154     EchoResponse response;
155     request.set_message("Hello channelz");
156     request.mutable_param()->set_backend_channel_idx(channel_idx);
157     auto* error = request.mutable_param()->mutable_expected_error();
158     error->set_code(13);  // INTERNAL
159     error->set_error_message("error");
160     ClientContext context;
161     Status s = echo_stub_->Echo(&context, request, &response);
162     EXPECT_FALSE(s.ok());
163   }
164 
165   // Uses GetTopChannels to return the channel_id of a particular channel,
166   // so that the unit tests may test GetChannel call.
GetChannelId(int channel_idx)167   intptr_t GetChannelId(int channel_idx) {
168     GetTopChannelsRequest request;
169     GetTopChannelsResponse response;
170     request.set_start_channel_id(0);
171     ClientContext context;
172     Status s = channelz_stub_->GetTopChannels(&context, request, &response);
173     EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
174     EXPECT_GT(response.channel_size(), channel_idx);
175     return response.channel(channel_idx).ref().channel_id();
176   }
177 
to_string(const int number)178   static string to_string(const int number) {
179     std::stringstream strs;
180     strs << number;
181     return strs.str();
182   }
183 
184  protected:
185   // package of data needed for each backend server.
186   struct BackendData {
187     std::unique_ptr<Server> server;
188     int port;
189     std::unique_ptr<TestServiceImpl> service;
190   };
191 
192   std::unique_ptr<grpc::channelz::v1::Channelz::Stub> channelz_stub_;
193   std::unique_ptr<grpc::testing::EchoTestService::Stub> echo_stub_;
194 
195   // proxy server to ping with channelz requests.
196   std::unique_ptr<Server> proxy_server_;
197   int proxy_port_;
198   Proxy proxy_service_;
199 
200   // backends. All implement the echo service.
201   std::vector<BackendData> backends_;
202 };
203 
TEST_F(ChannelzServerTest,BasicTest)204 TEST_F(ChannelzServerTest, BasicTest) {
205   ResetStubs();
206   ConfigureProxy(1);
207   GetTopChannelsRequest request;
208   GetTopChannelsResponse response;
209   request.set_start_channel_id(0);
210   ClientContext context;
211   Status s = channelz_stub_->GetTopChannels(&context, request, &response);
212   EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
213   EXPECT_EQ(response.channel_size(), 1);
214 }
215 
TEST_F(ChannelzServerTest,HighStartId)216 TEST_F(ChannelzServerTest, HighStartId) {
217   ResetStubs();
218   ConfigureProxy(1);
219   GetTopChannelsRequest request;
220   GetTopChannelsResponse response;
221   request.set_start_channel_id(10000);
222   ClientContext context;
223   Status s = channelz_stub_->GetTopChannels(&context, request, &response);
224   EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
225   EXPECT_EQ(response.channel_size(), 0);
226 }
227 
TEST_F(ChannelzServerTest,SuccessfulRequestTest)228 TEST_F(ChannelzServerTest, SuccessfulRequestTest) {
229   ResetStubs();
230   ConfigureProxy(1);
231   SendSuccessfulEcho(0);
232   GetChannelRequest request;
233   GetChannelResponse response;
234   request.set_channel_id(GetChannelId(0));
235   ClientContext context;
236   Status s = channelz_stub_->GetChannel(&context, request, &response);
237   EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
238   EXPECT_EQ(response.channel().data().calls_started(), 1);
239   EXPECT_EQ(response.channel().data().calls_succeeded(), 1);
240   EXPECT_EQ(response.channel().data().calls_failed(), 0);
241 }
242 
TEST_F(ChannelzServerTest,FailedRequestTest)243 TEST_F(ChannelzServerTest, FailedRequestTest) {
244   ResetStubs();
245   ConfigureProxy(1);
246   SendFailedEcho(0);
247   GetChannelRequest request;
248   GetChannelResponse response;
249   request.set_channel_id(GetChannelId(0));
250   ClientContext context;
251   Status s = channelz_stub_->GetChannel(&context, request, &response);
252   EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
253   EXPECT_EQ(response.channel().data().calls_started(), 1);
254   EXPECT_EQ(response.channel().data().calls_succeeded(), 0);
255   EXPECT_EQ(response.channel().data().calls_failed(), 1);
256 }
257 
TEST_F(ChannelzServerTest,ManyRequestsTest)258 TEST_F(ChannelzServerTest, ManyRequestsTest) {
259   ResetStubs();
260   ConfigureProxy(1);
261   // send some RPCs
262   const int kNumSuccess = 10;
263   const int kNumFailed = 11;
264   for (int i = 0; i < kNumSuccess; ++i) {
265     SendSuccessfulEcho(0);
266   }
267   for (int i = 0; i < kNumFailed; ++i) {
268     SendFailedEcho(0);
269   }
270   GetChannelRequest request;
271   GetChannelResponse response;
272   request.set_channel_id(GetChannelId(0));
273   ClientContext context;
274   Status s = channelz_stub_->GetChannel(&context, request, &response);
275   EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
276   EXPECT_EQ(response.channel().data().calls_started(),
277             kNumSuccess + kNumFailed);
278   EXPECT_EQ(response.channel().data().calls_succeeded(), kNumSuccess);
279   EXPECT_EQ(response.channel().data().calls_failed(), kNumFailed);
280 }
281 
TEST_F(ChannelzServerTest,ManyChannels)282 TEST_F(ChannelzServerTest, ManyChannels) {
283   ResetStubs();
284   const int kNumChannels = 4;
285   ConfigureProxy(kNumChannels);
286   GetTopChannelsRequest request;
287   GetTopChannelsResponse response;
288   request.set_start_channel_id(0);
289   ClientContext context;
290   Status s = channelz_stub_->GetTopChannels(&context, request, &response);
291   EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
292   EXPECT_EQ(response.channel_size(), kNumChannels);
293 }
294 
TEST_F(ChannelzServerTest,ManyRequestsManyChannels)295 TEST_F(ChannelzServerTest, ManyRequestsManyChannels) {
296   ResetStubs();
297   const int kNumChannels = 4;
298   ConfigureProxy(kNumChannels);
299   const int kNumSuccess = 10;
300   const int kNumFailed = 11;
301   for (int i = 0; i < kNumSuccess; ++i) {
302     SendSuccessfulEcho(0);
303     SendSuccessfulEcho(2);
304   }
305   for (int i = 0; i < kNumFailed; ++i) {
306     SendFailedEcho(1);
307     SendFailedEcho(2);
308   }
309 
310   // the first channel saw only successes
311   {
312     GetChannelRequest request;
313     GetChannelResponse response;
314     request.set_channel_id(GetChannelId(0));
315     ClientContext context;
316     Status s = channelz_stub_->GetChannel(&context, request, &response);
317     EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
318     EXPECT_EQ(response.channel().data().calls_started(), kNumSuccess);
319     EXPECT_EQ(response.channel().data().calls_succeeded(), kNumSuccess);
320     EXPECT_EQ(response.channel().data().calls_failed(), 0);
321   }
322 
323   // the second channel saw only failures
324   {
325     GetChannelRequest request;
326     GetChannelResponse response;
327     request.set_channel_id(GetChannelId(1));
328     ClientContext context;
329     Status s = channelz_stub_->GetChannel(&context, request, &response);
330     EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
331     EXPECT_EQ(response.channel().data().calls_started(), kNumFailed);
332     EXPECT_EQ(response.channel().data().calls_succeeded(), 0);
333     EXPECT_EQ(response.channel().data().calls_failed(), kNumFailed);
334   }
335 
336   // the third channel saw both
337   {
338     GetChannelRequest request;
339     GetChannelResponse response;
340     request.set_channel_id(GetChannelId(2));
341     ClientContext context;
342     Status s = channelz_stub_->GetChannel(&context, request, &response);
343     EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
344     EXPECT_EQ(response.channel().data().calls_started(),
345               kNumSuccess + kNumFailed);
346     EXPECT_EQ(response.channel().data().calls_succeeded(), kNumSuccess);
347     EXPECT_EQ(response.channel().data().calls_failed(), kNumFailed);
348   }
349 
350   // the fourth channel saw nothing
351   {
352     GetChannelRequest request;
353     GetChannelResponse response;
354     request.set_channel_id(GetChannelId(3));
355     ClientContext context;
356     Status s = channelz_stub_->GetChannel(&context, request, &response);
357     EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
358     EXPECT_EQ(response.channel().data().calls_started(), 0);
359     EXPECT_EQ(response.channel().data().calls_succeeded(), 0);
360     EXPECT_EQ(response.channel().data().calls_failed(), 0);
361   }
362 }
363 
TEST_F(ChannelzServerTest,ManySubchannels)364 TEST_F(ChannelzServerTest, ManySubchannels) {
365   ResetStubs();
366   const int kNumChannels = 4;
367   ConfigureProxy(kNumChannels);
368   const int kNumSuccess = 10;
369   const int kNumFailed = 11;
370   for (int i = 0; i < kNumSuccess; ++i) {
371     SendSuccessfulEcho(0);
372     SendSuccessfulEcho(2);
373   }
374   for (int i = 0; i < kNumFailed; ++i) {
375     SendFailedEcho(1);
376     SendFailedEcho(2);
377   }
378   GetTopChannelsRequest gtc_request;
379   GetTopChannelsResponse gtc_response;
380   gtc_request.set_start_channel_id(0);
381   ClientContext context;
382   Status s =
383       channelz_stub_->GetTopChannels(&context, gtc_request, &gtc_response);
384   EXPECT_TRUE(s.ok()) << s.error_message();
385   EXPECT_EQ(gtc_response.channel_size(), kNumChannels);
386   for (int i = 0; i < gtc_response.channel_size(); ++i) {
387     // if the channel sent no RPCs, then expect no subchannels to have been
388     // created.
389     if (gtc_response.channel(i).data().calls_started() == 0) {
390       EXPECT_EQ(gtc_response.channel(i).subchannel_ref_size(), 0);
391       continue;
392     }
393     // The resolver must return at least one address.
394     ASSERT_GT(gtc_response.channel(i).subchannel_ref_size(), 0);
395     GetSubchannelRequest gsc_request;
396     GetSubchannelResponse gsc_response;
397     gsc_request.set_subchannel_id(
398         gtc_response.channel(i).subchannel_ref(0).subchannel_id());
399     ClientContext context;
400     Status s =
401         channelz_stub_->GetSubchannel(&context, gsc_request, &gsc_response);
402     EXPECT_TRUE(s.ok()) << s.error_message();
403     EXPECT_EQ(gtc_response.channel(i).data().calls_started(),
404               gsc_response.subchannel().data().calls_started());
405     EXPECT_EQ(gtc_response.channel(i).data().calls_succeeded(),
406               gsc_response.subchannel().data().calls_succeeded());
407     EXPECT_EQ(gtc_response.channel(i).data().calls_failed(),
408               gsc_response.subchannel().data().calls_failed());
409   }
410 }
411 
TEST_F(ChannelzServerTest,BasicServerTest)412 TEST_F(ChannelzServerTest, BasicServerTest) {
413   ResetStubs();
414   ConfigureProxy(1);
415   GetServersRequest request;
416   GetServersResponse response;
417   request.set_start_server_id(0);
418   ClientContext context;
419   Status s = channelz_stub_->GetServers(&context, request, &response);
420   EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
421   EXPECT_EQ(response.server_size(), 1);
422 }
423 
TEST_F(ChannelzServerTest,ServerCallTest)424 TEST_F(ChannelzServerTest, ServerCallTest) {
425   ResetStubs();
426   ConfigureProxy(1);
427   const int kNumSuccess = 10;
428   const int kNumFailed = 11;
429   for (int i = 0; i < kNumSuccess; ++i) {
430     SendSuccessfulEcho(0);
431   }
432   for (int i = 0; i < kNumFailed; ++i) {
433     SendFailedEcho(0);
434   }
435   GetServersRequest request;
436   GetServersResponse response;
437   request.set_start_server_id(0);
438   ClientContext context;
439   Status s = channelz_stub_->GetServers(&context, request, &response);
440   EXPECT_TRUE(s.ok()) << "s.error_message() = " << s.error_message();
441   EXPECT_EQ(response.server_size(), 1);
442   EXPECT_EQ(response.server(0).data().calls_succeeded(), kNumSuccess);
443   EXPECT_EQ(response.server(0).data().calls_failed(), kNumFailed);
444   // This is success+failure+1 because the call that retrieved this information
445   // will be counted as started. It will not track success/failure until after
446   // it has returned, so that is not included in the response.
447   EXPECT_EQ(response.server(0).data().calls_started(),
448             kNumSuccess + kNumFailed + 1);
449 }
450 
451 }  // namespace testing
452 }  // namespace grpc
453 
main(int argc,char ** argv)454 int main(int argc, char** argv) {
455   grpc_test_init(argc, argv);
456   ::testing::InitGoogleTest(&argc, argv);
457   return RUN_ALL_TESTS();
458 }
459