1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/cpp/server/orca/orca_service.h"
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpcpp/ext/orca_service.h>
21 #include <grpcpp/ext/server_metric_recorder.h>
22 #include <grpcpp/impl/rpc_method.h>
23 #include <grpcpp/impl/rpc_service_method.h>
24 #include <grpcpp/impl/server_callback_handlers.h>
25 #include <grpcpp/impl/sync.h>
26 #include <grpcpp/server_context.h>
27 #include <grpcpp/support/byte_buffer.h>
28 #include <grpcpp/support/server_callback.h>
29 #include <grpcpp/support/slice.h>
30 #include <grpcpp/support/status.h>
31 #include <stddef.h>
32
33 #include <map>
34 #include <memory>
35 #include <utility>
36
37 #include "absl/log/check.h"
38 #include "absl/log/log.h"
39 #include "absl/strings/string_view.h"
40 #include "absl/time/time.h"
41 #include "absl/types/optional.h"
42 #include "google/protobuf/duration.upb.h"
43 #include "src/core/lib/event_engine/default_event_engine.h"
44 #include "src/core/lib/iomgr/exec_ctx.h"
45 #include "src/core/load_balancing/backend_metric_data.h"
46 #include "src/core/util/debug_location.h"
47 #include "src/core/util/time.h"
48 #include "src/cpp/server/backend_metric_recorder.h"
49 #include "upb/base/string_view.h"
50 #include "upb/mem/arena.hpp"
51 #include "xds/data/orca/v3/orca_load_report.upb.h"
52 #include "xds/service/orca/v3/orca.upb.h"
53
54 namespace grpc {
55 namespace experimental {
56
57 //
58 // OrcaService::Reactor
59 //
60
Reactor(OrcaService * service,absl::string_view peer,const ByteBuffer * request_buffer,std::shared_ptr<ReactorHook> hook)61 OrcaService::Reactor::Reactor(OrcaService* service, absl::string_view peer,
62 const ByteBuffer* request_buffer,
63 std::shared_ptr<ReactorHook> hook)
64 : service_(service),
65 hook_(std::move(hook)),
66 engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
67 // Get slice from request.
68 Slice slice;
69 grpc::Status status = request_buffer->DumpToSingleSlice(&slice);
70 if (!status.ok()) {
71 LOG_EVERY_N_SEC(WARNING, 1)
72 << "OrcaService failed to extract request from peer: " << peer
73 << " error:" << status.error_message();
74 FinishRpc(Status(StatusCode::INTERNAL, status.error_message()));
75 return;
76 }
77 // Parse request proto.
78 upb::Arena arena;
79 xds_service_orca_v3_OrcaLoadReportRequest* request =
80 xds_service_orca_v3_OrcaLoadReportRequest_parse(
81 reinterpret_cast<const char*>(slice.begin()), slice.size(),
82 arena.ptr());
83 if (request == nullptr) {
84 LOG_EVERY_N_SEC(WARNING, 1)
85 << "OrcaService failed to parse request proto from peer: " << peer;
86 FinishRpc(Status(StatusCode::INTERNAL, "could not parse request proto"));
87 return;
88 }
89 const auto* duration_proto =
90 xds_service_orca_v3_OrcaLoadReportRequest_report_interval(request);
91 grpc_core::Duration report_interval;
92 if (duration_proto != nullptr) {
93 report_interval = grpc_core::Duration::FromSecondsAndNanoseconds(
94 google_protobuf_Duration_seconds(duration_proto),
95 google_protobuf_Duration_nanos(duration_proto));
96 }
97 auto min_interval = grpc_core::Duration::Milliseconds(
98 service_->min_report_duration_ / absl::Milliseconds(1));
99 report_interval_ = std::max(report_interval, min_interval);
100 // Send initial response.
101 SendResponse();
102 }
103
OnWriteDone(bool ok)104 void OrcaService::Reactor::OnWriteDone(bool ok) {
105 if (!ok) {
106 FinishRpc(Status(StatusCode::UNKNOWN, "write failed"));
107 return;
108 }
109 response_.Clear();
110 if (!MaybeScheduleTimer()) {
111 FinishRpc(Status(StatusCode::UNKNOWN, "call cancelled by client"));
112 }
113 }
114
OnCancel()115 void OrcaService::Reactor::OnCancel() {
116 if (MaybeCancelTimer()) {
117 FinishRpc(Status(StatusCode::UNKNOWN, "call cancelled by client"));
118 }
119 }
120
OnDone()121 void OrcaService::Reactor::OnDone() {
122 // Free the initial ref from instantiation.
123 Unref(DEBUG_LOCATION, "OnDone");
124 }
125
FinishRpc(grpc::Status status)126 void OrcaService::Reactor::FinishRpc(grpc::Status status) {
127 if (hook_ != nullptr) {
128 hook_->OnFinish(status);
129 }
130 Finish(status);
131 }
132
SendResponse()133 void OrcaService::Reactor::SendResponse() {
134 Slice response_slice = service_->GetOrCreateSerializedResponse();
135 ByteBuffer response_buffer(&response_slice, 1);
136 response_.Swap(&response_buffer);
137 if (hook_ != nullptr) {
138 hook_->OnStartWrite(&response_);
139 }
140 StartWrite(&response_);
141 }
142
MaybeScheduleTimer()143 bool OrcaService::Reactor::MaybeScheduleTimer() {
144 grpc::internal::MutexLock lock(&timer_mu_);
145 if (cancelled_) return false;
146 timer_handle_ = engine_->RunAfter(
147 report_interval_,
148 [self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); });
149 return true;
150 }
151
MaybeCancelTimer()152 bool OrcaService::Reactor::MaybeCancelTimer() {
153 grpc::internal::MutexLock lock(&timer_mu_);
154 cancelled_ = true;
155 if (timer_handle_.has_value() && engine_->Cancel(*timer_handle_)) {
156 timer_handle_.reset();
157 return true;
158 }
159 return false;
160 }
161
OnTimer()162 void OrcaService::Reactor::OnTimer() {
163 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
164 grpc_core::ExecCtx exec_ctx;
165 grpc::internal::MutexLock lock(&timer_mu_);
166 timer_handle_.reset();
167 SendResponse();
168 }
169
170 //
171 // OrcaService
172 //
173
OrcaService(ServerMetricRecorder * const server_metric_recorder,Options options)174 OrcaService::OrcaService(ServerMetricRecorder* const server_metric_recorder,
175 Options options)
176 : server_metric_recorder_(server_metric_recorder),
177 min_report_duration_(options.min_report_duration) {
178 CHECK_NE(server_metric_recorder_, nullptr);
179 AddMethod(new internal::RpcServiceMethod(
180 "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics",
181 internal::RpcMethod::SERVER_STREAMING, /*handler=*/nullptr));
182 MarkMethodCallback(
183 0, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>(
184 [this](CallbackServerContext* ctx, const ByteBuffer* request) {
185 return new Reactor(this, ctx->peer(), request, nullptr);
186 }));
187 }
188
GetOrCreateSerializedResponse()189 Slice OrcaService::GetOrCreateSerializedResponse() {
190 grpc::internal::MutexLock lock(&mu_);
191 std::shared_ptr<const ServerMetricRecorder::BackendMetricDataState> result =
192 server_metric_recorder_->GetMetricsIfChanged();
193 if (!response_slice_seq_.has_value() ||
194 *response_slice_seq_ != result->sequence_number) {
195 const auto& data = result->data;
196 upb::Arena arena;
197 xds_data_orca_v3_OrcaLoadReport* response =
198 xds_data_orca_v3_OrcaLoadReport_new(arena.ptr());
199 if (data.cpu_utilization != -1) {
200 xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization(response,
201 data.cpu_utilization);
202 }
203 if (data.mem_utilization != -1) {
204 xds_data_orca_v3_OrcaLoadReport_set_mem_utilization(response,
205 data.mem_utilization);
206 }
207 if (data.application_utilization != -1) {
208 xds_data_orca_v3_OrcaLoadReport_set_application_utilization(
209 response, data.application_utilization);
210 }
211 if (data.qps != -1) {
212 xds_data_orca_v3_OrcaLoadReport_set_rps_fractional(response, data.qps);
213 }
214 if (data.eps != -1) {
215 xds_data_orca_v3_OrcaLoadReport_set_eps(response, data.eps);
216 }
217 for (const auto& u : data.utilization) {
218 xds_data_orca_v3_OrcaLoadReport_utilization_set(
219 response,
220 upb_StringView_FromDataAndSize(u.first.data(), u.first.size()),
221 u.second, arena.ptr());
222 }
223 size_t buf_length;
224 char* buf = xds_data_orca_v3_OrcaLoadReport_serialize(response, arena.ptr(),
225 &buf_length);
226 response_slice_.emplace(buf, buf_length);
227 response_slice_seq_ = result->sequence_number;
228 }
229 return Slice(*response_slice_);
230 }
231
232 } // namespace experimental
233 } // namespace grpc
234