1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/cpp/server/load_reporter/load_reporter.h"
20
21 #include <grpc/support/port_platform.h>
22 #include <inttypes.h>
23 #include <stdio.h>
24
25 #include <chrono>
26 #include <cstring>
27 #include <iterator>
28 #include <set>
29 #include <tuple>
30
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "opencensus/tags/tag_key.h"
34 #include "src/cpp/server/load_reporter/constants.h"
35 #include "src/cpp/server/load_reporter/get_cpu_stats.h"
36
37 // IWYU pragma: no_include "google/protobuf/duration.pb.h"
38
39 namespace grpc {
40 namespace load_reporter {
41
GetCpuStats()42 CpuStatsProvider::CpuStatsSample CpuStatsProviderDefaultImpl::GetCpuStats() {
43 return GetCpuStatsImpl();
44 }
45
CensusViewProvider()46 CensusViewProvider::CensusViewProvider()
47 : tag_key_token_(::opencensus::tags::TagKey::Register(kTagKeyToken)),
48 tag_key_host_(::opencensus::tags::TagKey::Register(kTagKeyHost)),
49 tag_key_user_id_(::opencensus::tags::TagKey::Register(kTagKeyUserId)),
50 tag_key_status_(::opencensus::tags::TagKey::Register(kTagKeyStatus)),
51 tag_key_metric_name_(
52 ::opencensus::tags::TagKey::Register(kTagKeyMetricName)) {
53 // One view related to starting a call.
54 auto vd_start_count =
55 ::opencensus::stats::ViewDescriptor()
56 .set_name(kViewStartCount)
57 .set_measure(kMeasureStartCount)
58 .set_aggregation(::opencensus::stats::Aggregation::Sum())
59 .add_column(tag_key_token_)
60 .add_column(tag_key_host_)
61 .add_column(tag_key_user_id_)
62 .set_description(
63 "Delta count of calls started broken down by <token, host, "
64 "user_id>.");
65 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
66 &vd_start_count);
67 view_descriptor_map_.emplace(kViewStartCount, vd_start_count);
68 // Four views related to ending a call.
69 // If this view is set as Count of kMeasureEndBytesSent (in hope of saving one
70 // measure), it's infeasible to prepare fake data for testing. That's because
71 // the OpenCensus API to make up view data will add the input data as separate
72 // measurements instead of setting the data values directly.
73 auto vd_end_count =
74 ::opencensus::stats::ViewDescriptor()
75 .set_name(kViewEndCount)
76 .set_measure(kMeasureEndCount)
77 .set_aggregation(::opencensus::stats::Aggregation::Sum())
78 .add_column(tag_key_token_)
79 .add_column(tag_key_host_)
80 .add_column(tag_key_user_id_)
81 .add_column(tag_key_status_)
82 .set_description(
83 "Delta count of calls ended broken down by <token, host, "
84 "user_id, status>.");
85 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
86 &vd_end_count);
87 view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
88 auto vd_end_bytes_sent =
89 ::opencensus::stats::ViewDescriptor()
90 .set_name(kViewEndBytesSent)
91 .set_measure(kMeasureEndBytesSent)
92 .set_aggregation(::opencensus::stats::Aggregation::Sum())
93 .add_column(tag_key_token_)
94 .add_column(tag_key_host_)
95 .add_column(tag_key_user_id_)
96 .add_column(tag_key_status_)
97 .set_description(
98 "Delta sum of bytes sent broken down by <token, host, user_id, "
99 "status>.");
100 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
101 &vd_end_bytes_sent);
102 view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
103 auto vd_end_bytes_received =
104 ::opencensus::stats::ViewDescriptor()
105 .set_name(kViewEndBytesReceived)
106 .set_measure(kMeasureEndBytesReceived)
107 .set_aggregation(::opencensus::stats::Aggregation::Sum())
108 .add_column(tag_key_token_)
109 .add_column(tag_key_host_)
110 .add_column(tag_key_user_id_)
111 .add_column(tag_key_status_)
112 .set_description(
113 "Delta sum of bytes received broken down by <token, host, "
114 "user_id, status>.");
115 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
116 &vd_end_bytes_received);
117 view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
118 auto vd_end_latency_ms =
119 ::opencensus::stats::ViewDescriptor()
120 .set_name(kViewEndLatencyMs)
121 .set_measure(kMeasureEndLatencyMs)
122 .set_aggregation(::opencensus::stats::Aggregation::Sum())
123 .add_column(tag_key_token_)
124 .add_column(tag_key_host_)
125 .add_column(tag_key_user_id_)
126 .add_column(tag_key_status_)
127 .set_description(
128 "Delta sum of latency in ms broken down by <token, host, "
129 "user_id, status>.");
130 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
131 &vd_end_latency_ms);
132 view_descriptor_map_.emplace(kViewEndLatencyMs, vd_end_latency_ms);
133 // Two views related to other call metrics.
134 auto vd_metric_call_count =
135 ::opencensus::stats::ViewDescriptor()
136 .set_name(kViewOtherCallMetricCount)
137 .set_measure(kMeasureOtherCallMetric)
138 .set_aggregation(::opencensus::stats::Aggregation::Count())
139 .add_column(tag_key_token_)
140 .add_column(tag_key_host_)
141 .add_column(tag_key_user_id_)
142 .add_column(tag_key_metric_name_)
143 .set_description(
144 "Delta count of calls broken down by <token, host, user_id, "
145 "metric_name>.");
146 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
147 &vd_metric_call_count);
148 view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
149 auto vd_metric_value =
150 ::opencensus::stats::ViewDescriptor()
151 .set_name(kViewOtherCallMetricValue)
152 .set_measure(kMeasureOtherCallMetric)
153 .set_aggregation(::opencensus::stats::Aggregation::Sum())
154 .add_column(tag_key_token_)
155 .add_column(tag_key_host_)
156 .add_column(tag_key_user_id_)
157 .add_column(tag_key_metric_name_)
158 .set_description(
159 "Delta sum of call metric value broken down "
160 "by <token, host, user_id, metric_name>.");
161 SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
162 &vd_metric_value);
163 view_descriptor_map_.emplace(kViewOtherCallMetricValue, vd_metric_value);
164 }
165
GetRelatedViewDataRowDouble(const ViewDataMap & view_data_map,const char * view_name,size_t view_name_len,const std::vector<std::string> & tag_values)166 double CensusViewProvider::GetRelatedViewDataRowDouble(
167 const ViewDataMap& view_data_map, const char* view_name,
168 size_t view_name_len, const std::vector<std::string>& tag_values) {
169 auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
170 CHECK(it_vd != view_data_map.end());
171 CHECK(it_vd->second.type() == ::opencensus::stats::ViewData::Type::kDouble);
172 auto it_row = it_vd->second.double_data().find(tag_values);
173 CHECK(it_row != it_vd->second.double_data().end());
174 return it_row->second;
175 }
176
GetRelatedViewDataRowInt(const ViewDataMap & view_data_map,const char * view_name,size_t view_name_len,const std::vector<std::string> & tag_values)177 uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
178 const ViewDataMap& view_data_map, const char* view_name,
179 size_t view_name_len, const std::vector<std::string>& tag_values) {
180 auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
181 CHECK(it_vd != view_data_map.end());
182 CHECK(it_vd->second.type() == ::opencensus::stats::ViewData::Type::kInt64);
183 auto it_row = it_vd->second.int_data().find(tag_values);
184 CHECK(it_row != it_vd->second.int_data().end());
185 CHECK_GE(it_row->second, 0);
186 return it_row->second;
187 }
188
CensusViewProviderDefaultImpl()189 CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
190 for (const auto& p : view_descriptor_map()) {
191 const std::string& view_name = p.first;
192 const ::opencensus::stats::ViewDescriptor& vd = p.second;
193 // We need to use pair's piecewise ctor here, otherwise the deleted copy
194 // ctor of View will be called.
195 view_map_.emplace(std::piecewise_construct,
196 std::forward_as_tuple(view_name),
197 std::forward_as_tuple(vd));
198 }
199 }
200
FetchViewData()201 CensusViewProvider::ViewDataMap CensusViewProviderDefaultImpl::FetchViewData() {
202 VLOG(2) << "[CVP " << this << "] Starts fetching Census view data.";
203 ViewDataMap view_data_map;
204 for (auto& p : view_map_) {
205 const std::string& view_name = p.first;
206 ::opencensus::stats::View& view = p.second;
207 if (view.IsValid()) {
208 view_data_map.emplace(view_name, view.GetData());
209 VLOG(2) << "[CVP " << this << "] Fetched view data (view: " << view_name
210 << ").";
211 } else {
212 VLOG(2) << "[CVP " << this
213 << "] Can't fetch view data because view is invalid (view: "
214 << view_name << ").";
215 }
216 }
217 return view_data_map;
218 }
219
GenerateLbId()220 std::string LoadReporter::GenerateLbId() {
221 while (true) {
222 if (next_lb_id_ > UINT32_MAX) {
223 LOG(ERROR) << "[LR " << this
224 << "] The LB ID exceeds the max valid value!";
225 return "";
226 }
227 int64_t lb_id = next_lb_id_++;
228 // Overflow should never happen.
229 CHECK_GE(lb_id, 0);
230 // Convert to padded hex string for a 32-bit LB ID. E.g, "0000ca5b".
231 char buf[kLbIdLength + 1];
232 snprintf(buf, sizeof(buf), "%08" PRIx64, lb_id);
233 std::string lb_id_str(buf, kLbIdLength);
234 // The client may send requests with LB ID that has never been allocated
235 // by this load reporter. Those IDs are tracked and will be skipped when
236 // we generate a new ID.
237 if (!load_data_store_.IsTrackedUnknownBalancerId(lb_id_str)) {
238 return lb_id_str;
239 }
240 }
241 }
242
243 ::grpc::lb::v1::LoadBalancingFeedback
GenerateLoadBalancingFeedback()244 LoadReporter::GenerateLoadBalancingFeedback() {
245 grpc_core::ReleasableMutexLock lock(&feedback_mu_);
246 auto now = std::chrono::system_clock::now();
247 // Discard records outside the window until there is only one record
248 // outside the window, which is used as the base for difference.
249 while (feedback_records_.size() > 1 &&
250 !IsRecordInWindow(feedback_records_[1], now)) {
251 feedback_records_.pop_front();
252 }
253 if (feedback_records_.size() < 2) {
254 return grpc::lb::v1::LoadBalancingFeedback::default_instance();
255 }
256 // Find the longest range with valid ends.
257 auto oldest = feedback_records_.begin();
258 auto newest = feedback_records_.end() - 1;
259 while (std::distance(oldest, newest) > 0 &&
260 (newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
261 // A zero limit means that the system info reading was failed, so these
262 // records can't be used to calculate CPU utilization.
263 if (newest->cpu_limit == 0) --newest;
264 if (oldest->cpu_limit == 0) ++oldest;
265 }
266 if (std::distance(oldest, newest) < 1 ||
267 oldest->end_time == newest->end_time ||
268 newest->cpu_limit == oldest->cpu_limit) {
269 return grpc::lb::v1::LoadBalancingFeedback::default_instance();
270 }
271 uint64_t rpcs = 0;
272 uint64_t errors = 0;
273 for (auto p = newest; p != oldest; --p) {
274 // Because these two numbers are counters, the oldest record shouldn't be
275 // included.
276 rpcs += p->rpcs;
277 errors += p->errors;
278 }
279 double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
280 double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
281 std::chrono::duration<double> duration_seconds =
282 newest->end_time - oldest->end_time;
283 lock.Release();
284 grpc::lb::v1::LoadBalancingFeedback feedback;
285 feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
286 feedback.set_calls_per_second(
287 static_cast<float>(rpcs / duration_seconds.count()));
288 feedback.set_errors_per_second(
289 static_cast<float>(errors / duration_seconds.count()));
290 return feedback;
291 }
292
293 ::google::protobuf::RepeatedPtrField<grpc::lb::v1::Load>
GenerateLoads(const std::string & hostname,const std::string & lb_id)294 LoadReporter::GenerateLoads(const std::string& hostname,
295 const std::string& lb_id) {
296 grpc_core::MutexLock lock(&store_mu_);
297 auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
298 CHECK_NE(assigned_stores, nullptr);
299 CHECK(!assigned_stores->empty());
300 ::google::protobuf::RepeatedPtrField<grpc::lb::v1::Load> loads;
301 for (PerBalancerStore* per_balancer_store : *assigned_stores) {
302 CHECK(!per_balancer_store->IsSuspended());
303 if (!per_balancer_store->load_record_map().empty()) {
304 for (const auto& p : per_balancer_store->load_record_map()) {
305 const auto& key = p.first;
306 const auto& value = p.second;
307 auto load = loads.Add();
308 load->set_load_balance_tag(key.lb_tag());
309 load->set_user_id(key.user_id());
310 load->set_client_ip_address(key.GetClientIpBytes());
311 load->set_num_calls_started(static_cast<int64_t>(value.start_count()));
312 load->set_num_calls_finished_without_error(
313 static_cast<int64_t>(value.ok_count()));
314 load->set_num_calls_finished_with_error(
315 static_cast<int64_t>(value.error_count()));
316 load->set_total_bytes_sent(static_cast<int64_t>(value.bytes_sent()));
317 load->set_total_bytes_received(
318 static_cast<int64_t>(value.bytes_recv()));
319 load->mutable_total_latency()->set_seconds(
320 static_cast<int64_t>(value.latency_ms() / 1000));
321 load->mutable_total_latency()->set_nanos(
322 (static_cast<int32_t>(value.latency_ms()) % 1000) * 1000000);
323 for (const auto& p : value.call_metrics()) {
324 const std::string& metric_name = p.first;
325 const CallMetricValue& metric_value = p.second;
326 auto call_metric_data = load->add_metric_data();
327 call_metric_data->set_metric_name(metric_name);
328 call_metric_data->set_num_calls_finished_with_metric(
329 metric_value.num_calls());
330 call_metric_data->set_total_metric_value(
331 metric_value.total_metric_value());
332 }
333 if (per_balancer_store->lb_id() != lb_id) {
334 // This per-balancer store is an orphan assigned to this receiving
335 // balancer.
336 AttachOrphanLoadId(load, *per_balancer_store);
337 }
338 }
339 per_balancer_store->ClearLoadRecordMap();
340 }
341 if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
342 auto load = loads.Add();
343 load->set_num_calls_in_progress(
344 per_balancer_store->GetNumCallsInProgressForReport());
345 if (per_balancer_store->lb_id() != lb_id) {
346 // This per-balancer store is an orphan assigned to this receiving
347 // balancer.
348 AttachOrphanLoadId(load, *per_balancer_store);
349 }
350 }
351 }
352 return loads;
353 }
354
AttachOrphanLoadId(grpc::lb::v1::Load * load,const PerBalancerStore & per_balancer_store)355 void LoadReporter::AttachOrphanLoadId(
356 grpc::lb::v1::Load* load, const PerBalancerStore& per_balancer_store) {
357 if (per_balancer_store.lb_id() == kInvalidLbId) {
358 load->set_load_key_unknown(true);
359 } else {
360 // We shouldn't set load_key_unknown to any value in this case because
361 // load_key_unknown and orphaned_load_identifier are under an oneof struct.
362 load->mutable_orphaned_load_identifier()->set_load_key(
363 per_balancer_store.load_key());
364 load->mutable_orphaned_load_identifier()->set_load_balancer_id(
365 per_balancer_store.lb_id());
366 }
367 }
368
AppendNewFeedbackRecord(uint64_t rpcs,uint64_t errors)369 void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
370 CpuStatsProvider::CpuStatsSample cpu_stats;
371 if (cpu_stats_provider_ != nullptr) {
372 cpu_stats = cpu_stats_provider_->GetCpuStats();
373 } else {
374 // This will make the load balancing feedback generation a no-op.
375 cpu_stats = {0, 0};
376 }
377 grpc_core::MutexLock lock(&feedback_mu_);
378 feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
379 cpu_stats.first, cpu_stats.second);
380 }
381
ReportStreamCreated(const std::string & hostname,const std::string & lb_id,const std::string & load_key)382 void LoadReporter::ReportStreamCreated(const std::string& hostname,
383 const std::string& lb_id,
384 const std::string& load_key) {
385 grpc_core::MutexLock lock(&store_mu_);
386 load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
387 LOG(INFO) << "[LR " << this << "] Report stream created (host: " << hostname
388 << ", LB ID: " << lb_id << ", load key: " << load_key << ").";
389 }
390
ReportStreamClosed(const std::string & hostname,const std::string & lb_id)391 void LoadReporter::ReportStreamClosed(const std::string& hostname,
392 const std::string& lb_id) {
393 grpc_core::MutexLock lock(&store_mu_);
394 load_data_store_.ReportStreamClosed(hostname, lb_id);
395 LOG(INFO) << "[LR " << this << "] Report stream closed (host: " << hostname
396 << ", LB ID: " << lb_id << ").";
397 }
398
ProcessViewDataCallStart(const CensusViewProvider::ViewDataMap & view_data_map)399 void LoadReporter::ProcessViewDataCallStart(
400 const CensusViewProvider::ViewDataMap& view_data_map) {
401 auto it = view_data_map.find(kViewStartCount);
402 if (it != view_data_map.end()) {
403 for (const auto& p : it->second.int_data()) {
404 const std::vector<std::string>& tag_values = p.first;
405 const uint64_t start_count = static_cast<uint64_t>(p.second);
406 const std::string& client_ip_and_token = tag_values[0];
407 const std::string& host = tag_values[1];
408 const std::string& user_id = tag_values[2];
409 LoadRecordKey key(client_ip_and_token, user_id);
410 LoadRecordValue value = LoadRecordValue(start_count);
411 {
412 grpc_core::MutexLock lock(&store_mu_);
413 load_data_store_.MergeRow(host, key, value);
414 }
415 }
416 }
417 }
418
ProcessViewDataCallEnd(const CensusViewProvider::ViewDataMap & view_data_map)419 void LoadReporter::ProcessViewDataCallEnd(
420 const CensusViewProvider::ViewDataMap& view_data_map) {
421 uint64_t total_end_count = 0;
422 uint64_t total_error_count = 0;
423 auto it = view_data_map.find(kViewEndCount);
424 if (it != view_data_map.end()) {
425 for (const auto& p : it->second.int_data()) {
426 const std::vector<std::string>& tag_values = p.first;
427 const uint64_t end_count = static_cast<uint64_t>(p.second);
428 const std::string& client_ip_and_token = tag_values[0];
429 const std::string& host = tag_values[1];
430 const std::string& user_id = tag_values[2];
431 const std::string& status = tag_values[3];
432 // This is due to a bug reported internally of Java server load reporting
433 // implementation.
434 // TODO(juanlishen): Check whether this situation happens in OSS C++.
435 if (client_ip_and_token.empty()) {
436 VLOG(2) << "Skipping processing Opencensus record with empty "
437 "client_ip_and_token tag.";
438 continue;
439 }
440 LoadRecordKey key(client_ip_and_token, user_id);
441 const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
442 view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
443 tag_values);
444 const uint64_t bytes_received =
445 CensusViewProvider::GetRelatedViewDataRowInt(
446 view_data_map, kViewEndBytesReceived,
447 sizeof(kViewEndBytesReceived) - 1, tag_values);
448 const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
449 view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
450 tag_values);
451 uint64_t ok_count = 0;
452 uint64_t error_count = 0;
453 total_end_count += end_count;
454 if (std::strcmp(status.c_str(), kCallStatusOk) == 0) {
455 ok_count = end_count;
456 } else {
457 error_count = end_count;
458 total_error_count += end_count;
459 }
460 LoadRecordValue value = LoadRecordValue(
461 0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
462 {
463 grpc_core::MutexLock lock(&store_mu_);
464 load_data_store_.MergeRow(host, key, value);
465 }
466 }
467 }
468 AppendNewFeedbackRecord(total_end_count, total_error_count);
469 }
470
ProcessViewDataOtherCallMetrics(const CensusViewProvider::ViewDataMap & view_data_map)471 void LoadReporter::ProcessViewDataOtherCallMetrics(
472 const CensusViewProvider::ViewDataMap& view_data_map) {
473 auto it = view_data_map.find(kViewOtherCallMetricCount);
474 if (it != view_data_map.end()) {
475 for (const auto& p : it->second.int_data()) {
476 const std::vector<std::string>& tag_values = p.first;
477 const int64_t num_calls = p.second;
478 const std::string& client_ip_and_token = tag_values[0];
479 const std::string& host = tag_values[1];
480 const std::string& user_id = tag_values[2];
481 const std::string& metric_name = tag_values[3];
482 LoadRecordKey key(client_ip_and_token, user_id);
483 const double total_metric_value =
484 CensusViewProvider::GetRelatedViewDataRowDouble(
485 view_data_map, kViewOtherCallMetricValue,
486 sizeof(kViewOtherCallMetricValue) - 1, tag_values);
487 LoadRecordValue value = LoadRecordValue(
488 metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
489 {
490 grpc_core::MutexLock lock(&store_mu_);
491 load_data_store_.MergeRow(host, key, value);
492 }
493 }
494 }
495 }
496
FetchAndSample()497 void LoadReporter::FetchAndSample() {
498 VLOG(2) << "[LR " << this
499 << "] Starts fetching Census view data and sampling LB feedback "
500 "record.";
501 CensusViewProvider::ViewDataMap view_data_map =
502 census_view_provider_->FetchViewData();
503 ProcessViewDataCallStart(view_data_map);
504 ProcessViewDataCallEnd(view_data_map);
505 ProcessViewDataOtherCallMetrics(view_data_map);
506 }
507
508 } // namespace load_reporter
509 } // namespace grpc
510