1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <gmock/gmock.h>
20 #include <grpc++/grpc++.h>
21 #include <grpc/grpc.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpc/support/string_util.h>
24 #include <grpcpp/ext/server_load_reporting.h>
25 #include <grpcpp/server_builder.h>
26 #include <gtest/gtest.h>
27
28 #include <thread>
29
30 #include "absl/log/log.h"
31 #include "src/core/client_channel/backup_poller.h"
32 #include "src/core/config/config_vars.h"
33 #include "src/core/util/crash.h"
34 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
35 #include "src/proto/grpc/testing/echo.grpc.pb.h"
36 #include "test/core/test_util/port.h"
37 #include "test/core/test_util/test_config.h"
38
39 namespace grpc {
40 namespace testing {
41 namespace {
42
43 constexpr double kMetricValue = 3.1415;
44 constexpr char kMetricName[] = "METRIC_PI";
45
46 // Different messages result in different response statuses. For simplicity in
47 // computing request bytes, the message sizes should be the same.
48 const char kOkMessage[] = "hello";
49 const char kServerErrorMessage[] = "sverr";
50 const char kClientErrorMessage[] = "clerr";
51
52 class EchoTestServiceImpl : public EchoTestService::Service {
53 public:
~EchoTestServiceImpl()54 ~EchoTestServiceImpl() override {}
55
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)56 Status Echo(ServerContext* context, const EchoRequest* request,
57 EchoResponse* response) override {
58 if (request->message() == kServerErrorMessage) {
59 return Status(StatusCode::UNKNOWN, "Server error requested");
60 }
61 if (request->message() == kClientErrorMessage) {
62 return Status(StatusCode::FAILED_PRECONDITION, "Client error requested");
63 }
64 response->set_message(request->message());
65 grpc::load_reporter::experimental::AddLoadReportingCost(
66 context, kMetricName, kMetricValue);
67 return Status::OK;
68 }
69 };
70
71 class ServerLoadReportingEnd2endTest : public ::testing::Test {
72 protected:
SetUp()73 void SetUp() override {
74 server_address_ =
75 "localhost:" + std::to_string(grpc_pick_unused_port_or_die());
76 server_ =
77 ServerBuilder()
78 .AddListeningPort(server_address_, InsecureServerCredentials())
79 .RegisterService(&echo_service_)
80 .SetOption(std::unique_ptr<grpc::ServerBuilderOption>(
81 new grpc::load_reporter::experimental::
82 LoadReportingServiceServerBuilderOption()))
83 .BuildAndStart();
84 server_thread_ =
85 std::thread(&ServerLoadReportingEnd2endTest::RunServerLoop, this);
86 }
87
RunServerLoop()88 void RunServerLoop() { server_->Wait(); }
89
TearDown()90 void TearDown() override {
91 server_->Shutdown();
92 server_thread_.join();
93 }
94
ClientMakeEchoCalls(const std::string & lb_id,const std::string & lb_tag,const std::string & message,size_t num_requests)95 void ClientMakeEchoCalls(const std::string& lb_id, const std::string& lb_tag,
96 const std::string& message, size_t num_requests) {
97 auto stub = EchoTestService::NewStub(
98 grpc::CreateChannel(server_address_, InsecureChannelCredentials()));
99 std::string lb_token = lb_id + lb_tag;
100 for (size_t i = 0; i < num_requests; ++i) {
101 ClientContext ctx;
102 if (!lb_token.empty()) ctx.AddMetadata(GRPC_LB_TOKEN_MD_KEY, lb_token);
103 EchoRequest request;
104 EchoResponse response;
105 request.set_message(message);
106 Status status = stub->Echo(&ctx, request, &response);
107 if (message == kOkMessage) {
108 ASSERT_EQ(status.error_code(), StatusCode::OK);
109 ASSERT_EQ(request.message(), response.message());
110 } else if (message == kServerErrorMessage) {
111 ASSERT_EQ(status.error_code(), StatusCode::UNKNOWN);
112 } else if (message == kClientErrorMessage) {
113 ASSERT_EQ(status.error_code(), StatusCode::FAILED_PRECONDITION);
114 }
115 }
116 }
117
118 std::string server_address_;
119 std::unique_ptr<Server> server_;
120 std::thread server_thread_;
121 EchoTestServiceImpl echo_service_;
122 };
123
TEST_F(ServerLoadReportingEnd2endTest,NoCall)124 TEST_F(ServerLoadReportingEnd2endTest, NoCall) {}
125
TEST_F(ServerLoadReportingEnd2endTest,BasicReport)126 TEST_F(ServerLoadReportingEnd2endTest, BasicReport) {
127 auto channel =
128 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
129 auto stub = grpc::lb::v1::LoadReporter::NewStub(channel);
130 ClientContext ctx;
131 auto stream = stub->ReportLoad(&ctx);
132 grpc::lb::v1::LoadReportRequest request;
133 request.mutable_initial_request()->set_load_balanced_hostname(
134 server_address_);
135 request.mutable_initial_request()->set_load_key("LOAD_KEY");
136 request.mutable_initial_request()
137 ->mutable_load_report_interval()
138 ->set_seconds(5);
139 stream->Write(request);
140 LOG(INFO) << "Initial request sent.";
141 grpc::lb::v1::LoadReportResponse response;
142 stream->Read(&response);
143 const std::string& lb_id = response.initial_response().load_balancer_id();
144 LOG(INFO) << "Initial response received (lb_id: " << lb_id << ").";
145 ClientMakeEchoCalls(lb_id, "LB_TAG", kOkMessage, 1);
146
147 unsigned load_count = 0;
148 bool got_in_progress = false;
149 bool got_orphaned = false;
150 bool got_calls = false;
151 while (load_count < 3) {
152 stream->Read(&response);
153 for (const auto& load : response.load()) {
154 load_count++;
155 if (load.in_progress_report_case()) {
156 // The special load record that reports the number of in-progress
157 // calls.
158 EXPECT_EQ(load.num_calls_in_progress(), 1);
159 EXPECT_FALSE(got_in_progress);
160 got_in_progress = true;
161 } else if (load.orphaned_load_case()) {
162 // The call from the balancer doesn't have any valid LB token.
163 EXPECT_EQ(load.orphaned_load_case(), load.kLoadKeyUnknown);
164 EXPECT_EQ(load.num_calls_started(), 1);
165 EXPECT_EQ(load.num_calls_finished_without_error(), 0);
166 EXPECT_EQ(load.num_calls_finished_with_error(), 0);
167 EXPECT_FALSE(got_orphaned);
168 got_orphaned = true;
169 } else {
170 // This corresponds to the calls from the client.
171 EXPECT_EQ(load.num_calls_started(), 1);
172 EXPECT_EQ(load.num_calls_finished_without_error(), 1);
173 EXPECT_EQ(load.num_calls_finished_with_error(), 0);
174 EXPECT_GE(load.total_bytes_received(), sizeof(kOkMessage));
175 EXPECT_GE(load.total_bytes_sent(), sizeof(kOkMessage));
176 EXPECT_EQ(load.metric_data().size(), 1);
177 EXPECT_EQ(load.metric_data().Get(0).metric_name(), kMetricName);
178 EXPECT_EQ(load.metric_data().Get(0).num_calls_finished_with_metric(),
179 1);
180 EXPECT_EQ(load.metric_data().Get(0).total_metric_value(), kMetricValue);
181 EXPECT_FALSE(got_calls);
182 got_calls = true;
183 }
184 }
185 }
186 EXPECT_EQ(load_count, 3);
187 EXPECT_TRUE(got_in_progress);
188 EXPECT_TRUE(got_orphaned);
189 EXPECT_TRUE(got_calls);
190 stream->WritesDone();
191 EXPECT_EQ(stream->Finish().error_code(), StatusCode::CANCELLED);
192 }
193
194 // TODO(juanlishen): Add more tests.
195
196 } // namespace
197 } // namespace testing
198 } // namespace grpc
199
main(int argc,char ** argv)200 int main(int argc, char** argv) {
201 grpc::testing::TestEnvironment env(&argc, argv);
202 ::testing::InitGoogleTest(&argc, argv);
203 // Make the backup poller poll very frequently in order to pick up
204 // updates from all the subchannels's FDs.
205 grpc_core::ConfigVars::Overrides config_overrides;
206 config_overrides.client_channel_backup_poll_interval_ms = 1;
207 grpc_core::ConfigVars::SetOverrides(config_overrides);
208 return RUN_ALL_TESTS();
209 }
210