1 /* 2 * Copyright 2021 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <algorithm> 20 #include <cmath> 21 #include <cstdint> 22 #include <memory> 23 #include <utility> 24 #include <vector> 25 26 #include "random_generator.h" 27 #include "sampler.h" 28 29 namespace dist_proc { 30 namespace aggregation { 31 namespace internal { 32 33 class KllSampler; 34 35 // Hierarchy of compactors, which store items from the stream and 'compact' 36 // them when necessary (i.e., keep every second item in a sorted compactor) 37 // and add them to the compactor one level up. 38 class CompactorStack { 39 public: 40 CompactorStack(int64_t inv_eps, int64_t inv_delta, RandomGenerator* random); 41 CompactorStack(int64_t inv_eps, int64_t inv_delta, int k, RandomGenerator* random); 42 ~CompactorStack(); 43 44 // Initialize or reset the compactor stack and all counters and thresholds. 45 void Reset(); 46 47 void Add(const int64_t value); 48 49 // Adds an item to the compactor stack with weight >= 1. 50 // Does nothing if weight <= 0. 51 void AddWithWeight(int64_t value, int weight); 52 53 // Ensures that the contents of each compactor are sorted. 54 void SortCompactorContents(); 55 56 // Target capacity of compactor with index h. If this capacity is exceeded, 57 // the compactor will be lazily compacted in one of the next CompactStack() 58 // runs. I.e., this capacity can be temorarily exceeded. 59 int TargetCapacityAtLevel(int h) const; 60 61 void DoubleSamplerCapacity(); 62 63 int num_stored_items() const; 64 65 std::optional<std::pair<const int64_t, int64_t>> sampled_item_and_weight() const; 66 67 // Returns the lowest active level in the compactor stack, which is identical 68 // with the number of replaced levels, or log2(sampler_capacity()). 69 int lowest_active_level() const; 70 71 int64_t sampler_capacity() const; 72 73 // For testing IsSamplerOn()74 bool IsSamplerOn() const { 75 return sampler_ != nullptr; 76 } 77 compactors()78 const std::vector<std::vector<int64_t>>& compactors() const { 79 return compactors_; 80 } 81 random()82 RandomGenerator* random() { 83 return random_; 84 } 85 k()86 int k() const { 87 return k_; 88 } 89 90 private: 91 void ClearCompactors(); 92 93 // Adds a new compactor at the highest level. To be called when the currently 94 // topmost compactor is full. 95 void AddLevel(); 96 97 // Called when at least one level in the compactor stack is above capacity. 98 // Iterates from bottom to top through the compactors and compacts the 99 // first one that is over its capacity by halving its contents and adding 100 // them to the compactor one level higher. 101 void CompactStack(); 102 103 void CompactLevel(int level); 104 105 // To compact the items in a compactor to roughly half the size, 106 // sorts the items and adds every even or odd item (determined randomly) 107 // to the up_compactor. 108 void Halve(std::vector<int64_t>* down_compactor, std::vector<int64_t>* up_compactor); 109 110 std::vector<std::vector<int64_t>> compactors_; 111 int k_; 112 const double c_ = 2.0 / 3.0; 113 int overall_capacity_; 114 int num_items_in_compactors_; 115 RandomGenerator* random_; 116 std::unique_ptr<KllSampler> sampler_; 117 }; 118 119 } // namespace internal 120 } // namespace aggregation 121 } // namespace dist_proc 122