1 // 2 // 3 // Copyright 2018 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <stddef.h> 25 26 #include <atomic> 27 #include <chrono> 28 #include <cstdint> 29 #include <deque> 30 #include <memory> 31 #include <string> 32 #include <unordered_map> 33 #include <utility> 34 #include <vector> 35 36 #include <google/protobuf/repeated_ptr_field.h> 37 38 #include "opencensus/stats/stats.h" 39 #include "opencensus/tags/tag_key.h" 40 41 #include "src/core/lib/gprpp/sync.h" 42 #include "src/cpp/server/load_reporter/load_data_store.h" 43 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h" 44 45 // IWYU pragma: no_include <ratio> 46 47 namespace grpc { 48 namespace load_reporter { 49 50 // The interface to get the Census stats. Abstracted for mocking. 51 class CensusViewProvider { 52 public: 53 // Maps from the view name to the view data. 54 using ViewDataMap = 55 std::unordered_map<std::string, ::opencensus::stats::ViewData>; 56 // Maps from the view name to the view descriptor. 57 using ViewDescriptorMap = 58 std::unordered_map<std::string, ::opencensus::stats::ViewDescriptor>; 59 60 CensusViewProvider(); 61 virtual ~CensusViewProvider() = default; 62 63 // Fetches the view data accumulated since last fetching, and returns it as a 64 // map from the view name to the view data. 65 virtual ViewDataMap FetchViewData() = 0; 66 67 // A helper function that gets a row with the input tag values from the view 68 // data. Only used when we know that row must exist because we have seen a row 69 // with the same tag values in a related view data. Several ViewData's are 70 // considered related if their views are based on the measures that are always 71 // recorded at the same time. 72 static double GetRelatedViewDataRowDouble( 73 const ViewDataMap& view_data_map, const char* view_name, 74 size_t view_name_len, const std::vector<std::string>& tag_values); 75 static uint64_t GetRelatedViewDataRowInt( 76 const ViewDataMap& view_data_map, const char* view_name, 77 size_t view_name_len, const std::vector<std::string>& tag_values); 78 79 protected: view_descriptor_map()80 const ViewDescriptorMap& view_descriptor_map() const { 81 return view_descriptor_map_; 82 } 83 84 private: 85 ViewDescriptorMap view_descriptor_map_; 86 // Tag keys. 87 ::opencensus::tags::TagKey tag_key_token_; 88 ::opencensus::tags::TagKey tag_key_host_; 89 ::opencensus::tags::TagKey tag_key_user_id_; 90 ::opencensus::tags::TagKey tag_key_status_; 91 ::opencensus::tags::TagKey tag_key_metric_name_; 92 }; 93 94 // The default implementation fetches the real stats from Census. 95 class CensusViewProviderDefaultImpl : public CensusViewProvider { 96 public: 97 CensusViewProviderDefaultImpl(); 98 99 ViewDataMap FetchViewData() override; 100 101 private: 102 std::unordered_map<std::string, ::opencensus::stats::View> view_map_; 103 }; 104 105 // The interface to get the CPU stats. Abstracted for mocking. 106 class CpuStatsProvider { 107 public: 108 // The used and total amounts of CPU usage. 109 using CpuStatsSample = std::pair<uint64_t, uint64_t>; 110 111 virtual ~CpuStatsProvider() = default; 112 113 // Gets the cumulative used CPU and total CPU resource. 114 virtual CpuStatsSample GetCpuStats() = 0; 115 }; 116 117 // The default implementation reads CPU jiffies from the system to calculate CPU 118 // utilization. 119 class CpuStatsProviderDefaultImpl : public CpuStatsProvider { 120 public: 121 CpuStatsSample GetCpuStats() override; 122 }; 123 124 // Maintains all the load data and load reporting streams. 125 class LoadReporter { 126 public: 127 // TODO(juanlishen): Allow config for providers from users. LoadReporter(uint32_t feedback_sample_window_seconds,std::unique_ptr<CensusViewProvider> census_view_provider,std::unique_ptr<CpuStatsProvider> cpu_stats_provider)128 LoadReporter(uint32_t feedback_sample_window_seconds, 129 std::unique_ptr<CensusViewProvider> census_view_provider, 130 std::unique_ptr<CpuStatsProvider> cpu_stats_provider) 131 : feedback_sample_window_seconds_(feedback_sample_window_seconds), 132 census_view_provider_(std::move(census_view_provider)), 133 cpu_stats_provider_(std::move(cpu_stats_provider)) { 134 // Append the initial record so that the next real record can have a base. 135 AppendNewFeedbackRecord(0, 0); 136 } 137 138 // Fetches the latest data from Census and merge it into the data store. 139 // Also adds a new sample to the LB feedback sliding window. 140 // Thread-unsafe. (1). The access to the load data store and feedback records 141 // has locking. (2). The access to the Census view provider and CPU stats 142 // provider lacks locking, but we only access these two members in this method 143 // (in testing, we also access them when setting up expectation). So the 144 // invocations of this method must be serialized. 145 void FetchAndSample(); 146 147 // Generates a report for that host and balancer. The report contains 148 // all the stats data accumulated between the last report (i.e., the last 149 // consumption) and the last fetch from Census (i.e., the last production). 150 // Thread-safe. 151 ::google::protobuf::RepeatedPtrField<grpc::lb::v1::Load> GenerateLoads( 152 const std::string& hostname, const std::string& lb_id); 153 154 // The feedback is calculated from the stats data recorded in the sliding 155 // window. Outdated records are discarded. 156 // Thread-safe. 157 grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback(); 158 159 // Wrapper around LoadDataStore::ReportStreamCreated. 160 // Thread-safe. 161 void ReportStreamCreated(const std::string& hostname, 162 const std::string& lb_id, 163 const std::string& load_key); 164 165 // Wrapper around LoadDataStore::ReportStreamClosed. 166 // Thread-safe. 167 void ReportStreamClosed(const std::string& hostname, 168 const std::string& lb_id); 169 170 // Generates a unique LB ID of length kLbIdLength. Returns an empty string 171 // upon failure. Thread-safe. 172 std::string GenerateLbId(); 173 174 // Accessors only for testing. census_view_provider()175 CensusViewProvider* census_view_provider() { 176 return census_view_provider_.get(); 177 } cpu_stats_provider()178 CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); } 179 180 private: 181 struct LoadBalancingFeedbackRecord { 182 std::chrono::system_clock::time_point end_time; 183 uint64_t rpcs; 184 uint64_t errors; 185 uint64_t cpu_usage; 186 uint64_t cpu_limit; 187 LoadBalancingFeedbackRecordLoadBalancingFeedbackRecord188 LoadBalancingFeedbackRecord( 189 const std::chrono::system_clock::time_point& end_time, uint64_t rpcs, 190 uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit) 191 : end_time(end_time), 192 rpcs(rpcs), 193 errors(errors), 194 cpu_usage(cpu_usage), 195 cpu_limit(cpu_limit) {} 196 }; 197 198 // Finds the view data about starting call from the view_data_map and merges 199 // the data to the load data store. 200 void ProcessViewDataCallStart( 201 const CensusViewProvider::ViewDataMap& view_data_map); 202 // Finds the view data about ending call from the view_data_map and merges the 203 // data to the load data store. 204 void ProcessViewDataCallEnd( 205 const CensusViewProvider::ViewDataMap& view_data_map); 206 // Finds the view data about the customized call metrics from the 207 // view_data_map and merges the data to the load data store. 208 void ProcessViewDataOtherCallMetrics( 209 const CensusViewProvider::ViewDataMap& view_data_map); 210 IsRecordInWindow(const LoadBalancingFeedbackRecord & record,std::chrono::system_clock::time_point now)211 bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record, 212 std::chrono::system_clock::time_point now) { 213 return record.end_time > now - feedback_sample_window_seconds_; 214 } 215 216 void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors); 217 218 // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches 219 // it to the load. 220 void AttachOrphanLoadId(grpc::lb::v1::Load* load, 221 const PerBalancerStore& per_balancer_store); 222 223 std::atomic<int64_t> next_lb_id_{0}; 224 const std::chrono::seconds feedback_sample_window_seconds_; 225 grpc_core::Mutex feedback_mu_; 226 std::deque<LoadBalancingFeedbackRecord> feedback_records_; 227 // TODO(juanlishen): Lock in finer grain. Locking the whole store may be 228 // too expensive. 229 grpc_core::Mutex store_mu_; 230 LoadDataStore load_data_store_; 231 std::unique_ptr<CensusViewProvider> census_view_provider_; 232 std::unique_ptr<CpuStatsProvider> cpu_stats_provider_; 233 }; 234 235 } // namespace load_reporter 236 } // namespace grpc 237 238 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 239