• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/cpp/server/load_reporter/load_reporter.h"
20 
21 #include <grpc/support/port_platform.h>
22 #include <inttypes.h>
23 #include <stdio.h>
24 
25 #include <chrono>
26 #include <cstring>
27 #include <iterator>
28 #include <set>
29 #include <tuple>
30 
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "opencensus/tags/tag_key.h"
34 #include "src/cpp/server/load_reporter/constants.h"
35 #include "src/cpp/server/load_reporter/get_cpu_stats.h"
36 
37 // IWYU pragma: no_include "google/protobuf/duration.pb.h"
38 
39 namespace grpc {
40 namespace load_reporter {
41 
GetCpuStats()42 CpuStatsProvider::CpuStatsSample CpuStatsProviderDefaultImpl::GetCpuStats() {
43   return GetCpuStatsImpl();
44 }
45 
CensusViewProvider()46 CensusViewProvider::CensusViewProvider()
47     : tag_key_token_(::opencensus::tags::TagKey::Register(kTagKeyToken)),
48       tag_key_host_(::opencensus::tags::TagKey::Register(kTagKeyHost)),
49       tag_key_user_id_(::opencensus::tags::TagKey::Register(kTagKeyUserId)),
50       tag_key_status_(::opencensus::tags::TagKey::Register(kTagKeyStatus)),
51       tag_key_metric_name_(
52           ::opencensus::tags::TagKey::Register(kTagKeyMetricName)) {
53   // One view related to starting a call.
54   auto vd_start_count =
55       ::opencensus::stats::ViewDescriptor()
56           .set_name(kViewStartCount)
57           .set_measure(kMeasureStartCount)
58           .set_aggregation(::opencensus::stats::Aggregation::Sum())
59           .add_column(tag_key_token_)
60           .add_column(tag_key_host_)
61           .add_column(tag_key_user_id_)
62           .set_description(
63               "Delta count of calls started broken down by <token, host, "
64               "user_id>.");
65   SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
66                        &vd_start_count);
67   view_descriptor_map_.emplace(kViewStartCount, vd_start_count);
68   // Four views related to ending a call.
69   // If this view is set as Count of kMeasureEndBytesSent (in hope of saving one
70   // measure), it's infeasible to prepare fake data for testing. That's because
71   // the OpenCensus API to make up view data will add the input data as separate
72   // measurements instead of setting the data values directly.
73   auto vd_end_count =
74       ::opencensus::stats::ViewDescriptor()
75           .set_name(kViewEndCount)
76           .set_measure(kMeasureEndCount)
77           .set_aggregation(::opencensus::stats::Aggregation::Sum())
78           .add_column(tag_key_token_)
79           .add_column(tag_key_host_)
80           .add_column(tag_key_user_id_)
81           .add_column(tag_key_status_)
82           .set_description(
83               "Delta count of calls ended broken down by <token, host, "
84               "user_id, status>.");
85   SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
86                        &vd_end_count);
87   view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
88   auto vd_end_bytes_sent =
89       ::opencensus::stats::ViewDescriptor()
90           .set_name(kViewEndBytesSent)
91           .set_measure(kMeasureEndBytesSent)
92           .set_aggregation(::opencensus::stats::Aggregation::Sum())
93           .add_column(tag_key_token_)
94           .add_column(tag_key_host_)
95           .add_column(tag_key_user_id_)
96           .add_column(tag_key_status_)
97           .set_description(
98               "Delta sum of bytes sent broken down by <token, host, user_id, "
99               "status>.");
100   SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
101                        &vd_end_bytes_sent);
102   view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
103   auto vd_end_bytes_received =
104       ::opencensus::stats::ViewDescriptor()
105           .set_name(kViewEndBytesReceived)
106           .set_measure(kMeasureEndBytesReceived)
107           .set_aggregation(::opencensus::stats::Aggregation::Sum())
108           .add_column(tag_key_token_)
109           .add_column(tag_key_host_)
110           .add_column(tag_key_user_id_)
111           .add_column(tag_key_status_)
112           .set_description(
113               "Delta sum of bytes received broken down by <token, host, "
114               "user_id, status>.");
115   SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
116                        &vd_end_bytes_received);
117   view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
118   auto vd_end_latency_ms =
119       ::opencensus::stats::ViewDescriptor()
120           .set_name(kViewEndLatencyMs)
121           .set_measure(kMeasureEndLatencyMs)
122           .set_aggregation(::opencensus::stats::Aggregation::Sum())
123           .add_column(tag_key_token_)
124           .add_column(tag_key_host_)
125           .add_column(tag_key_user_id_)
126           .add_column(tag_key_status_)
127           .set_description(
128               "Delta sum of latency in ms broken down by <token, host, "
129               "user_id, status>.");
130   SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
131                        &vd_end_latency_ms);
132   view_descriptor_map_.emplace(kViewEndLatencyMs, vd_end_latency_ms);
133   // Two views related to other call metrics.
134   auto vd_metric_call_count =
135       ::opencensus::stats::ViewDescriptor()
136           .set_name(kViewOtherCallMetricCount)
137           .set_measure(kMeasureOtherCallMetric)
138           .set_aggregation(::opencensus::stats::Aggregation::Count())
139           .add_column(tag_key_token_)
140           .add_column(tag_key_host_)
141           .add_column(tag_key_user_id_)
142           .add_column(tag_key_metric_name_)
143           .set_description(
144               "Delta count of calls broken down by <token, host, user_id, "
145               "metric_name>.");
146   SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
147                        &vd_metric_call_count);
148   view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
149   auto vd_metric_value =
150       ::opencensus::stats::ViewDescriptor()
151           .set_name(kViewOtherCallMetricValue)
152           .set_measure(kMeasureOtherCallMetric)
153           .set_aggregation(::opencensus::stats::Aggregation::Sum())
154           .add_column(tag_key_token_)
155           .add_column(tag_key_host_)
156           .add_column(tag_key_user_id_)
157           .add_column(tag_key_metric_name_)
158           .set_description(
159               "Delta sum of call metric value broken down "
160               "by <token, host, user_id, metric_name>.");
161   SetAggregationWindow(::opencensus::stats::AggregationWindow::Delta(),
162                        &vd_metric_value);
163   view_descriptor_map_.emplace(kViewOtherCallMetricValue, vd_metric_value);
164 }
165 
GetRelatedViewDataRowDouble(const ViewDataMap & view_data_map,const char * view_name,size_t view_name_len,const std::vector<std::string> & tag_values)166 double CensusViewProvider::GetRelatedViewDataRowDouble(
167     const ViewDataMap& view_data_map, const char* view_name,
168     size_t view_name_len, const std::vector<std::string>& tag_values) {
169   auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
170   CHECK(it_vd != view_data_map.end());
171   CHECK(it_vd->second.type() == ::opencensus::stats::ViewData::Type::kDouble);
172   auto it_row = it_vd->second.double_data().find(tag_values);
173   CHECK(it_row != it_vd->second.double_data().end());
174   return it_row->second;
175 }
176 
GetRelatedViewDataRowInt(const ViewDataMap & view_data_map,const char * view_name,size_t view_name_len,const std::vector<std::string> & tag_values)177 uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
178     const ViewDataMap& view_data_map, const char* view_name,
179     size_t view_name_len, const std::vector<std::string>& tag_values) {
180   auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
181   CHECK(it_vd != view_data_map.end());
182   CHECK(it_vd->second.type() == ::opencensus::stats::ViewData::Type::kInt64);
183   auto it_row = it_vd->second.int_data().find(tag_values);
184   CHECK(it_row != it_vd->second.int_data().end());
185   CHECK_GE(it_row->second, 0);
186   return it_row->second;
187 }
188 
CensusViewProviderDefaultImpl()189 CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
190   for (const auto& p : view_descriptor_map()) {
191     const std::string& view_name = p.first;
192     const ::opencensus::stats::ViewDescriptor& vd = p.second;
193     // We need to use pair's piecewise ctor here, otherwise the deleted copy
194     // ctor of View will be called.
195     view_map_.emplace(std::piecewise_construct,
196                       std::forward_as_tuple(view_name),
197                       std::forward_as_tuple(vd));
198   }
199 }
200 
FetchViewData()201 CensusViewProvider::ViewDataMap CensusViewProviderDefaultImpl::FetchViewData() {
202   VLOG(2) << "[CVP " << this << "] Starts fetching Census view data.";
203   ViewDataMap view_data_map;
204   for (auto& p : view_map_) {
205     const std::string& view_name = p.first;
206     ::opencensus::stats::View& view = p.second;
207     if (view.IsValid()) {
208       view_data_map.emplace(view_name, view.GetData());
209       VLOG(2) << "[CVP " << this << "] Fetched view data (view: " << view_name
210               << ").";
211     } else {
212       VLOG(2) << "[CVP " << this
213               << "] Can't fetch view data because view is invalid (view: "
214               << view_name << ").";
215     }
216   }
217   return view_data_map;
218 }
219 
GenerateLbId()220 std::string LoadReporter::GenerateLbId() {
221   while (true) {
222     if (next_lb_id_ > UINT32_MAX) {
223       LOG(ERROR) << "[LR " << this
224                  << "] The LB ID exceeds the max valid value!";
225       return "";
226     }
227     int64_t lb_id = next_lb_id_++;
228     // Overflow should never happen.
229     CHECK_GE(lb_id, 0);
230     // Convert to padded hex string for a 32-bit LB ID. E.g, "0000ca5b".
231     char buf[kLbIdLength + 1];
232     snprintf(buf, sizeof(buf), "%08" PRIx64, lb_id);
233     std::string lb_id_str(buf, kLbIdLength);
234     // The client may send requests with LB ID that has never been allocated
235     // by this load reporter. Those IDs are tracked and will be skipped when
236     // we generate a new ID.
237     if (!load_data_store_.IsTrackedUnknownBalancerId(lb_id_str)) {
238       return lb_id_str;
239     }
240   }
241 }
242 
243 ::grpc::lb::v1::LoadBalancingFeedback
GenerateLoadBalancingFeedback()244 LoadReporter::GenerateLoadBalancingFeedback() {
245   grpc_core::ReleasableMutexLock lock(&feedback_mu_);
246   auto now = std::chrono::system_clock::now();
247   // Discard records outside the window until there is only one record
248   // outside the window, which is used as the base for difference.
249   while (feedback_records_.size() > 1 &&
250          !IsRecordInWindow(feedback_records_[1], now)) {
251     feedback_records_.pop_front();
252   }
253   if (feedback_records_.size() < 2) {
254     return grpc::lb::v1::LoadBalancingFeedback::default_instance();
255   }
256   // Find the longest range with valid ends.
257   auto oldest = feedback_records_.begin();
258   auto newest = feedback_records_.end() - 1;
259   while (std::distance(oldest, newest) > 0 &&
260          (newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
261     // A zero limit means that the system info reading was failed, so these
262     // records can't be used to calculate CPU utilization.
263     if (newest->cpu_limit == 0) --newest;
264     if (oldest->cpu_limit == 0) ++oldest;
265   }
266   if (std::distance(oldest, newest) < 1 ||
267       oldest->end_time == newest->end_time ||
268       newest->cpu_limit == oldest->cpu_limit) {
269     return grpc::lb::v1::LoadBalancingFeedback::default_instance();
270   }
271   uint64_t rpcs = 0;
272   uint64_t errors = 0;
273   for (auto p = newest; p != oldest; --p) {
274     // Because these two numbers are counters, the oldest record shouldn't be
275     // included.
276     rpcs += p->rpcs;
277     errors += p->errors;
278   }
279   double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
280   double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
281   std::chrono::duration<double> duration_seconds =
282       newest->end_time - oldest->end_time;
283   lock.Release();
284   grpc::lb::v1::LoadBalancingFeedback feedback;
285   feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
286   feedback.set_calls_per_second(
287       static_cast<float>(rpcs / duration_seconds.count()));
288   feedback.set_errors_per_second(
289       static_cast<float>(errors / duration_seconds.count()));
290   return feedback;
291 }
292 
293 ::google::protobuf::RepeatedPtrField<grpc::lb::v1::Load>
GenerateLoads(const std::string & hostname,const std::string & lb_id)294 LoadReporter::GenerateLoads(const std::string& hostname,
295                             const std::string& lb_id) {
296   grpc_core::MutexLock lock(&store_mu_);
297   auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
298   CHECK_NE(assigned_stores, nullptr);
299   CHECK(!assigned_stores->empty());
300   ::google::protobuf::RepeatedPtrField<grpc::lb::v1::Load> loads;
301   for (PerBalancerStore* per_balancer_store : *assigned_stores) {
302     CHECK(!per_balancer_store->IsSuspended());
303     if (!per_balancer_store->load_record_map().empty()) {
304       for (const auto& p : per_balancer_store->load_record_map()) {
305         const auto& key = p.first;
306         const auto& value = p.second;
307         auto load = loads.Add();
308         load->set_load_balance_tag(key.lb_tag());
309         load->set_user_id(key.user_id());
310         load->set_client_ip_address(key.GetClientIpBytes());
311         load->set_num_calls_started(static_cast<int64_t>(value.start_count()));
312         load->set_num_calls_finished_without_error(
313             static_cast<int64_t>(value.ok_count()));
314         load->set_num_calls_finished_with_error(
315             static_cast<int64_t>(value.error_count()));
316         load->set_total_bytes_sent(static_cast<int64_t>(value.bytes_sent()));
317         load->set_total_bytes_received(
318             static_cast<int64_t>(value.bytes_recv()));
319         load->mutable_total_latency()->set_seconds(
320             static_cast<int64_t>(value.latency_ms() / 1000));
321         load->mutable_total_latency()->set_nanos(
322             (static_cast<int32_t>(value.latency_ms()) % 1000) * 1000000);
323         for (const auto& p : value.call_metrics()) {
324           const std::string& metric_name = p.first;
325           const CallMetricValue& metric_value = p.second;
326           auto call_metric_data = load->add_metric_data();
327           call_metric_data->set_metric_name(metric_name);
328           call_metric_data->set_num_calls_finished_with_metric(
329               metric_value.num_calls());
330           call_metric_data->set_total_metric_value(
331               metric_value.total_metric_value());
332         }
333         if (per_balancer_store->lb_id() != lb_id) {
334           // This per-balancer store is an orphan assigned to this receiving
335           // balancer.
336           AttachOrphanLoadId(load, *per_balancer_store);
337         }
338       }
339       per_balancer_store->ClearLoadRecordMap();
340     }
341     if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
342       auto load = loads.Add();
343       load->set_num_calls_in_progress(
344           per_balancer_store->GetNumCallsInProgressForReport());
345       if (per_balancer_store->lb_id() != lb_id) {
346         // This per-balancer store is an orphan assigned to this receiving
347         // balancer.
348         AttachOrphanLoadId(load, *per_balancer_store);
349       }
350     }
351   }
352   return loads;
353 }
354 
AttachOrphanLoadId(grpc::lb::v1::Load * load,const PerBalancerStore & per_balancer_store)355 void LoadReporter::AttachOrphanLoadId(
356     grpc::lb::v1::Load* load, const PerBalancerStore& per_balancer_store) {
357   if (per_balancer_store.lb_id() == kInvalidLbId) {
358     load->set_load_key_unknown(true);
359   } else {
360     // We shouldn't set load_key_unknown to any value in this case because
361     // load_key_unknown and orphaned_load_identifier are under an oneof struct.
362     load->mutable_orphaned_load_identifier()->set_load_key(
363         per_balancer_store.load_key());
364     load->mutable_orphaned_load_identifier()->set_load_balancer_id(
365         per_balancer_store.lb_id());
366   }
367 }
368 
AppendNewFeedbackRecord(uint64_t rpcs,uint64_t errors)369 void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
370   CpuStatsProvider::CpuStatsSample cpu_stats;
371   if (cpu_stats_provider_ != nullptr) {
372     cpu_stats = cpu_stats_provider_->GetCpuStats();
373   } else {
374     // This will make the load balancing feedback generation a no-op.
375     cpu_stats = {0, 0};
376   }
377   grpc_core::MutexLock lock(&feedback_mu_);
378   feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
379                                  cpu_stats.first, cpu_stats.second);
380 }
381 
ReportStreamCreated(const std::string & hostname,const std::string & lb_id,const std::string & load_key)382 void LoadReporter::ReportStreamCreated(const std::string& hostname,
383                                        const std::string& lb_id,
384                                        const std::string& load_key) {
385   grpc_core::MutexLock lock(&store_mu_);
386   load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
387   LOG(INFO) << "[LR " << this << "] Report stream created (host: " << hostname
388             << ", LB ID: " << lb_id << ", load key: " << load_key << ").";
389 }
390 
ReportStreamClosed(const std::string & hostname,const std::string & lb_id)391 void LoadReporter::ReportStreamClosed(const std::string& hostname,
392                                       const std::string& lb_id) {
393   grpc_core::MutexLock lock(&store_mu_);
394   load_data_store_.ReportStreamClosed(hostname, lb_id);
395   LOG(INFO) << "[LR " << this << "] Report stream closed (host: " << hostname
396             << ", LB ID: " << lb_id << ").";
397 }
398 
ProcessViewDataCallStart(const CensusViewProvider::ViewDataMap & view_data_map)399 void LoadReporter::ProcessViewDataCallStart(
400     const CensusViewProvider::ViewDataMap& view_data_map) {
401   auto it = view_data_map.find(kViewStartCount);
402   if (it != view_data_map.end()) {
403     for (const auto& p : it->second.int_data()) {
404       const std::vector<std::string>& tag_values = p.first;
405       const uint64_t start_count = static_cast<uint64_t>(p.second);
406       const std::string& client_ip_and_token = tag_values[0];
407       const std::string& host = tag_values[1];
408       const std::string& user_id = tag_values[2];
409       LoadRecordKey key(client_ip_and_token, user_id);
410       LoadRecordValue value = LoadRecordValue(start_count);
411       {
412         grpc_core::MutexLock lock(&store_mu_);
413         load_data_store_.MergeRow(host, key, value);
414       }
415     }
416   }
417 }
418 
ProcessViewDataCallEnd(const CensusViewProvider::ViewDataMap & view_data_map)419 void LoadReporter::ProcessViewDataCallEnd(
420     const CensusViewProvider::ViewDataMap& view_data_map) {
421   uint64_t total_end_count = 0;
422   uint64_t total_error_count = 0;
423   auto it = view_data_map.find(kViewEndCount);
424   if (it != view_data_map.end()) {
425     for (const auto& p : it->second.int_data()) {
426       const std::vector<std::string>& tag_values = p.first;
427       const uint64_t end_count = static_cast<uint64_t>(p.second);
428       const std::string& client_ip_and_token = tag_values[0];
429       const std::string& host = tag_values[1];
430       const std::string& user_id = tag_values[2];
431       const std::string& status = tag_values[3];
432       // This is due to a bug reported internally of Java server load reporting
433       // implementation.
434       // TODO(juanlishen): Check whether this situation happens in OSS C++.
435       if (client_ip_and_token.empty()) {
436         VLOG(2) << "Skipping processing Opencensus record with empty "
437                    "client_ip_and_token tag.";
438         continue;
439       }
440       LoadRecordKey key(client_ip_and_token, user_id);
441       const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
442           view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
443           tag_values);
444       const uint64_t bytes_received =
445           CensusViewProvider::GetRelatedViewDataRowInt(
446               view_data_map, kViewEndBytesReceived,
447               sizeof(kViewEndBytesReceived) - 1, tag_values);
448       const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
449           view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
450           tag_values);
451       uint64_t ok_count = 0;
452       uint64_t error_count = 0;
453       total_end_count += end_count;
454       if (std::strcmp(status.c_str(), kCallStatusOk) == 0) {
455         ok_count = end_count;
456       } else {
457         error_count = end_count;
458         total_error_count += end_count;
459       }
460       LoadRecordValue value = LoadRecordValue(
461           0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
462       {
463         grpc_core::MutexLock lock(&store_mu_);
464         load_data_store_.MergeRow(host, key, value);
465       }
466     }
467   }
468   AppendNewFeedbackRecord(total_end_count, total_error_count);
469 }
470 
ProcessViewDataOtherCallMetrics(const CensusViewProvider::ViewDataMap & view_data_map)471 void LoadReporter::ProcessViewDataOtherCallMetrics(
472     const CensusViewProvider::ViewDataMap& view_data_map) {
473   auto it = view_data_map.find(kViewOtherCallMetricCount);
474   if (it != view_data_map.end()) {
475     for (const auto& p : it->second.int_data()) {
476       const std::vector<std::string>& tag_values = p.first;
477       const int64_t num_calls = p.second;
478       const std::string& client_ip_and_token = tag_values[0];
479       const std::string& host = tag_values[1];
480       const std::string& user_id = tag_values[2];
481       const std::string& metric_name = tag_values[3];
482       LoadRecordKey key(client_ip_and_token, user_id);
483       const double total_metric_value =
484           CensusViewProvider::GetRelatedViewDataRowDouble(
485               view_data_map, kViewOtherCallMetricValue,
486               sizeof(kViewOtherCallMetricValue) - 1, tag_values);
487       LoadRecordValue value = LoadRecordValue(
488           metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
489       {
490         grpc_core::MutexLock lock(&store_mu_);
491         load_data_store_.MergeRow(host, key, value);
492       }
493     }
494   }
495 }
496 
FetchAndSample()497 void LoadReporter::FetchAndSample() {
498   VLOG(2) << "[LR " << this
499           << "] Starts fetching Census view data and sampling LB feedback "
500              "record.";
501   CensusViewProvider::ViewDataMap view_data_map =
502       census_view_provider_->FetchViewData();
503   ProcessViewDataCallStart(view_data_map);
504   ProcessViewDataCallEnd(view_data_map);
505   ProcessViewDataOtherCallMetrics(view_data_map);
506 }
507 
508 }  // namespace load_reporter
509 }  // namespace grpc
510