1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/cpp/server/load_reporter/load_reporter.h"
22
23 #include <inttypes.h>
24 #include <stdio.h>
25
26 #include <chrono>
27 #include <cstring>
28 #include <iterator>
29 #include <set>
30 #include <tuple>
31
32 #include "opencensus/tags/tag_key.h"
33
34 #include <grpc/support/log.h>
35
36 #include "src/cpp/server/load_reporter/constants.h"
37 #include "src/cpp/server/load_reporter/get_cpu_stats.h"
38
39 // IWYU pragma: no_include "google/protobuf/duration.pb.h"
40
41 namespace grpc {
42 namespace load_reporter {
43
GetCpuStats()44 CpuStatsProvider::CpuStatsSample CpuStatsProviderDefaultImpl::GetCpuStats() {
45 return GetCpuStatsImpl();
46 }
47
CensusViewProvider()48 CensusViewProvider::CensusViewProvider()
49 : tag_key_token_(::opencensus::tags::TagKey::Register(kTagKeyToken)),
50 tag_key_host_(::opencensus::tags::TagKey::Register(kTagKeyHost)),
51 tag_key_user_id_(::opencensus::tags::TagKey::Register(kTagKeyUserId)),
52 tag_key_status_(::opencensus::tags::TagKey::Register(kTagKeyStatus)),
53 tag_key_metric_name_(
54 ::opencensus::tags::TagKey::Register(kTagKeyMetricName)) {
55 // One view related to starting a call.
56 auto vd_start_count =
57 ::opencensus::stats::ViewDescriptor()
58 .set_name(kViewStartCount)
59 .set_measure(kMeasureStartCount)
60 .set_aggregation(::opencensus::stats::Aggregation::Sum())
61 .add_column(tag_key_token_)
62 .add_column(tag_key_host_)
63 .add_column(tag_key_user_id_)
64 .set_description(
65 "Delta count of calls started broken down by <token, host, "
66 "user_id>.");
67 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
68 &vd_start_count);
69 view_descriptor_map_.emplace(kViewStartCount, vd_start_count);
70 // Four views related to ending a call.
71 // If this view is set as Count of kMeasureEndBytesSent (in hope of saving one
72 // measure), it's infeasible to prepare fake data for testing. That's because
73 // the OpenCensus API to make up view data will add the input data as separate
74 // measurements instead of setting the data values directly.
75 auto vd_end_count =
76 ::opencensus::stats::ViewDescriptor()
77 .set_name(kViewEndCount)
78 .set_measure(kMeasureEndCount)
79 .set_aggregation(::opencensus::stats::Aggregation::Sum())
80 .add_column(tag_key_token_)
81 .add_column(tag_key_host_)
82 .add_column(tag_key_user_id_)
83 .add_column(tag_key_status_)
84 .set_description(
85 "Delta count of calls ended broken down by <token, host, "
86 "user_id, status>.");
87 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
88 &vd_end_count);
89 view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
90 auto vd_end_bytes_sent =
91 ::opencensus::stats::ViewDescriptor()
92 .set_name(kViewEndBytesSent)
93 .set_measure(kMeasureEndBytesSent)
94 .set_aggregation(::opencensus::stats::Aggregation::Sum())
95 .add_column(tag_key_token_)
96 .add_column(tag_key_host_)
97 .add_column(tag_key_user_id_)
98 .add_column(tag_key_status_)
99 .set_description(
100 "Delta sum of bytes sent broken down by <token, host, user_id, "
101 "status>.");
102 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
103 &vd_end_bytes_sent);
104 view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
105 auto vd_end_bytes_received =
106 ::opencensus::stats::ViewDescriptor()
107 .set_name(kViewEndBytesReceived)
108 .set_measure(kMeasureEndBytesReceived)
109 .set_aggregation(::opencensus::stats::Aggregation::Sum())
110 .add_column(tag_key_token_)
111 .add_column(tag_key_host_)
112 .add_column(tag_key_user_id_)
113 .add_column(tag_key_status_)
114 .set_description(
115 "Delta sum of bytes received broken down by <token, host, "
116 "user_id, status>.");
117 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
118 &vd_end_bytes_received);
119 view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
120 auto vd_end_latency_ms =
121 ::opencensus::stats::ViewDescriptor()
122 .set_name(kViewEndLatencyMs)
123 .set_measure(kMeasureEndLatencyMs)
124 .set_aggregation(::opencensus::stats::Aggregation::Sum())
125 .add_column(tag_key_token_)
126 .add_column(tag_key_host_)
127 .add_column(tag_key_user_id_)
128 .add_column(tag_key_status_)
129 .set_description(
130 "Delta sum of latency in ms broken down by <token, host, "
131 "user_id, status>.");
132 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
133 &vd_end_latency_ms);
134 view_descriptor_map_.emplace(kViewEndLatencyMs, vd_end_latency_ms);
135 // Two views related to other call metrics.
136 auto vd_metric_call_count =
137 ::opencensus::stats::ViewDescriptor()
138 .set_name(kViewOtherCallMetricCount)
139 .set_measure(kMeasureOtherCallMetric)
140 .set_aggregation(::opencensus::stats::Aggregation::Count())
141 .add_column(tag_key_token_)
142 .add_column(tag_key_host_)
143 .add_column(tag_key_user_id_)
144 .add_column(tag_key_metric_name_)
145 .set_description(
146 "Delta count of calls broken down by <token, host, user_id, "
147 "metric_name>.");
148 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
149 &vd_metric_call_count);
150 view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
151 auto vd_metric_value =
152 ::opencensus::stats::ViewDescriptor()
153 .set_name(kViewOtherCallMetricValue)
154 .set_measure(kMeasureOtherCallMetric)
155 .set_aggregation(::opencensus::stats::Aggregation::Sum())
156 .add_column(tag_key_token_)
157 .add_column(tag_key_host_)
158 .add_column(tag_key_user_id_)
159 .add_column(tag_key_metric_name_)
160 .set_description(
161 "Delta sum of call metric value broken down "
162 "by <token, host, user_id, metric_name>.");
163 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
164 &vd_metric_value);
165 view_descriptor_map_.emplace(kViewOtherCallMetricValue, vd_metric_value);
166 }
167
GetRelatedViewDataRowDouble(const ViewDataMap & view_data_map,const char * view_name,size_t view_name_len,const std::vector<std::string> & tag_values)168 double CensusViewProvider::GetRelatedViewDataRowDouble(
169 const ViewDataMap& view_data_map, const char* view_name,
170 size_t view_name_len, const std::vector<std::string>& tag_values) {
171 auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
172 GPR_ASSERT(it_vd != view_data_map.end());
173 GPR_ASSERT(it_vd->second.type() ==
174 ::opencensus::stats::ViewData::Type::kDouble);
175 auto it_row = it_vd->second.double_data().find(tag_values);
176 GPR_ASSERT(it_row != it_vd->second.double_data().end());
177 return it_row->second;
178 }
179
GetRelatedViewDataRowInt(const ViewDataMap & view_data_map,const char * view_name,size_t view_name_len,const std::vector<std::string> & tag_values)180 uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
181 const ViewDataMap& view_data_map, const char* view_name,
182 size_t view_name_len, const std::vector<std::string>& tag_values) {
183 auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
184 GPR_ASSERT(it_vd != view_data_map.end());
185 GPR_ASSERT(it_vd->second.type() ==
186 ::opencensus::stats::ViewData::Type::kInt64);
187 auto it_row = it_vd->second.int_data().find(tag_values);
188 GPR_ASSERT(it_row != it_vd->second.int_data().end());
189 GPR_ASSERT(it_row->second >= 0);
190 return it_row->second;
191 }
192
CensusViewProviderDefaultImpl()193 CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
194 for (const auto& p : view_descriptor_map()) {
195 const std::string& view_name = p.first;
196 const ::opencensus::stats::ViewDescriptor& vd = p.second;
197 // We need to use pair's piecewise ctor here, otherwise the deleted copy
198 // ctor of View will be called.
199 view_map_.emplace(std::piecewise_construct,
200 std::forward_as_tuple(view_name),
201 std::forward_as_tuple(vd));
202 }
203 }
204
FetchViewData()205 CensusViewProvider::ViewDataMap CensusViewProviderDefaultImpl::FetchViewData() {
206 gpr_log(GPR_DEBUG, "[CVP %p] Starts fetching Census view data.", this);
207 ViewDataMap view_data_map;
208 for (auto& p : view_map_) {
209 const std::string& view_name = p.first;
210 ::opencensus::stats::View& view = p.second;
211 if (view.IsValid()) {
212 view_data_map.emplace(view_name, view.GetData());
213 gpr_log(GPR_DEBUG, "[CVP %p] Fetched view data (view: %s).", this,
214 view_name.c_str());
215 } else {
216 gpr_log(
217 GPR_DEBUG,
218 "[CVP %p] Can't fetch view data because view is invalid (view: %s).",
219 this, view_name.c_str());
220 }
221 }
222 return view_data_map;
223 }
224
GenerateLbId()225 std::string LoadReporter::GenerateLbId() {
226 while (true) {
227 if (next_lb_id_ > UINT32_MAX) {
228 gpr_log(GPR_ERROR, "[LR %p] The LB ID exceeds the max valid value!",
229 this);
230 return "";
231 }
232 int64_t lb_id = next_lb_id_++;
233 // Overflow should never happen.
234 GPR_ASSERT(lb_id >= 0);
235 // Convert to padded hex string for a 32-bit LB ID. E.g, "0000ca5b".
236 char buf[kLbIdLength + 1];
237 snprintf(buf, sizeof(buf), "%08" PRIx64, lb_id);
238 std::string lb_id_str(buf, kLbIdLength);
239 // The client may send requests with LB ID that has never been allocated
240 // by this load reporter. Those IDs are tracked and will be skipped when
241 // we generate a new ID.
242 if (!load_data_store_.IsTrackedUnknownBalancerId(lb_id_str)) {
243 return lb_id_str;
244 }
245 }
246 }
247
248 ::grpc::lb::v1::LoadBalancingFeedback
GenerateLoadBalancingFeedback()249 LoadReporter::GenerateLoadBalancingFeedback() {
250 grpc_core::ReleasableMutexLock lock(&feedback_mu_);
251 auto now = std::chrono::system_clock::now();
252 // Discard records outside the window until there is only one record
253 // outside the window, which is used as the base for difference.
254 while (feedback_records_.size() > 1 &&
255 !IsRecordInWindow(feedback_records_[1], now)) {
256 feedback_records_.pop_front();
257 }
258 if (feedback_records_.size() < 2) {
259 return grpc::lb::v1::LoadBalancingFeedback::default_instance();
260 }
261 // Find the longest range with valid ends.
262 auto oldest = feedback_records_.begin();
263 auto newest = feedback_records_.end() - 1;
264 while (std::distance(oldest, newest) > 0 &&
265 (newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
266 // A zero limit means that the system info reading was failed, so these
267 // records can't be used to calculate CPU utilization.
268 if (newest->cpu_limit == 0) --newest;
269 if (oldest->cpu_limit == 0) ++oldest;
270 }
271 if (std::distance(oldest, newest) < 1 ||
272 oldest->end_time == newest->end_time ||
273 newest->cpu_limit == oldest->cpu_limit) {
274 return grpc::lb::v1::LoadBalancingFeedback::default_instance();
275 }
276 uint64_t rpcs = 0;
277 uint64_t errors = 0;
278 for (auto p = newest; p != oldest; --p) {
279 // Because these two numbers are counters, the oldest record shouldn't be
280 // included.
281 rpcs += p->rpcs;
282 errors += p->errors;
283 }
284 double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
285 double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
286 std::chrono::duration<double> duration_seconds =
287 newest->end_time - oldest->end_time;
288 lock.Release();
289 grpc::lb::v1::LoadBalancingFeedback feedback;
290 feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
291 feedback.set_calls_per_second(
292 static_cast<float>(rpcs / duration_seconds.count()));
293 feedback.set_errors_per_second(
294 static_cast<float>(errors / duration_seconds.count()));
295 return feedback;
296 }
297
298 ::google::protobuf::RepeatedPtrField<grpc::lb::v1::Load>
GenerateLoads(const std::string & hostname,const std::string & lb_id)299 LoadReporter::GenerateLoads(const std::string& hostname,
300 const std::string& lb_id) {
301 grpc_core::MutexLock lock(&store_mu_);
302 auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
303 GPR_ASSERT(assigned_stores != nullptr);
304 GPR_ASSERT(!assigned_stores->empty());
305 ::google::protobuf::RepeatedPtrField<grpc::lb::v1::Load> loads;
306 for (PerBalancerStore* per_balancer_store : *assigned_stores) {
307 GPR_ASSERT(!per_balancer_store->IsSuspended());
308 if (!per_balancer_store->load_record_map().empty()) {
309 for (const auto& p : per_balancer_store->load_record_map()) {
310 const auto& key = p.first;
311 const auto& value = p.second;
312 auto load = loads.Add();
313 load->set_load_balance_tag(key.lb_tag());
314 load->set_user_id(key.user_id());
315 load->set_client_ip_address(key.GetClientIpBytes());
316 load->set_num_calls_started(static_cast<int64_t>(value.start_count()));
317 load->set_num_calls_finished_without_error(
318 static_cast<int64_t>(value.ok_count()));
319 load->set_num_calls_finished_with_error(
320 static_cast<int64_t>(value.error_count()));
321 load->set_total_bytes_sent(static_cast<int64_t>(value.bytes_sent()));
322 load->set_total_bytes_received(
323 static_cast<int64_t>(value.bytes_recv()));
324 load->mutable_total_latency()->set_seconds(
325 static_cast<int64_t>(value.latency_ms() / 1000));
326 load->mutable_total_latency()->set_nanos(
327 (static_cast<int32_t>(value.latency_ms()) % 1000) * 1000000);
328 for (const auto& p : value.call_metrics()) {
329 const std::string& metric_name = p.first;
330 const CallMetricValue& metric_value = p.second;
331 auto call_metric_data = load->add_metric_data();
332 call_metric_data->set_metric_name(metric_name);
333 call_metric_data->set_num_calls_finished_with_metric(
334 metric_value.num_calls());
335 call_metric_data->set_total_metric_value(
336 metric_value.total_metric_value());
337 }
338 if (per_balancer_store->lb_id() != lb_id) {
339 // This per-balancer store is an orphan assigned to this receiving
340 // balancer.
341 AttachOrphanLoadId(load, *per_balancer_store);
342 }
343 }
344 per_balancer_store->ClearLoadRecordMap();
345 }
346 if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
347 auto load = loads.Add();
348 load->set_num_calls_in_progress(
349 per_balancer_store->GetNumCallsInProgressForReport());
350 if (per_balancer_store->lb_id() != lb_id) {
351 // This per-balancer store is an orphan assigned to this receiving
352 // balancer.
353 AttachOrphanLoadId(load, *per_balancer_store);
354 }
355 }
356 }
357 return loads;
358 }
359
AttachOrphanLoadId(grpc::lb::v1::Load * load,const PerBalancerStore & per_balancer_store)360 void LoadReporter::AttachOrphanLoadId(
361 grpc::lb::v1::Load* load, const PerBalancerStore& per_balancer_store) {
362 if (per_balancer_store.lb_id() == kInvalidLbId) {
363 load->set_load_key_unknown(true);
364 } else {
365 // We shouldn't set load_key_unknown to any value in this case because
366 // load_key_unknown and orphaned_load_identifier are under an oneof struct.
367 load->mutable_orphaned_load_identifier()->set_load_key(
368 per_balancer_store.load_key());
369 load->mutable_orphaned_load_identifier()->set_load_balancer_id(
370 per_balancer_store.lb_id());
371 }
372 }
373
AppendNewFeedbackRecord(uint64_t rpcs,uint64_t errors)374 void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
375 CpuStatsProvider::CpuStatsSample cpu_stats;
376 if (cpu_stats_provider_ != nullptr) {
377 cpu_stats = cpu_stats_provider_->GetCpuStats();
378 } else {
379 // This will make the load balancing feedback generation a no-op.
380 cpu_stats = {0, 0};
381 }
382 grpc_core::MutexLock lock(&feedback_mu_);
383 feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
384 cpu_stats.first, cpu_stats.second);
385 }
386
ReportStreamCreated(const std::string & hostname,const std::string & lb_id,const std::string & load_key)387 void LoadReporter::ReportStreamCreated(const std::string& hostname,
388 const std::string& lb_id,
389 const std::string& load_key) {
390 grpc_core::MutexLock lock(&store_mu_);
391 load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
392 gpr_log(GPR_INFO,
393 "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
394 this, hostname.c_str(), lb_id.c_str(), load_key.c_str());
395 }
396
ReportStreamClosed(const std::string & hostname,const std::string & lb_id)397 void LoadReporter::ReportStreamClosed(const std::string& hostname,
398 const std::string& lb_id) {
399 grpc_core::MutexLock lock(&store_mu_);
400 load_data_store_.ReportStreamClosed(hostname, lb_id);
401 gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this,
402 hostname.c_str(), lb_id.c_str());
403 }
404
ProcessViewDataCallStart(const CensusViewProvider::ViewDataMap & view_data_map)405 void LoadReporter::ProcessViewDataCallStart(
406 const CensusViewProvider::ViewDataMap& view_data_map) {
407 auto it = view_data_map.find(kViewStartCount);
408 if (it != view_data_map.end()) {
409 for (const auto& p : it->second.int_data()) {
410 const std::vector<std::string>& tag_values = p.first;
411 const uint64_t start_count = static_cast<uint64_t>(p.second);
412 const std::string& client_ip_and_token = tag_values[0];
413 const std::string& host = tag_values[1];
414 const std::string& user_id = tag_values[2];
415 LoadRecordKey key(client_ip_and_token, user_id);
416 LoadRecordValue value = LoadRecordValue(start_count);
417 {
418 grpc_core::MutexLock lock(&store_mu_);
419 load_data_store_.MergeRow(host, key, value);
420 }
421 }
422 }
423 }
424
ProcessViewDataCallEnd(const CensusViewProvider::ViewDataMap & view_data_map)425 void LoadReporter::ProcessViewDataCallEnd(
426 const CensusViewProvider::ViewDataMap& view_data_map) {
427 uint64_t total_end_count = 0;
428 uint64_t total_error_count = 0;
429 auto it = view_data_map.find(kViewEndCount);
430 if (it != view_data_map.end()) {
431 for (const auto& p : it->second.int_data()) {
432 const std::vector<std::string>& tag_values = p.first;
433 const uint64_t end_count = static_cast<uint64_t>(p.second);
434 const std::string& client_ip_and_token = tag_values[0];
435 const std::string& host = tag_values[1];
436 const std::string& user_id = tag_values[2];
437 const std::string& status = tag_values[3];
438 // This is due to a bug reported internally of Java server load reporting
439 // implementation.
440 // TODO(juanlishen): Check whether this situation happens in OSS C++.
441 if (client_ip_and_token.empty()) {
442 gpr_log(GPR_DEBUG,
443 "Skipping processing Opencensus record with empty "
444 "client_ip_and_token tag.");
445 continue;
446 }
447 LoadRecordKey key(client_ip_and_token, user_id);
448 const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
449 view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
450 tag_values);
451 const uint64_t bytes_received =
452 CensusViewProvider::GetRelatedViewDataRowInt(
453 view_data_map, kViewEndBytesReceived,
454 sizeof(kViewEndBytesReceived) - 1, tag_values);
455 const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
456 view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
457 tag_values);
458 uint64_t ok_count = 0;
459 uint64_t error_count = 0;
460 total_end_count += end_count;
461 if (std::strcmp(status.c_str(), kCallStatusOk) == 0) {
462 ok_count = end_count;
463 } else {
464 error_count = end_count;
465 total_error_count += end_count;
466 }
467 LoadRecordValue value = LoadRecordValue(
468 0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
469 {
470 grpc_core::MutexLock lock(&store_mu_);
471 load_data_store_.MergeRow(host, key, value);
472 }
473 }
474 }
475 AppendNewFeedbackRecord(total_end_count, total_error_count);
476 }
477
ProcessViewDataOtherCallMetrics(const CensusViewProvider::ViewDataMap & view_data_map)478 void LoadReporter::ProcessViewDataOtherCallMetrics(
479 const CensusViewProvider::ViewDataMap& view_data_map) {
480 auto it = view_data_map.find(kViewOtherCallMetricCount);
481 if (it != view_data_map.end()) {
482 for (const auto& p : it->second.int_data()) {
483 const std::vector<std::string>& tag_values = p.first;
484 const int64_t num_calls = p.second;
485 const std::string& client_ip_and_token = tag_values[0];
486 const std::string& host = tag_values[1];
487 const std::string& user_id = tag_values[2];
488 const std::string& metric_name = tag_values[3];
489 LoadRecordKey key(client_ip_and_token, user_id);
490 const double total_metric_value =
491 CensusViewProvider::GetRelatedViewDataRowDouble(
492 view_data_map, kViewOtherCallMetricValue,
493 sizeof(kViewOtherCallMetricValue) - 1, tag_values);
494 LoadRecordValue value = LoadRecordValue(
495 metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
496 {
497 grpc_core::MutexLock lock(&store_mu_);
498 load_data_store_.MergeRow(host, key, value);
499 }
500 }
501 }
502 }
503
FetchAndSample()504 void LoadReporter::FetchAndSample() {
505 gpr_log(GPR_DEBUG,
506 "[LR %p] Starts fetching Census view data and sampling LB feedback "
507 "record.",
508 this);
509 CensusViewProvider::ViewDataMap view_data_map =
510 census_view_provider_->FetchViewData();
511 ProcessViewDataCallStart(view_data_map);
512 ProcessViewDataCallEnd(view_data_map);
513 ProcessViewDataOtherCallMetrics(view_data_map);
514 }
515
516 } // namespace load_reporter
517 } // namespace grpc
518