• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2023 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/cpp/server/backend_metric_recorder.h"
18 
19 #include <grpcpp/ext/call_metric_recorder.h>
20 #include <grpcpp/ext/server_metric_recorder.h>
21 #include <inttypes.h>
22 
23 #include <functional>
24 #include <memory>
25 #include <string>
26 #include <type_traits>
27 #include <utility>
28 
29 #include "absl/log/log.h"
30 #include "src/core/lib/debug/trace.h"
31 #include "src/core/load_balancing/backend_metric_data.h"
32 
33 using grpc_core::BackendMetricData;
34 
35 namespace {
36 // Utilization values with soft limits must be in [0, infy).
IsUtilizationWithSoftLimitsValid(double util)37 bool IsUtilizationWithSoftLimitsValid(double util) { return util >= 0.0; }
38 
39 // Other utilization values must be in [0, 1].
IsUtilizationValid(double utilization)40 bool IsUtilizationValid(double utilization) {
41   return utilization >= 0.0 && utilization <= 1.0;
42 }
43 
44 // Rate values (qps and eps) must be in [0, infy).
IsRateValid(double rate)45 bool IsRateValid(double rate) { return rate >= 0.0; }
46 
47 }  // namespace
48 
49 namespace grpc {
50 namespace experimental {
51 
Create()52 std::unique_ptr<ServerMetricRecorder> ServerMetricRecorder::Create() {
53   return std::unique_ptr<ServerMetricRecorder>(new ServerMetricRecorder());
54 }
55 
ServerMetricRecorder()56 ServerMetricRecorder::ServerMetricRecorder()
57     : metric_state_(std::make_shared<const BackendMetricDataState>()) {}
58 
UpdateBackendMetricDataState(std::function<void (BackendMetricData *)> updater)59 void ServerMetricRecorder::UpdateBackendMetricDataState(
60     std::function<void(BackendMetricData*)> updater) {
61   internal::MutexLock lock(&mu_);
62   auto new_state = std::make_shared<BackendMetricDataState>(*metric_state_);
63   updater(&new_state->data);
64   ++new_state->sequence_number;
65   metric_state_ = std::move(new_state);
66 }
67 
SetCpuUtilization(double value)68 void ServerMetricRecorder::SetCpuUtilization(double value) {
69   if (!IsUtilizationWithSoftLimitsValid(value)) {
70     GRPC_TRACE_LOG(backend_metric, INFO)
71         << "[" << this << "] CPU utilization rejected: " << value;
72     return;
73   }
74   UpdateBackendMetricDataState(
75       [value](BackendMetricData* data) { data->cpu_utilization = value; });
76   GRPC_TRACE_LOG(backend_metric, INFO)
77       << "[" << this << "] CPU utilization set: " << value;
78 }
79 
SetMemoryUtilization(double value)80 void ServerMetricRecorder::SetMemoryUtilization(double value) {
81   if (!IsUtilizationValid(value)) {
82     GRPC_TRACE_LOG(backend_metric, INFO)
83         << "[" << this << "] Mem utilization rejected: " << value;
84     return;
85   }
86   UpdateBackendMetricDataState(
87       [value](BackendMetricData* data) { data->mem_utilization = value; });
88   GRPC_TRACE_LOG(backend_metric, INFO)
89       << "[" << this << "] Mem utilization set: " << value;
90 }
91 
SetApplicationUtilization(double value)92 void ServerMetricRecorder::SetApplicationUtilization(double value) {
93   if (!IsUtilizationWithSoftLimitsValid(value)) {
94     GRPC_TRACE_LOG(backend_metric, INFO)
95         << "[" << this << "] Application utilization rejected: " << value;
96     return;
97   }
98   UpdateBackendMetricDataState([value](BackendMetricData* data) {
99     data->application_utilization = value;
100   });
101   GRPC_TRACE_LOG(backend_metric, INFO)
102       << "[" << this << "] Application utilization set: " << value;
103 }
104 
SetQps(double value)105 void ServerMetricRecorder::SetQps(double value) {
106   if (!IsRateValid(value)) {
107     GRPC_TRACE_LOG(backend_metric, INFO)
108         << "[" << this << "] QPS rejected: " << value;
109     return;
110   }
111   UpdateBackendMetricDataState(
112       [value](BackendMetricData* data) { data->qps = value; });
113   GRPC_TRACE_LOG(backend_metric, INFO) << "[" << this << "] QPS set: " << value;
114 }
115 
SetEps(double value)116 void ServerMetricRecorder::SetEps(double value) {
117   if (!IsRateValid(value)) {
118     GRPC_TRACE_LOG(backend_metric, INFO)
119         << "[" << this << "] EPS rejected: " << value;
120     return;
121   }
122   UpdateBackendMetricDataState(
123       [value](BackendMetricData* data) { data->eps = value; });
124   GRPC_TRACE_LOG(backend_metric, INFO) << "[" << this << "] EPS set: " << value;
125 }
126 
SetNamedUtilization(string_ref name,double value)127 void ServerMetricRecorder::SetNamedUtilization(string_ref name, double value) {
128   if (!IsUtilizationValid(value)) {
129     GRPC_TRACE_LOG(backend_metric, INFO)
130         << "[" << this << "] Named utilization rejected: " << value
131         << " name: " << std::string(name.data(), name.size());
132     return;
133   }
134   GRPC_TRACE_LOG(backend_metric, INFO)
135       << "[" << this << "] Named utilization set: " << value
136       << " name: " << std::string(name.data(), name.size());
137   UpdateBackendMetricDataState([name, value](BackendMetricData* data) {
138     data->utilization[absl::string_view(name.data(), name.size())] = value;
139   });
140 }
141 
SetAllNamedUtilization(std::map<string_ref,double> named_utilization)142 void ServerMetricRecorder::SetAllNamedUtilization(
143     std::map<string_ref, double> named_utilization) {
144   GRPC_TRACE_LOG(backend_metric, INFO)
145       << "[" << this
146       << "] All named utilization updated. size: " << named_utilization.size();
147   UpdateBackendMetricDataState(
148       [utilization = std::move(named_utilization)](BackendMetricData* data) {
149         data->utilization.clear();
150         for (const auto& u : utilization) {
151           data->utilization[absl::string_view(u.first.data(), u.first.size())] =
152               u.second;
153         }
154       });
155 }
156 
ClearCpuUtilization()157 void ServerMetricRecorder::ClearCpuUtilization() {
158   UpdateBackendMetricDataState(
159       [](BackendMetricData* data) { data->cpu_utilization = -1; });
160   GRPC_TRACE_LOG(backend_metric, INFO)
161       << "[" << this << "] CPU utilization cleared.";
162 }
163 
ClearMemoryUtilization()164 void ServerMetricRecorder::ClearMemoryUtilization() {
165   UpdateBackendMetricDataState(
166       [](BackendMetricData* data) { data->mem_utilization = -1; });
167   GRPC_TRACE_LOG(backend_metric, INFO)
168       << "[" << this << "] Mem utilization cleared.";
169 }
170 
ClearApplicationUtilization()171 void ServerMetricRecorder::ClearApplicationUtilization() {
172   UpdateBackendMetricDataState(
173       [](BackendMetricData* data) { data->application_utilization = -1; });
174   GRPC_TRACE_LOG(backend_metric, INFO)
175       << "[" << this << "] Application utilization cleared.";
176 }
177 
ClearQps()178 void ServerMetricRecorder::ClearQps() {
179   UpdateBackendMetricDataState([](BackendMetricData* data) { data->qps = -1; });
180   GRPC_TRACE_LOG(backend_metric, INFO)
181       << "[" << this << "] QPS utilization cleared.";
182 }
183 
ClearEps()184 void ServerMetricRecorder::ClearEps() {
185   UpdateBackendMetricDataState([](BackendMetricData* data) { data->eps = -1; });
186   GRPC_TRACE_LOG(backend_metric, INFO)
187       << "[" << this << "] EPS utilization cleared.";
188 }
189 
ClearNamedUtilization(string_ref name)190 void ServerMetricRecorder::ClearNamedUtilization(string_ref name) {
191   GRPC_TRACE_LOG(backend_metric, INFO)
192       << "[" << this << "] Named utilization cleared. name: "
193       << std::string(name.data(), name.size());
194   UpdateBackendMetricDataState([name](BackendMetricData* data) {
195     data->utilization.erase(absl::string_view(name.data(), name.size()));
196   });
197 }
198 
GetMetrics() const199 grpc_core::BackendMetricData ServerMetricRecorder::GetMetrics() const {
200   auto result = GetMetricsIfChanged();
201   return result->data;
202 }
203 
204 std::shared_ptr<const ServerMetricRecorder::BackendMetricDataState>
GetMetricsIfChanged() const205 ServerMetricRecorder::GetMetricsIfChanged() const {
206   std::shared_ptr<const BackendMetricDataState> result;
207   {
208     internal::MutexLock lock(&mu_);
209     result = metric_state_;
210   }
211   if (GRPC_TRACE_FLAG_ENABLED(backend_metric)) {
212     const auto& data = result->data;
213     LOG(INFO) << "[" << this
214               << "] GetMetrics() returned: seq:" << result->sequence_number
215               << " cpu:" << data.cpu_utilization
216               << " mem:" << data.mem_utilization
217               << " app:" << data.application_utilization << " qps:" << data.qps
218               << " eps:" << data.eps
219               << " utilization size: " << data.utilization.size();
220   }
221   return result;
222 }
223 
224 }  // namespace experimental
225 
226 experimental::CallMetricRecorder&
RecordCpuUtilizationMetric(double value)227 BackendMetricState::RecordCpuUtilizationMetric(double value) {
228   if (!IsUtilizationWithSoftLimitsValid(value)) {
229     GRPC_TRACE_LOG(backend_metric, INFO)
230         << "[" << this << "] CPU utilization value rejected: " << value;
231     return *this;
232   }
233   cpu_utilization_.store(value, std::memory_order_relaxed);
234   GRPC_TRACE_LOG(backend_metric, INFO)
235       << "[" << this << "] CPU utilization recorded: " << value;
236   return *this;
237 }
238 
239 experimental::CallMetricRecorder&
RecordMemoryUtilizationMetric(double value)240 BackendMetricState::RecordMemoryUtilizationMetric(double value) {
241   if (!IsUtilizationValid(value)) {
242     GRPC_TRACE_LOG(backend_metric, INFO)
243         << "[" << this << "] Mem utilization value rejected: " << value;
244     return *this;
245   }
246   mem_utilization_.store(value, std::memory_order_relaxed);
247   GRPC_TRACE_LOG(backend_metric, INFO)
248       << "[" << this << "] Mem utilization recorded: " << value;
249   return *this;
250 }
251 
252 experimental::CallMetricRecorder&
RecordApplicationUtilizationMetric(double value)253 BackendMetricState::RecordApplicationUtilizationMetric(double value) {
254   if (!IsUtilizationWithSoftLimitsValid(value)) {
255     GRPC_TRACE_LOG(backend_metric, INFO)
256         << "[" << this << "] Application utilization value rejected: " << value;
257     return *this;
258   }
259   application_utilization_.store(value, std::memory_order_relaxed);
260   GRPC_TRACE_LOG(backend_metric, INFO)
261       << "[" << this << "] Application utilization recorded: " << value;
262   return *this;
263 }
264 
RecordQpsMetric(double value)265 experimental::CallMetricRecorder& BackendMetricState::RecordQpsMetric(
266     double value) {
267   if (!IsRateValid(value)) {
268     GRPC_TRACE_LOG(backend_metric, INFO)
269         << "[" << this << "] QPS value rejected: " << value;
270     return *this;
271   }
272   qps_.store(value, std::memory_order_relaxed);
273   GRPC_TRACE_LOG(backend_metric, INFO)
274       << "[" << this << "] QPS recorded: " << value;
275   return *this;
276 }
277 
RecordEpsMetric(double value)278 experimental::CallMetricRecorder& BackendMetricState::RecordEpsMetric(
279     double value) {
280   if (!IsRateValid(value)) {
281     GRPC_TRACE_LOG(backend_metric, INFO)
282         << "[" << this << "] EPS value rejected: " << value;
283     return *this;
284   }
285   eps_.store(value, std::memory_order_relaxed);
286   GRPC_TRACE_LOG(backend_metric, INFO)
287       << "[" << this << "] EPS recorded: " << value;
288   return *this;
289 }
290 
RecordUtilizationMetric(string_ref name,double value)291 experimental::CallMetricRecorder& BackendMetricState::RecordUtilizationMetric(
292     string_ref name, double value) {
293   if (!IsUtilizationValid(value)) {
294     GRPC_TRACE_LOG(backend_metric, INFO)
295         << "[" << this << "] Utilization value rejected: "
296         << std::string(name.data(), name.length()) << " " << value;
297     return *this;
298   }
299   internal::MutexLock lock(&mu_);
300   absl::string_view name_sv(name.data(), name.length());
301   utilization_[name_sv] = value;
302   GRPC_TRACE_LOG(backend_metric, INFO)
303       << "[" << this << "] Utilization recorded: " << name_sv << " " << value;
304   return *this;
305 }
306 
RecordRequestCostMetric(string_ref name,double value)307 experimental::CallMetricRecorder& BackendMetricState::RecordRequestCostMetric(
308     string_ref name, double value) {
309   internal::MutexLock lock(&mu_);
310   absl::string_view name_sv(name.data(), name.length());
311   request_cost_[name_sv] = value;
312   GRPC_TRACE_LOG(backend_metric, INFO)
313       << "[" << this << "] Request cost recorded: " << name_sv << " " << value;
314   return *this;
315 }
316 
RecordNamedMetric(string_ref name,double value)317 experimental::CallMetricRecorder& BackendMetricState::RecordNamedMetric(
318     string_ref name, double value) {
319   internal::MutexLock lock(&mu_);
320   absl::string_view name_sv(name.data(), name.length());
321   named_metrics_[name_sv] = value;
322   GRPC_TRACE_LOG(backend_metric, INFO)
323       << "[" << this << "] Named metric recorded: " << name_sv << " " << value;
324   return *this;
325 }
326 
GetBackendMetricData()327 BackendMetricData BackendMetricState::GetBackendMetricData() {
328   // Merge metrics from the ServerMetricRecorder first since metrics recorded
329   // to CallMetricRecorder takes a higher precedence.
330   BackendMetricData data;
331   if (server_metric_recorder_ != nullptr) {
332     data = server_metric_recorder_->GetMetrics();
333   }
334   // Only overwrite if the value is set i.e. in the valid range.
335   const double cpu = cpu_utilization_.load(std::memory_order_relaxed);
336   if (IsUtilizationWithSoftLimitsValid(cpu)) {
337     data.cpu_utilization = cpu;
338   }
339   const double mem = mem_utilization_.load(std::memory_order_relaxed);
340   if (IsUtilizationValid(mem)) {
341     data.mem_utilization = mem;
342   }
343   const double app_util =
344       application_utilization_.load(std::memory_order_relaxed);
345   if (IsUtilizationWithSoftLimitsValid(app_util)) {
346     data.application_utilization = app_util;
347   }
348   const double qps = qps_.load(std::memory_order_relaxed);
349   if (IsRateValid(qps)) {
350     data.qps = qps;
351   }
352   const double eps = eps_.load(std::memory_order_relaxed);
353   if (IsRateValid(eps)) {
354     data.eps = eps;
355   }
356   {
357     internal::MutexLock lock(&mu_);
358     for (const auto& u : utilization_) {
359       data.utilization[u.first] = u.second;
360     }
361     for (const auto& r : request_cost_) {
362       data.request_cost[r.first] = r.second;
363     }
364     for (const auto& r : named_metrics_) {
365       data.named_metrics[r.first] = r.second;
366     }
367   }
368   GRPC_TRACE_LOG(backend_metric, INFO)
369       << "[" << this
370       << "] Backend metric data returned: cpu:" << data.cpu_utilization
371       << " mem:" << data.mem_utilization << " qps:" << data.qps
372       << " eps:" << data.eps << " utilization size:" << data.utilization.size()
373       << " request_cost size:" << data.request_cost.size()
374       << "named_metrics size:" << data.named_metrics.size();
375   return data;
376 }
377 
378 }  // namespace grpc
379