• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/cpp/server/orca/orca_service.h"
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpcpp/ext/orca_service.h>
21 #include <grpcpp/ext/server_metric_recorder.h>
22 #include <grpcpp/impl/rpc_method.h>
23 #include <grpcpp/impl/rpc_service_method.h>
24 #include <grpcpp/impl/server_callback_handlers.h>
25 #include <grpcpp/impl/sync.h>
26 #include <grpcpp/server_context.h>
27 #include <grpcpp/support/byte_buffer.h>
28 #include <grpcpp/support/server_callback.h>
29 #include <grpcpp/support/slice.h>
30 #include <grpcpp/support/status.h>
31 #include <stddef.h>
32 
33 #include <map>
34 #include <memory>
35 #include <utility>
36 
37 #include "absl/log/check.h"
38 #include "absl/log/log.h"
39 #include "absl/strings/string_view.h"
40 #include "absl/time/time.h"
41 #include "absl/types/optional.h"
42 #include "google/protobuf/duration.upb.h"
43 #include "src/core/lib/event_engine/default_event_engine.h"
44 #include "src/core/lib/iomgr/exec_ctx.h"
45 #include "src/core/load_balancing/backend_metric_data.h"
46 #include "src/core/util/debug_location.h"
47 #include "src/core/util/time.h"
48 #include "src/cpp/server/backend_metric_recorder.h"
49 #include "upb/base/string_view.h"
50 #include "upb/mem/arena.hpp"
51 #include "xds/data/orca/v3/orca_load_report.upb.h"
52 #include "xds/service/orca/v3/orca.upb.h"
53 
54 namespace grpc {
55 namespace experimental {
56 
57 //
58 // OrcaService::Reactor
59 //
60 
Reactor(OrcaService * service,absl::string_view peer,const ByteBuffer * request_buffer,std::shared_ptr<ReactorHook> hook)61 OrcaService::Reactor::Reactor(OrcaService* service, absl::string_view peer,
62                               const ByteBuffer* request_buffer,
63                               std::shared_ptr<ReactorHook> hook)
64     : service_(service),
65       hook_(std::move(hook)),
66       engine_(grpc_event_engine::experimental::GetDefaultEventEngine()) {
67   // Get slice from request.
68   Slice slice;
69   grpc::Status status = request_buffer->DumpToSingleSlice(&slice);
70   if (!status.ok()) {
71     LOG_EVERY_N_SEC(WARNING, 1)
72         << "OrcaService failed to extract request from peer: " << peer
73         << " error:" << status.error_message();
74     FinishRpc(Status(StatusCode::INTERNAL, status.error_message()));
75     return;
76   }
77   // Parse request proto.
78   upb::Arena arena;
79   xds_service_orca_v3_OrcaLoadReportRequest* request =
80       xds_service_orca_v3_OrcaLoadReportRequest_parse(
81           reinterpret_cast<const char*>(slice.begin()), slice.size(),
82           arena.ptr());
83   if (request == nullptr) {
84     LOG_EVERY_N_SEC(WARNING, 1)
85         << "OrcaService failed to parse request proto from peer: " << peer;
86     FinishRpc(Status(StatusCode::INTERNAL, "could not parse request proto"));
87     return;
88   }
89   const auto* duration_proto =
90       xds_service_orca_v3_OrcaLoadReportRequest_report_interval(request);
91   grpc_core::Duration report_interval;
92   if (duration_proto != nullptr) {
93     report_interval = grpc_core::Duration::FromSecondsAndNanoseconds(
94         google_protobuf_Duration_seconds(duration_proto),
95         google_protobuf_Duration_nanos(duration_proto));
96   }
97   auto min_interval = grpc_core::Duration::Milliseconds(
98       service_->min_report_duration_ / absl::Milliseconds(1));
99   report_interval_ = std::max(report_interval, min_interval);
100   // Send initial response.
101   SendResponse();
102 }
103 
OnWriteDone(bool ok)104 void OrcaService::Reactor::OnWriteDone(bool ok) {
105   if (!ok) {
106     FinishRpc(Status(StatusCode::UNKNOWN, "write failed"));
107     return;
108   }
109   response_.Clear();
110   if (!MaybeScheduleTimer()) {
111     FinishRpc(Status(StatusCode::UNKNOWN, "call cancelled by client"));
112   }
113 }
114 
OnCancel()115 void OrcaService::Reactor::OnCancel() {
116   if (MaybeCancelTimer()) {
117     FinishRpc(Status(StatusCode::UNKNOWN, "call cancelled by client"));
118   }
119 }
120 
OnDone()121 void OrcaService::Reactor::OnDone() {
122   // Free the initial ref from instantiation.
123   Unref(DEBUG_LOCATION, "OnDone");
124 }
125 
FinishRpc(grpc::Status status)126 void OrcaService::Reactor::FinishRpc(grpc::Status status) {
127   if (hook_ != nullptr) {
128     hook_->OnFinish(status);
129   }
130   Finish(status);
131 }
132 
SendResponse()133 void OrcaService::Reactor::SendResponse() {
134   Slice response_slice = service_->GetOrCreateSerializedResponse();
135   ByteBuffer response_buffer(&response_slice, 1);
136   response_.Swap(&response_buffer);
137   if (hook_ != nullptr) {
138     hook_->OnStartWrite(&response_);
139   }
140   StartWrite(&response_);
141 }
142 
MaybeScheduleTimer()143 bool OrcaService::Reactor::MaybeScheduleTimer() {
144   grpc::internal::MutexLock lock(&timer_mu_);
145   if (cancelled_) return false;
146   timer_handle_ = engine_->RunAfter(
147       report_interval_,
148       [self = Ref(DEBUG_LOCATION, "Orca Service")] { self->OnTimer(); });
149   return true;
150 }
151 
MaybeCancelTimer()152 bool OrcaService::Reactor::MaybeCancelTimer() {
153   grpc::internal::MutexLock lock(&timer_mu_);
154   cancelled_ = true;
155   if (timer_handle_.has_value() && engine_->Cancel(*timer_handle_)) {
156     timer_handle_.reset();
157     return true;
158   }
159   return false;
160 }
161 
OnTimer()162 void OrcaService::Reactor::OnTimer() {
163   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
164   grpc_core::ExecCtx exec_ctx;
165   grpc::internal::MutexLock lock(&timer_mu_);
166   timer_handle_.reset();
167   SendResponse();
168 }
169 
170 //
171 // OrcaService
172 //
173 
OrcaService(ServerMetricRecorder * const server_metric_recorder,Options options)174 OrcaService::OrcaService(ServerMetricRecorder* const server_metric_recorder,
175                          Options options)
176     : server_metric_recorder_(server_metric_recorder),
177       min_report_duration_(options.min_report_duration) {
178   CHECK_NE(server_metric_recorder_, nullptr);
179   AddMethod(new internal::RpcServiceMethod(
180       "/xds.service.orca.v3.OpenRcaService/StreamCoreMetrics",
181       internal::RpcMethod::SERVER_STREAMING, /*handler=*/nullptr));
182   MarkMethodCallback(
183       0, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>(
184              [this](CallbackServerContext* ctx, const ByteBuffer* request) {
185                return new Reactor(this, ctx->peer(), request, nullptr);
186              }));
187 }
188 
GetOrCreateSerializedResponse()189 Slice OrcaService::GetOrCreateSerializedResponse() {
190   grpc::internal::MutexLock lock(&mu_);
191   std::shared_ptr<const ServerMetricRecorder::BackendMetricDataState> result =
192       server_metric_recorder_->GetMetricsIfChanged();
193   if (!response_slice_seq_.has_value() ||
194       *response_slice_seq_ != result->sequence_number) {
195     const auto& data = result->data;
196     upb::Arena arena;
197     xds_data_orca_v3_OrcaLoadReport* response =
198         xds_data_orca_v3_OrcaLoadReport_new(arena.ptr());
199     if (data.cpu_utilization != -1) {
200       xds_data_orca_v3_OrcaLoadReport_set_cpu_utilization(response,
201                                                           data.cpu_utilization);
202     }
203     if (data.mem_utilization != -1) {
204       xds_data_orca_v3_OrcaLoadReport_set_mem_utilization(response,
205                                                           data.mem_utilization);
206     }
207     if (data.application_utilization != -1) {
208       xds_data_orca_v3_OrcaLoadReport_set_application_utilization(
209           response, data.application_utilization);
210     }
211     if (data.qps != -1) {
212       xds_data_orca_v3_OrcaLoadReport_set_rps_fractional(response, data.qps);
213     }
214     if (data.eps != -1) {
215       xds_data_orca_v3_OrcaLoadReport_set_eps(response, data.eps);
216     }
217     for (const auto& u : data.utilization) {
218       xds_data_orca_v3_OrcaLoadReport_utilization_set(
219           response,
220           upb_StringView_FromDataAndSize(u.first.data(), u.first.size()),
221           u.second, arena.ptr());
222     }
223     size_t buf_length;
224     char* buf = xds_data_orca_v3_OrcaLoadReport_serialize(response, arena.ptr(),
225                                                           &buf_length);
226     response_slice_.emplace(buf, buf_length);
227     response_slice_seq_ = result->sequence_number;
228   }
229   return Slice(*response_slice_);
230 }
231 
232 }  // namespace experimental
233 }  // namespace grpc
234