• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2023 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/cpp/server/backend_metric_recorder.h"
18 
19 #include <inttypes.h>
20 
21 #include <functional>
22 #include <memory>
23 #include <string>
24 #include <type_traits>
25 #include <utility>
26 
27 #include <grpc/support/log.h>
28 #include <grpcpp/ext/call_metric_recorder.h>
29 #include <grpcpp/ext/server_metric_recorder.h>
30 
31 #include "src/core/lib/debug/trace.h"
32 #include "src/core/load_balancing/backend_metric_data.h"
33 
34 using grpc_core::BackendMetricData;
35 
36 namespace {
37 // Utilization values with soft limits must be in [0, infy).
IsUtilizationWithSoftLimitsValid(double util)38 bool IsUtilizationWithSoftLimitsValid(double util) { return util >= 0.0; }
39 
40 // Other utilization values must be in [0, 1].
IsUtilizationValid(double utilization)41 bool IsUtilizationValid(double utilization) {
42   return utilization >= 0.0 && utilization <= 1.0;
43 }
44 
45 // Rate values (qps and eps) must be in [0, infy).
IsRateValid(double rate)46 bool IsRateValid(double rate) { return rate >= 0.0; }
47 
48 grpc_core::TraceFlag grpc_backend_metric_trace(false, "backend_metric");
49 }  // namespace
50 
51 namespace grpc {
52 namespace experimental {
53 
Create()54 std::unique_ptr<ServerMetricRecorder> ServerMetricRecorder::Create() {
55   return std::unique_ptr<ServerMetricRecorder>(new ServerMetricRecorder());
56 }
57 
ServerMetricRecorder()58 ServerMetricRecorder::ServerMetricRecorder()
59     : metric_state_(std::make_shared<const BackendMetricDataState>()) {}
60 
UpdateBackendMetricDataState(std::function<void (BackendMetricData *)> updater)61 void ServerMetricRecorder::UpdateBackendMetricDataState(
62     std::function<void(BackendMetricData*)> updater) {
63   internal::MutexLock lock(&mu_);
64   auto new_state = std::make_shared<BackendMetricDataState>(*metric_state_);
65   updater(&new_state->data);
66   ++new_state->sequence_number;
67   metric_state_ = std::move(new_state);
68 }
69 
SetCpuUtilization(double value)70 void ServerMetricRecorder::SetCpuUtilization(double value) {
71   if (!IsUtilizationWithSoftLimitsValid(value)) {
72     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
73       gpr_log(GPR_INFO, "[%p] CPU utilization rejected: %f", this, value);
74     }
75     return;
76   }
77   UpdateBackendMetricDataState(
78       [value](BackendMetricData* data) { data->cpu_utilization = value; });
79   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
80     gpr_log(GPR_INFO, "[%p] CPU utilization set: %f", this, value);
81   }
82 }
83 
SetMemoryUtilization(double value)84 void ServerMetricRecorder::SetMemoryUtilization(double value) {
85   if (!IsUtilizationValid(value)) {
86     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
87       gpr_log(GPR_INFO, "[%p] Mem utilization rejected: %f", this, value);
88     }
89     return;
90   }
91   UpdateBackendMetricDataState(
92       [value](BackendMetricData* data) { data->mem_utilization = value; });
93   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
94     gpr_log(GPR_INFO, "[%p] Mem utilization set: %f", this, value);
95   }
96 }
97 
SetApplicationUtilization(double value)98 void ServerMetricRecorder::SetApplicationUtilization(double value) {
99   if (!IsUtilizationWithSoftLimitsValid(value)) {
100     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
101       gpr_log(GPR_INFO, "[%p] Application utilization rejected: %f", this,
102               value);
103     }
104     return;
105   }
106   UpdateBackendMetricDataState([value](BackendMetricData* data) {
107     data->application_utilization = value;
108   });
109   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
110     gpr_log(GPR_INFO, "[%p] Application utilization set: %f", this, value);
111   }
112 }
113 
SetQps(double value)114 void ServerMetricRecorder::SetQps(double value) {
115   if (!IsRateValid(value)) {
116     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
117       gpr_log(GPR_INFO, "[%p] QPS rejected: %f", this, value);
118     }
119     return;
120   }
121   UpdateBackendMetricDataState(
122       [value](BackendMetricData* data) { data->qps = value; });
123   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
124     gpr_log(GPR_INFO, "[%p] QPS set: %f", this, value);
125   }
126 }
127 
SetEps(double value)128 void ServerMetricRecorder::SetEps(double value) {
129   if (!IsRateValid(value)) {
130     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
131       gpr_log(GPR_INFO, "[%p] EPS rejected: %f", this, value);
132     }
133     return;
134   }
135   UpdateBackendMetricDataState(
136       [value](BackendMetricData* data) { data->eps = value; });
137   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
138     gpr_log(GPR_INFO, "[%p] EPS set: %f", this, value);
139   }
140 }
141 
SetNamedUtilization(string_ref name,double value)142 void ServerMetricRecorder::SetNamedUtilization(string_ref name, double value) {
143   if (!IsUtilizationValid(value)) {
144     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
145       gpr_log(GPR_INFO, "[%p] Named utilization rejected: %f name: %s", this,
146               value, std::string(name.data(), name.size()).c_str());
147     }
148     return;
149   }
150   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
151     gpr_log(GPR_INFO, "[%p] Named utilization set: %f name: %s", this, value,
152             std::string(name.data(), name.size()).c_str());
153   }
154   UpdateBackendMetricDataState([name, value](BackendMetricData* data) {
155     data->utilization[absl::string_view(name.data(), name.size())] = value;
156   });
157 }
158 
SetAllNamedUtilization(std::map<string_ref,double> named_utilization)159 void ServerMetricRecorder::SetAllNamedUtilization(
160     std::map<string_ref, double> named_utilization) {
161   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
162     gpr_log(GPR_INFO, "[%p] All named utilization updated. size: %" PRIuPTR,
163             this, named_utilization.size());
164   }
165   UpdateBackendMetricDataState(
166       [utilization = std::move(named_utilization)](BackendMetricData* data) {
167         data->utilization.clear();
168         for (const auto& u : utilization) {
169           data->utilization[absl::string_view(u.first.data(), u.first.size())] =
170               u.second;
171         }
172       });
173 }
174 
ClearCpuUtilization()175 void ServerMetricRecorder::ClearCpuUtilization() {
176   UpdateBackendMetricDataState(
177       [](BackendMetricData* data) { data->cpu_utilization = -1; });
178   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
179     gpr_log(GPR_INFO, "[%p] CPU utilization cleared.", this);
180   }
181 }
182 
ClearMemoryUtilization()183 void ServerMetricRecorder::ClearMemoryUtilization() {
184   UpdateBackendMetricDataState(
185       [](BackendMetricData* data) { data->mem_utilization = -1; });
186   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
187     gpr_log(GPR_INFO, "[%p] Mem utilization cleared.", this);
188   }
189 }
190 
ClearApplicationUtilization()191 void ServerMetricRecorder::ClearApplicationUtilization() {
192   UpdateBackendMetricDataState(
193       [](BackendMetricData* data) { data->application_utilization = -1; });
194   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
195     gpr_log(GPR_INFO, "[%p] Application utilization cleared.", this);
196   }
197 }
198 
ClearQps()199 void ServerMetricRecorder::ClearQps() {
200   UpdateBackendMetricDataState([](BackendMetricData* data) { data->qps = -1; });
201   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
202     gpr_log(GPR_INFO, "[%p] QPS utilization cleared.", this);
203   }
204 }
205 
ClearEps()206 void ServerMetricRecorder::ClearEps() {
207   UpdateBackendMetricDataState([](BackendMetricData* data) { data->eps = -1; });
208   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
209     gpr_log(GPR_INFO, "[%p] EPS utilization cleared.", this);
210   }
211 }
212 
ClearNamedUtilization(string_ref name)213 void ServerMetricRecorder::ClearNamedUtilization(string_ref name) {
214   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
215     gpr_log(GPR_INFO, "[%p] Named utilization cleared. name: %s", this,
216             std::string(name.data(), name.size()).c_str());
217   }
218   UpdateBackendMetricDataState([name](BackendMetricData* data) {
219     data->utilization.erase(absl::string_view(name.data(), name.size()));
220   });
221 }
222 
GetMetrics() const223 grpc_core::BackendMetricData ServerMetricRecorder::GetMetrics() const {
224   auto result = GetMetricsIfChanged();
225   return result->data;
226 }
227 
228 std::shared_ptr<const ServerMetricRecorder::BackendMetricDataState>
GetMetricsIfChanged() const229 ServerMetricRecorder::GetMetricsIfChanged() const {
230   std::shared_ptr<const BackendMetricDataState> result;
231   {
232     internal::MutexLock lock(&mu_);
233     result = metric_state_;
234   }
235   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
236     const auto& data = result->data;
237     gpr_log(GPR_INFO,
238             "[%p] GetMetrics() returned: seq:%" PRIu64
239             " cpu:%f mem:%f app:%f qps:%f eps:%f utilization size: %" PRIuPTR,
240             this, result->sequence_number, data.cpu_utilization,
241             data.mem_utilization, data.application_utilization, data.qps,
242             data.eps, data.utilization.size());
243   }
244   return result;
245 }
246 
247 }  // namespace experimental
248 
249 experimental::CallMetricRecorder&
RecordCpuUtilizationMetric(double value)250 BackendMetricState::RecordCpuUtilizationMetric(double value) {
251   if (!IsUtilizationWithSoftLimitsValid(value)) {
252     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
253       gpr_log(GPR_INFO, "[%p] CPU utilization value rejected: %f", this, value);
254     }
255     return *this;
256   }
257   cpu_utilization_.store(value, std::memory_order_relaxed);
258   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
259     gpr_log(GPR_INFO, "[%p] CPU utilization recorded: %f", this, value);
260   }
261   return *this;
262 }
263 
264 experimental::CallMetricRecorder&
RecordMemoryUtilizationMetric(double value)265 BackendMetricState::RecordMemoryUtilizationMetric(double value) {
266   if (!IsUtilizationValid(value)) {
267     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
268       gpr_log(GPR_INFO, "[%p] Mem utilization value rejected: %f", this, value);
269     }
270     return *this;
271   }
272   mem_utilization_.store(value, std::memory_order_relaxed);
273   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
274     gpr_log(GPR_INFO, "[%p] Mem utilization recorded: %f", this, value);
275   }
276   return *this;
277 }
278 
279 experimental::CallMetricRecorder&
RecordApplicationUtilizationMetric(double value)280 BackendMetricState::RecordApplicationUtilizationMetric(double value) {
281   if (!IsUtilizationWithSoftLimitsValid(value)) {
282     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
283       gpr_log(GPR_INFO, "[%p] Application utilization value rejected: %f", this,
284               value);
285     }
286     return *this;
287   }
288   application_utilization_.store(value, std::memory_order_relaxed);
289   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
290     gpr_log(GPR_INFO, "[%p] Application utilization recorded: %f", this, value);
291   }
292   return *this;
293 }
294 
RecordQpsMetric(double value)295 experimental::CallMetricRecorder& BackendMetricState::RecordQpsMetric(
296     double value) {
297   if (!IsRateValid(value)) {
298     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
299       gpr_log(GPR_INFO, "[%p] QPS value rejected: %f", this, value);
300     }
301     return *this;
302   }
303   qps_.store(value, std::memory_order_relaxed);
304   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
305     gpr_log(GPR_INFO, "[%p] QPS recorded: %f", this, value);
306   }
307   return *this;
308 }
309 
RecordEpsMetric(double value)310 experimental::CallMetricRecorder& BackendMetricState::RecordEpsMetric(
311     double value) {
312   if (!IsRateValid(value)) {
313     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
314       gpr_log(GPR_INFO, "[%p] EPS value rejected: %f", this, value);
315     }
316     return *this;
317   }
318   eps_.store(value, std::memory_order_relaxed);
319   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
320     gpr_log(GPR_INFO, "[%p] EPS recorded: %f", this, value);
321   }
322   return *this;
323 }
324 
RecordUtilizationMetric(string_ref name,double value)325 experimental::CallMetricRecorder& BackendMetricState::RecordUtilizationMetric(
326     string_ref name, double value) {
327   if (!IsUtilizationValid(value)) {
328     if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
329       gpr_log(GPR_INFO, "[%p] Utilization value rejected: %s %f", this,
330               std::string(name.data(), name.length()).c_str(), value);
331     }
332     return *this;
333   }
334   internal::MutexLock lock(&mu_);
335   absl::string_view name_sv(name.data(), name.length());
336   utilization_[name_sv] = value;
337   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
338     gpr_log(GPR_INFO, "[%p] Utilization recorded: %s %f", this,
339             std::string(name_sv).c_str(), value);
340   }
341   return *this;
342 }
343 
RecordRequestCostMetric(string_ref name,double value)344 experimental::CallMetricRecorder& BackendMetricState::RecordRequestCostMetric(
345     string_ref name, double value) {
346   internal::MutexLock lock(&mu_);
347   absl::string_view name_sv(name.data(), name.length());
348   request_cost_[name_sv] = value;
349   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
350     gpr_log(GPR_INFO, "[%p] Request cost recorded: %s %f", this,
351             std::string(name_sv).c_str(), value);
352   }
353   return *this;
354 }
355 
RecordNamedMetric(string_ref name,double value)356 experimental::CallMetricRecorder& BackendMetricState::RecordNamedMetric(
357     string_ref name, double value) {
358   internal::MutexLock lock(&mu_);
359   absl::string_view name_sv(name.data(), name.length());
360   named_metrics_[name_sv] = value;
361   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
362     gpr_log(GPR_INFO, "[%p] Named metric recorded: %s %f", this,
363             std::string(name_sv).c_str(), value);
364   }
365   return *this;
366 }
367 
GetBackendMetricData()368 BackendMetricData BackendMetricState::GetBackendMetricData() {
369   // Merge metrics from the ServerMetricRecorder first since metrics recorded
370   // to CallMetricRecorder takes a higher precedence.
371   BackendMetricData data;
372   if (server_metric_recorder_ != nullptr) {
373     data = server_metric_recorder_->GetMetrics();
374   }
375   // Only overwrite if the value is set i.e. in the valid range.
376   const double cpu = cpu_utilization_.load(std::memory_order_relaxed);
377   if (IsUtilizationWithSoftLimitsValid(cpu)) {
378     data.cpu_utilization = cpu;
379   }
380   const double mem = mem_utilization_.load(std::memory_order_relaxed);
381   if (IsUtilizationValid(mem)) {
382     data.mem_utilization = mem;
383   }
384   const double app_util =
385       application_utilization_.load(std::memory_order_relaxed);
386   if (IsUtilizationWithSoftLimitsValid(app_util)) {
387     data.application_utilization = app_util;
388   }
389   const double qps = qps_.load(std::memory_order_relaxed);
390   if (IsRateValid(qps)) {
391     data.qps = qps;
392   }
393   const double eps = eps_.load(std::memory_order_relaxed);
394   if (IsRateValid(eps)) {
395     data.eps = eps;
396   }
397   {
398     internal::MutexLock lock(&mu_);
399     for (const auto& u : utilization_) {
400       data.utilization[u.first] = u.second;
401     }
402     for (const auto& r : request_cost_) {
403       data.request_cost[r.first] = r.second;
404     }
405     for (const auto& r : named_metrics_) {
406       data.named_metrics[r.first] = r.second;
407     }
408   }
409   if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
410     gpr_log(GPR_INFO,
411             "[%p] Backend metric data returned: cpu:%f mem:%f qps:%f eps:%f "
412             "utilization size:%" PRIuPTR " request_cost size:%" PRIuPTR
413             "named_metrics size:%" PRIuPTR,
414             this, data.cpu_utilization, data.mem_utilization, data.qps,
415             data.eps, data.utilization.size(), data.request_cost.size(),
416             data.named_metrics.size());
417   }
418   return data;
419 }
420 
421 }  // namespace grpc
422