• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2024 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/interop/backend_metrics_lb_policy.h"
20 
21 #include <gmock/gmock.h>
22 #include <grpc/grpc.h>
23 #include <grpcpp/ext/call_metric_recorder.h>
24 #include <grpcpp/ext/orca_service.h>
25 #include <grpcpp/grpcpp.h>
26 #include <grpcpp/support/status.h>
27 #include <gtest/gtest.h>
28 
29 #include <memory>
30 #include <thread>
31 
32 #include "src/core/config/config_vars.h"
33 #include "src/core/util/sync.h"
34 #include "src/proto/grpc/testing/messages.pb.h"
35 #include "src/proto/grpc/testing/test.grpc.pb.h"
36 #include "test/core/test_util/port.h"
37 #include "test/core/test_util/test_config.h"
38 
39 namespace grpc {
40 namespace testing {
41 namespace {
42 
43 class EchoServiceImpl : public grpc::testing::TestService::CallbackService {
44  public:
UnaryCall(grpc::CallbackServerContext * context,const grpc::testing::SimpleRequest *,grpc::testing::SimpleResponse *)45   grpc::ServerUnaryReactor* UnaryCall(
46       grpc::CallbackServerContext* context,
47       const grpc::testing::SimpleRequest* /* request */,
48       grpc::testing::SimpleResponse* /* response */) override {
49     auto reactor = context->DefaultReactor();
50     reactor->Finish(grpc::Status::OK);
51     return reactor;
52   }
53 };
54 
55 class Server {
56  public:
Server()57   Server() : port_(grpc_pick_unused_port_or_die()) {
58     server_thread_ = std::thread(ServerLoop, this);
59     grpc_core::MutexLock lock(&mu_);
60     cond_.WaitWithTimeout(&mu_, absl::Seconds(1));
61   }
62 
~Server()63   ~Server() {
64     server_->Shutdown();
65     server_thread_.join();
66   }
67 
address() const68   std::string address() const { return absl::StrCat("localhost:", port_); }
69 
70  private:
ServerLoop(Server * server)71   static void ServerLoop(Server* server) { server->Run(); }
72 
Run()73   void Run() {
74     ServerBuilder builder;
75     EchoServiceImpl service;
76     auto server_metric_recorder =
77         grpc::experimental::ServerMetricRecorder::Create();
78     server_metric_recorder->SetCpuUtilization(.5f);
79     grpc::experimental::OrcaService orca_service(
80         server_metric_recorder.get(),
81         grpc::experimental::OrcaService::Options().set_min_report_duration(
82             absl::Seconds(1)));
83     builder.RegisterService(&orca_service);
84     builder.RegisterService(&service);
85     builder.AddListeningPort(address(), InsecureServerCredentials());
86     auto grpc_server = builder.BuildAndStart();
87     server_ = grpc_server.get();
88     {
89       grpc_core::MutexLock lock(&mu_);
90       cond_.SignalAll();
91     }
92     grpc_server->Wait();
93   }
94 
95   int port_;
96   grpc_core::Mutex mu_;
97   grpc_core::CondVar cond_;
98   std::thread server_thread_;
99   grpc::Server* server_;
100 };
101 
TEST(BackendMetricsLbPolicyTest,TestOobMetricsReceipt)102 TEST(BackendMetricsLbPolicyTest, TestOobMetricsReceipt) {
103   LoadReportTracker tracker;
104   grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy);
105   Server server;
106   ChannelArguments args = tracker.GetChannelArguments();
107   args.SetLoadBalancingPolicyName("test_backend_metrics_load_balancer");
108   auto channel = grpc::CreateCustomChannel(server.address(),
109                                            InsecureChannelCredentials(), args);
110   auto stub = grpc::testing::TestService::Stub(channel);
111   ClientContext ctx;
112   SimpleRequest req;
113   SimpleResponse res;
114   grpc_core::Mutex mu;
115   grpc_core::CondVar cond;
116   absl::optional<Status> status;
117 
118   stub.async()->UnaryCall(&ctx, &req, &res, [&](auto s) {
119     grpc_core::MutexLock lock(&mu);
120     status = s;
121     cond.SignalAll();
122   });
123   // This report is sent on start, available immediately
124   auto report = tracker.WaitForOobLoadReport(
125       [](auto report) { return report.cpu_utilization() == 0.5; },
126       absl::Seconds(5) * grpc_test_slowdown_factor(), 3);
127   ASSERT_TRUE(report.has_value());
128   EXPECT_EQ(report->cpu_utilization(), 0.5);
129   for (size_t i = 0; i < 3; i++) {
130     // Wait for slightly more than 1 min
131     report = tracker.WaitForOobLoadReport(
132         [](auto report) { return report.cpu_utilization() == 0.5; },
133         absl::Milliseconds(1500), 3);
134     ASSERT_TRUE(report.has_value());
135     EXPECT_EQ(report->cpu_utilization(), 0.5);
136   }
137   {
138     grpc_core::MutexLock lock(&mu);
139     if (!status.has_value()) {
140       cond.Wait(&mu);
141     }
142     ASSERT_TRUE(status.has_value());
143     EXPECT_EQ(status->error_code(), grpc::StatusCode::OK);
144   }
145 }
146 
147 }  // namespace
148 }  // namespace testing
149 }  // namespace grpc
150 
main(int argc,char ** argv)151 int main(int argc, char** argv) {
152   ::testing::InitGoogleTest(&argc, argv);
153   grpc::testing::TestEnvironment env(&argc, argv);
154   grpc_init();
155   auto result = RUN_ALL_TESTS();
156   grpc_shutdown();
157   return result;
158 }
159