• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     https://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include <assert.h>
19 #include <optional>
20 
21 #include "compactor_stack.h"
22 
23 namespace dist_proc {
24 namespace aggregation {
25 namespace internal {
26 
27 class CompactorStack;
28 
29 // Class that does reservoir sampling to uniformly-at-random select one out of
30 // capacity() items that are added to it. The selected item is added to the
31 // compactor stack and sampling continues with the next capacity_ items.
32 //
33 // Serves as an replacement of num_replaced_levels levels of size 2 of the
34 // compactor stack, while only using constant memory.
35 class KllSampler {
36 public:
KllSampler(CompactorStack * compactor_stack)37     KllSampler(CompactorStack* compactor_stack) : compactor_stack_(compactor_stack) {
38         assert(compactor_stack != nullptr);
39         Reset();
40     }
41 
42     void Reset();
43 
44     // Adds an item to the sampler with weight one.
45     void Add(int64_t item);
46 
47     // Adds an item to the sampler with weight >= 1. Does nothing if weight <= 0.
48     void AddWithWeight(int64_t item, int weight);
49 
50     void DoubleCapacity();
51 
capacity()52     int64_t capacity() const {
53         return capacity_;
54     }
55 
sampled_item_and_weight()56     std::optional<std::pair<int64_t, int>> sampled_item_and_weight() const {
57         if (item_weight_ == 0) {
58             return std::nullopt;
59         }
60         return std::make_pair(sampled_item_, item_weight_);
61     }
62 
num_replaced_levels()63     int num_replaced_levels() const {
64         return num_replaced_levels_;
65     }
66 
67 private:
68     void AddSampleToCompactorStackAndRestart();
69     int64_t sampled_item_;
70     int64_t item_weight_;
71     int64_t capacity_;
72     int num_replaced_levels_;
73     CompactorStack* compactor_stack_;
74 };
75 
76 }  // namespace internal
77 }  // namespace aggregation
78 }  // namespace dist_proc
79