1 //
2 // Copyright 2023 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/cpp/server/backend_metric_recorder.h"
18
19 #include <grpcpp/ext/call_metric_recorder.h>
20 #include <grpcpp/ext/server_metric_recorder.h>
21 #include <inttypes.h>
22
23 #include <functional>
24 #include <memory>
25 #include <string>
26 #include <type_traits>
27 #include <utility>
28
29 #include "absl/log/log.h"
30 #include "src/core/lib/debug/trace.h"
31 #include "src/core/load_balancing/backend_metric_data.h"
32
33 using grpc_core::BackendMetricData;
34
35 namespace {
36 // Utilization values with soft limits must be in [0, infy).
IsUtilizationWithSoftLimitsValid(double util)37 bool IsUtilizationWithSoftLimitsValid(double util) { return util >= 0.0; }
38
39 // Other utilization values must be in [0, 1].
IsUtilizationValid(double utilization)40 bool IsUtilizationValid(double utilization) {
41 return utilization >= 0.0 && utilization <= 1.0;
42 }
43
44 // Rate values (qps and eps) must be in [0, infy).
IsRateValid(double rate)45 bool IsRateValid(double rate) { return rate >= 0.0; }
46
47 } // namespace
48
49 namespace grpc {
50 namespace experimental {
51
Create()52 std::unique_ptr<ServerMetricRecorder> ServerMetricRecorder::Create() {
53 return std::unique_ptr<ServerMetricRecorder>(new ServerMetricRecorder());
54 }
55
ServerMetricRecorder()56 ServerMetricRecorder::ServerMetricRecorder()
57 : metric_state_(std::make_shared<const BackendMetricDataState>()) {}
58
UpdateBackendMetricDataState(std::function<void (BackendMetricData *)> updater)59 void ServerMetricRecorder::UpdateBackendMetricDataState(
60 std::function<void(BackendMetricData*)> updater) {
61 internal::MutexLock lock(&mu_);
62 auto new_state = std::make_shared<BackendMetricDataState>(*metric_state_);
63 updater(&new_state->data);
64 ++new_state->sequence_number;
65 metric_state_ = std::move(new_state);
66 }
67
SetCpuUtilization(double value)68 void ServerMetricRecorder::SetCpuUtilization(double value) {
69 if (!IsUtilizationWithSoftLimitsValid(value)) {
70 GRPC_TRACE_LOG(backend_metric, INFO)
71 << "[" << this << "] CPU utilization rejected: " << value;
72 return;
73 }
74 UpdateBackendMetricDataState(
75 [value](BackendMetricData* data) { data->cpu_utilization = value; });
76 GRPC_TRACE_LOG(backend_metric, INFO)
77 << "[" << this << "] CPU utilization set: " << value;
78 }
79
SetMemoryUtilization(double value)80 void ServerMetricRecorder::SetMemoryUtilization(double value) {
81 if (!IsUtilizationValid(value)) {
82 GRPC_TRACE_LOG(backend_metric, INFO)
83 << "[" << this << "] Mem utilization rejected: " << value;
84 return;
85 }
86 UpdateBackendMetricDataState(
87 [value](BackendMetricData* data) { data->mem_utilization = value; });
88 GRPC_TRACE_LOG(backend_metric, INFO)
89 << "[" << this << "] Mem utilization set: " << value;
90 }
91
SetApplicationUtilization(double value)92 void ServerMetricRecorder::SetApplicationUtilization(double value) {
93 if (!IsUtilizationWithSoftLimitsValid(value)) {
94 GRPC_TRACE_LOG(backend_metric, INFO)
95 << "[" << this << "] Application utilization rejected: " << value;
96 return;
97 }
98 UpdateBackendMetricDataState([value](BackendMetricData* data) {
99 data->application_utilization = value;
100 });
101 GRPC_TRACE_LOG(backend_metric, INFO)
102 << "[" << this << "] Application utilization set: " << value;
103 }
104
SetQps(double value)105 void ServerMetricRecorder::SetQps(double value) {
106 if (!IsRateValid(value)) {
107 GRPC_TRACE_LOG(backend_metric, INFO)
108 << "[" << this << "] QPS rejected: " << value;
109 return;
110 }
111 UpdateBackendMetricDataState(
112 [value](BackendMetricData* data) { data->qps = value; });
113 GRPC_TRACE_LOG(backend_metric, INFO) << "[" << this << "] QPS set: " << value;
114 }
115
SetEps(double value)116 void ServerMetricRecorder::SetEps(double value) {
117 if (!IsRateValid(value)) {
118 GRPC_TRACE_LOG(backend_metric, INFO)
119 << "[" << this << "] EPS rejected: " << value;
120 return;
121 }
122 UpdateBackendMetricDataState(
123 [value](BackendMetricData* data) { data->eps = value; });
124 GRPC_TRACE_LOG(backend_metric, INFO) << "[" << this << "] EPS set: " << value;
125 }
126
SetNamedUtilization(string_ref name,double value)127 void ServerMetricRecorder::SetNamedUtilization(string_ref name, double value) {
128 if (!IsUtilizationValid(value)) {
129 GRPC_TRACE_LOG(backend_metric, INFO)
130 << "[" << this << "] Named utilization rejected: " << value
131 << " name: " << std::string(name.data(), name.size());
132 return;
133 }
134 GRPC_TRACE_LOG(backend_metric, INFO)
135 << "[" << this << "] Named utilization set: " << value
136 << " name: " << std::string(name.data(), name.size());
137 UpdateBackendMetricDataState([name, value](BackendMetricData* data) {
138 data->utilization[absl::string_view(name.data(), name.size())] = value;
139 });
140 }
141
SetAllNamedUtilization(std::map<string_ref,double> named_utilization)142 void ServerMetricRecorder::SetAllNamedUtilization(
143 std::map<string_ref, double> named_utilization) {
144 GRPC_TRACE_LOG(backend_metric, INFO)
145 << "[" << this
146 << "] All named utilization updated. size: " << named_utilization.size();
147 UpdateBackendMetricDataState(
148 [utilization = std::move(named_utilization)](BackendMetricData* data) {
149 data->utilization.clear();
150 for (const auto& u : utilization) {
151 data->utilization[absl::string_view(u.first.data(), u.first.size())] =
152 u.second;
153 }
154 });
155 }
156
ClearCpuUtilization()157 void ServerMetricRecorder::ClearCpuUtilization() {
158 UpdateBackendMetricDataState(
159 [](BackendMetricData* data) { data->cpu_utilization = -1; });
160 GRPC_TRACE_LOG(backend_metric, INFO)
161 << "[" << this << "] CPU utilization cleared.";
162 }
163
ClearMemoryUtilization()164 void ServerMetricRecorder::ClearMemoryUtilization() {
165 UpdateBackendMetricDataState(
166 [](BackendMetricData* data) { data->mem_utilization = -1; });
167 GRPC_TRACE_LOG(backend_metric, INFO)
168 << "[" << this << "] Mem utilization cleared.";
169 }
170
ClearApplicationUtilization()171 void ServerMetricRecorder::ClearApplicationUtilization() {
172 UpdateBackendMetricDataState(
173 [](BackendMetricData* data) { data->application_utilization = -1; });
174 GRPC_TRACE_LOG(backend_metric, INFO)
175 << "[" << this << "] Application utilization cleared.";
176 }
177
ClearQps()178 void ServerMetricRecorder::ClearQps() {
179 UpdateBackendMetricDataState([](BackendMetricData* data) { data->qps = -1; });
180 GRPC_TRACE_LOG(backend_metric, INFO)
181 << "[" << this << "] QPS utilization cleared.";
182 }
183
ClearEps()184 void ServerMetricRecorder::ClearEps() {
185 UpdateBackendMetricDataState([](BackendMetricData* data) { data->eps = -1; });
186 GRPC_TRACE_LOG(backend_metric, INFO)
187 << "[" << this << "] EPS utilization cleared.";
188 }
189
ClearNamedUtilization(string_ref name)190 void ServerMetricRecorder::ClearNamedUtilization(string_ref name) {
191 GRPC_TRACE_LOG(backend_metric, INFO)
192 << "[" << this << "] Named utilization cleared. name: "
193 << std::string(name.data(), name.size());
194 UpdateBackendMetricDataState([name](BackendMetricData* data) {
195 data->utilization.erase(absl::string_view(name.data(), name.size()));
196 });
197 }
198
GetMetrics() const199 grpc_core::BackendMetricData ServerMetricRecorder::GetMetrics() const {
200 auto result = GetMetricsIfChanged();
201 return result->data;
202 }
203
204 std::shared_ptr<const ServerMetricRecorder::BackendMetricDataState>
GetMetricsIfChanged() const205 ServerMetricRecorder::GetMetricsIfChanged() const {
206 std::shared_ptr<const BackendMetricDataState> result;
207 {
208 internal::MutexLock lock(&mu_);
209 result = metric_state_;
210 }
211 if (GRPC_TRACE_FLAG_ENABLED(backend_metric)) {
212 const auto& data = result->data;
213 LOG(INFO) << "[" << this
214 << "] GetMetrics() returned: seq:" << result->sequence_number
215 << " cpu:" << data.cpu_utilization
216 << " mem:" << data.mem_utilization
217 << " app:" << data.application_utilization << " qps:" << data.qps
218 << " eps:" << data.eps
219 << " utilization size: " << data.utilization.size();
220 }
221 return result;
222 }
223
224 } // namespace experimental
225
226 experimental::CallMetricRecorder&
RecordCpuUtilizationMetric(double value)227 BackendMetricState::RecordCpuUtilizationMetric(double value) {
228 if (!IsUtilizationWithSoftLimitsValid(value)) {
229 GRPC_TRACE_LOG(backend_metric, INFO)
230 << "[" << this << "] CPU utilization value rejected: " << value;
231 return *this;
232 }
233 cpu_utilization_.store(value, std::memory_order_relaxed);
234 GRPC_TRACE_LOG(backend_metric, INFO)
235 << "[" << this << "] CPU utilization recorded: " << value;
236 return *this;
237 }
238
239 experimental::CallMetricRecorder&
RecordMemoryUtilizationMetric(double value)240 BackendMetricState::RecordMemoryUtilizationMetric(double value) {
241 if (!IsUtilizationValid(value)) {
242 GRPC_TRACE_LOG(backend_metric, INFO)
243 << "[" << this << "] Mem utilization value rejected: " << value;
244 return *this;
245 }
246 mem_utilization_.store(value, std::memory_order_relaxed);
247 GRPC_TRACE_LOG(backend_metric, INFO)
248 << "[" << this << "] Mem utilization recorded: " << value;
249 return *this;
250 }
251
252 experimental::CallMetricRecorder&
RecordApplicationUtilizationMetric(double value)253 BackendMetricState::RecordApplicationUtilizationMetric(double value) {
254 if (!IsUtilizationWithSoftLimitsValid(value)) {
255 GRPC_TRACE_LOG(backend_metric, INFO)
256 << "[" << this << "] Application utilization value rejected: " << value;
257 return *this;
258 }
259 application_utilization_.store(value, std::memory_order_relaxed);
260 GRPC_TRACE_LOG(backend_metric, INFO)
261 << "[" << this << "] Application utilization recorded: " << value;
262 return *this;
263 }
264
RecordQpsMetric(double value)265 experimental::CallMetricRecorder& BackendMetricState::RecordQpsMetric(
266 double value) {
267 if (!IsRateValid(value)) {
268 GRPC_TRACE_LOG(backend_metric, INFO)
269 << "[" << this << "] QPS value rejected: " << value;
270 return *this;
271 }
272 qps_.store(value, std::memory_order_relaxed);
273 GRPC_TRACE_LOG(backend_metric, INFO)
274 << "[" << this << "] QPS recorded: " << value;
275 return *this;
276 }
277
RecordEpsMetric(double value)278 experimental::CallMetricRecorder& BackendMetricState::RecordEpsMetric(
279 double value) {
280 if (!IsRateValid(value)) {
281 GRPC_TRACE_LOG(backend_metric, INFO)
282 << "[" << this << "] EPS value rejected: " << value;
283 return *this;
284 }
285 eps_.store(value, std::memory_order_relaxed);
286 GRPC_TRACE_LOG(backend_metric, INFO)
287 << "[" << this << "] EPS recorded: " << value;
288 return *this;
289 }
290
RecordUtilizationMetric(string_ref name,double value)291 experimental::CallMetricRecorder& BackendMetricState::RecordUtilizationMetric(
292 string_ref name, double value) {
293 if (!IsUtilizationValid(value)) {
294 GRPC_TRACE_LOG(backend_metric, INFO)
295 << "[" << this << "] Utilization value rejected: "
296 << std::string(name.data(), name.length()) << " " << value;
297 return *this;
298 }
299 internal::MutexLock lock(&mu_);
300 absl::string_view name_sv(name.data(), name.length());
301 utilization_[name_sv] = value;
302 GRPC_TRACE_LOG(backend_metric, INFO)
303 << "[" << this << "] Utilization recorded: " << name_sv << " " << value;
304 return *this;
305 }
306
RecordRequestCostMetric(string_ref name,double value)307 experimental::CallMetricRecorder& BackendMetricState::RecordRequestCostMetric(
308 string_ref name, double value) {
309 internal::MutexLock lock(&mu_);
310 absl::string_view name_sv(name.data(), name.length());
311 request_cost_[name_sv] = value;
312 GRPC_TRACE_LOG(backend_metric, INFO)
313 << "[" << this << "] Request cost recorded: " << name_sv << " " << value;
314 return *this;
315 }
316
RecordNamedMetric(string_ref name,double value)317 experimental::CallMetricRecorder& BackendMetricState::RecordNamedMetric(
318 string_ref name, double value) {
319 internal::MutexLock lock(&mu_);
320 absl::string_view name_sv(name.data(), name.length());
321 named_metrics_[name_sv] = value;
322 GRPC_TRACE_LOG(backend_metric, INFO)
323 << "[" << this << "] Named metric recorded: " << name_sv << " " << value;
324 return *this;
325 }
326
GetBackendMetricData()327 BackendMetricData BackendMetricState::GetBackendMetricData() {
328 // Merge metrics from the ServerMetricRecorder first since metrics recorded
329 // to CallMetricRecorder takes a higher precedence.
330 BackendMetricData data;
331 if (server_metric_recorder_ != nullptr) {
332 data = server_metric_recorder_->GetMetrics();
333 }
334 // Only overwrite if the value is set i.e. in the valid range.
335 const double cpu = cpu_utilization_.load(std::memory_order_relaxed);
336 if (IsUtilizationWithSoftLimitsValid(cpu)) {
337 data.cpu_utilization = cpu;
338 }
339 const double mem = mem_utilization_.load(std::memory_order_relaxed);
340 if (IsUtilizationValid(mem)) {
341 data.mem_utilization = mem;
342 }
343 const double app_util =
344 application_utilization_.load(std::memory_order_relaxed);
345 if (IsUtilizationWithSoftLimitsValid(app_util)) {
346 data.application_utilization = app_util;
347 }
348 const double qps = qps_.load(std::memory_order_relaxed);
349 if (IsRateValid(qps)) {
350 data.qps = qps;
351 }
352 const double eps = eps_.load(std::memory_order_relaxed);
353 if (IsRateValid(eps)) {
354 data.eps = eps;
355 }
356 {
357 internal::MutexLock lock(&mu_);
358 for (const auto& u : utilization_) {
359 data.utilization[u.first] = u.second;
360 }
361 for (const auto& r : request_cost_) {
362 data.request_cost[r.first] = r.second;
363 }
364 for (const auto& r : named_metrics_) {
365 data.named_metrics[r.first] = r.second;
366 }
367 }
368 GRPC_TRACE_LOG(backend_metric, INFO)
369 << "[" << this
370 << "] Backend metric data returned: cpu:" << data.cpu_utilization
371 << " mem:" << data.mem_utilization << " qps:" << data.qps
372 << " eps:" << data.eps << " utilization size:" << data.utilization.size()
373 << " request_cost size:" << data.request_cost.size()
374 << "named_metrics size:" << data.named_metrics.size();
375 return data;
376 }
377
378 } // namespace grpc
379