1 // Copyright 2024 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_UTIL_TDIGEST_H 16 #define GRPC_SRC_CORE_UTIL_TDIGEST_H 17 18 #include <cstddef> 19 #include <cstdint> 20 #include <memory> 21 #include <optional> 22 #include <string> 23 24 #include "absl/status/status.h" 25 #include "absl/strings/string_view.h" 26 27 namespace grpc_core { 28 29 // Represents a t-digest [1]. 30 // 31 // t-digest is a structure that can store accurate accumulation of quantiles 32 // and other rank-based statistics, over a stream of data. 33 // 34 // There are different flavors of t-digest, but here we only implement a merging 35 // t-digest. 36 // 37 // Users can add values to a t-digest, and also merge t-digests. 38 // 39 // [1] Ted Dunning and Otmar Ertl, "COMPUTING EXTREMELY ACCURATE QUANTILES USING 40 // t-DIGESTS", go/tdigest. 41 // 42 // Note on thread-safety: This class provides no thread-safety guarantee. Access 43 // to the methods of this class must be synchronized externally by the user. 44 class TDigest final { 45 public: 46 TDigest(const TDigest&) = delete; 47 TDigest(TDigest&&) = delete; 48 49 TDigest& operator=(const TDigest&) = delete; 50 TDigest& operator=(TDigest&&) = delete; 51 52 // Creates a t-digest with the given compression factor (aka delta). 53 // 54 // The number of centroids kept in a t-digest is in O(compression). 55 // A t-digest should keep less than 2*compression. 56 explicit TDigest(double compression); 57 58 void Reset(double compression); 59 60 // Adds `count` number of `val` to t-digest. 61 void Add(double val, int64_t count); 62 63 // Adds a single value with a count of 1 to the t-digest. Add(double val)64 void Add(double val) { Add(val, 1); } 65 66 // Merges `that` t-digest into `this` t-digest. 67 void Merge(const TDigest& that); 68 69 // Returns an approximate quantile of values stored in the t-digest. Inclusive 70 // i.e. largest value that <= quantile. 71 // 72 // `quantile` can be any real value between 0 and 1. For example, 0.99 would 73 // return the 99th percentile. 74 double Quantile(double quantile); 75 76 // Returns the cumulative probability corresponding to the given value. 77 // Inclusive i.e. probabiliy that <= val. 78 double Cdf(double val); 79 80 // Returns the minimum of all values added to the t-digest. Min()81 double Min() const { return min_; } 82 83 // Returns the maximum of all values added to the t-digest. Max()84 double Max() const { return max_; } 85 86 // Returns the sum of all values added to the t-digest. Sum()87 double Sum() const { return sum_; } 88 89 // Returns the count of all values added to the t-digest. Count()90 int64_t Count() const { return count_; } 91 92 // Returns the compression factor of the t-digest. Compression()93 double Compression() const { return compression_; } 94 95 // Returns the string representation of this t-digest. The string format is 96 // external and compatible with all implementations of this library. 97 std::string ToString(); 98 99 // Restores the t-digest from the string representation. 100 // Returns an error if `string` is mal-formed where the state of this t-digest 101 // is undefined. 102 absl::Status FromString(absl::string_view string); 103 104 // Returns the (approximate) size in bytes of storing this t-digest in RAM. 105 // Useful when a TDigest is used as the accumulator in a Flume AccumulateFn. 106 size_t MemUsageBytes() const; 107 Swap(TDigest & that)108 void Swap(TDigest& that) { 109 std::swap(compression_, that.compression_); 110 std::swap(batch_size_, that.batch_size_); 111 std::swap(centroids_, that.centroids_); 112 std::swap(merged_, that.merged_); 113 std::swap(unmerged_, that.unmerged_); 114 std::swap(min_, that.min_); 115 std::swap(max_, that.max_); 116 std::swap(sum_, that.sum_); 117 std::swap(count_, that.count_); 118 } 119 120 private: 121 // Centroid the primitive construct in t-digest. 122 // A centroid has a mean and a count. 123 struct CentroidPod { CentroidPodCentroidPod124 CentroidPod() : CentroidPod(0, 0) {} CentroidPodCentroidPod125 CentroidPod(double mean, int64_t count) : mean(mean), count(count) {} 126 127 double mean; 128 int64_t count; 129 130 bool operator<(const CentroidPod& that) const { 131 // For centroids with the same mean, we want to have the centroids 132 // with a larger mass in front of the queue. 133 // 134 // See http://github.com/tdunning/t-digest/issues/78 for the discussion. 135 return mean < that.mean || (mean == that.mean && count > that.count); 136 } 137 }; 138 139 // Adds a centroid to the unmerged list, and merge the unemerged centroids 140 // when we have `batch_size` of unmerged centroids. 141 void AddUnmergedCentroid(const CentroidPod& centroid); 142 143 // Merges the batch of unmerged points and centroids. 144 // 145 // This is an in-place implementation of the progressive merging algorithm, 146 // and does work solely using the centroids_ vector. 147 void DoMerge(); 148 149 // Converts a quantile to the approximate centroid index. 150 // 151 // This is the k(q,delta) function in the t-digest paper. 152 // See Figure 1 for more details. 153 double QuantileToCentroid(double quantile) const; 154 155 // Converts a centroid index to an approximate quantile. 156 // 157 // This is the _inverse_ of k(q,delta) function in the t-digest paper. 158 // See Figure 1 for more details. 159 double CentroidToQuantile(double centroid) const; 160 161 // Updates min, max, sum, count. UpdateStats(double min,double max,double sum,int64_t count)162 void UpdateStats(double min, double max, double sum, int64_t count) { 163 if (count <= 0) return; 164 if (min < min_) min_ = min; 165 if (max > max_) max_ = max; 166 count_ += count; 167 sum_ += sum; 168 } 169 170 // Compression factor (aka delta). 171 // 172 // When zero, to be determined from the first merge. 173 double compression_; 174 // Maximum number of unmerged elements. 175 int64_t batch_size_; 176 177 // All centroids merged and unmerged. Unmerged centroids can actually be a 178 // value or a centroid. 179 std::vector<CentroidPod> centroids_; 180 // Number of centroids that are already merged. 181 int64_t merged_; 182 // Number of centroids and values that are added but not merged yet. 183 int64_t unmerged_; 184 185 // Minimum of all values and centroid means. 186 double min_; 187 // Maximum of all values and centroid means. 188 double max_; 189 // Sum of all values and centroid means added. 190 double sum_; 191 // Count of all values and centroids added. 192 int64_t count_; 193 }; 194 195 } // namespace grpc_core 196 197 #endif // GRPC_SRC_CORE_UTIL_TDIGEST_H 198