• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/cpp/server/load_reporter/load_data_store.h"
20 
21 #include <grpc/support/port_platform.h>
22 #include <stdint.h>
23 #include <stdio.h>
24 
25 #include <cstdlib>
26 #include <iterator>
27 #include <set>
28 #include <unordered_map>
29 
30 #include "absl/log/check.h"
31 #include "absl/log/log.h"
32 #include "src/core/lib/iomgr/socket_utils.h"
33 #include "src/cpp/server/load_reporter/constants.h"
34 
35 namespace grpc {
36 namespace load_reporter {
37 
38 // Some helper functions.
39 namespace {
40 
41 // Given a map from type K to a set of value type V, finds the set associated
42 // with the given key and erases the value from the set. If the set becomes
43 // empty, also erases the key-set pair. Returns true if the value is erased
44 // successfully.
45 template <typename K, typename V>
UnorderedMapOfSetEraseKeyValue(std::unordered_map<K,std::set<V>> & map,const K & key,const V & value)46 bool UnorderedMapOfSetEraseKeyValue(std::unordered_map<K, std::set<V>>& map,
47                                     const K& key, const V& value) {
48   auto it = map.find(key);
49   if (it != map.end()) {
50     size_t erased = it->second.erase(value);
51     if (it->second.empty()) {
52       map.erase(it);
53     }
54     return erased;
55   }
56   return false;
57 };
58 
59 // Given a map from type K to a set of value type V, removes the given key and
60 // the associated set, and returns the set. Returns an empty set if the key is
61 // not found.
62 template <typename K, typename V>
UnorderedMapOfSetExtract(std::unordered_map<K,std::set<V>> & map,const K & key)63 std::set<V> UnorderedMapOfSetExtract(std::unordered_map<K, std::set<V>>& map,
64                                      const K& key) {
65   auto it = map.find(key);
66   if (it != map.end()) {
67     auto set = std::move(it->second);
68     map.erase(it);
69     return set;
70   }
71   return {};
72 };
73 
74 // From a non-empty container, returns a pointer to a random element.
75 template <typename C>
RandomElement(const C & container)76 const typename C::value_type* RandomElement(const C& container) {
77   CHECK(!container.empty());
78   auto it = container.begin();
79   std::advance(it, std::rand() % container.size());
80   return &(*it);
81 }
82 
83 }  // namespace
84 
LoadRecordKey(const std::string & client_ip_and_token,std::string user_id)85 LoadRecordKey::LoadRecordKey(const std::string& client_ip_and_token,
86                              std::string user_id)
87     : user_id_(std::move(user_id)) {
88   CHECK_GE(client_ip_and_token.size(), 2u);
89   int ip_hex_size;
90   CHECK(sscanf(client_ip_and_token.substr(0, 2).c_str(), "%d", &ip_hex_size) ==
91         1);
92   CHECK(ip_hex_size == 0 || ip_hex_size == kIpv4AddressLength ||
93         ip_hex_size == kIpv6AddressLength);
94   size_t cur_pos = 2;
95   client_ip_hex_ = client_ip_and_token.substr(cur_pos, ip_hex_size);
96   cur_pos += ip_hex_size;
97   if (client_ip_and_token.size() - cur_pos < kLbIdLength) {
98     lb_id_ = kInvalidLbId;
99     lb_tag_ = "";
100   } else {
101     lb_id_ = client_ip_and_token.substr(cur_pos, kLbIdLength);
102     lb_tag_ = client_ip_and_token.substr(cur_pos + kLbIdLength);
103   }
104 }
105 
GetClientIpBytes() const106 std::string LoadRecordKey::GetClientIpBytes() const {
107   if (client_ip_hex_.empty()) {
108     return "";
109   } else if (client_ip_hex_.size() == kIpv4AddressLength) {
110     uint32_t ip_bytes;
111     if (sscanf(client_ip_hex_.c_str(), "%x", &ip_bytes) != 1) {
112       LOG(ERROR) << "Can't parse client IP (" << client_ip_hex_
113                  << ") from a hex string to an integer.";
114       return "";
115     }
116     ip_bytes = grpc_htonl(ip_bytes);
117     return std::string(reinterpret_cast<const char*>(&ip_bytes),
118                        sizeof(ip_bytes));
119   } else if (client_ip_hex_.size() == kIpv6AddressLength) {
120     uint32_t ip_bytes[4];
121     for (size_t i = 0; i < 4; ++i) {
122       if (sscanf(client_ip_hex_.substr(i * 8, (i + 1) * 8).c_str(), "%x",
123                  ip_bytes + i) != 1) {
124         LOG(ERROR) << "Can't parse client IP part ("
125                    << client_ip_hex_.substr(i * 8, (i + 1) * 8)
126                    << ") from a hex string to an integer.";
127         return "";
128       }
129       ip_bytes[i] = grpc_htonl(ip_bytes[i]);
130     }
131     return std::string(reinterpret_cast<const char*>(ip_bytes),
132                        sizeof(ip_bytes));
133   } else {
134     GPR_UNREACHABLE_CODE(return "");
135   }
136 }
137 
LoadRecordValue(std::string metric_name,uint64_t num_calls,double total_metric_value)138 LoadRecordValue::LoadRecordValue(std::string metric_name, uint64_t num_calls,
139                                  double total_metric_value) {
140   call_metrics_.emplace(std::move(metric_name),
141                         CallMetricValue(num_calls, total_metric_value));
142 }
143 
MergeRow(const LoadRecordKey & key,const LoadRecordValue & value)144 void PerBalancerStore::MergeRow(const LoadRecordKey& key,
145                                 const LoadRecordValue& value) {
146   // During suspension, the load data received will be dropped.
147   if (!suspended_) {
148     load_record_map_[key].MergeFrom(value);
149     VLOG(2) << "[PerBalancerStore " << this
150             << "] Load data merged (Key: " << key.ToString()
151             << ", Value: " << value.ToString() << ").";
152   } else {
153     VLOG(2) << "[PerBalancerStore " << this
154             << "] Load data dropped (Key: " << key.ToString()
155             << ", Value: " << value.ToString() << ").";
156   }
157   // We always keep track of num_calls_in_progress_, so that when this
158   // store is resumed, we still have a correct value of
159   // num_calls_in_progress_.
160   CHECK(static_cast<int64_t>(num_calls_in_progress_) +
161             value.GetNumCallsInProgressDelta() >=
162         0);
163   num_calls_in_progress_ += value.GetNumCallsInProgressDelta();
164 }
165 
Suspend()166 void PerBalancerStore::Suspend() {
167   suspended_ = true;
168   load_record_map_.clear();
169   VLOG(2) << "[PerBalancerStore " << this << "] Suspended.";
170 }
171 
Resume()172 void PerBalancerStore::Resume() {
173   suspended_ = false;
174   VLOG(2) << "[PerBalancerStore " << this << "] Resumed.";
175 }
176 
GetNumCallsInProgressForReport()177 uint64_t PerBalancerStore::GetNumCallsInProgressForReport() {
178   CHECK(!suspended_);
179   last_reported_num_calls_in_progress_ = num_calls_in_progress_;
180   return num_calls_in_progress_;
181 }
182 
ReportStreamCreated(const std::string & lb_id,const std::string & load_key)183 void PerHostStore::ReportStreamCreated(const std::string& lb_id,
184                                        const std::string& load_key) {
185   CHECK(lb_id != kInvalidLbId);
186   SetUpForNewLbId(lb_id, load_key);
187   // Prior to this one, there was no load balancer receiving report, so we may
188   // have unassigned orphaned stores to assign to this new balancer.
189   // TODO(juanlishen): If the load key of this new stream is the same with
190   // some previously adopted orphan store, we may want to take the orphan to
191   // this stream. Need to discuss with LB team.
192   if (assigned_stores_.size() == 1) {
193     for (const auto& p : per_balancer_stores_) {
194       const std::string& other_lb_id = p.first;
195       const std::unique_ptr<PerBalancerStore>& orphaned_store = p.second;
196       if (other_lb_id != lb_id) {
197         orphaned_store->Resume();
198         AssignOrphanedStore(orphaned_store.get(), lb_id);
199       }
200     }
201   }
202   // The first connected balancer will adopt the kInvalidLbId.
203   if (per_balancer_stores_.size() == 1) {
204     SetUpForNewLbId(kInvalidLbId, "");
205     ReportStreamClosed(kInvalidLbId);
206   }
207 }
208 
ReportStreamClosed(const std::string & lb_id)209 void PerHostStore::ReportStreamClosed(const std::string& lb_id) {
210   auto it_store_for_gone_lb = per_balancer_stores_.find(lb_id);
211   CHECK(it_store_for_gone_lb != per_balancer_stores_.end());
212   // Remove this closed stream from our records.
213   CHECK(UnorderedMapOfSetEraseKeyValue(load_key_to_receiving_lb_ids_,
214                                        it_store_for_gone_lb->second->load_key(),
215                                        lb_id));
216   std::set<PerBalancerStore*> orphaned_stores =
217       UnorderedMapOfSetExtract(assigned_stores_, lb_id);
218   // The stores that were assigned to this balancer are orphaned now. They
219   // should be re-assigned to other balancers which are still receiving reports.
220   for (PerBalancerStore* orphaned_store : orphaned_stores) {
221     const std::string* new_receiver = nullptr;
222     auto it = load_key_to_receiving_lb_ids_.find(orphaned_store->load_key());
223     if (it != load_key_to_receiving_lb_ids_.end()) {
224       // First, try to pick from the active balancers with the same load key.
225       new_receiver = RandomElement(it->second);
226     } else if (!assigned_stores_.empty()) {
227       // If failed, pick from all the remaining active balancers.
228       new_receiver = &(RandomElement(assigned_stores_)->first);
229     }
230     if (new_receiver != nullptr) {
231       AssignOrphanedStore(orphaned_store, *new_receiver);
232     } else {
233       // Load data for an LB ID that can't be assigned to any stream should
234       // be dropped.
235       orphaned_store->Suspend();
236     }
237   }
238 }
239 
FindPerBalancerStore(const std::string & lb_id) const240 PerBalancerStore* PerHostStore::FindPerBalancerStore(
241     const std::string& lb_id) const {
242   return per_balancer_stores_.find(lb_id) != per_balancer_stores_.end()
243              ? per_balancer_stores_.find(lb_id)->second.get()
244              : nullptr;
245 }
246 
GetAssignedStores(const std::string & lb_id) const247 const std::set<PerBalancerStore*>* PerHostStore::GetAssignedStores(
248     const std::string& lb_id) const {
249   auto it = assigned_stores_.find(lb_id);
250   if (it == assigned_stores_.end()) return nullptr;
251   return &(it->second);
252 }
253 
AssignOrphanedStore(PerBalancerStore * orphaned_store,const std::string & new_receiver)254 void PerHostStore::AssignOrphanedStore(PerBalancerStore* orphaned_store,
255                                        const std::string& new_receiver) {
256   auto it = assigned_stores_.find(new_receiver);
257   CHECK(it != assigned_stores_.end());
258   it->second.insert(orphaned_store);
259   LOG(INFO) << "[PerHostStore " << this << "] Re-assigned orphaned store ("
260             << orphaned_store << ") with original LB ID of "
261             << orphaned_store->lb_id() << " to new receiver " << new_receiver;
262 }
263 
SetUpForNewLbId(const std::string & lb_id,const std::string & load_key)264 void PerHostStore::SetUpForNewLbId(const std::string& lb_id,
265                                    const std::string& load_key) {
266   // The top-level caller (i.e., LoadReportService) should guarantee the
267   // lb_id is unique for each reporting stream.
268   CHECK(per_balancer_stores_.find(lb_id) == per_balancer_stores_.end());
269   CHECK(assigned_stores_.find(lb_id) == assigned_stores_.end());
270   load_key_to_receiving_lb_ids_[load_key].insert(lb_id);
271   std::unique_ptr<PerBalancerStore> per_balancer_store(
272       new PerBalancerStore(lb_id, load_key));
273   assigned_stores_[lb_id] = {per_balancer_store.get()};
274   per_balancer_stores_[lb_id] = std::move(per_balancer_store);
275 }
276 
FindPerBalancerStore(const string & hostname,const string & lb_id) const277 PerBalancerStore* LoadDataStore::FindPerBalancerStore(
278     const string& hostname, const string& lb_id) const {
279   auto it = per_host_stores_.find(hostname);
280   if (it != per_host_stores_.end()) {
281     const PerHostStore& per_host_store = it->second;
282     return per_host_store.FindPerBalancerStore(lb_id);
283   } else {
284     return nullptr;
285   }
286 }
287 
MergeRow(const std::string & hostname,const LoadRecordKey & key,const LoadRecordValue & value)288 void LoadDataStore::MergeRow(const std::string& hostname,
289                              const LoadRecordKey& key,
290                              const LoadRecordValue& value) {
291   PerBalancerStore* per_balancer_store =
292       FindPerBalancerStore(hostname, key.lb_id());
293   if (per_balancer_store != nullptr) {
294     per_balancer_store->MergeRow(key, value);
295     return;
296   }
297   // Unknown LB ID. Track it until its number of in-progress calls drops to
298   // zero.
299   int64_t in_progress_delta = value.GetNumCallsInProgressDelta();
300   if (in_progress_delta != 0) {
301     auto it_tracker = unknown_balancer_id_trackers_.find(key.lb_id());
302     if (it_tracker == unknown_balancer_id_trackers_.end()) {
303       VLOG(2) << "[LoadDataStore " << this
304               << "] Start tracking unknown balancer (lb_id_: " << key.lb_id()
305               << ").";
306       unknown_balancer_id_trackers_.insert(
307           {key.lb_id(), static_cast<uint64_t>(in_progress_delta)});
308     } else if ((it_tracker->second += in_progress_delta) == 0) {
309       unknown_balancer_id_trackers_.erase(it_tracker);
310       VLOG(2) << "[LoadDataStore " << this
311               << "] Stop tracking unknown balancer (lb_id_: " << key.lb_id()
312               << ").";
313     }
314   }
315 }
316 
GetAssignedStores(const std::string & hostname,const std::string & lb_id)317 const std::set<PerBalancerStore*>* LoadDataStore::GetAssignedStores(
318     const std::string& hostname, const std::string& lb_id) {
319   auto it = per_host_stores_.find(hostname);
320   if (it == per_host_stores_.end()) return nullptr;
321   return it->second.GetAssignedStores(lb_id);
322 }
323 
ReportStreamCreated(const std::string & hostname,const std::string & lb_id,const std::string & load_key)324 void LoadDataStore::ReportStreamCreated(const std::string& hostname,
325                                         const std::string& lb_id,
326                                         const std::string& load_key) {
327   per_host_stores_[hostname].ReportStreamCreated(lb_id, load_key);
328 }
329 
ReportStreamClosed(const std::string & hostname,const std::string & lb_id)330 void LoadDataStore::ReportStreamClosed(const std::string& hostname,
331                                        const std::string& lb_id) {
332   auto it_per_host_store = per_host_stores_.find(hostname);
333   CHECK(it_per_host_store != per_host_stores_.end());
334   it_per_host_store->second.ReportStreamClosed(lb_id);
335 }
336 
337 }  // namespace load_reporter
338 }  // namespace grpc
339