1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/util/tdigest.h"
16
17 #include "absl/log/check.h"
18 #include "absl/log/log.h"
19 #include "absl/strings/str_cat.h"
20 #include "absl/strings/str_split.h"
21
22 namespace grpc_core {
23
24 namespace {
25
26 constexpr double kNan = std::numeric_limits<double>::quiet_NaN();
27 constexpr double kMaxCompression = 1e6;
28 constexpr double kPi = 3.14159265358979323846;
29
30 // Returns the minimum of compression and kMaxCompression.
BoundedCompression(double compression)31 double BoundedCompression(double compression) {
32 static_assert(8 * kMaxCompression < std::numeric_limits<int64_t>::max(),
33 "kMaxCompression must be smaller than max_int64/8.");
34 return std::min(kMaxCompression, compression);
35 }
36
37 // Returns the maximum centroids that can be generated by the merging t-digest.
MaxCentroids(double compression)38 size_t MaxCentroids(double compression) {
39 compression = BoundedCompression(compression);
40 return 2 * static_cast<size_t>(std::ceil(compression));
41 }
42
LinearInterpolate(double val1,double val2,double weight1,double weight2)43 double LinearInterpolate(double val1, double val2, double weight1,
44 double weight2) {
45 DCHECK_GE(weight1, 0);
46 DCHECK_GE(weight2, 0);
47 DCHECK_GT(weight1 + weight2, 0);
48 return (val1 * weight1 + val2 * weight2) / (weight1 + weight2);
49 }
50
51 } // namespace
52
TDigest(double compression)53 TDigest::TDigest(double compression) { Reset(compression); }
54
Reset(double compression)55 void TDigest::Reset(double compression) {
56 compression_ = BoundedCompression(compression);
57 // Set the default batch_size to 4 times the number of centroids.
58 batch_size_ = static_cast<int64_t>(4 * MaxCentroids(compression_));
59 DCHECK(compression_ == 0.0 || batch_size_ > 0);
60 centroids_.reserve(MaxCentroids(compression_) + batch_size_);
61 centroids_.clear();
62 merged_ = 0;
63 unmerged_ = 0;
64 min_ = std::numeric_limits<double>::max();
65 max_ = std::numeric_limits<double>::lowest();
66 sum_ = 0;
67 count_ = 0;
68 }
69
Add(double val,int64_t count)70 void TDigest::Add(double val, int64_t count) {
71 if (count == 0) {
72 return;
73 }
74 // Single sample is considered discrete.
75 UpdateStats(/*min=*/val, /*max=*/val, /*sum=*/val * count, count);
76 AddUnmergedCentroid(CentroidPod(val, count));
77 }
78
AddUnmergedCentroid(const CentroidPod & centroid)79 void TDigest::AddUnmergedCentroid(const CentroidPod& centroid) {
80 DCHECK_LT(unmerged_, batch_size_);
81
82 centroids_.push_back(centroid);
83 ++unmerged_;
84 if (unmerged_ == batch_size_) {
85 DoMerge();
86 }
87 }
88
Merge(const TDigest & that)89 void TDigest::Merge(const TDigest& that) {
90 if (compression_ == 0.0) {
91 Reset(that.Compression());
92 }
93
94 UpdateStats(that.Min(), that.Max(), that.Sum(), that.Count());
95
96 for (const auto& centroid : that.centroids_) {
97 AddUnmergedCentroid(centroid);
98 }
99 }
100
QuantileToCentroid(double quantile) const101 double TDigest::QuantileToCentroid(double quantile) const {
102 return compression_ * (std::asin(2 * quantile - 1) + kPi / 2) / kPi;
103 }
104
CentroidToQuantile(double centroid) const105 double TDigest::CentroidToQuantile(double centroid) const {
106 centroid = std::min(centroid, compression_);
107 return (sin(centroid * kPi / compression_ - kPi / 2) + 1) / 2;
108 }
109
110 // TODO(ysseung): Maybe try bi-directional merge to lower 1p error rate. Error
111 // rates are consistently higher at 1p. This is likely due to greedy merge from
112 // left. But we care 99p more and this may be just fine.
DoMerge()113 void TDigest::DoMerge() {
114 if (unmerged_ == 0) {
115 return;
116 }
117
118 // We first sort the centroids, and assume the first centroid is merged,
119 // and the rest are unmerged.
120 DCHECK(!centroids_.empty());
121 std::sort(centroids_.begin(), centroids_.end());
122 unmerged_ += merged_ - 1;
123 merged_ = 1;
124
125 const int64_t total_count = count_;
126
127 double q0 = 0;
128 // This is actually S * q_{limit} in the paper, not exactly q_limit.
129 // But, keeping the scaled value results in eliminating the division in the
130 // hotpath. Also, it is closer to the reference implementation.
131 double q_limit = total_count * CentroidToQuantile(q0 + 1);
132
133 // When non-discrete, the sum value may change due to floating point errors
134 // every time centroids are merged. We must correct this each time by keeping
135 // it as much in sync with current centroids as possible to keep this error
136 // bounded.
137 sum_ = 0;
138
139 auto last_merged = centroids_.begin();
140 auto first_unmerged = last_merged + 1;
141 int64_t merged_count = last_merged->count;
142 for (; unmerged_ > 0; --unmerged_, ++first_unmerged) {
143 // Simply merge, if the last merged centroid has enough room for the last
144 // unmerged element.
145 if (first_unmerged->count + merged_count <= q_limit) {
146 // Note that here we use the Welford's method, and
147 // count must be updated before mean.
148 last_merged->count += first_unmerged->count;
149 last_merged->mean +=
150 ((first_unmerged->mean - last_merged->mean) * first_unmerged->count) /
151 last_merged->count;
152 merged_count += first_unmerged->count;
153 continue;
154 }
155
156 // Now we need to move onto the next centroid to merge the first unmerged.
157 q0 = QuantileToCentroid(static_cast<double>(merged_count) / total_count);
158 q_limit = total_count * CentroidToQuantile(q0 + 1);
159 merged_count += first_unmerged->count;
160 sum_ += last_merged->mean * last_merged->count;
161 ++merged_;
162 ++last_merged;
163 *last_merged = *first_unmerged;
164 }
165 sum_ += last_merged->mean * last_merged->count;
166
167 unmerged_ = 0;
168 centroids_.resize(merged_);
169 if (!centroids_.empty()) {
170 min_ = std::min(min_, centroids_.front().mean);
171 max_ = std::max(max_, centroids_.back().mean);
172 }
173 DCHECK_LE(centroids_.size(), MaxCentroids(compression_));
174 }
175
176 // We use linear interpolation between mid points of centroids when calculating
177 // Cdf() and Percentile(). All unmerged centoirds are merged first so that they
178 // are strongly ordered, then we use linear interpolation with points:
179 //
180 // (percentile, value) = (0, min), (count[0] / 2, mean[0]), ..
181 // ((count[i-1]+count[i])/2, mean[i]), ..
182 // (count[last], max)
183 //
184 // the CDF from centroids with interpolation points marked with '*':
185 //
186 // count
187 // |
188 // +c[2]| --------*
189 // (=tot)| | |
190 // | * |
191 // +c[1]| --------| |
192 // | | |
193 // | * |
194 // +c[0]| -----| |
195 // | | |
196 // | * |
197 // | | |
198 // 0 *----------------------------- value
199 // min m[0] m[1] m[2] max
200 //
Cdf(double val)201 double TDigest::Cdf(double val) {
202 DoMerge();
203
204 if (merged_ == 0) {
205 return kNan;
206 }
207
208 if (val < min_) {
209 return 0;
210 }
211
212 // We diverge from the spec here. If value == max == min, we return 1.
213 if (val >= max_) {
214 return 1;
215 }
216 DCHECK_NE(min_, max_);
217
218 if (merged_ == 1) {
219 return (val - min_) / (min_ - max_);
220 }
221
222 if (val < centroids_[0].mean) {
223 return LinearInterpolate(
224 0.0, static_cast<double>(centroids_[0].count) / count_ / 2.0,
225 centroids_[0].mean - val, val - min_);
226 }
227
228 if (val >= centroids_.back().mean) {
229 return LinearInterpolate(
230 1.0 - static_cast<double>(centroids_.back().count) / count_ / 2.0, 1,
231 max_ - val, val - centroids_.back().mean);
232 }
233
234 double accum_count = centroids_[0].count / 2.0;
235 for (size_t i = 0; i < centroids_.size(); ++i) {
236 if (centroids_[i].mean == val) {
237 double prev_accum_count = accum_count;
238 // We may have centroids of the same mean. We need to sum their counts
239 // and then interpolate.
240 for (; centroids_[i + 1].mean == val; ++i) {
241 accum_count += centroids_[i].count + centroids_[i + 1].count;
242 }
243 return (prev_accum_count + accum_count) / 2.0 / count_;
244 }
245 if (centroids_[i].mean <= val && val < centroids_[i + 1].mean) {
246 auto mean1 = centroids_[i].mean;
247 auto mean2 = centroids_[i + 1].mean;
248 double mean_ratio;
249 // guard against double madness.
250 if (mean2 <= mean1) {
251 mean_ratio = 1;
252 } else {
253 mean_ratio = (val - mean1) / (mean2 - mean1);
254 }
255 double delta_count =
256 (centroids_[i].count + centroids_[i + 1].count) / 2.0;
257 return (accum_count + delta_count * mean_ratio) / count_;
258 }
259 accum_count += (centroids_[i].count + centroids_[i + 1].count) / 2.0;
260 }
261
262 LOG(DFATAL) << "Cannot measure CDF for: " << val;
263 return kNan;
264 }
265
Quantile(double quantile)266 double TDigest::Quantile(double quantile) {
267 DCHECK_LE(quantile, 1);
268 DCHECK_GE(quantile, 0);
269
270 DoMerge();
271
272 if (merged_ == 0) {
273 return kNan;
274 }
275 if (merged_ == 1) {
276 return centroids_[0].mean;
277 }
278
279 const double quantile_count = quantile * count_;
280 double prev_count = 0;
281 double prev_val = min_;
282 double this_count = centroids_[0].count / 2.0;
283 double this_val = centroids_[0].mean;
284
285 for (size_t i = 0; i < centroids_.size(); ++i) {
286 if (quantile_count < this_count) {
287 break;
288 }
289
290 prev_count = this_count;
291 prev_val = this_val;
292
293 if (i == centroids_.size() - 1) {
294 // Interpolate between max and the last centroid.
295 this_count = count_;
296 this_val = max_;
297 } else {
298 this_count += (centroids_[i].count + centroids_[i + 1].count) / 2.0;
299 this_val = centroids_[i + 1].mean;
300 }
301 }
302
303 return LinearInterpolate(prev_val, this_val, this_count - quantile_count,
304 quantile_count - prev_count);
305 }
306
ToString()307 std::string TDigest::ToString() {
308 std::string str = absl::StrCat(compression_);
309 if (count_ <= 1) {
310 if (count_ == 0) {
311 // Note the string representation serializes min/max = 0 when empty.
312 return absl::StrAppendFormat(&str, "/0/0/0/0");
313 }
314 return absl::StrAppendFormat(&str, "/%0.17g", centroids_.front().mean);
315 }
316
317 DoMerge();
318
319 absl::StrAppendFormat(&str, "/%0.17g/%0.17g/%0.17g/%d", min_, max_, sum_,
320 count_);
321
322 for (auto& centroid : centroids_) {
323 absl::StrAppendFormat(&str, "/%0.17g:%d", centroid.mean, centroid.count);
324 }
325 return str;
326 }
327
FromString(absl::string_view string)328 absl::Status TDigest::FromString(absl::string_view string) {
329 // Accept an empty string as 'not set'.
330 // Although ToString() never produces an empty string, an empty string is
331 // still expected when a t-Digest is missing.
332 if (string.empty()) {
333 Reset(0);
334 return absl::OkStatus();
335 }
336
337 const std::vector<absl::string_view> tokens = absl::StrSplit(string, '/');
338 auto iter = tokens.begin();
339
340 // First token (compression and discrete).
341 if (iter == tokens.end() || iter->empty()) {
342 return absl::InvalidArgumentError("No compression/discrete.");
343 }
344
345 double double_val;
346 if (!absl::SimpleAtod(iter->substr(0, iter->length()), &double_val) ||
347 double_val < 0) {
348 return absl::InvalidArgumentError(
349 absl::StrCat("Invalid double_val/discrete: ", *iter));
350 }
351
352 Reset(double_val);
353
354 if (++iter == tokens.end()) {
355 return absl::InvalidArgumentError("Unexpected end of string.");
356 }
357
358 // Single-valued t-Digest.
359 if ((iter + 1) == tokens.end()) {
360 if (!absl::SimpleAtod(*iter, &double_val)) {
361 return absl::InvalidArgumentError(
362 absl::StrCat("Invalid single-value: ", *iter));
363 }
364 Add(double_val, 1);
365 return absl::OkStatus();
366 }
367
368 // Parse min/max/sum/count.
369 double min = 0.0, max = 0.0, sum = 0.0;
370 int64_t count = 0;
371 if (iter == tokens.end() || !absl::SimpleAtod(*iter, &min) ||
372 ++iter == tokens.end() || !absl::SimpleAtod(*iter, &max) ||
373 ++iter == tokens.end() || !absl::SimpleAtod(*iter, &sum) ||
374 ++iter == tokens.end() || !absl::SimpleAtoi(*iter, &count)) {
375 return absl::InvalidArgumentError("Invalid min, max, sum, or count.");
376 }
377
378 // Empty. Note the string representation serializes min/max = 0 when empty.
379 if (++iter == tokens.end()) {
380 if (min != 0 || max != 0 || count != 0 || sum != 0) {
381 return absl::InvalidArgumentError(
382 "Empty t-Digest with non-zero min, max, sum, or count.");
383 }
384 return absl::OkStatus();
385 }
386
387 // Parse centroids.
388 int64_t int_val = 0;
389
390 for (; iter != tokens.end(); ++iter) {
391 const auto pos = iter->find_first_of(':');
392 if (pos == absl::string_view::npos ||
393 !absl::SimpleAtod(iter->substr(0, pos), &double_val) ||
394 !absl::SimpleAtoi(iter->substr(pos + 1), &int_val)) {
395 return absl::InvalidArgumentError(
396 absl::StrCat("Invalid centroid: ", *iter));
397 }
398 Add(double_val, int_val);
399 }
400
401 DoMerge();
402 min_ = min;
403 max_ = max;
404
405 if (centroids_.empty()) {
406 return absl::OkStatus();
407 }
408
409 // Validate min/max/sum/count.
410 DCHECK_LT(std::abs(sum - sum_), 1e-10) << "Invalid sum value.";
411
412 if (count != count_) {
413 return absl::InvalidArgumentError("Invalid count value.");
414 }
415
416 return absl::OkStatus();
417 }
418
MemUsageBytes() const419 size_t TDigest::MemUsageBytes() const {
420 return sizeof(TDigest) + (sizeof(CentroidPod) * centroids_.capacity());
421 }
422
423 } // namespace grpc_core
424