1 // 2 // 3 // Copyright 2018 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 21 22 #include <google/protobuf/repeated_ptr_field.h> 23 #include <grpc/support/port_platform.h> 24 #include <stddef.h> 25 26 #include <atomic> 27 #include <chrono> 28 #include <cstdint> 29 #include <deque> 30 #include <memory> 31 #include <string> 32 #include <unordered_map> 33 #include <utility> 34 #include <vector> 35 36 #include "opencensus/stats/stats.h" 37 #include "opencensus/tags/tag_key.h" 38 #include "src/core/util/sync.h" 39 #include "src/cpp/server/load_reporter/load_data_store.h" 40 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h" 41 42 // IWYU pragma: no_include <ratio> 43 44 namespace grpc { 45 namespace load_reporter { 46 47 // The interface to get the Census stats. Abstracted for mocking. 48 class CensusViewProvider { 49 public: 50 // Maps from the view name to the view data. 51 using ViewDataMap = 52 std::unordered_map<std::string, ::opencensus::stats::ViewData>; 53 // Maps from the view name to the view descriptor. 54 using ViewDescriptorMap = 55 std::unordered_map<std::string, ::opencensus::stats::ViewDescriptor>; 56 57 CensusViewProvider(); 58 virtual ~CensusViewProvider() = default; 59 60 // Fetches the view data accumulated since last fetching, and returns it as a 61 // map from the view name to the view data. 62 virtual ViewDataMap FetchViewData() = 0; 63 64 // A helper function that gets a row with the input tag values from the view 65 // data. Only used when we know that row must exist because we have seen a row 66 // with the same tag values in a related view data. Several ViewData's are 67 // considered related if their views are based on the measures that are always 68 // recorded at the same time. 69 static double GetRelatedViewDataRowDouble( 70 const ViewDataMap& view_data_map, const char* view_name, 71 size_t view_name_len, const std::vector<std::string>& tag_values); 72 static uint64_t GetRelatedViewDataRowInt( 73 const ViewDataMap& view_data_map, const char* view_name, 74 size_t view_name_len, const std::vector<std::string>& tag_values); 75 76 protected: view_descriptor_map()77 const ViewDescriptorMap& view_descriptor_map() const { 78 return view_descriptor_map_; 79 } 80 81 private: 82 ViewDescriptorMap view_descriptor_map_; 83 // Tag keys. 84 ::opencensus::tags::TagKey tag_key_token_; 85 ::opencensus::tags::TagKey tag_key_host_; 86 ::opencensus::tags::TagKey tag_key_user_id_; 87 ::opencensus::tags::TagKey tag_key_status_; 88 ::opencensus::tags::TagKey tag_key_metric_name_; 89 }; 90 91 // The default implementation fetches the real stats from Census. 92 class CensusViewProviderDefaultImpl : public CensusViewProvider { 93 public: 94 CensusViewProviderDefaultImpl(); 95 96 ViewDataMap FetchViewData() override; 97 98 private: 99 std::unordered_map<std::string, ::opencensus::stats::View> view_map_; 100 }; 101 102 // The interface to get the CPU stats. Abstracted for mocking. 103 class CpuStatsProvider { 104 public: 105 // The used and total amounts of CPU usage. 106 using CpuStatsSample = std::pair<uint64_t, uint64_t>; 107 108 virtual ~CpuStatsProvider() = default; 109 110 // Gets the cumulative used CPU and total CPU resource. 111 virtual CpuStatsSample GetCpuStats() = 0; 112 }; 113 114 // The default implementation reads CPU jiffies from the system to calculate CPU 115 // utilization. 116 class CpuStatsProviderDefaultImpl : public CpuStatsProvider { 117 public: 118 CpuStatsSample GetCpuStats() override; 119 }; 120 121 // Maintains all the load data and load reporting streams. 122 class LoadReporter { 123 public: 124 // TODO(juanlishen): Allow config for providers from users. LoadReporter(uint32_t feedback_sample_window_seconds,std::unique_ptr<CensusViewProvider> census_view_provider,std::unique_ptr<CpuStatsProvider> cpu_stats_provider)125 LoadReporter(uint32_t feedback_sample_window_seconds, 126 std::unique_ptr<CensusViewProvider> census_view_provider, 127 std::unique_ptr<CpuStatsProvider> cpu_stats_provider) 128 : feedback_sample_window_seconds_(feedback_sample_window_seconds), 129 census_view_provider_(std::move(census_view_provider)), 130 cpu_stats_provider_(std::move(cpu_stats_provider)) { 131 // Append the initial record so that the next real record can have a base. 132 AppendNewFeedbackRecord(0, 0); 133 } 134 135 // Fetches the latest data from Census and merge it into the data store. 136 // Also adds a new sample to the LB feedback sliding window. 137 // Thread-unsafe. (1). The access to the load data store and feedback records 138 // has locking. (2). The access to the Census view provider and CPU stats 139 // provider lacks locking, but we only access these two members in this method 140 // (in testing, we also access them when setting up expectation). So the 141 // invocations of this method must be serialized. 142 void FetchAndSample(); 143 144 // Generates a report for that host and balancer. The report contains 145 // all the stats data accumulated between the last report (i.e., the last 146 // consumption) and the last fetch from Census (i.e., the last production). 147 // Thread-safe. 148 ::google::protobuf::RepeatedPtrField<grpc::lb::v1::Load> GenerateLoads( 149 const std::string& hostname, const std::string& lb_id); 150 151 // The feedback is calculated from the stats data recorded in the sliding 152 // window. Outdated records are discarded. 153 // Thread-safe. 154 grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback(); 155 156 // Wrapper around LoadDataStore::ReportStreamCreated. 157 // Thread-safe. 158 void ReportStreamCreated(const std::string& hostname, 159 const std::string& lb_id, 160 const std::string& load_key); 161 162 // Wrapper around LoadDataStore::ReportStreamClosed. 163 // Thread-safe. 164 void ReportStreamClosed(const std::string& hostname, 165 const std::string& lb_id); 166 167 // Generates a unique LB ID of length kLbIdLength. Returns an empty string 168 // upon failure. Thread-safe. 169 std::string GenerateLbId(); 170 171 // Accessors only for testing. census_view_provider()172 CensusViewProvider* census_view_provider() { 173 return census_view_provider_.get(); 174 } cpu_stats_provider()175 CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); } 176 177 private: 178 struct LoadBalancingFeedbackRecord { 179 std::chrono::system_clock::time_point end_time; 180 uint64_t rpcs; 181 uint64_t errors; 182 uint64_t cpu_usage; 183 uint64_t cpu_limit; 184 LoadBalancingFeedbackRecordLoadBalancingFeedbackRecord185 LoadBalancingFeedbackRecord( 186 const std::chrono::system_clock::time_point& end_time, uint64_t rpcs, 187 uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit) 188 : end_time(end_time), 189 rpcs(rpcs), 190 errors(errors), 191 cpu_usage(cpu_usage), 192 cpu_limit(cpu_limit) {} 193 }; 194 195 // Finds the view data about starting call from the view_data_map and merges 196 // the data to the load data store. 197 void ProcessViewDataCallStart( 198 const CensusViewProvider::ViewDataMap& view_data_map); 199 // Finds the view data about ending call from the view_data_map and merges the 200 // data to the load data store. 201 void ProcessViewDataCallEnd( 202 const CensusViewProvider::ViewDataMap& view_data_map); 203 // Finds the view data about the customized call metrics from the 204 // view_data_map and merges the data to the load data store. 205 void ProcessViewDataOtherCallMetrics( 206 const CensusViewProvider::ViewDataMap& view_data_map); 207 IsRecordInWindow(const LoadBalancingFeedbackRecord & record,std::chrono::system_clock::time_point now)208 bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record, 209 std::chrono::system_clock::time_point now) { 210 return record.end_time > now - feedback_sample_window_seconds_; 211 } 212 213 void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors); 214 215 // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches 216 // it to the load. 217 void AttachOrphanLoadId(grpc::lb::v1::Load* load, 218 const PerBalancerStore& per_balancer_store); 219 220 std::atomic<int64_t> next_lb_id_{0}; 221 const std::chrono::seconds feedback_sample_window_seconds_; 222 grpc_core::Mutex feedback_mu_; 223 std::deque<LoadBalancingFeedbackRecord> feedback_records_; 224 // TODO(juanlishen): Lock in finer grain. Locking the whole store may be 225 // too expensive. 226 grpc_core::Mutex store_mu_; 227 LoadDataStore load_data_store_; 228 std::unique_ptr<CensusViewProvider> census_view_provider_; 229 std::unique_ptr<CpuStatsProvider> cpu_stats_provider_; 230 }; 231 232 } // namespace load_reporter 233 } // namespace grpc 234 235 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 236