• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_UTIL_TDIGEST_H
16 #define GRPC_SRC_CORE_UTIL_TDIGEST_H
17 
18 #include <cstddef>
19 #include <cstdint>
20 #include <memory>
21 #include <optional>
22 #include <string>
23 
24 #include "absl/status/status.h"
25 #include "absl/strings/string_view.h"
26 
27 namespace grpc_core {
28 
29 // Represents a t-digest [1].
30 //
31 // t-digest is a structure that can store accurate accumulation of quantiles
32 // and other rank-based statistics, over a stream of data.
33 //
34 // There are different flavors of t-digest, but here we only implement a merging
35 // t-digest.
36 //
37 // Users can add values to a t-digest, and also merge t-digests.
38 //
39 // [1] Ted Dunning and Otmar Ertl, "COMPUTING EXTREMELY ACCURATE QUANTILES USING
40 //     t-DIGESTS", go/tdigest.
41 //
42 // Note on thread-safety: This class provides no thread-safety guarantee. Access
43 // to the methods of this class must be synchronized externally by the user.
44 class TDigest final {
45  public:
46   TDigest(const TDigest&) = delete;
47   TDigest(TDigest&&) = delete;
48 
49   TDigest& operator=(const TDigest&) = delete;
50   TDigest& operator=(TDigest&&) = delete;
51 
52   // Creates a t-digest with the given compression factor (aka delta).
53   //
54   // The number of centroids kept in a t-digest is in O(compression).
55   // A t-digest should keep less than 2*compression.
56   explicit TDigest(double compression);
57 
58   void Reset(double compression);
59 
60   // Adds `count` number of `val` to t-digest.
61   void Add(double val, int64_t count);
62 
63   // Adds a single value with a count of 1 to the t-digest.
Add(double val)64   void Add(double val) { Add(val, 1); }
65 
66   // Merges `that` t-digest into `this` t-digest.
67   void Merge(const TDigest& that);
68 
69   // Returns an approximate quantile of values stored in the t-digest. Inclusive
70   // i.e. largest value that <= quantile.
71   //
72   // `quantile` can be any real value between 0 and 1. For example, 0.99 would
73   // return the 99th percentile.
74   double Quantile(double quantile);
75 
76   // Returns the cumulative probability corresponding to the given value.
77   // Inclusive i.e. probabiliy that <= val.
78   double Cdf(double val);
79 
80   // Returns the minimum of all values added to the t-digest.
Min()81   double Min() const { return min_; }
82 
83   // Returns the maximum of all values added to the t-digest.
Max()84   double Max() const { return max_; }
85 
86   // Returns the sum of all values added to the t-digest.
Sum()87   double Sum() const { return sum_; }
88 
89   // Returns the count of all values added to the t-digest.
Count()90   int64_t Count() const { return count_; }
91 
92   // Returns the compression factor of the t-digest.
Compression()93   double Compression() const { return compression_; }
94 
95   // Returns the string representation of this t-digest. The string format is
96   // external and compatible with all implementations of this library.
97   std::string ToString();
98 
99   // Restores the t-digest from the string representation.
100   // Returns an error if `string` is mal-formed where the state of this t-digest
101   // is undefined.
102   absl::Status FromString(absl::string_view string);
103 
104   // Returns the (approximate) size in bytes of storing this t-digest in RAM.
105   // Useful when a TDigest is used as the accumulator in a Flume AccumulateFn.
106   size_t MemUsageBytes() const;
107 
Swap(TDigest & that)108   void Swap(TDigest& that) {
109     std::swap(compression_, that.compression_);
110     std::swap(batch_size_, that.batch_size_);
111     std::swap(centroids_, that.centroids_);
112     std::swap(merged_, that.merged_);
113     std::swap(unmerged_, that.unmerged_);
114     std::swap(min_, that.min_);
115     std::swap(max_, that.max_);
116     std::swap(sum_, that.sum_);
117     std::swap(count_, that.count_);
118   }
119 
120  private:
121   // Centroid the primitive construct in t-digest.
122   // A centroid has a mean and a count.
123   struct CentroidPod {
CentroidPodCentroidPod124     CentroidPod() : CentroidPod(0, 0) {}
CentroidPodCentroidPod125     CentroidPod(double mean, int64_t count) : mean(mean), count(count) {}
126 
127     double mean;
128     int64_t count;
129 
130     bool operator<(const CentroidPod& that) const {
131       // For centroids with the same mean, we want to have the centroids
132       // with a larger mass in front of the queue.
133       //
134       // See http://github.com/tdunning/t-digest/issues/78 for the discussion.
135       return mean < that.mean || (mean == that.mean && count > that.count);
136     }
137   };
138 
139   // Adds a centroid to the unmerged list, and merge the unemerged centroids
140   // when we have `batch_size` of unmerged centroids.
141   void AddUnmergedCentroid(const CentroidPod& centroid);
142 
143   // Merges the batch of unmerged points and centroids.
144   //
145   // This is an in-place implementation of the progressive merging algorithm,
146   // and does work solely using the centroids_ vector.
147   void DoMerge();
148 
149   // Converts a quantile to the approximate centroid index.
150   //
151   // This is the k(q,delta) function in the t-digest paper.
152   // See Figure 1 for more details.
153   double QuantileToCentroid(double quantile) const;
154 
155   // Converts a centroid index to an approximate quantile.
156   //
157   // This is the _inverse_ of k(q,delta) function in the t-digest paper.
158   // See Figure 1 for more details.
159   double CentroidToQuantile(double centroid) const;
160 
161   // Updates min, max, sum, count.
UpdateStats(double min,double max,double sum,int64_t count)162   void UpdateStats(double min, double max, double sum, int64_t count) {
163     if (count <= 0) return;
164     if (min < min_) min_ = min;
165     if (max > max_) max_ = max;
166     count_ += count;
167     sum_ += sum;
168   }
169 
170   // Compression factor (aka delta).
171   //
172   // When zero, to be determined from the first merge.
173   double compression_;
174   // Maximum number of unmerged elements.
175   int64_t batch_size_;
176 
177   // All centroids merged and unmerged. Unmerged centroids can actually be a
178   // value or a centroid.
179   std::vector<CentroidPod> centroids_;
180   // Number of centroids that are already merged.
181   int64_t merged_;
182   // Number of centroids and values that are added but not merged yet.
183   int64_t unmerged_;
184 
185   // Minimum of all values and centroid means.
186   double min_;
187   // Maximum of all values and centroid means.
188   double max_;
189   // Sum of all values and centroid means added.
190   double sum_;
191   // Count of all values and centroids added.
192   int64_t count_;
193 };
194 
195 }  // namespace grpc_core
196 
197 #endif  // GRPC_SRC_CORE_UTIL_TDIGEST_H
198