1 /*
2 * Copyright 2021 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "kll.h"
18
19 #include <cstdint>
20 #include <memory>
21
22 #include "aggregator.pb.h"
23 #include "compactor_stack.h"
24 #include "encoding/encoder.h"
25 #include "kll-quantiles.pb.h"
26
27 namespace dist_proc {
28 namespace aggregation {
29
30 using zetasketch::android::AggregatorStateProto;
31
Create(std::string * error)32 std::unique_ptr<KllQuantile> KllQuantile::Create(std::string* error) {
33 return Create(KllQuantileOptions(), error);
34 }
35
Create(const KllQuantileOptions & options,std::string * error)36 std::unique_ptr<KllQuantile> KllQuantile::Create(const KllQuantileOptions& options,
37 std::string* error) {
38 if (options.k() < 0) {
39 if (error != nullptr) {
40 *error = "k has to be >= 0";
41 }
42 return nullptr;
43 }
44 return std::unique_ptr<KllQuantile>(
45 new KllQuantile(options.inv_eps(), options.inv_delta(), options.k(), options.random()));
46 }
47
Add(const int64_t value)48 void KllQuantile::Add(const int64_t value) {
49 compactor_stack_.Add(value);
50 UpdateMin(value);
51 UpdateMax(value);
52 num_values_++;
53 }
54
AddWeighted(int64_t value,int weight)55 void KllQuantile::AddWeighted(int64_t value, int weight) {
56 if (weight > 0) {
57 compactor_stack_.AddWithWeight(value, weight);
58 UpdateMin(value);
59 UpdateMax(value);
60 num_values_ += weight;
61 }
62 }
63
SerializeToProto()64 AggregatorStateProto KllQuantile::SerializeToProto() {
65 AggregatorStateProto aggregator_state;
66
67 aggregator_state.set_type(zetasketch::android::KLL_QUANTILES);
68 aggregator_state.set_num_values(num_values_);
69 aggregator_state.set_value_type(zetasketch::android::DefaultOpsType::INT64);
70
71 zetasketch::android::KllQuantilesStateProto* quantile_state =
72 aggregator_state.MutableExtension(zetasketch::android::kll_quantiles_state);
73
74 quantile_state->set_k(compactor_stack_.k());
75 quantile_state->set_inv_eps(inv_eps_);
76
77 if (num_values_ == 0) {
78 return aggregator_state;
79 }
80
81 // Encode min/max.
82 encoding::Encoder::AppendToString(min_, quantile_state->mutable_min());
83 encoding::Encoder::AppendToString(max_, quantile_state->mutable_max());
84
85 // Sort compactors before encoding them, to only do sorting work once (vs.
86 // every time a sketch is read and extracted or merged), and to reduce sketch
87 // cardinality, which saves space in e.g. column store dictionaries.
88 compactor_stack_.SortCompactorContents();
89
90 // Encode compactors.
91 const std::vector<std::vector<int64_t>>& compactors = compactor_stack_.compactors();
92 quantile_state->mutable_compactors()->Reserve(compactors.size());
93
94 for (const auto& compactor : compactors) {
95 encoding::Encoder::SerializeToPackedStringAll(
96 compactor.begin(), compactor.end(),
97 quantile_state
98 ->add_compactors() // Adds one compactor to the compactors field.
99 ->mutable_packed_values());
100 }
101
102 // Encode sampler.
103 if (compactor_stack_.IsSamplerOn()) {
104 const auto& sampled_item_and_weight = compactor_stack_.sampled_item_and_weight();
105 if (sampled_item_and_weight.has_value()) {
106 encoding::Encoder::AppendToString(
107 sampled_item_and_weight->first,
108 quantile_state->mutable_sampler()->mutable_sampled_item());
109 quantile_state->mutable_sampler()->set_sampled_weight(sampled_item_and_weight->second);
110 }
111 quantile_state->mutable_sampler()->set_log_capacity(compactor_stack_.lowest_active_level());
112 }
113
114 return aggregator_state;
115 }
116
UpdateMin(int64_t value)117 void KllQuantile::UpdateMin(int64_t value) {
118 if (num_values_ == 0 || min_ > value) {
119 min_ = value;
120 }
121 }
122
UpdateMax(int64_t value)123 void KllQuantile::UpdateMax(int64_t value) {
124 if (num_values_ == 0 || max_ < value) {
125 max_ = value;
126 }
127 }
128
Reset()129 void KllQuantile::Reset() {
130 num_values_ = 0;
131 compactor_stack_.Reset();
132 }
133
134 } // namespace aggregation
135 } // namespace dist_proc
136