1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <atomic> 25 #include <chrono> 26 #include <deque> 27 #include <vector> 28 29 #include <grpc/support/log.h> 30 #include <grpcpp/impl/codegen/config.h> 31 32 #include "src/core/lib/gprpp/sync.h" 33 #include "src/cpp/server/load_reporter/load_data_store.h" 34 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h" 35 36 #include "opencensus/stats/stats.h" 37 #include "opencensus/tags/tag_key.h" 38 39 namespace grpc { 40 namespace load_reporter { 41 42 // The interface to get the Census stats. Abstracted for mocking. 43 class CensusViewProvider { 44 public: 45 // Maps from the view name to the view data. 46 using ViewDataMap = 47 std::unordered_map<std::string, ::opencensus::stats::ViewData>; 48 // Maps from the view name to the view descriptor. 49 using ViewDescriptorMap = 50 std::unordered_map<std::string, ::opencensus::stats::ViewDescriptor>; 51 52 CensusViewProvider(); 53 virtual ~CensusViewProvider() = default; 54 55 // Fetches the view data accumulated since last fetching, and returns it as a 56 // map from the view name to the view data. 57 virtual ViewDataMap FetchViewData() = 0; 58 59 // A helper function that gets a row with the input tag values from the view 60 // data. Only used when we know that row must exist because we have seen a row 61 // with the same tag values in a related view data. Several ViewData's are 62 // considered related if their views are based on the measures that are always 63 // recorded at the same time. 64 static double GetRelatedViewDataRowDouble( 65 const ViewDataMap& view_data_map, const char* view_name, 66 size_t view_name_len, const std::vector<std::string>& tag_values); 67 static uint64_t GetRelatedViewDataRowInt( 68 const ViewDataMap& view_data_map, const char* view_name, 69 size_t view_name_len, const std::vector<std::string>& tag_values); 70 71 protected: view_descriptor_map()72 const ViewDescriptorMap& view_descriptor_map() const { 73 return view_descriptor_map_; 74 } 75 76 private: 77 ViewDescriptorMap view_descriptor_map_; 78 // Tag keys. 79 ::opencensus::tags::TagKey tag_key_token_; 80 ::opencensus::tags::TagKey tag_key_host_; 81 ::opencensus::tags::TagKey tag_key_user_id_; 82 ::opencensus::tags::TagKey tag_key_status_; 83 ::opencensus::tags::TagKey tag_key_metric_name_; 84 }; 85 86 // The default implementation fetches the real stats from Census. 87 class CensusViewProviderDefaultImpl : public CensusViewProvider { 88 public: 89 CensusViewProviderDefaultImpl(); 90 91 ViewDataMap FetchViewData() override; 92 93 private: 94 std::unordered_map<std::string, ::opencensus::stats::View> view_map_; 95 }; 96 97 // The interface to get the CPU stats. Abstracted for mocking. 98 class CpuStatsProvider { 99 public: 100 // The used and total amounts of CPU usage. 101 using CpuStatsSample = std::pair<uint64_t, uint64_t>; 102 103 virtual ~CpuStatsProvider() = default; 104 105 // Gets the cumulative used CPU and total CPU resource. 106 virtual CpuStatsSample GetCpuStats() = 0; 107 }; 108 109 // The default implementation reads CPU jiffies from the system to calculate CPU 110 // utilization. 111 class CpuStatsProviderDefaultImpl : public CpuStatsProvider { 112 public: 113 CpuStatsSample GetCpuStats() override; 114 }; 115 116 // Maintains all the load data and load reporting streams. 117 class LoadReporter { 118 public: 119 // TODO(juanlishen): Allow config for providers from users. LoadReporter(uint32_t feedback_sample_window_seconds,std::unique_ptr<CensusViewProvider> census_view_provider,std::unique_ptr<CpuStatsProvider> cpu_stats_provider)120 LoadReporter(uint32_t feedback_sample_window_seconds, 121 std::unique_ptr<CensusViewProvider> census_view_provider, 122 std::unique_ptr<CpuStatsProvider> cpu_stats_provider) 123 : feedback_sample_window_seconds_(feedback_sample_window_seconds), 124 census_view_provider_(std::move(census_view_provider)), 125 cpu_stats_provider_(std::move(cpu_stats_provider)) { 126 // Append the initial record so that the next real record can have a base. 127 AppendNewFeedbackRecord(0, 0); 128 } 129 130 // Fetches the latest data from Census and merge it into the data store. 131 // Also adds a new sample to the LB feedback sliding window. 132 // Thread-unsafe. (1). The access to the load data store and feedback records 133 // has locking. (2). The access to the Census view provider and CPU stats 134 // provider lacks locking, but we only access these two members in this method 135 // (in testing, we also access them when setting up expectation). So the 136 // invocations of this method must be serialized. 137 void FetchAndSample(); 138 139 // Generates a report for that host and balancer. The report contains 140 // all the stats data accumulated between the last report (i.e., the last 141 // consumption) and the last fetch from Census (i.e., the last production). 142 // Thread-safe. 143 ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads( 144 const std::string& hostname, const std::string& lb_id); 145 146 // The feedback is calculated from the stats data recorded in the sliding 147 // window. Outdated records are discarded. 148 // Thread-safe. 149 ::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback(); 150 151 // Wrapper around LoadDataStore::ReportStreamCreated. 152 // Thread-safe. 153 void ReportStreamCreated(const std::string& hostname, 154 const std::string& lb_id, 155 const std::string& load_key); 156 157 // Wrapper around LoadDataStore::ReportStreamClosed. 158 // Thread-safe. 159 void ReportStreamClosed(const std::string& hostname, 160 const std::string& lb_id); 161 162 // Generates a unique LB ID of length kLbIdLength. Returns an empty string 163 // upon failure. Thread-safe. 164 std::string GenerateLbId(); 165 166 // Accessors only for testing. census_view_provider()167 CensusViewProvider* census_view_provider() { 168 return census_view_provider_.get(); 169 } cpu_stats_provider()170 CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); } 171 172 private: 173 struct LoadBalancingFeedbackRecord { 174 std::chrono::system_clock::time_point end_time; 175 uint64_t rpcs; 176 uint64_t errors; 177 uint64_t cpu_usage; 178 uint64_t cpu_limit; 179 LoadBalancingFeedbackRecordLoadBalancingFeedbackRecord180 LoadBalancingFeedbackRecord( 181 const std::chrono::system_clock::time_point& end_time, uint64_t rpcs, 182 uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit) 183 : end_time(end_time), 184 rpcs(rpcs), 185 errors(errors), 186 cpu_usage(cpu_usage), 187 cpu_limit(cpu_limit) {} 188 }; 189 190 // Finds the view data about starting call from the view_data_map and merges 191 // the data to the load data store. 192 void ProcessViewDataCallStart( 193 const CensusViewProvider::ViewDataMap& view_data_map); 194 // Finds the view data about ending call from the view_data_map and merges the 195 // data to the load data store. 196 void ProcessViewDataCallEnd( 197 const CensusViewProvider::ViewDataMap& view_data_map); 198 // Finds the view data about the customized call metrics from the 199 // view_data_map and merges the data to the load data store. 200 void ProcessViewDataOtherCallMetrics( 201 const CensusViewProvider::ViewDataMap& view_data_map); 202 IsRecordInWindow(const LoadBalancingFeedbackRecord & record,std::chrono::system_clock::time_point now)203 bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record, 204 std::chrono::system_clock::time_point now) { 205 return record.end_time > now - feedback_sample_window_seconds_; 206 } 207 208 void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors); 209 210 // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches 211 // it to the load. 212 void AttachOrphanLoadId(::grpc::lb::v1::Load* load, 213 const PerBalancerStore& per_balancer_store); 214 215 std::atomic<int64_t> next_lb_id_{0}; 216 const std::chrono::seconds feedback_sample_window_seconds_; 217 grpc_core::Mutex feedback_mu_; 218 std::deque<LoadBalancingFeedbackRecord> feedback_records_; 219 // TODO(juanlishen): Lock in finer grain. Locking the whole store may be 220 // too expensive. 221 grpc_core::Mutex store_mu_; 222 LoadDataStore load_data_store_; 223 std::unique_ptr<CensusViewProvider> census_view_provider_; 224 std::unique_ptr<CpuStatsProvider> cpu_stats_provider_; 225 }; 226 227 } // namespace load_reporter 228 } // namespace grpc 229 230 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H 231