• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <thread>
22 
23 #include <gmock/gmock.h>
24 #include <gtest/gtest.h>
25 
26 #include <grpc++/grpc++.h>
27 #include <grpc/grpc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 #include <grpcpp/ext/server_load_reporting.h>
31 #include <grpcpp/server_builder.h>
32 
33 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
34 #include "src/proto/grpc/testing/echo.grpc.pb.h"
35 #include "test/core/util/port.h"
36 
37 namespace grpc {
38 namespace testing {
39 namespace {
40 
41 constexpr double kMetricValue = 3.1415;
42 constexpr char kMetricName[] = "METRIC_PI";
43 
44 // Different messages result in different response statuses. For simplicity in
45 // computing request bytes, the message sizes should be the same.
46 const char kOkMessage[] = "hello";
47 const char kServerErrorMessage[] = "sverr";
48 const char kClientErrorMessage[] = "clerr";
49 
50 class EchoTestServiceImpl : public EchoTestService::Service {
51  public:
~EchoTestServiceImpl()52   ~EchoTestServiceImpl() override {}
53 
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)54   Status Echo(ServerContext* context, const EchoRequest* request,
55               EchoResponse* response) override {
56     if (request->message() == kServerErrorMessage) {
57       return Status(StatusCode::UNKNOWN, "Server error requested");
58     }
59     if (request->message() == kClientErrorMessage) {
60       return Status(StatusCode::FAILED_PRECONDITION, "Client error requested");
61     }
62     response->set_message(request->message());
63     ::grpc::load_reporter::experimental::AddLoadReportingCost(
64         context, kMetricName, kMetricValue);
65     return Status::OK;
66   }
67 };
68 
69 class ServerLoadReportingEnd2endTest : public ::testing::Test {
70  protected:
SetUp()71   void SetUp() override {
72     server_address_ =
73         "localhost:" + std::to_string(grpc_pick_unused_port_or_die());
74     server_ =
75         ServerBuilder()
76             .AddListeningPort(server_address_, InsecureServerCredentials())
77             .RegisterService(&echo_service_)
78             .SetOption(std::unique_ptr<::grpc::ServerBuilderOption>(
79                 new ::grpc::load_reporter::experimental::
80                     LoadReportingServiceServerBuilderOption()))
81             .BuildAndStart();
82     server_thread_ =
83         std::thread(&ServerLoadReportingEnd2endTest::RunServerLoop, this);
84   }
85 
RunServerLoop()86   void RunServerLoop() { server_->Wait(); }
87 
TearDown()88   void TearDown() override {
89     server_->Shutdown();
90     server_thread_.join();
91   }
92 
ClientMakeEchoCalls(const std::string & lb_id,const std::string & lb_tag,const std::string & message,size_t num_requests)93   void ClientMakeEchoCalls(const std::string& lb_id, const std::string& lb_tag,
94                            const std::string& message, size_t num_requests) {
95     auto stub = EchoTestService::NewStub(
96         grpc::CreateChannel(server_address_, InsecureChannelCredentials()));
97     std::string lb_token = lb_id + lb_tag;
98     for (int i = 0; i < num_requests; ++i) {
99       ClientContext ctx;
100       if (!lb_token.empty()) ctx.AddMetadata(GRPC_LB_TOKEN_MD_KEY, lb_token);
101       EchoRequest request;
102       EchoResponse response;
103       request.set_message(message);
104       Status status = stub->Echo(&ctx, request, &response);
105       if (message == kOkMessage) {
106         ASSERT_EQ(status.error_code(), StatusCode::OK);
107         ASSERT_EQ(request.message(), response.message());
108       } else if (message == kServerErrorMessage) {
109         ASSERT_EQ(status.error_code(), StatusCode::UNKNOWN);
110       } else if (message == kClientErrorMessage) {
111         ASSERT_EQ(status.error_code(), StatusCode::FAILED_PRECONDITION);
112       }
113     }
114   }
115 
116   std::string server_address_;
117   std::unique_ptr<Server> server_;
118   std::thread server_thread_;
119   EchoTestServiceImpl echo_service_;
120 };
121 
TEST_F(ServerLoadReportingEnd2endTest,NoCall)122 TEST_F(ServerLoadReportingEnd2endTest, NoCall) {}
123 
TEST_F(ServerLoadReportingEnd2endTest,BasicReport)124 TEST_F(ServerLoadReportingEnd2endTest, BasicReport) {
125   auto channel =
126       grpc::CreateChannel(server_address_, InsecureChannelCredentials());
127   auto stub = ::grpc::lb::v1::LoadReporter::NewStub(channel);
128   ClientContext ctx;
129   auto stream = stub->ReportLoad(&ctx);
130   ::grpc::lb::v1::LoadReportRequest request;
131   request.mutable_initial_request()->set_load_balanced_hostname(
132       server_address_);
133   request.mutable_initial_request()->set_load_key("LOAD_KEY");
134   request.mutable_initial_request()
135       ->mutable_load_report_interval()
136       ->set_seconds(5);
137   stream->Write(request);
138   gpr_log(GPR_INFO, "Initial request sent.");
139   ::grpc::lb::v1::LoadReportResponse response;
140   stream->Read(&response);
141   const std::string& lb_id = response.initial_response().load_balancer_id();
142   gpr_log(GPR_INFO, "Initial response received (lb_id: %s).", lb_id.c_str());
143   ClientMakeEchoCalls(lb_id, "LB_TAG", kOkMessage, 1);
144   while (true) {
145     stream->Read(&response);
146     if (!response.load().empty()) {
147       ASSERT_EQ(response.load().size(), 3);
148       for (const auto& load : response.load()) {
149         if (load.in_progress_report_case()) {
150           // The special load record that reports the number of in-progress
151           // calls.
152           ASSERT_EQ(load.num_calls_in_progress(), 1);
153         } else if (load.orphaned_load_case()) {
154           // The call from the balancer doesn't have any valid LB token.
155           ASSERT_EQ(load.orphaned_load_case(), load.kLoadKeyUnknown);
156           ASSERT_EQ(load.num_calls_started(), 1);
157           ASSERT_EQ(load.num_calls_finished_without_error(), 0);
158           ASSERT_EQ(load.num_calls_finished_with_error(), 0);
159         } else {
160           // This corresponds to the calls from the client.
161           ASSERT_EQ(load.num_calls_started(), 1);
162           ASSERT_EQ(load.num_calls_finished_without_error(), 1);
163           ASSERT_EQ(load.num_calls_finished_with_error(), 0);
164           ASSERT_GE(load.total_bytes_received(), sizeof(kOkMessage));
165           ASSERT_GE(load.total_bytes_sent(), sizeof(kOkMessage));
166           ASSERT_EQ(load.metric_data().size(), 1);
167           ASSERT_EQ(load.metric_data().Get(0).metric_name(), kMetricName);
168           ASSERT_EQ(load.metric_data().Get(0).num_calls_finished_with_metric(),
169                     1);
170           ASSERT_EQ(load.metric_data().Get(0).total_metric_value(),
171                     kMetricValue);
172         }
173       }
174       break;
175     }
176   }
177   stream->WritesDone();
178   ASSERT_EQ(stream->Finish().error_code(), StatusCode::CANCELLED);
179 }
180 
181 // TODO(juanlishen): Add more tests.
182 
183 }  // namespace
184 }  // namespace testing
185 }  // namespace grpc
186 
main(int argc,char ** argv)187 int main(int argc, char** argv) {
188   ::testing::InitGoogleTest(&argc, argv);
189   return RUN_ALL_TESTS();
190 }
191