• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/util/tdigest.h"
16 
17 #include "absl/log/check.h"
18 #include "absl/log/log.h"
19 #include "absl/strings/str_cat.h"
20 #include "absl/strings/str_split.h"
21 
22 namespace grpc_core {
23 
24 namespace {
25 
26 constexpr double kNan = std::numeric_limits<double>::quiet_NaN();
27 constexpr double kMaxCompression = 1e6;
28 constexpr double kPi = 3.14159265358979323846;
29 
30 // Returns the minimum of compression and kMaxCompression.
BoundedCompression(double compression)31 double BoundedCompression(double compression) {
32   static_assert(8 * kMaxCompression < std::numeric_limits<int64_t>::max(),
33                 "kMaxCompression must be smaller than max_int64/8.");
34   return std::min(kMaxCompression, compression);
35 }
36 
37 // Returns the maximum centroids that can be generated by the merging t-digest.
MaxCentroids(double compression)38 size_t MaxCentroids(double compression) {
39   compression = BoundedCompression(compression);
40   return 2 * static_cast<size_t>(std::ceil(compression));
41 }
42 
LinearInterpolate(double val1,double val2,double weight1,double weight2)43 double LinearInterpolate(double val1, double val2, double weight1,
44                          double weight2) {
45   DCHECK_GE(weight1, 0);
46   DCHECK_GE(weight2, 0);
47   DCHECK_GT(weight1 + weight2, 0);
48   return (val1 * weight1 + val2 * weight2) / (weight1 + weight2);
49 }
50 
51 }  // namespace
52 
TDigest(double compression)53 TDigest::TDigest(double compression) { Reset(compression); }
54 
Reset(double compression)55 void TDigest::Reset(double compression) {
56   compression_ = BoundedCompression(compression);
57   // Set the default batch_size to 4 times the number of centroids.
58   batch_size_ = static_cast<int64_t>(4 * MaxCentroids(compression_));
59   DCHECK(compression_ == 0.0 || batch_size_ > 0);
60   centroids_.reserve(MaxCentroids(compression_) + batch_size_);
61   centroids_.clear();
62   merged_ = 0;
63   unmerged_ = 0;
64   min_ = std::numeric_limits<double>::max();
65   max_ = std::numeric_limits<double>::lowest();
66   sum_ = 0;
67   count_ = 0;
68 }
69 
Add(double val,int64_t count)70 void TDigest::Add(double val, int64_t count) {
71   if (count == 0) {
72     return;
73   }
74   // Single sample is considered discrete.
75   UpdateStats(/*min=*/val, /*max=*/val, /*sum=*/val * count, count);
76   AddUnmergedCentroid(CentroidPod(val, count));
77 }
78 
AddUnmergedCentroid(const CentroidPod & centroid)79 void TDigest::AddUnmergedCentroid(const CentroidPod& centroid) {
80   DCHECK_LT(unmerged_, batch_size_);
81 
82   centroids_.push_back(centroid);
83   ++unmerged_;
84   if (unmerged_ == batch_size_) {
85     DoMerge();
86   }
87 }
88 
Merge(const TDigest & that)89 void TDigest::Merge(const TDigest& that) {
90   if (compression_ == 0.0) {
91     Reset(that.Compression());
92   }
93 
94   UpdateStats(that.Min(), that.Max(), that.Sum(), that.Count());
95 
96   for (const auto& centroid : that.centroids_) {
97     AddUnmergedCentroid(centroid);
98   }
99 }
100 
QuantileToCentroid(double quantile) const101 double TDigest::QuantileToCentroid(double quantile) const {
102   return compression_ * (std::asin(2 * quantile - 1) + kPi / 2) / kPi;
103 }
104 
CentroidToQuantile(double centroid) const105 double TDigest::CentroidToQuantile(double centroid) const {
106   centroid = std::min(centroid, compression_);
107   return (sin(centroid * kPi / compression_ - kPi / 2) + 1) / 2;
108 }
109 
110 // TODO(ysseung): Maybe try bi-directional merge to lower 1p error rate. Error
111 // rates are consistently higher at 1p. This is likely due to greedy merge from
112 // left. But we care 99p more and this may be just fine.
DoMerge()113 void TDigest::DoMerge() {
114   if (unmerged_ == 0) {
115     return;
116   }
117 
118   // We first sort the centroids, and assume the first centroid is merged,
119   // and the rest are unmerged.
120   DCHECK(!centroids_.empty());
121   std::sort(centroids_.begin(), centroids_.end());
122   unmerged_ += merged_ - 1;
123   merged_ = 1;
124 
125   const int64_t total_count = count_;
126 
127   double q0 = 0;
128   // This is actually S * q_{limit} in the paper, not exactly q_limit.
129   // But, keeping the scaled value results in eliminating the division in the
130   // hotpath. Also, it is closer to the reference implementation.
131   double q_limit = total_count * CentroidToQuantile(q0 + 1);
132 
133   // When non-discrete, the sum value may change due to floating point errors
134   // every time centroids are merged. We must correct this each time by keeping
135   // it as much in sync with current centroids as possible to keep this error
136   // bounded.
137   sum_ = 0;
138 
139   auto last_merged = centroids_.begin();
140   auto first_unmerged = last_merged + 1;
141   int64_t merged_count = last_merged->count;
142   for (; unmerged_ > 0; --unmerged_, ++first_unmerged) {
143     // Simply merge, if the last merged centroid has enough room for the last
144     // unmerged element.
145     if (first_unmerged->count + merged_count <= q_limit) {
146       // Note that here we use the Welford's method, and
147       // count must be updated before mean.
148       last_merged->count += first_unmerged->count;
149       last_merged->mean +=
150           ((first_unmerged->mean - last_merged->mean) * first_unmerged->count) /
151           last_merged->count;
152       merged_count += first_unmerged->count;
153       continue;
154     }
155 
156     // Now we need to move onto the next centroid to merge the first unmerged.
157     q0 = QuantileToCentroid(static_cast<double>(merged_count) / total_count);
158     q_limit = total_count * CentroidToQuantile(q0 + 1);
159     merged_count += first_unmerged->count;
160     sum_ += last_merged->mean * last_merged->count;
161     ++merged_;
162     ++last_merged;
163     *last_merged = *first_unmerged;
164   }
165   sum_ += last_merged->mean * last_merged->count;
166 
167   unmerged_ = 0;
168   centroids_.resize(merged_);
169   if (!centroids_.empty()) {
170     min_ = std::min(min_, centroids_.front().mean);
171     max_ = std::max(max_, centroids_.back().mean);
172   }
173   DCHECK_LE(centroids_.size(), MaxCentroids(compression_));
174 }
175 
176 // We use linear interpolation between mid points of centroids when calculating
177 // Cdf() and Percentile(). All unmerged centoirds are merged first so that they
178 // are strongly ordered, then we use linear interpolation with points:
179 //
180 //   (percentile, value) = (0, min), (count[0] / 2, mean[0]), ..
181 //                         ((count[i-1]+count[i])/2, mean[i]), ..
182 //                         (count[last], max)
183 //
184 // the CDF from centroids with interpolation points marked with '*':
185 //
186 //    count
187 //       |
188 //  +c[2]|                  --------*
189 // (=tot)|                  |       |
190 //       |                  *       |
191 //  +c[1]|          --------|       |
192 //       |          |               |
193 //       |          *               |
194 //  +c[0]|     -----|               |
195 //       |     |                    |
196 //       |     *                    |
197 //       |     |                    |
198 //     0 *----------------------------- value
199 //      min  m[0] m[1]     m[2]    max
200 //
Cdf(double val)201 double TDigest::Cdf(double val) {
202   DoMerge();
203 
204   if (merged_ == 0) {
205     return kNan;
206   }
207 
208   if (val < min_) {
209     return 0;
210   }
211 
212   // We diverge from the spec here. If value == max == min, we return 1.
213   if (val >= max_) {
214     return 1;
215   }
216   DCHECK_NE(min_, max_);
217 
218   if (merged_ == 1) {
219     return (val - min_) / (min_ - max_);
220   }
221 
222   if (val < centroids_[0].mean) {
223     return LinearInterpolate(
224         0.0, static_cast<double>(centroids_[0].count) / count_ / 2.0,
225         centroids_[0].mean - val, val - min_);
226   }
227 
228   if (val >= centroids_.back().mean) {
229     return LinearInterpolate(
230         1.0 - static_cast<double>(centroids_.back().count) / count_ / 2.0, 1,
231         max_ - val, val - centroids_.back().mean);
232   }
233 
234   double accum_count = centroids_[0].count / 2.0;
235   for (size_t i = 0; i < centroids_.size(); ++i) {
236     if (centroids_[i].mean == val) {
237       double prev_accum_count = accum_count;
238       // We may have centroids of the same mean. We need to sum their counts
239       // and then interpolate.
240       for (; centroids_[i + 1].mean == val; ++i) {
241         accum_count += centroids_[i].count + centroids_[i + 1].count;
242       }
243       return (prev_accum_count + accum_count) / 2.0 / count_;
244     }
245     if (centroids_[i].mean <= val && val < centroids_[i + 1].mean) {
246       auto mean1 = centroids_[i].mean;
247       auto mean2 = centroids_[i + 1].mean;
248       double mean_ratio;
249       // guard against double madness.
250       if (mean2 <= mean1) {
251         mean_ratio = 1;
252       } else {
253         mean_ratio = (val - mean1) / (mean2 - mean1);
254       }
255       double delta_count =
256           (centroids_[i].count + centroids_[i + 1].count) / 2.0;
257       return (accum_count + delta_count * mean_ratio) / count_;
258     }
259     accum_count += (centroids_[i].count + centroids_[i + 1].count) / 2.0;
260   }
261 
262   LOG(DFATAL) << "Cannot measure CDF for: " << val;
263   return kNan;
264 }
265 
Quantile(double quantile)266 double TDigest::Quantile(double quantile) {
267   DCHECK_LE(quantile, 1);
268   DCHECK_GE(quantile, 0);
269 
270   DoMerge();
271 
272   if (merged_ == 0) {
273     return kNan;
274   }
275   if (merged_ == 1) {
276     return centroids_[0].mean;
277   }
278 
279   const double quantile_count = quantile * count_;
280   double prev_count = 0;
281   double prev_val = min_;
282   double this_count = centroids_[0].count / 2.0;
283   double this_val = centroids_[0].mean;
284 
285   for (size_t i = 0; i < centroids_.size(); ++i) {
286     if (quantile_count < this_count) {
287       break;
288     }
289 
290     prev_count = this_count;
291     prev_val = this_val;
292 
293     if (i == centroids_.size() - 1) {
294       // Interpolate between max and the last centroid.
295       this_count = count_;
296       this_val = max_;
297     } else {
298       this_count += (centroids_[i].count + centroids_[i + 1].count) / 2.0;
299       this_val = centroids_[i + 1].mean;
300     }
301   }
302 
303   return LinearInterpolate(prev_val, this_val, this_count - quantile_count,
304                            quantile_count - prev_count);
305 }
306 
ToString()307 std::string TDigest::ToString() {
308   std::string str = absl::StrCat(compression_);
309   if (count_ <= 1) {
310     if (count_ == 0) {
311       // Note the string representation serializes min/max = 0 when empty.
312       return absl::StrAppendFormat(&str, "/0/0/0/0");
313     }
314     return absl::StrAppendFormat(&str, "/%0.17g", centroids_.front().mean);
315   }
316 
317   DoMerge();
318 
319   absl::StrAppendFormat(&str, "/%0.17g/%0.17g/%0.17g/%d", min_, max_, sum_,
320                         count_);
321 
322   for (auto& centroid : centroids_) {
323     absl::StrAppendFormat(&str, "/%0.17g:%d", centroid.mean, centroid.count);
324   }
325   return str;
326 }
327 
FromString(absl::string_view string)328 absl::Status TDigest::FromString(absl::string_view string) {
329   // Accept an empty string as 'not set'.
330   // Although ToString() never produces an empty string, an empty string is
331   // still expected when a t-Digest is missing.
332   if (string.empty()) {
333     Reset(0);
334     return absl::OkStatus();
335   }
336 
337   const std::vector<absl::string_view> tokens = absl::StrSplit(string, '/');
338   auto iter = tokens.begin();
339 
340   // First token (compression and discrete).
341   if (iter == tokens.end() || iter->empty()) {
342     return absl::InvalidArgumentError("No compression/discrete.");
343   }
344 
345   double double_val;
346   if (!absl::SimpleAtod(iter->substr(0, iter->length()), &double_val) ||
347       double_val < 0) {
348     return absl::InvalidArgumentError(
349         absl::StrCat("Invalid double_val/discrete: ", *iter));
350   }
351 
352   Reset(double_val);
353 
354   if (++iter == tokens.end()) {
355     return absl::InvalidArgumentError("Unexpected end of string.");
356   }
357 
358   // Single-valued t-Digest.
359   if ((iter + 1) == tokens.end()) {
360     if (!absl::SimpleAtod(*iter, &double_val)) {
361       return absl::InvalidArgumentError(
362           absl::StrCat("Invalid single-value: ", *iter));
363     }
364     Add(double_val, 1);
365     return absl::OkStatus();
366   }
367 
368   // Parse min/max/sum/count.
369   double min = 0.0, max = 0.0, sum = 0.0;
370   int64_t count = 0;
371   if (iter == tokens.end() || !absl::SimpleAtod(*iter, &min) ||
372       ++iter == tokens.end() || !absl::SimpleAtod(*iter, &max) ||
373       ++iter == tokens.end() || !absl::SimpleAtod(*iter, &sum) ||
374       ++iter == tokens.end() || !absl::SimpleAtoi(*iter, &count)) {
375     return absl::InvalidArgumentError("Invalid min, max, sum, or count.");
376   }
377 
378   // Empty. Note the string representation serializes min/max = 0 when empty.
379   if (++iter == tokens.end()) {
380     if (min != 0 || max != 0 || count != 0 || sum != 0) {
381       return absl::InvalidArgumentError(
382           "Empty t-Digest with non-zero min, max, sum, or count.");
383     }
384     return absl::OkStatus();
385   }
386 
387   // Parse centroids.
388   int64_t int_val = 0;
389 
390   for (; iter != tokens.end(); ++iter) {
391     const auto pos = iter->find_first_of(':');
392     if (pos == absl::string_view::npos ||
393         !absl::SimpleAtod(iter->substr(0, pos), &double_val) ||
394         !absl::SimpleAtoi(iter->substr(pos + 1), &int_val)) {
395       return absl::InvalidArgumentError(
396           absl::StrCat("Invalid centroid: ", *iter));
397     }
398     Add(double_val, int_val);
399   }
400 
401   DoMerge();
402   min_ = min;
403   max_ = max;
404 
405   if (centroids_.empty()) {
406     return absl::OkStatus();
407   }
408 
409   // Validate min/max/sum/count.
410   DCHECK_LT(std::abs(sum - sum_), 1e-10) << "Invalid sum value.";
411 
412   if (count != count_) {
413     return absl::InvalidArgumentError("Invalid count value.");
414   }
415 
416   return absl::OkStatus();
417 }
418 
MemUsageBytes() const419 size_t TDigest::MemUsageBytes() const {
420   return sizeof(TDigest) + (sizeof(CentroidPod) * centroids_.capacity());
421 }
422 
423 }  // namespace grpc_core
424