1 //
2 // Copyright 2023 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/cpp/server/backend_metric_recorder.h"
18
19 #include <inttypes.h>
20
21 #include <functional>
22 #include <memory>
23 #include <string>
24 #include <type_traits>
25 #include <utility>
26
27 #include <grpc/support/log.h>
28 #include <grpcpp/ext/call_metric_recorder.h>
29 #include <grpcpp/ext/server_metric_recorder.h>
30
31 #include "src/core/lib/debug/trace.h"
32 #include "src/core/load_balancing/backend_metric_data.h"
33
34 using grpc_core::BackendMetricData;
35
36 namespace {
37 // Utilization values with soft limits must be in [0, infy).
IsUtilizationWithSoftLimitsValid(double util)38 bool IsUtilizationWithSoftLimitsValid(double util) { return util >= 0.0; }
39
40 // Other utilization values must be in [0, 1].
IsUtilizationValid(double utilization)41 bool IsUtilizationValid(double utilization) {
42 return utilization >= 0.0 && utilization <= 1.0;
43 }
44
45 // Rate values (qps and eps) must be in [0, infy).
IsRateValid(double rate)46 bool IsRateValid(double rate) { return rate >= 0.0; }
47
48 grpc_core::TraceFlag grpc_backend_metric_trace(false, "backend_metric");
49 } // namespace
50
51 namespace grpc {
52 namespace experimental {
53
Create()54 std::unique_ptr<ServerMetricRecorder> ServerMetricRecorder::Create() {
55 return std::unique_ptr<ServerMetricRecorder>(new ServerMetricRecorder());
56 }
57
ServerMetricRecorder()58 ServerMetricRecorder::ServerMetricRecorder()
59 : metric_state_(std::make_shared<const BackendMetricDataState>()) {}
60
UpdateBackendMetricDataState(std::function<void (BackendMetricData *)> updater)61 void ServerMetricRecorder::UpdateBackendMetricDataState(
62 std::function<void(BackendMetricData*)> updater) {
63 internal::MutexLock lock(&mu_);
64 auto new_state = std::make_shared<BackendMetricDataState>(*metric_state_);
65 updater(&new_state->data);
66 ++new_state->sequence_number;
67 metric_state_ = std::move(new_state);
68 }
69
SetCpuUtilization(double value)70 void ServerMetricRecorder::SetCpuUtilization(double value) {
71 if (!IsUtilizationWithSoftLimitsValid(value)) {
72 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
73 gpr_log(GPR_INFO, "[%p] CPU utilization rejected: %f", this, value);
74 }
75 return;
76 }
77 UpdateBackendMetricDataState(
78 [value](BackendMetricData* data) { data->cpu_utilization = value; });
79 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
80 gpr_log(GPR_INFO, "[%p] CPU utilization set: %f", this, value);
81 }
82 }
83
SetMemoryUtilization(double value)84 void ServerMetricRecorder::SetMemoryUtilization(double value) {
85 if (!IsUtilizationValid(value)) {
86 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
87 gpr_log(GPR_INFO, "[%p] Mem utilization rejected: %f", this, value);
88 }
89 return;
90 }
91 UpdateBackendMetricDataState(
92 [value](BackendMetricData* data) { data->mem_utilization = value; });
93 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
94 gpr_log(GPR_INFO, "[%p] Mem utilization set: %f", this, value);
95 }
96 }
97
SetApplicationUtilization(double value)98 void ServerMetricRecorder::SetApplicationUtilization(double value) {
99 if (!IsUtilizationWithSoftLimitsValid(value)) {
100 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
101 gpr_log(GPR_INFO, "[%p] Application utilization rejected: %f", this,
102 value);
103 }
104 return;
105 }
106 UpdateBackendMetricDataState([value](BackendMetricData* data) {
107 data->application_utilization = value;
108 });
109 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
110 gpr_log(GPR_INFO, "[%p] Application utilization set: %f", this, value);
111 }
112 }
113
SetQps(double value)114 void ServerMetricRecorder::SetQps(double value) {
115 if (!IsRateValid(value)) {
116 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
117 gpr_log(GPR_INFO, "[%p] QPS rejected: %f", this, value);
118 }
119 return;
120 }
121 UpdateBackendMetricDataState(
122 [value](BackendMetricData* data) { data->qps = value; });
123 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
124 gpr_log(GPR_INFO, "[%p] QPS set: %f", this, value);
125 }
126 }
127
SetEps(double value)128 void ServerMetricRecorder::SetEps(double value) {
129 if (!IsRateValid(value)) {
130 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
131 gpr_log(GPR_INFO, "[%p] EPS rejected: %f", this, value);
132 }
133 return;
134 }
135 UpdateBackendMetricDataState(
136 [value](BackendMetricData* data) { data->eps = value; });
137 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
138 gpr_log(GPR_INFO, "[%p] EPS set: %f", this, value);
139 }
140 }
141
SetNamedUtilization(string_ref name,double value)142 void ServerMetricRecorder::SetNamedUtilization(string_ref name, double value) {
143 if (!IsUtilizationValid(value)) {
144 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
145 gpr_log(GPR_INFO, "[%p] Named utilization rejected: %f name: %s", this,
146 value, std::string(name.data(), name.size()).c_str());
147 }
148 return;
149 }
150 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
151 gpr_log(GPR_INFO, "[%p] Named utilization set: %f name: %s", this, value,
152 std::string(name.data(), name.size()).c_str());
153 }
154 UpdateBackendMetricDataState([name, value](BackendMetricData* data) {
155 data->utilization[absl::string_view(name.data(), name.size())] = value;
156 });
157 }
158
SetAllNamedUtilization(std::map<string_ref,double> named_utilization)159 void ServerMetricRecorder::SetAllNamedUtilization(
160 std::map<string_ref, double> named_utilization) {
161 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
162 gpr_log(GPR_INFO, "[%p] All named utilization updated. size: %" PRIuPTR,
163 this, named_utilization.size());
164 }
165 UpdateBackendMetricDataState(
166 [utilization = std::move(named_utilization)](BackendMetricData* data) {
167 data->utilization.clear();
168 for (const auto& u : utilization) {
169 data->utilization[absl::string_view(u.first.data(), u.first.size())] =
170 u.second;
171 }
172 });
173 }
174
ClearCpuUtilization()175 void ServerMetricRecorder::ClearCpuUtilization() {
176 UpdateBackendMetricDataState(
177 [](BackendMetricData* data) { data->cpu_utilization = -1; });
178 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
179 gpr_log(GPR_INFO, "[%p] CPU utilization cleared.", this);
180 }
181 }
182
ClearMemoryUtilization()183 void ServerMetricRecorder::ClearMemoryUtilization() {
184 UpdateBackendMetricDataState(
185 [](BackendMetricData* data) { data->mem_utilization = -1; });
186 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
187 gpr_log(GPR_INFO, "[%p] Mem utilization cleared.", this);
188 }
189 }
190
ClearApplicationUtilization()191 void ServerMetricRecorder::ClearApplicationUtilization() {
192 UpdateBackendMetricDataState(
193 [](BackendMetricData* data) { data->application_utilization = -1; });
194 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
195 gpr_log(GPR_INFO, "[%p] Application utilization cleared.", this);
196 }
197 }
198
ClearQps()199 void ServerMetricRecorder::ClearQps() {
200 UpdateBackendMetricDataState([](BackendMetricData* data) { data->qps = -1; });
201 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
202 gpr_log(GPR_INFO, "[%p] QPS utilization cleared.", this);
203 }
204 }
205
ClearEps()206 void ServerMetricRecorder::ClearEps() {
207 UpdateBackendMetricDataState([](BackendMetricData* data) { data->eps = -1; });
208 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
209 gpr_log(GPR_INFO, "[%p] EPS utilization cleared.", this);
210 }
211 }
212
ClearNamedUtilization(string_ref name)213 void ServerMetricRecorder::ClearNamedUtilization(string_ref name) {
214 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
215 gpr_log(GPR_INFO, "[%p] Named utilization cleared. name: %s", this,
216 std::string(name.data(), name.size()).c_str());
217 }
218 UpdateBackendMetricDataState([name](BackendMetricData* data) {
219 data->utilization.erase(absl::string_view(name.data(), name.size()));
220 });
221 }
222
GetMetrics() const223 grpc_core::BackendMetricData ServerMetricRecorder::GetMetrics() const {
224 auto result = GetMetricsIfChanged();
225 return result->data;
226 }
227
228 std::shared_ptr<const ServerMetricRecorder::BackendMetricDataState>
GetMetricsIfChanged() const229 ServerMetricRecorder::GetMetricsIfChanged() const {
230 std::shared_ptr<const BackendMetricDataState> result;
231 {
232 internal::MutexLock lock(&mu_);
233 result = metric_state_;
234 }
235 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
236 const auto& data = result->data;
237 gpr_log(GPR_INFO,
238 "[%p] GetMetrics() returned: seq:%" PRIu64
239 " cpu:%f mem:%f app:%f qps:%f eps:%f utilization size: %" PRIuPTR,
240 this, result->sequence_number, data.cpu_utilization,
241 data.mem_utilization, data.application_utilization, data.qps,
242 data.eps, data.utilization.size());
243 }
244 return result;
245 }
246
247 } // namespace experimental
248
249 experimental::CallMetricRecorder&
RecordCpuUtilizationMetric(double value)250 BackendMetricState::RecordCpuUtilizationMetric(double value) {
251 if (!IsUtilizationWithSoftLimitsValid(value)) {
252 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
253 gpr_log(GPR_INFO, "[%p] CPU utilization value rejected: %f", this, value);
254 }
255 return *this;
256 }
257 cpu_utilization_.store(value, std::memory_order_relaxed);
258 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
259 gpr_log(GPR_INFO, "[%p] CPU utilization recorded: %f", this, value);
260 }
261 return *this;
262 }
263
264 experimental::CallMetricRecorder&
RecordMemoryUtilizationMetric(double value)265 BackendMetricState::RecordMemoryUtilizationMetric(double value) {
266 if (!IsUtilizationValid(value)) {
267 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
268 gpr_log(GPR_INFO, "[%p] Mem utilization value rejected: %f", this, value);
269 }
270 return *this;
271 }
272 mem_utilization_.store(value, std::memory_order_relaxed);
273 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
274 gpr_log(GPR_INFO, "[%p] Mem utilization recorded: %f", this, value);
275 }
276 return *this;
277 }
278
279 experimental::CallMetricRecorder&
RecordApplicationUtilizationMetric(double value)280 BackendMetricState::RecordApplicationUtilizationMetric(double value) {
281 if (!IsUtilizationWithSoftLimitsValid(value)) {
282 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
283 gpr_log(GPR_INFO, "[%p] Application utilization value rejected: %f", this,
284 value);
285 }
286 return *this;
287 }
288 application_utilization_.store(value, std::memory_order_relaxed);
289 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
290 gpr_log(GPR_INFO, "[%p] Application utilization recorded: %f", this, value);
291 }
292 return *this;
293 }
294
RecordQpsMetric(double value)295 experimental::CallMetricRecorder& BackendMetricState::RecordQpsMetric(
296 double value) {
297 if (!IsRateValid(value)) {
298 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
299 gpr_log(GPR_INFO, "[%p] QPS value rejected: %f", this, value);
300 }
301 return *this;
302 }
303 qps_.store(value, std::memory_order_relaxed);
304 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
305 gpr_log(GPR_INFO, "[%p] QPS recorded: %f", this, value);
306 }
307 return *this;
308 }
309
RecordEpsMetric(double value)310 experimental::CallMetricRecorder& BackendMetricState::RecordEpsMetric(
311 double value) {
312 if (!IsRateValid(value)) {
313 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
314 gpr_log(GPR_INFO, "[%p] EPS value rejected: %f", this, value);
315 }
316 return *this;
317 }
318 eps_.store(value, std::memory_order_relaxed);
319 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
320 gpr_log(GPR_INFO, "[%p] EPS recorded: %f", this, value);
321 }
322 return *this;
323 }
324
RecordUtilizationMetric(string_ref name,double value)325 experimental::CallMetricRecorder& BackendMetricState::RecordUtilizationMetric(
326 string_ref name, double value) {
327 if (!IsUtilizationValid(value)) {
328 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
329 gpr_log(GPR_INFO, "[%p] Utilization value rejected: %s %f", this,
330 std::string(name.data(), name.length()).c_str(), value);
331 }
332 return *this;
333 }
334 internal::MutexLock lock(&mu_);
335 absl::string_view name_sv(name.data(), name.length());
336 utilization_[name_sv] = value;
337 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
338 gpr_log(GPR_INFO, "[%p] Utilization recorded: %s %f", this,
339 std::string(name_sv).c_str(), value);
340 }
341 return *this;
342 }
343
RecordRequestCostMetric(string_ref name,double value)344 experimental::CallMetricRecorder& BackendMetricState::RecordRequestCostMetric(
345 string_ref name, double value) {
346 internal::MutexLock lock(&mu_);
347 absl::string_view name_sv(name.data(), name.length());
348 request_cost_[name_sv] = value;
349 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
350 gpr_log(GPR_INFO, "[%p] Request cost recorded: %s %f", this,
351 std::string(name_sv).c_str(), value);
352 }
353 return *this;
354 }
355
RecordNamedMetric(string_ref name,double value)356 experimental::CallMetricRecorder& BackendMetricState::RecordNamedMetric(
357 string_ref name, double value) {
358 internal::MutexLock lock(&mu_);
359 absl::string_view name_sv(name.data(), name.length());
360 named_metrics_[name_sv] = value;
361 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
362 gpr_log(GPR_INFO, "[%p] Named metric recorded: %s %f", this,
363 std::string(name_sv).c_str(), value);
364 }
365 return *this;
366 }
367
GetBackendMetricData()368 BackendMetricData BackendMetricState::GetBackendMetricData() {
369 // Merge metrics from the ServerMetricRecorder first since metrics recorded
370 // to CallMetricRecorder takes a higher precedence.
371 BackendMetricData data;
372 if (server_metric_recorder_ != nullptr) {
373 data = server_metric_recorder_->GetMetrics();
374 }
375 // Only overwrite if the value is set i.e. in the valid range.
376 const double cpu = cpu_utilization_.load(std::memory_order_relaxed);
377 if (IsUtilizationWithSoftLimitsValid(cpu)) {
378 data.cpu_utilization = cpu;
379 }
380 const double mem = mem_utilization_.load(std::memory_order_relaxed);
381 if (IsUtilizationValid(mem)) {
382 data.mem_utilization = mem;
383 }
384 const double app_util =
385 application_utilization_.load(std::memory_order_relaxed);
386 if (IsUtilizationWithSoftLimitsValid(app_util)) {
387 data.application_utilization = app_util;
388 }
389 const double qps = qps_.load(std::memory_order_relaxed);
390 if (IsRateValid(qps)) {
391 data.qps = qps;
392 }
393 const double eps = eps_.load(std::memory_order_relaxed);
394 if (IsRateValid(eps)) {
395 data.eps = eps;
396 }
397 {
398 internal::MutexLock lock(&mu_);
399 for (const auto& u : utilization_) {
400 data.utilization[u.first] = u.second;
401 }
402 for (const auto& r : request_cost_) {
403 data.request_cost[r.first] = r.second;
404 }
405 for (const auto& r : named_metrics_) {
406 data.named_metrics[r.first] = r.second;
407 }
408 }
409 if (GRPC_TRACE_FLAG_ENABLED(grpc_backend_metric_trace)) {
410 gpr_log(GPR_INFO,
411 "[%p] Backend metric data returned: cpu:%f mem:%f qps:%f eps:%f "
412 "utilization size:%" PRIuPTR " request_cost size:%" PRIuPTR
413 "named_metrics size:%" PRIuPTR,
414 this, data.cpu_utilization, data.mem_utilization, data.qps,
415 data.eps, data.utilization.size(), data.request_cost.size(),
416 data.named_metrics.size());
417 }
418 return data;
419 }
420
421 } // namespace grpc
422