1 //
2 //
3 // Copyright 2024 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/cpp/interop/backend_metrics_lb_policy.h"
20
21 #include <gmock/gmock.h>
22 #include <grpc/grpc.h>
23 #include <grpcpp/ext/call_metric_recorder.h>
24 #include <grpcpp/ext/orca_service.h>
25 #include <grpcpp/grpcpp.h>
26 #include <grpcpp/support/status.h>
27 #include <gtest/gtest.h>
28
29 #include <memory>
30 #include <thread>
31
32 #include "src/core/config/config_vars.h"
33 #include "src/core/util/sync.h"
34 #include "src/proto/grpc/testing/messages.pb.h"
35 #include "src/proto/grpc/testing/test.grpc.pb.h"
36 #include "test/core/test_util/port.h"
37 #include "test/core/test_util/test_config.h"
38
39 namespace grpc {
40 namespace testing {
41 namespace {
42
43 class EchoServiceImpl : public grpc::testing::TestService::CallbackService {
44 public:
UnaryCall(grpc::CallbackServerContext * context,const grpc::testing::SimpleRequest *,grpc::testing::SimpleResponse *)45 grpc::ServerUnaryReactor* UnaryCall(
46 grpc::CallbackServerContext* context,
47 const grpc::testing::SimpleRequest* /* request */,
48 grpc::testing::SimpleResponse* /* response */) override {
49 auto reactor = context->DefaultReactor();
50 reactor->Finish(grpc::Status::OK);
51 return reactor;
52 }
53 };
54
55 class Server {
56 public:
Server()57 Server() : port_(grpc_pick_unused_port_or_die()) {
58 server_thread_ = std::thread(ServerLoop, this);
59 grpc_core::MutexLock lock(&mu_);
60 cond_.WaitWithTimeout(&mu_, absl::Seconds(1));
61 }
62
~Server()63 ~Server() {
64 server_->Shutdown();
65 server_thread_.join();
66 }
67
address() const68 std::string address() const { return absl::StrCat("localhost:", port_); }
69
70 private:
ServerLoop(Server * server)71 static void ServerLoop(Server* server) { server->Run(); }
72
Run()73 void Run() {
74 ServerBuilder builder;
75 EchoServiceImpl service;
76 auto server_metric_recorder =
77 grpc::experimental::ServerMetricRecorder::Create();
78 server_metric_recorder->SetCpuUtilization(.5f);
79 grpc::experimental::OrcaService orca_service(
80 server_metric_recorder.get(),
81 grpc::experimental::OrcaService::Options().set_min_report_duration(
82 absl::Seconds(1)));
83 builder.RegisterService(&orca_service);
84 builder.RegisterService(&service);
85 builder.AddListeningPort(address(), InsecureServerCredentials());
86 auto grpc_server = builder.BuildAndStart();
87 server_ = grpc_server.get();
88 {
89 grpc_core::MutexLock lock(&mu_);
90 cond_.SignalAll();
91 }
92 grpc_server->Wait();
93 }
94
95 int port_;
96 grpc_core::Mutex mu_;
97 grpc_core::CondVar cond_;
98 std::thread server_thread_;
99 grpc::Server* server_;
100 };
101
TEST(BackendMetricsLbPolicyTest,TestOobMetricsReceipt)102 TEST(BackendMetricsLbPolicyTest, TestOobMetricsReceipt) {
103 LoadReportTracker tracker;
104 grpc_core::CoreConfiguration::RegisterBuilder(RegisterBackendMetricsLbPolicy);
105 Server server;
106 ChannelArguments args = tracker.GetChannelArguments();
107 args.SetLoadBalancingPolicyName("test_backend_metrics_load_balancer");
108 auto channel = grpc::CreateCustomChannel(server.address(),
109 InsecureChannelCredentials(), args);
110 auto stub = grpc::testing::TestService::Stub(channel);
111 ClientContext ctx;
112 SimpleRequest req;
113 SimpleResponse res;
114 grpc_core::Mutex mu;
115 grpc_core::CondVar cond;
116 absl::optional<Status> status;
117
118 stub.async()->UnaryCall(&ctx, &req, &res, [&](auto s) {
119 grpc_core::MutexLock lock(&mu);
120 status = s;
121 cond.SignalAll();
122 });
123 // This report is sent on start, available immediately
124 auto report = tracker.WaitForOobLoadReport(
125 [](auto report) { return report.cpu_utilization() == 0.5; },
126 absl::Seconds(5) * grpc_test_slowdown_factor(), 3);
127 ASSERT_TRUE(report.has_value());
128 EXPECT_EQ(report->cpu_utilization(), 0.5);
129 for (size_t i = 0; i < 3; i++) {
130 // Wait for slightly more than 1 min
131 report = tracker.WaitForOobLoadReport(
132 [](auto report) { return report.cpu_utilization() == 0.5; },
133 absl::Milliseconds(1500), 3);
134 ASSERT_TRUE(report.has_value());
135 EXPECT_EQ(report->cpu_utilization(), 0.5);
136 }
137 {
138 grpc_core::MutexLock lock(&mu);
139 if (!status.has_value()) {
140 cond.Wait(&mu);
141 }
142 ASSERT_TRUE(status.has_value());
143 EXPECT_EQ(status->error_code(), grpc::StatusCode::OK);
144 }
145 }
146
147 } // namespace
148 } // namespace testing
149 } // namespace grpc
150
main(int argc,char ** argv)151 int main(int argc, char** argv) {
152 ::testing::InitGoogleTest(&argc, argv);
153 grpc::testing::TestEnvironment env(&argc, argv);
154 grpc_init();
155 auto result = RUN_ALL_TESTS();
156 grpc_shutdown();
157 return result;
158 }
159