1 /* 2 * Copyright 2021 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #pragma once 17 18 #include <assert.h> 19 #include <optional> 20 21 #include "compactor_stack.h" 22 23 namespace dist_proc { 24 namespace aggregation { 25 namespace internal { 26 27 class CompactorStack; 28 29 // Class that does reservoir sampling to uniformly-at-random select one out of 30 // capacity() items that are added to it. The selected item is added to the 31 // compactor stack and sampling continues with the next capacity_ items. 32 // 33 // Serves as an replacement of num_replaced_levels levels of size 2 of the 34 // compactor stack, while only using constant memory. 35 class KllSampler { 36 public: KllSampler(CompactorStack * compactor_stack)37 KllSampler(CompactorStack* compactor_stack) : compactor_stack_(compactor_stack) { 38 assert(compactor_stack != nullptr); 39 Reset(); 40 } 41 42 void Reset(); 43 44 // Adds an item to the sampler with weight one. 45 void Add(int64_t item); 46 47 // Adds an item to the sampler with weight >= 1. Does nothing if weight <= 0. 48 void AddWithWeight(int64_t item, int weight); 49 50 void DoubleCapacity(); 51 capacity()52 int64_t capacity() const { 53 return capacity_; 54 } 55 sampled_item_and_weight()56 std::optional<std::pair<int64_t, int>> sampled_item_and_weight() const { 57 if (item_weight_ == 0) { 58 return std::nullopt; 59 } 60 return std::make_pair(sampled_item_, item_weight_); 61 } 62 num_replaced_levels()63 int num_replaced_levels() const { 64 return num_replaced_levels_; 65 } 66 67 private: 68 void AddSampleToCompactorStackAndRestart(); 69 int64_t sampled_item_; 70 int64_t item_weight_; 71 int64_t capacity_; 72 int num_replaced_levels_; 73 CompactorStack* compactor_stack_; 74 }; 75 76 } // namespace internal 77 } // namespace aggregation 78 } // namespace dist_proc 79