1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <gmock/gmock.h>
18 #include <grpc/grpc.h>
19 #include <grpcpp/channel.h>
20 #include <grpcpp/client_context.h>
21 #include <grpcpp/create_channel.h>
22 #include <grpcpp/ext/call_metric_recorder.h>
23 #include <grpcpp/ext/orca_service.h>
24 #include <grpcpp/ext/server_metric_recorder.h>
25 #include <grpcpp/generic/generic_stub.h>
26 #include <grpcpp/server.h>
27 #include <grpcpp/server_builder.h>
28 #include <grpcpp/server_context.h>
29 #include <grpcpp/support/byte_buffer.h>
30 #include <grpcpp/support/status.h>
31 #include <gtest/gtest.h>
32
33 #include <memory>
34
35 #include "absl/log/log.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/time/time.h"
38 #include "absl/types/optional.h"
39 #include "src/core/util/notification.h"
40 #include "src/core/util/time.h"
41 #include "src/proto/grpc/testing/xds/v3/orca_service.grpc.pb.h"
42 #include "src/proto/grpc/testing/xds/v3/orca_service.pb.h"
43 #include "test/core/test_util/port.h"
44 #include "test/core/test_util/test_config.h"
45
46 using xds::data::orca::v3::OrcaLoadReport;
47 using xds::service::orca::v3::OpenRcaService;
48 using xds::service::orca::v3::OrcaLoadReportRequest;
49
50 namespace grpc {
51 namespace testing {
52 namespace {
53
54 using experimental::OrcaService;
55 using experimental::ServerMetricRecorder;
56
57 class OrcaServiceEnd2endTest : public ::testing::Test {
58 protected:
59 // A wrapper for the client stream that ensures that responses come
60 // back at the requested interval.
61 class Stream {
62 public:
Stream(OpenRcaService::Stub * stub,grpc_core::Duration requested_interval)63 Stream(OpenRcaService::Stub* stub, grpc_core::Duration requested_interval)
64 : requested_interval_(requested_interval) {
65 OrcaLoadReportRequest request;
66 gpr_timespec timespec = requested_interval.as_timespec();
67 auto* interval_proto = request.mutable_report_interval();
68 interval_proto->set_seconds(timespec.tv_sec);
69 interval_proto->set_nanos(timespec.tv_nsec);
70 stream_ = stub->StreamCoreMetrics(&context_, request);
71 }
72
~Stream()73 ~Stream() { context_.TryCancel(); }
74
ReadResponse()75 OrcaLoadReport ReadResponse() {
76 OrcaLoadReport response;
77 EXPECT_TRUE(stream_->Read(&response));
78 auto now = grpc_core::Timestamp::FromTimespecRoundDown(
79 gpr_now(GPR_CLOCK_MONOTONIC));
80 if (last_response_time_.has_value()) {
81 // Allow a small fudge factor to avoid test flakiness.
82 const grpc_core::Duration fudge_factor =
83 grpc_core::Duration::Milliseconds(750) *
84 grpc_test_slowdown_factor();
85 auto elapsed = now - *last_response_time_;
86 LOG(INFO) << "received ORCA response after " << elapsed;
87 EXPECT_GE(elapsed, requested_interval_ - fudge_factor)
88 << elapsed.ToString();
89 EXPECT_LE(elapsed, requested_interval_ + fudge_factor)
90 << elapsed.ToString();
91 }
92 last_response_time_ = now;
93 return response;
94 }
95
96 private:
97 const grpc_core::Duration requested_interval_;
98 ClientContext context_;
99 std::unique_ptr<grpc::ClientReaderInterface<OrcaLoadReport>> stream_;
100 absl::optional<grpc_core::Timestamp> last_response_time_;
101 };
102
103 class GenericOrcaClientReactor
104 : public grpc::ClientBidiReactor<grpc::ByteBuffer, grpc::ByteBuffer> {
105 public:
GenericOrcaClientReactor(GenericStub * stub)106 explicit GenericOrcaClientReactor(GenericStub* stub) : stub_(stub) {}
107
Prepare()108 void Prepare() {
109 stub_->PrepareBidiStreamingCall(
110 &cli_ctx_, "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics",
111 StubOptions(), this);
112 }
113
Await()114 grpc::Status Await() {
115 notification_.WaitForNotification();
116 return status_;
117 }
118
OnDone(const grpc::Status & s)119 void OnDone(const grpc::Status& s) override {
120 status_ = s;
121 notification_.Notify();
122 }
123
124 private:
125 GenericStub* stub_;
126 grpc::ClientContext cli_ctx_;
127 grpc_core::Notification notification_;
128 grpc::Status status_;
129 };
130
OrcaServiceEnd2endTest()131 OrcaServiceEnd2endTest()
132 : server_metric_recorder_(ServerMetricRecorder::Create()),
133 orca_service_(server_metric_recorder_.get(),
134 OrcaService::Options().set_min_report_duration(
135 absl::ZeroDuration())) {
136 std::string server_address =
137 absl::StrCat("localhost:", grpc_pick_unused_port_or_die());
138 ServerBuilder builder;
139 builder.AddListeningPort(server_address, InsecureServerCredentials());
140 builder.RegisterService(&orca_service_);
141 server_ = builder.BuildAndStart();
142 LOG(INFO) << "server started on " << server_address;
143 channel_ = CreateChannel(server_address, InsecureChannelCredentials());
144 }
145
~OrcaServiceEnd2endTest()146 ~OrcaServiceEnd2endTest() override { server_->Shutdown(); }
147
148 std::unique_ptr<ServerMetricRecorder> server_metric_recorder_;
149 OrcaService orca_service_;
150 std::unique_ptr<Server> server_;
151 std::shared_ptr<Channel> channel_;
152 };
153
TEST_F(OrcaServiceEnd2endTest,Basic)154 TEST_F(OrcaServiceEnd2endTest, Basic) {
155 constexpr char kMetricName1[] = "foo";
156 constexpr char kMetricName2[] = "bar";
157 constexpr char kMetricName3[] = "baz";
158 constexpr char kMetricName4[] = "quux";
159 auto stub = OpenRcaService::NewStub(channel_);
160 // Start stream1 with 5s interval and stream2 with 2.5s interval.
161 // Throughout the test, we should get two responses on stream2 for
162 // every one response on stream1.
163 Stream stream1(stub.get(), grpc_core::Duration::Milliseconds(5000));
164 Stream stream2(stub.get(), grpc_core::Duration::Milliseconds(2500));
165 auto ReadResponses = [&](std::function<void(const OrcaLoadReport&)> checker) {
166 LOG(INFO) << "reading response from stream1";
167 OrcaLoadReport response = stream1.ReadResponse();
168 checker(response);
169 LOG(INFO) << "reading response from stream2";
170 response = stream2.ReadResponse();
171 checker(response);
172 LOG(INFO) << "reading response from stream2";
173 response = stream2.ReadResponse();
174 checker(response);
175 };
176 // Initial response should not have any values populated.
177 ReadResponses([](const OrcaLoadReport& response) {
178 EXPECT_EQ(response.application_utilization(), 0);
179 EXPECT_EQ(response.cpu_utilization(), 0);
180 EXPECT_EQ(response.mem_utilization(), 0);
181 EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
182 });
183 // Now set app utilization on the server.
184 server_metric_recorder_->SetApplicationUtilization(0.5);
185 ReadResponses([](const OrcaLoadReport& response) {
186 EXPECT_EQ(response.application_utilization(), 0.5);
187 EXPECT_EQ(response.cpu_utilization(), 0);
188 EXPECT_EQ(response.mem_utilization(), 0);
189 EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
190 });
191 // Update app utilization and set CPU and memory utilization.
192 server_metric_recorder_->SetApplicationUtilization(1.8);
193 server_metric_recorder_->SetCpuUtilization(0.3);
194 server_metric_recorder_->SetMemoryUtilization(0.4);
195 ReadResponses([](const OrcaLoadReport& response) {
196 EXPECT_EQ(response.application_utilization(), 1.8);
197 EXPECT_EQ(response.cpu_utilization(), 0.3);
198 EXPECT_EQ(response.mem_utilization(), 0.4);
199 EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
200 });
201 // Unset app, CPU, and memory utilization and set a named utilization.
202 server_metric_recorder_->ClearApplicationUtilization();
203 server_metric_recorder_->ClearCpuUtilization();
204 server_metric_recorder_->ClearMemoryUtilization();
205 server_metric_recorder_->SetNamedUtilization(kMetricName1, 0.3);
206 ReadResponses([&](const OrcaLoadReport& response) {
207 EXPECT_EQ(response.application_utilization(), 0);
208 EXPECT_EQ(response.cpu_utilization(), 0);
209 EXPECT_EQ(response.mem_utilization(), 0);
210 EXPECT_THAT(
211 response.utilization(),
212 ::testing::UnorderedElementsAre(::testing::Pair(kMetricName1, 0.3)));
213 });
214 // Unset the previous named utilization and set two new ones.
215 server_metric_recorder_->ClearNamedUtilization(kMetricName1);
216 server_metric_recorder_->SetNamedUtilization(kMetricName2, 0.2);
217 server_metric_recorder_->SetNamedUtilization(kMetricName3, 0.1);
218 ReadResponses([&](const OrcaLoadReport& response) {
219 EXPECT_EQ(response.application_utilization(), 0);
220 EXPECT_EQ(response.cpu_utilization(), 0);
221 EXPECT_EQ(response.mem_utilization(), 0);
222 EXPECT_THAT(
223 response.utilization(),
224 ::testing::UnorderedElementsAre(::testing::Pair(kMetricName2, 0.2),
225 ::testing::Pair(kMetricName3, 0.1)));
226 });
227 // Replace the entire named metric map at once.
228 server_metric_recorder_->SetAllNamedUtilization(
229 {{kMetricName2, 0.5}, {kMetricName4, 0.9}});
230 ReadResponses([&](const OrcaLoadReport& response) {
231 EXPECT_EQ(response.application_utilization(), 0);
232 EXPECT_EQ(response.cpu_utilization(), 0);
233 EXPECT_EQ(response.mem_utilization(), 0);
234 EXPECT_THAT(
235 response.utilization(),
236 ::testing::UnorderedElementsAre(::testing::Pair(kMetricName2, 0.5),
237 ::testing::Pair(kMetricName4, 0.9)));
238 });
239 }
240
TEST_F(OrcaServiceEnd2endTest,ClientClosesBeforeSendingMessage)241 TEST_F(OrcaServiceEnd2endTest, ClientClosesBeforeSendingMessage) {
242 auto stub = std::make_unique<GenericStub>(channel_);
243 GenericOrcaClientReactor reactor(stub.get());
244 reactor.Prepare();
245 reactor.StartWritesDone();
246 reactor.StartCall();
247 EXPECT_EQ(reactor.Await().error_code(), grpc::StatusCode::INTERNAL);
248 }
249
250 } // namespace
251 } // namespace testing
252 } // namespace grpc
253
main(int argc,char ** argv)254 int main(int argc, char** argv) {
255 grpc::testing::TestEnvironment env(&argc, argv);
256 ::testing::InitGoogleTest(&argc, argv);
257 return RUN_ALL_TESTS();
258 }
259