1 /*
2 * Copyright 2021 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "compactor_stack.h"
17
18 #include <vector>
19
20 #include "random_generator.h"
21 #include "sampler.h"
22
23 namespace dist_proc {
24 namespace aggregation {
25 namespace internal {
26
CompactorStack(int64_t inv_eps,int64_t inv_delta,RandomGenerator * random)27 CompactorStack::CompactorStack(int64_t inv_eps, int64_t inv_delta, RandomGenerator* random)
28 : CompactorStack(inv_eps, inv_delta, 0, random) {
29 }
30
CompactorStack(int64_t inv_eps,int64_t inv_delta,int k,RandomGenerator * random)31 CompactorStack::CompactorStack(int64_t inv_eps, int64_t inv_delta, int k, RandomGenerator* random)
32 : random_(random) {
33 if (k != 0) {
34 k_ = k;
35 } else {
36 // k = 1/eps * sqrt(log_2(1/delta)) - taken from proof of Thm 1.
37 double raw_k = inv_eps * std::sqrt(std::log2(inv_delta));
38 k_ = std::pow(2, std::lround(std::log2(raw_k)));
39 }
40 Reset();
41 }
42
~CompactorStack()43 CompactorStack::~CompactorStack() {
44 ClearCompactors();
45 }
46
47 // Initialize or reset the compactor stack and all counters and thresholds.
Reset()48 void CompactorStack::Reset() {
49 overall_capacity_ = 0;
50 ClearCompactors();
51 sampler_ = nullptr;
52 AddLevel();
53 }
54
Add(const int64_t value)55 void CompactorStack::Add(const int64_t value) {
56 if (sampler_ == nullptr) {
57 compactors_[0].push_back(value);
58 num_items_in_compactors_++;
59 CompactStack();
60 } else {
61 sampler_->Add(value);
62 }
63 }
64
65 // Adds an item to the compactor stack with weight >= 1.
66 // Does nothing if weight <= 0.
AddWithWeight(int64_t value,int weight)67 void CompactorStack::AddWithWeight(int64_t value, int weight) {
68 if (weight > 0) {
69 int remaining_weight = weight;
70 size_t level_to_add = 0;
71 if (sampler_ != nullptr) {
72 sampler_->AddWithWeight(value, remaining_weight % sampler_->capacity());
73 remaining_weight /= sampler_->capacity();
74 level_to_add = sampler_->num_replaced_levels();
75 }
76 while (remaining_weight != 0) {
77 if (level_to_add >= compactors_.size()) {
78 AddLevel();
79 }
80 if ((remaining_weight & 1) != 0) {
81 compactors_[level_to_add].push_back(value);
82 num_items_in_compactors_++;
83 }
84 remaining_weight >>= 1;
85 level_to_add++;
86 }
87 CompactStack();
88 }
89 }
90
SortCompactorContents()91 void CompactorStack::SortCompactorContents() {
92 for (std::vector<int64_t>& compactor : compactors_) {
93 std::sort(compactor.begin(), compactor.end());
94 }
95 }
96
ClearCompactors()97 void CompactorStack::ClearCompactors() {
98 compactors_.clear();
99 num_items_in_compactors_ = 0;
100 }
101
AddLevel()102 void CompactorStack::AddLevel() {
103 compactors_.resize(compactors_.size() + 1);
104
105 int cap_at_lowest_active_level = TargetCapacityAtLevel(lowest_active_level());
106 // All levels i get capacity that previously level i-1 had, except the
107 // (previous) lowest active level, which gets a new smaller capacity.
108 // Overall capacity changes by that amount.
109 overall_capacity_ += cap_at_lowest_active_level;
110
111 if (cap_at_lowest_active_level == 0) {
112 DoubleSamplerCapacity();
113 }
114 }
115
CompactStack()116 void CompactorStack::CompactStack() {
117 while (num_items_in_compactors_ >= overall_capacity_) {
118 for (size_t i = 0; i < compactors_.size(); i++) {
119 if (!compactors_[i].empty() &&
120 static_cast<int>(compactors_[i].size()) >= TargetCapacityAtLevel(i)) {
121 CompactLevel(i);
122 if (num_items_in_compactors_ < overall_capacity_) {
123 break;
124 }
125 }
126 }
127 }
128 }
129
CompactLevel(int level)130 void CompactorStack::CompactLevel(int level) {
131 if (level == static_cast<int>(compactors_.size()) - 1) {
132 AddLevel();
133 }
134 Halve(&compactors_[level], &compactors_[level + 1]);
135 std::vector<int64_t>().swap(compactors_[level]);
136 }
137
138 // To compact the items in a compactor to roughly half the size,
139 // sorts the items and adds every even or odd item (determined randomly)
140 // to the up_compactor.
Halve(std::vector<int64_t> * down_compactor,std::vector<int64_t> * up_compactor)141 void CompactorStack::Halve(std::vector<int64_t>* down_compactor,
142 std::vector<int64_t>* up_compactor) {
143 std::sort(down_compactor->begin(), down_compactor->end());
144 double half_of_items = down_compactor->size() / static_cast<double>(2);
145 bool keep_even_items = (random_->UnbiasedUniform(2) == 0);
146 num_items_in_compactors_ -= static_cast<int>(keep_even_items ? std::floor(half_of_items)
147 : std::ceil(half_of_items));
148
149 bool even = true;
150
151 for (size_t i = 0; i < down_compactor->size(); i++) {
152 if (even == keep_even_items) {
153 up_compactor->push_back((*down_compactor)[i]);
154 }
155 even = !even;
156 }
157 down_compactor->clear();
158 }
159
TargetCapacityAtLevel(int h) const160 int CompactorStack::TargetCapacityAtLevel(int h) const {
161 int num_stack_levels = compactors_.size();
162
163 int raw_capacity = static_cast<int>(std::ceil(std::pow(c_, num_stack_levels - h - 1) * k_));
164
165 // If the capacity is two or less, the level will be replaced by the
166 // sampler.
167 return raw_capacity > 2 ? raw_capacity : 0;
168 }
169
DoubleSamplerCapacity()170 void CompactorStack::DoubleSamplerCapacity() {
171 int prev_lowest_active_level = lowest_active_level();
172 if (sampler_ != nullptr) {
173 sampler_->DoubleCapacity();
174 } else {
175 sampler_ = std::make_unique<KllSampler>(this);
176 }
177
178 CompactLevel(prev_lowest_active_level);
179 }
180
num_stored_items() const181 int CompactorStack::num_stored_items() const {
182 if (sampler_ == nullptr) {
183 return num_items_in_compactors_;
184 } else {
185 return num_items_in_compactors_ +
186 ((sampler_->sampled_item_and_weight().has_value()) ? 1 : 0);
187 }
188 }
189
sampled_item_and_weight() const190 std::optional<std::pair<const int64_t, int64_t>> CompactorStack::sampled_item_and_weight() const {
191 if (sampler_ != nullptr) {
192 return sampler_->sampled_item_and_weight();
193 } else {
194 return std::nullopt;
195 }
196 }
197
sampler_capacity() const198 int64_t CompactorStack::sampler_capacity() const {
199 return sampler_ ? sampler_->capacity() : 1; // capacity = 1 to denote the empty sampler.
200 }
201
lowest_active_level() const202 int CompactorStack::lowest_active_level() const {
203 return sampler_ ? sampler_->num_replaced_levels() : 0;
204 }
205
206 } // namespace internal
207 } // namespace aggregation
208 } // namespace dist_proc
209