• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <stddef.h>
25 
26 #include <atomic>
27 #include <chrono>
28 #include <cstdint>
29 #include <deque>
30 #include <memory>
31 #include <string>
32 #include <unordered_map>
33 #include <utility>
34 #include <vector>
35 
36 #include <google/protobuf/repeated_ptr_field.h>
37 
38 #include "opencensus/stats/stats.h"
39 #include "opencensus/tags/tag_key.h"
40 
41 #include "src/core/lib/gprpp/sync.h"
42 #include "src/cpp/server/load_reporter/load_data_store.h"
43 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
44 
45 // IWYU pragma: no_include <ratio>
46 
47 namespace grpc {
48 namespace load_reporter {
49 
50 // The interface to get the Census stats. Abstracted for mocking.
51 class CensusViewProvider {
52  public:
53   // Maps from the view name to the view data.
54   using ViewDataMap =
55       std::unordered_map<std::string, ::opencensus::stats::ViewData>;
56   // Maps from the view name to the view descriptor.
57   using ViewDescriptorMap =
58       std::unordered_map<std::string, ::opencensus::stats::ViewDescriptor>;
59 
60   CensusViewProvider();
61   virtual ~CensusViewProvider() = default;
62 
63   // Fetches the view data accumulated since last fetching, and returns it as a
64   // map from the view name to the view data.
65   virtual ViewDataMap FetchViewData() = 0;
66 
67   // A helper function that gets a row with the input tag values from the view
68   // data. Only used when we know that row must exist because we have seen a row
69   // with the same tag values in a related view data. Several ViewData's are
70   // considered related if their views are based on the measures that are always
71   // recorded at the same time.
72   static double GetRelatedViewDataRowDouble(
73       const ViewDataMap& view_data_map, const char* view_name,
74       size_t view_name_len, const std::vector<std::string>& tag_values);
75   static uint64_t GetRelatedViewDataRowInt(
76       const ViewDataMap& view_data_map, const char* view_name,
77       size_t view_name_len, const std::vector<std::string>& tag_values);
78 
79  protected:
view_descriptor_map()80   const ViewDescriptorMap& view_descriptor_map() const {
81     return view_descriptor_map_;
82   }
83 
84  private:
85   ViewDescriptorMap view_descriptor_map_;
86   // Tag keys.
87   ::opencensus::tags::TagKey tag_key_token_;
88   ::opencensus::tags::TagKey tag_key_host_;
89   ::opencensus::tags::TagKey tag_key_user_id_;
90   ::opencensus::tags::TagKey tag_key_status_;
91   ::opencensus::tags::TagKey tag_key_metric_name_;
92 };
93 
94 // The default implementation fetches the real stats from Census.
95 class CensusViewProviderDefaultImpl : public CensusViewProvider {
96  public:
97   CensusViewProviderDefaultImpl();
98 
99   ViewDataMap FetchViewData() override;
100 
101  private:
102   std::unordered_map<std::string, ::opencensus::stats::View> view_map_;
103 };
104 
105 // The interface to get the CPU stats. Abstracted for mocking.
106 class CpuStatsProvider {
107  public:
108   // The used and total amounts of CPU usage.
109   using CpuStatsSample = std::pair<uint64_t, uint64_t>;
110 
111   virtual ~CpuStatsProvider() = default;
112 
113   // Gets the cumulative used CPU and total CPU resource.
114   virtual CpuStatsSample GetCpuStats() = 0;
115 };
116 
117 // The default implementation reads CPU jiffies from the system to calculate CPU
118 // utilization.
119 class CpuStatsProviderDefaultImpl : public CpuStatsProvider {
120  public:
121   CpuStatsSample GetCpuStats() override;
122 };
123 
124 // Maintains all the load data and load reporting streams.
125 class LoadReporter {
126  public:
127   // TODO(juanlishen): Allow config for providers from users.
LoadReporter(uint32_t feedback_sample_window_seconds,std::unique_ptr<CensusViewProvider> census_view_provider,std::unique_ptr<CpuStatsProvider> cpu_stats_provider)128   LoadReporter(uint32_t feedback_sample_window_seconds,
129                std::unique_ptr<CensusViewProvider> census_view_provider,
130                std::unique_ptr<CpuStatsProvider> cpu_stats_provider)
131       : feedback_sample_window_seconds_(feedback_sample_window_seconds),
132         census_view_provider_(std::move(census_view_provider)),
133         cpu_stats_provider_(std::move(cpu_stats_provider)) {
134     // Append the initial record so that the next real record can have a base.
135     AppendNewFeedbackRecord(0, 0);
136   }
137 
138   // Fetches the latest data from Census and merge it into the data store.
139   // Also adds a new sample to the LB feedback sliding window.
140   // Thread-unsafe. (1). The access to the load data store and feedback records
141   // has locking. (2). The access to the Census view provider and CPU stats
142   // provider lacks locking, but we only access these two members in this method
143   // (in testing, we also access them when setting up expectation). So the
144   // invocations of this method must be serialized.
145   void FetchAndSample();
146 
147   // Generates a report for that host and balancer. The report contains
148   // all the stats data accumulated between the last report (i.e., the last
149   // consumption) and the last fetch from Census (i.e., the last production).
150   // Thread-safe.
151   ::google::protobuf::RepeatedPtrField<grpc::lb::v1::Load> GenerateLoads(
152       const std::string& hostname, const std::string& lb_id);
153 
154   // The feedback is calculated from the stats data recorded in the sliding
155   // window. Outdated records are discarded.
156   // Thread-safe.
157   grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback();
158 
159   // Wrapper around LoadDataStore::ReportStreamCreated.
160   // Thread-safe.
161   void ReportStreamCreated(const std::string& hostname,
162                            const std::string& lb_id,
163                            const std::string& load_key);
164 
165   // Wrapper around LoadDataStore::ReportStreamClosed.
166   // Thread-safe.
167   void ReportStreamClosed(const std::string& hostname,
168                           const std::string& lb_id);
169 
170   // Generates a unique LB ID of length kLbIdLength. Returns an empty string
171   // upon failure. Thread-safe.
172   std::string GenerateLbId();
173 
174   // Accessors only for testing.
census_view_provider()175   CensusViewProvider* census_view_provider() {
176     return census_view_provider_.get();
177   }
cpu_stats_provider()178   CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); }
179 
180  private:
181   struct LoadBalancingFeedbackRecord {
182     std::chrono::system_clock::time_point end_time;
183     uint64_t rpcs;
184     uint64_t errors;
185     uint64_t cpu_usage;
186     uint64_t cpu_limit;
187 
LoadBalancingFeedbackRecordLoadBalancingFeedbackRecord188     LoadBalancingFeedbackRecord(
189         const std::chrono::system_clock::time_point& end_time, uint64_t rpcs,
190         uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit)
191         : end_time(end_time),
192           rpcs(rpcs),
193           errors(errors),
194           cpu_usage(cpu_usage),
195           cpu_limit(cpu_limit) {}
196   };
197 
198   // Finds the view data about starting call from the view_data_map and merges
199   // the data to the load data store.
200   void ProcessViewDataCallStart(
201       const CensusViewProvider::ViewDataMap& view_data_map);
202   // Finds the view data about ending call from the view_data_map and merges the
203   // data to the load data store.
204   void ProcessViewDataCallEnd(
205       const CensusViewProvider::ViewDataMap& view_data_map);
206   // Finds the view data about the customized call metrics from the
207   // view_data_map and merges the data to the load data store.
208   void ProcessViewDataOtherCallMetrics(
209       const CensusViewProvider::ViewDataMap& view_data_map);
210 
IsRecordInWindow(const LoadBalancingFeedbackRecord & record,std::chrono::system_clock::time_point now)211   bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record,
212                         std::chrono::system_clock::time_point now) {
213     return record.end_time > now - feedback_sample_window_seconds_;
214   }
215 
216   void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors);
217 
218   // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches
219   // it to the load.
220   void AttachOrphanLoadId(grpc::lb::v1::Load* load,
221                           const PerBalancerStore& per_balancer_store);
222 
223   std::atomic<int64_t> next_lb_id_{0};
224   const std::chrono::seconds feedback_sample_window_seconds_;
225   grpc_core::Mutex feedback_mu_;
226   std::deque<LoadBalancingFeedbackRecord> feedback_records_;
227   // TODO(juanlishen): Lock in finer grain. Locking the whole store may be
228   // too expensive.
229   grpc_core::Mutex store_mu_;
230   LoadDataStore load_data_store_;
231   std::unique_ptr<CensusViewProvider> census_view_provider_;
232   std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;
233 };
234 
235 }  // namespace load_reporter
236 }  // namespace grpc
237 
238 #endif  // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
239