• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <gmock/gmock.h>
18 #include <grpc/grpc.h>
19 #include <grpcpp/channel.h>
20 #include <grpcpp/client_context.h>
21 #include <grpcpp/create_channel.h>
22 #include <grpcpp/ext/call_metric_recorder.h>
23 #include <grpcpp/ext/orca_service.h>
24 #include <grpcpp/ext/server_metric_recorder.h>
25 #include <grpcpp/generic/generic_stub.h>
26 #include <grpcpp/server.h>
27 #include <grpcpp/server_builder.h>
28 #include <grpcpp/server_context.h>
29 #include <grpcpp/support/byte_buffer.h>
30 #include <grpcpp/support/status.h>
31 #include <gtest/gtest.h>
32 
33 #include <memory>
34 
35 #include "absl/log/log.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/time/time.h"
38 #include "absl/types/optional.h"
39 #include "src/core/util/notification.h"
40 #include "src/core/util/time.h"
41 #include "src/proto/grpc/testing/xds/v3/orca_service.grpc.pb.h"
42 #include "src/proto/grpc/testing/xds/v3/orca_service.pb.h"
43 #include "test/core/test_util/port.h"
44 #include "test/core/test_util/test_config.h"
45 
46 using xds::data::orca::v3::OrcaLoadReport;
47 using xds::service::orca::v3::OpenRcaService;
48 using xds::service::orca::v3::OrcaLoadReportRequest;
49 
50 namespace grpc {
51 namespace testing {
52 namespace {
53 
54 using experimental::OrcaService;
55 using experimental::ServerMetricRecorder;
56 
57 class OrcaServiceEnd2endTest : public ::testing::Test {
58  protected:
59   // A wrapper for the client stream that ensures that responses come
60   // back at the requested interval.
61   class Stream {
62    public:
Stream(OpenRcaService::Stub * stub,grpc_core::Duration requested_interval)63     Stream(OpenRcaService::Stub* stub, grpc_core::Duration requested_interval)
64         : requested_interval_(requested_interval) {
65       OrcaLoadReportRequest request;
66       gpr_timespec timespec = requested_interval.as_timespec();
67       auto* interval_proto = request.mutable_report_interval();
68       interval_proto->set_seconds(timespec.tv_sec);
69       interval_proto->set_nanos(timespec.tv_nsec);
70       stream_ = stub->StreamCoreMetrics(&context_, request);
71     }
72 
~Stream()73     ~Stream() { context_.TryCancel(); }
74 
ReadResponse()75     OrcaLoadReport ReadResponse() {
76       OrcaLoadReport response;
77       EXPECT_TRUE(stream_->Read(&response));
78       auto now = grpc_core::Timestamp::FromTimespecRoundDown(
79           gpr_now(GPR_CLOCK_MONOTONIC));
80       if (last_response_time_.has_value()) {
81         // Allow a small fudge factor to avoid test flakiness.
82         const grpc_core::Duration fudge_factor =
83             grpc_core::Duration::Milliseconds(750) *
84             grpc_test_slowdown_factor();
85         auto elapsed = now - *last_response_time_;
86         LOG(INFO) << "received ORCA response after " << elapsed;
87         EXPECT_GE(elapsed, requested_interval_ - fudge_factor)
88             << elapsed.ToString();
89         EXPECT_LE(elapsed, requested_interval_ + fudge_factor)
90             << elapsed.ToString();
91       }
92       last_response_time_ = now;
93       return response;
94     }
95 
96    private:
97     const grpc_core::Duration requested_interval_;
98     ClientContext context_;
99     std::unique_ptr<grpc::ClientReaderInterface<OrcaLoadReport>> stream_;
100     absl::optional<grpc_core::Timestamp> last_response_time_;
101   };
102 
103   class GenericOrcaClientReactor
104       : public grpc::ClientBidiReactor<grpc::ByteBuffer, grpc::ByteBuffer> {
105    public:
GenericOrcaClientReactor(GenericStub * stub)106     explicit GenericOrcaClientReactor(GenericStub* stub) : stub_(stub) {}
107 
Prepare()108     void Prepare() {
109       stub_->PrepareBidiStreamingCall(
110           &cli_ctx_, "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics",
111           StubOptions(), this);
112     }
113 
Await()114     grpc::Status Await() {
115       notification_.WaitForNotification();
116       return status_;
117     }
118 
OnDone(const grpc::Status & s)119     void OnDone(const grpc::Status& s) override {
120       status_ = s;
121       notification_.Notify();
122     }
123 
124    private:
125     GenericStub* stub_;
126     grpc::ClientContext cli_ctx_;
127     grpc_core::Notification notification_;
128     grpc::Status status_;
129   };
130 
OrcaServiceEnd2endTest()131   OrcaServiceEnd2endTest()
132       : server_metric_recorder_(ServerMetricRecorder::Create()),
133         orca_service_(server_metric_recorder_.get(),
134                       OrcaService::Options().set_min_report_duration(
135                           absl::ZeroDuration())) {
136     std::string server_address =
137         absl::StrCat("localhost:", grpc_pick_unused_port_or_die());
138     ServerBuilder builder;
139     builder.AddListeningPort(server_address, InsecureServerCredentials());
140     builder.RegisterService(&orca_service_);
141     server_ = builder.BuildAndStart();
142     LOG(INFO) << "server started on " << server_address;
143     channel_ = CreateChannel(server_address, InsecureChannelCredentials());
144   }
145 
~OrcaServiceEnd2endTest()146   ~OrcaServiceEnd2endTest() override { server_->Shutdown(); }
147 
148   std::unique_ptr<ServerMetricRecorder> server_metric_recorder_;
149   OrcaService orca_service_;
150   std::unique_ptr<Server> server_;
151   std::shared_ptr<Channel> channel_;
152 };
153 
TEST_F(OrcaServiceEnd2endTest,Basic)154 TEST_F(OrcaServiceEnd2endTest, Basic) {
155   constexpr char kMetricName1[] = "foo";
156   constexpr char kMetricName2[] = "bar";
157   constexpr char kMetricName3[] = "baz";
158   constexpr char kMetricName4[] = "quux";
159   auto stub = OpenRcaService::NewStub(channel_);
160   // Start stream1 with 5s interval and stream2 with 2.5s interval.
161   // Throughout the test, we should get two responses on stream2 for
162   // every one response on stream1.
163   Stream stream1(stub.get(), grpc_core::Duration::Milliseconds(5000));
164   Stream stream2(stub.get(), grpc_core::Duration::Milliseconds(2500));
165   auto ReadResponses = [&](std::function<void(const OrcaLoadReport&)> checker) {
166     LOG(INFO) << "reading response from stream1";
167     OrcaLoadReport response = stream1.ReadResponse();
168     checker(response);
169     LOG(INFO) << "reading response from stream2";
170     response = stream2.ReadResponse();
171     checker(response);
172     LOG(INFO) << "reading response from stream2";
173     response = stream2.ReadResponse();
174     checker(response);
175   };
176   // Initial response should not have any values populated.
177   ReadResponses([](const OrcaLoadReport& response) {
178     EXPECT_EQ(response.application_utilization(), 0);
179     EXPECT_EQ(response.cpu_utilization(), 0);
180     EXPECT_EQ(response.mem_utilization(), 0);
181     EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
182   });
183   // Now set app utilization on the server.
184   server_metric_recorder_->SetApplicationUtilization(0.5);
185   ReadResponses([](const OrcaLoadReport& response) {
186     EXPECT_EQ(response.application_utilization(), 0.5);
187     EXPECT_EQ(response.cpu_utilization(), 0);
188     EXPECT_EQ(response.mem_utilization(), 0);
189     EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
190   });
191   // Update app utilization and set CPU and memory utilization.
192   server_metric_recorder_->SetApplicationUtilization(1.8);
193   server_metric_recorder_->SetCpuUtilization(0.3);
194   server_metric_recorder_->SetMemoryUtilization(0.4);
195   ReadResponses([](const OrcaLoadReport& response) {
196     EXPECT_EQ(response.application_utilization(), 1.8);
197     EXPECT_EQ(response.cpu_utilization(), 0.3);
198     EXPECT_EQ(response.mem_utilization(), 0.4);
199     EXPECT_THAT(response.utilization(), ::testing::UnorderedElementsAre());
200   });
201   // Unset app, CPU, and memory utilization and set a named utilization.
202   server_metric_recorder_->ClearApplicationUtilization();
203   server_metric_recorder_->ClearCpuUtilization();
204   server_metric_recorder_->ClearMemoryUtilization();
205   server_metric_recorder_->SetNamedUtilization(kMetricName1, 0.3);
206   ReadResponses([&](const OrcaLoadReport& response) {
207     EXPECT_EQ(response.application_utilization(), 0);
208     EXPECT_EQ(response.cpu_utilization(), 0);
209     EXPECT_EQ(response.mem_utilization(), 0);
210     EXPECT_THAT(
211         response.utilization(),
212         ::testing::UnorderedElementsAre(::testing::Pair(kMetricName1, 0.3)));
213   });
214   // Unset the previous named utilization and set two new ones.
215   server_metric_recorder_->ClearNamedUtilization(kMetricName1);
216   server_metric_recorder_->SetNamedUtilization(kMetricName2, 0.2);
217   server_metric_recorder_->SetNamedUtilization(kMetricName3, 0.1);
218   ReadResponses([&](const OrcaLoadReport& response) {
219     EXPECT_EQ(response.application_utilization(), 0);
220     EXPECT_EQ(response.cpu_utilization(), 0);
221     EXPECT_EQ(response.mem_utilization(), 0);
222     EXPECT_THAT(
223         response.utilization(),
224         ::testing::UnorderedElementsAre(::testing::Pair(kMetricName2, 0.2),
225                                         ::testing::Pair(kMetricName3, 0.1)));
226   });
227   // Replace the entire named metric map at once.
228   server_metric_recorder_->SetAllNamedUtilization(
229       {{kMetricName2, 0.5}, {kMetricName4, 0.9}});
230   ReadResponses([&](const OrcaLoadReport& response) {
231     EXPECT_EQ(response.application_utilization(), 0);
232     EXPECT_EQ(response.cpu_utilization(), 0);
233     EXPECT_EQ(response.mem_utilization(), 0);
234     EXPECT_THAT(
235         response.utilization(),
236         ::testing::UnorderedElementsAre(::testing::Pair(kMetricName2, 0.5),
237                                         ::testing::Pair(kMetricName4, 0.9)));
238   });
239 }
240 
TEST_F(OrcaServiceEnd2endTest,ClientClosesBeforeSendingMessage)241 TEST_F(OrcaServiceEnd2endTest, ClientClosesBeforeSendingMessage) {
242   auto stub = std::make_unique<GenericStub>(channel_);
243   GenericOrcaClientReactor reactor(stub.get());
244   reactor.Prepare();
245   reactor.StartWritesDone();
246   reactor.StartCall();
247   EXPECT_EQ(reactor.Await().error_code(), grpc::StatusCode::INTERNAL);
248 }
249 
250 }  // namespace
251 }  // namespace testing
252 }  // namespace grpc
253 
main(int argc,char ** argv)254 int main(int argc, char** argv) {
255   grpc::testing::TestEnvironment env(&argc, argv);
256   ::testing::InitGoogleTest(&argc, argv);
257   return RUN_ALL_TESTS();
258 }
259