• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <atomic>
25 #include <chrono>
26 #include <deque>
27 #include <vector>
28 
29 #include <grpc/support/log.h>
30 #include <grpcpp/impl/codegen/config.h>
31 
32 #include "src/core/lib/gprpp/sync.h"
33 #include "src/cpp/server/load_reporter/load_data_store.h"
34 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
35 
36 #include "opencensus/stats/stats.h"
37 #include "opencensus/tags/tag_key.h"
38 
39 namespace grpc {
40 namespace load_reporter {
41 
42 // The interface to get the Census stats. Abstracted for mocking.
43 class CensusViewProvider {
44  public:
45   // Maps from the view name to the view data.
46   using ViewDataMap =
47       std::unordered_map<std::string, ::opencensus::stats::ViewData>;
48   // Maps from the view name to the view descriptor.
49   using ViewDescriptorMap =
50       std::unordered_map<std::string, ::opencensus::stats::ViewDescriptor>;
51 
52   CensusViewProvider();
53   virtual ~CensusViewProvider() = default;
54 
55   // Fetches the view data accumulated since last fetching, and returns it as a
56   // map from the view name to the view data.
57   virtual ViewDataMap FetchViewData() = 0;
58 
59   // A helper function that gets a row with the input tag values from the view
60   // data. Only used when we know that row must exist because we have seen a row
61   // with the same tag values in a related view data. Several ViewData's are
62   // considered related if their views are based on the measures that are always
63   // recorded at the same time.
64   static double GetRelatedViewDataRowDouble(
65       const ViewDataMap& view_data_map, const char* view_name,
66       size_t view_name_len, const std::vector<std::string>& tag_values);
67   static uint64_t GetRelatedViewDataRowInt(
68       const ViewDataMap& view_data_map, const char* view_name,
69       size_t view_name_len, const std::vector<std::string>& tag_values);
70 
71  protected:
view_descriptor_map()72   const ViewDescriptorMap& view_descriptor_map() const {
73     return view_descriptor_map_;
74   }
75 
76  private:
77   ViewDescriptorMap view_descriptor_map_;
78   // Tag keys.
79   ::opencensus::tags::TagKey tag_key_token_;
80   ::opencensus::tags::TagKey tag_key_host_;
81   ::opencensus::tags::TagKey tag_key_user_id_;
82   ::opencensus::tags::TagKey tag_key_status_;
83   ::opencensus::tags::TagKey tag_key_metric_name_;
84 };
85 
86 // The default implementation fetches the real stats from Census.
87 class CensusViewProviderDefaultImpl : public CensusViewProvider {
88  public:
89   CensusViewProviderDefaultImpl();
90 
91   ViewDataMap FetchViewData() override;
92 
93  private:
94   std::unordered_map<std::string, ::opencensus::stats::View> view_map_;
95 };
96 
97 // The interface to get the CPU stats. Abstracted for mocking.
98 class CpuStatsProvider {
99  public:
100   // The used and total amounts of CPU usage.
101   using CpuStatsSample = std::pair<uint64_t, uint64_t>;
102 
103   virtual ~CpuStatsProvider() = default;
104 
105   // Gets the cumulative used CPU and total CPU resource.
106   virtual CpuStatsSample GetCpuStats() = 0;
107 };
108 
109 // The default implementation reads CPU jiffies from the system to calculate CPU
110 // utilization.
111 class CpuStatsProviderDefaultImpl : public CpuStatsProvider {
112  public:
113   CpuStatsSample GetCpuStats() override;
114 };
115 
116 // Maintains all the load data and load reporting streams.
117 class LoadReporter {
118  public:
119   // TODO(juanlishen): Allow config for providers from users.
LoadReporter(uint32_t feedback_sample_window_seconds,std::unique_ptr<CensusViewProvider> census_view_provider,std::unique_ptr<CpuStatsProvider> cpu_stats_provider)120   LoadReporter(uint32_t feedback_sample_window_seconds,
121                std::unique_ptr<CensusViewProvider> census_view_provider,
122                std::unique_ptr<CpuStatsProvider> cpu_stats_provider)
123       : feedback_sample_window_seconds_(feedback_sample_window_seconds),
124         census_view_provider_(std::move(census_view_provider)),
125         cpu_stats_provider_(std::move(cpu_stats_provider)) {
126     // Append the initial record so that the next real record can have a base.
127     AppendNewFeedbackRecord(0, 0);
128   }
129 
130   // Fetches the latest data from Census and merge it into the data store.
131   // Also adds a new sample to the LB feedback sliding window.
132   // Thread-unsafe. (1). The access to the load data store and feedback records
133   // has locking. (2). The access to the Census view provider and CPU stats
134   // provider lacks locking, but we only access these two members in this method
135   // (in testing, we also access them when setting up expectation). So the
136   // invocations of this method must be serialized.
137   void FetchAndSample();
138 
139   // Generates a report for that host and balancer. The report contains
140   // all the stats data accumulated between the last report (i.e., the last
141   // consumption) and the last fetch from Census (i.e., the last production).
142   // Thread-safe.
143   ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads(
144       const std::string& hostname, const std::string& lb_id);
145 
146   // The feedback is calculated from the stats data recorded in the sliding
147   // window. Outdated records are discarded.
148   // Thread-safe.
149   ::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback();
150 
151   // Wrapper around LoadDataStore::ReportStreamCreated.
152   // Thread-safe.
153   void ReportStreamCreated(const std::string& hostname,
154                            const std::string& lb_id,
155                            const std::string& load_key);
156 
157   // Wrapper around LoadDataStore::ReportStreamClosed.
158   // Thread-safe.
159   void ReportStreamClosed(const std::string& hostname,
160                           const std::string& lb_id);
161 
162   // Generates a unique LB ID of length kLbIdLength. Returns an empty string
163   // upon failure. Thread-safe.
164   std::string GenerateLbId();
165 
166   // Accessors only for testing.
census_view_provider()167   CensusViewProvider* census_view_provider() {
168     return census_view_provider_.get();
169   }
cpu_stats_provider()170   CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); }
171 
172  private:
173   struct LoadBalancingFeedbackRecord {
174     std::chrono::system_clock::time_point end_time;
175     uint64_t rpcs;
176     uint64_t errors;
177     uint64_t cpu_usage;
178     uint64_t cpu_limit;
179 
LoadBalancingFeedbackRecordLoadBalancingFeedbackRecord180     LoadBalancingFeedbackRecord(
181         const std::chrono::system_clock::time_point& end_time, uint64_t rpcs,
182         uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit)
183         : end_time(end_time),
184           rpcs(rpcs),
185           errors(errors),
186           cpu_usage(cpu_usage),
187           cpu_limit(cpu_limit) {}
188   };
189 
190   // Finds the view data about starting call from the view_data_map and merges
191   // the data to the load data store.
192   void ProcessViewDataCallStart(
193       const CensusViewProvider::ViewDataMap& view_data_map);
194   // Finds the view data about ending call from the view_data_map and merges the
195   // data to the load data store.
196   void ProcessViewDataCallEnd(
197       const CensusViewProvider::ViewDataMap& view_data_map);
198   // Finds the view data about the customized call metrics from the
199   // view_data_map and merges the data to the load data store.
200   void ProcessViewDataOtherCallMetrics(
201       const CensusViewProvider::ViewDataMap& view_data_map);
202 
IsRecordInWindow(const LoadBalancingFeedbackRecord & record,std::chrono::system_clock::time_point now)203   bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record,
204                         std::chrono::system_clock::time_point now) {
205     return record.end_time > now - feedback_sample_window_seconds_;
206   }
207 
208   void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors);
209 
210   // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches
211   // it to the load.
212   void AttachOrphanLoadId(::grpc::lb::v1::Load* load,
213                           const PerBalancerStore& per_balancer_store);
214 
215   std::atomic<int64_t> next_lb_id_{0};
216   const std::chrono::seconds feedback_sample_window_seconds_;
217   grpc_core::Mutex feedback_mu_;
218   std::deque<LoadBalancingFeedbackRecord> feedback_records_;
219   // TODO(juanlishen): Lock in finer grain. Locking the whole store may be
220   // too expensive.
221   grpc_core::Mutex store_mu_;
222   LoadDataStore load_data_store_;
223   std::unique_ptr<CensusViewProvider> census_view_provider_;
224   std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;
225 };
226 
227 }  // namespace load_reporter
228 }  // namespace grpc
229 
230 #endif  // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
231