1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <thread>
22
23 #include <gmock/gmock.h>
24 #include <gtest/gtest.h>
25
26 #include <grpc++/grpc++.h>
27 #include <grpc/grpc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 #include <grpcpp/ext/server_load_reporting.h>
31 #include <grpcpp/server_builder.h>
32
33 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
34 #include "src/proto/grpc/testing/echo.grpc.pb.h"
35 #include "test/core/util/port.h"
36
37 namespace grpc {
38 namespace testing {
39 namespace {
40
41 constexpr double kMetricValue = 3.1415;
42 constexpr char kMetricName[] = "METRIC_PI";
43
44 // Different messages result in different response statuses. For simplicity in
45 // computing request bytes, the message sizes should be the same.
46 const char kOkMessage[] = "hello";
47 const char kServerErrorMessage[] = "sverr";
48 const char kClientErrorMessage[] = "clerr";
49
50 class EchoTestServiceImpl : public EchoTestService::Service {
51 public:
~EchoTestServiceImpl()52 ~EchoTestServiceImpl() override {}
53
Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)54 Status Echo(ServerContext* context, const EchoRequest* request,
55 EchoResponse* response) override {
56 if (request->message() == kServerErrorMessage) {
57 return Status(StatusCode::UNKNOWN, "Server error requested");
58 }
59 if (request->message() == kClientErrorMessage) {
60 return Status(StatusCode::FAILED_PRECONDITION, "Client error requested");
61 }
62 response->set_message(request->message());
63 ::grpc::load_reporter::experimental::AddLoadReportingCost(
64 context, kMetricName, kMetricValue);
65 return Status::OK;
66 }
67 };
68
69 class ServerLoadReportingEnd2endTest : public ::testing::Test {
70 protected:
SetUp()71 void SetUp() override {
72 server_address_ =
73 "localhost:" + std::to_string(grpc_pick_unused_port_or_die());
74 server_ =
75 ServerBuilder()
76 .AddListeningPort(server_address_, InsecureServerCredentials())
77 .RegisterService(&echo_service_)
78 .SetOption(std::unique_ptr<::grpc::ServerBuilderOption>(
79 new ::grpc::load_reporter::experimental::
80 LoadReportingServiceServerBuilderOption()))
81 .BuildAndStart();
82 server_thread_ =
83 std::thread(&ServerLoadReportingEnd2endTest::RunServerLoop, this);
84 }
85
RunServerLoop()86 void RunServerLoop() { server_->Wait(); }
87
TearDown()88 void TearDown() override {
89 server_->Shutdown();
90 server_thread_.join();
91 }
92
ClientMakeEchoCalls(const grpc::string & lb_id,const grpc::string & lb_tag,const grpc::string & message,size_t num_requests)93 void ClientMakeEchoCalls(const grpc::string& lb_id,
94 const grpc::string& lb_tag,
95 const grpc::string& message, size_t num_requests) {
96 auto stub = EchoTestService::NewStub(
97 CreateChannel(server_address_, InsecureChannelCredentials()));
98 grpc::string lb_token = lb_id + lb_tag;
99 for (int i = 0; i < num_requests; ++i) {
100 ClientContext ctx;
101 if (!lb_token.empty()) ctx.AddMetadata(GRPC_LB_TOKEN_MD_KEY, lb_token);
102 EchoRequest request;
103 EchoResponse response;
104 request.set_message(message);
105 Status status = stub->Echo(&ctx, request, &response);
106 if (message == kOkMessage) {
107 ASSERT_EQ(status.error_code(), StatusCode::OK);
108 ASSERT_EQ(request.message(), response.message());
109 } else if (message == kServerErrorMessage) {
110 ASSERT_EQ(status.error_code(), StatusCode::UNKNOWN);
111 } else if (message == kClientErrorMessage) {
112 ASSERT_EQ(status.error_code(), StatusCode::FAILED_PRECONDITION);
113 }
114 }
115 }
116
117 grpc::string server_address_;
118 std::unique_ptr<Server> server_;
119 std::thread server_thread_;
120 EchoTestServiceImpl echo_service_;
121 };
122
TEST_F(ServerLoadReportingEnd2endTest,NoCall)123 TEST_F(ServerLoadReportingEnd2endTest, NoCall) {}
124
TEST_F(ServerLoadReportingEnd2endTest,BasicReport)125 TEST_F(ServerLoadReportingEnd2endTest, BasicReport) {
126 auto channel =
127 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
128 auto stub = ::grpc::lb::v1::LoadReporter::NewStub(channel);
129 ClientContext ctx;
130 auto stream = stub->ReportLoad(&ctx);
131 ::grpc::lb::v1::LoadReportRequest request;
132 request.mutable_initial_request()->set_load_balanced_hostname(
133 server_address_);
134 request.mutable_initial_request()->set_load_key("LOAD_KEY");
135 request.mutable_initial_request()
136 ->mutable_load_report_interval()
137 ->set_seconds(5);
138 stream->Write(request);
139 gpr_log(GPR_INFO, "Initial request sent.");
140 ::grpc::lb::v1::LoadReportResponse response;
141 stream->Read(&response);
142 const grpc::string& lb_id = response.initial_response().load_balancer_id();
143 gpr_log(GPR_INFO, "Initial response received (lb_id: %s).", lb_id.c_str());
144 ClientMakeEchoCalls(lb_id, "LB_TAG", kOkMessage, 1);
145 while (true) {
146 stream->Read(&response);
147 if (!response.load().empty()) {
148 ASSERT_EQ(response.load().size(), 3);
149 for (const auto& load : response.load()) {
150 if (load.in_progress_report_case()) {
151 // The special load record that reports the number of in-progress
152 // calls.
153 ASSERT_EQ(load.num_calls_in_progress(), 1);
154 } else if (load.orphaned_load_case()) {
155 // The call from the balancer doesn't have any valid LB token.
156 ASSERT_EQ(load.orphaned_load_case(), load.kLoadKeyUnknown);
157 ASSERT_EQ(load.num_calls_started(), 1);
158 ASSERT_EQ(load.num_calls_finished_without_error(), 0);
159 ASSERT_EQ(load.num_calls_finished_with_error(), 0);
160 } else {
161 // This corresponds to the calls from the client.
162 ASSERT_EQ(load.num_calls_started(), 1);
163 ASSERT_EQ(load.num_calls_finished_without_error(), 1);
164 ASSERT_EQ(load.num_calls_finished_with_error(), 0);
165 ASSERT_GE(load.total_bytes_received(), sizeof(kOkMessage));
166 ASSERT_GE(load.total_bytes_sent(), sizeof(kOkMessage));
167 ASSERT_EQ(load.metric_data().size(), 1);
168 ASSERT_EQ(load.metric_data().Get(0).metric_name(), kMetricName);
169 ASSERT_EQ(load.metric_data().Get(0).num_calls_finished_with_metric(),
170 1);
171 ASSERT_EQ(load.metric_data().Get(0).total_metric_value(),
172 kMetricValue);
173 }
174 }
175 break;
176 }
177 }
178 stream->WritesDone();
179 ASSERT_EQ(stream->Finish().error_code(), StatusCode::CANCELLED);
180 }
181
182 // TODO(juanlishen): Add more tests.
183
184 } // namespace
185 } // namespace testing
186 } // namespace grpc
187
main(int argc,char ** argv)188 int main(int argc, char** argv) {
189 ::testing::InitGoogleTest(&argc, argv);
190 return RUN_ALL_TESTS();
191 }
192