/* * Copyright 2021 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include "random_generator.h" #include "sampler.h" namespace dist_proc { namespace aggregation { namespace internal { class KllSampler; // Hierarchy of compactors, which store items from the stream and 'compact' // them when necessary (i.e., keep every second item in a sorted compactor) // and add them to the compactor one level up. class CompactorStack { public: CompactorStack(int64_t inv_eps, int64_t inv_delta, RandomGenerator* random); CompactorStack(int64_t inv_eps, int64_t inv_delta, int k, RandomGenerator* random); ~CompactorStack(); // Initialize or reset the compactor stack and all counters and thresholds. void Reset(); void Add(const int64_t value); // Adds an item to the compactor stack with weight >= 1. // Does nothing if weight <= 0. void AddWithWeight(int64_t value, int weight); // Ensures that the contents of each compactor are sorted. void SortCompactorContents(); // Target capacity of compactor with index h. If this capacity is exceeded, // the compactor will be lazily compacted in one of the next CompactStack() // runs. I.e., this capacity can be temorarily exceeded. int TargetCapacityAtLevel(int h) const; void DoubleSamplerCapacity(); int num_stored_items() const; std::optional> sampled_item_and_weight() const; // Returns the lowest active level in the compactor stack, which is identical // with the number of replaced levels, or log2(sampler_capacity()). int lowest_active_level() const; int64_t sampler_capacity() const; // For testing bool IsSamplerOn() const { return sampler_ != nullptr; } const std::vector>& compactors() const { return compactors_; } RandomGenerator* random() { return random_; } int k() const { return k_; } private: void ClearCompactors(); // Adds a new compactor at the highest level. To be called when the currently // topmost compactor is full. void AddLevel(); // Called when at least one level in the compactor stack is above capacity. // Iterates from bottom to top through the compactors and compacts the // first one that is over its capacity by halving its contents and adding // them to the compactor one level higher. void CompactStack(); void CompactLevel(int level); // To compact the items in a compactor to roughly half the size, // sorts the items and adds every even or odd item (determined randomly) // to the up_compactor. void Halve(std::vector* down_compactor, std::vector* up_compactor); std::vector> compactors_; int k_; const double c_ = 2.0 / 3.0; int overall_capacity_; int num_items_in_compactors_; RandomGenerator* random_; std::unique_ptr sampler_; }; } // namespace internal } // namespace aggregation } // namespace dist_proc