1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define STATSD_DEBUG false // STOPSHIP if true
18 #include "Log.h"
19
20 #include "KllMetricProducer.h"
21
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "guardrail/StatsdStats.h"
26 #include "metrics/parsing_utils/metrics_manager_util.h"
27 #include "stats_log_util.h"
28
29 using android::util::FIELD_COUNT_REPEATED;
30 using android::util::FIELD_TYPE_BYTES;
31 using android::util::FIELD_TYPE_INT32;
32 using android::util::FIELD_TYPE_MESSAGE;
33 using android::util::ProtoOutputStream;
34 using std::map;
35 using std::nullopt;
36 using std::optional;
37 using std::shared_ptr;
38 using std::string;
39 using std::unordered_map;
40 using zetasketch::android::AggregatorStateProto;
41
42 namespace android {
43 namespace os {
44 namespace statsd {
45
46 // for StatsLogReport
47 const int FIELD_ID_KLL_METRICS = 16;
48 // for KllBucketInfo
49 const int FIELD_ID_SKETCH_INDEX = 1;
50 const int FIELD_ID_KLL_SKETCH = 2;
51 const int FIELD_ID_SKETCHES = 3;
52 const int FIELD_ID_BUCKET_NUM = 4;
53 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
54 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
55 const int FIELD_ID_CONDITION_TRUE_NS = 7;
56
KllMetricProducer(const ConfigKey & key,const KllMetric & metric,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions)57 KllMetricProducer::KllMetricProducer(const ConfigKey& key, const KllMetric& metric,
58 const uint64_t protoHash, const PullOptions& pullOptions,
59 const BucketOptions& bucketOptions,
60 const WhatOptions& whatOptions,
61 const ConditionOptions& conditionOptions,
62 const StateOptions& stateOptions,
63 const ActivationOptions& activationOptions,
64 const GuardrailOptions& guardrailOptions)
65 : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions,
66 conditionOptions, stateOptions, activationOptions, guardrailOptions) {
67 }
68
getDumpProtoFields() const69 KllMetricProducer::DumpProtoFields KllMetricProducer::getDumpProtoFields() const {
70 return {FIELD_ID_KLL_METRICS,
71 FIELD_ID_BUCKET_NUM,
72 FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
73 FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
74 FIELD_ID_CONDITION_TRUE_NS,
75 /*conditionCorrectionNsFieldId=*/nullopt};
76 }
77
writePastBucketAggregateToProto(const int aggIndex,const unique_ptr<KllQuantile> & kll,const int sampleSize,ProtoOutputStream * const protoOutput) const78 void KllMetricProducer::writePastBucketAggregateToProto(
79 const int aggIndex, const unique_ptr<KllQuantile>& kll, const int sampleSize,
80 ProtoOutputStream* const protoOutput) const {
81 uint64_t sketchesToken =
82 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKETCHES);
83 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_SKETCH_INDEX, aggIndex);
84
85 // TODO(b/186737273): Serialize directly to ProtoOutputStream
86 const AggregatorStateProto& aggProto = kll->SerializeToProto();
87 const size_t numBytes = aggProto.ByteSizeLong();
88 const unique_ptr<char[]> buffer(new char[numBytes]);
89 aggProto.SerializeToArray(&buffer[0], numBytes);
90 protoOutput->write(FIELD_TYPE_BYTES | FIELD_ID_KLL_SKETCH, &buffer[0], numBytes);
91
92 VLOG("\t\t sketch %d: %zu bytes", aggIndex, numBytes);
93 protoOutput->end(sketchesToken);
94 }
95
getInt64ValueFromEvent(const LogEvent & event,const Matcher & matcher)96 optional<int64_t> getInt64ValueFromEvent(const LogEvent& event, const Matcher& matcher) {
97 for (const FieldValue& value : event.getValues()) {
98 if (value.mField.matches(matcher)) {
99 switch (value.mValue.type) {
100 case INT:
101 return {value.mValue.int_value};
102 case LONG:
103 return {value.mValue.long_value};
104 default:
105 return nullopt;
106 }
107 }
108 }
109 return nullopt;
110 }
111
aggregateFields(const int64_t eventTimeNs,const MetricDimensionKey & eventKey,const LogEvent & event,vector<Interval> & intervals,Empty & empty)112 bool KllMetricProducer::aggregateFields(const int64_t eventTimeNs,
113 const MetricDimensionKey& eventKey, const LogEvent& event,
114 vector<Interval>& intervals, Empty& empty) {
115 bool seenNewData = false;
116 for (size_t i = 0; i < mFieldMatchers.size(); i++) {
117 const Matcher& matcher = mFieldMatchers[i];
118 Interval& interval = intervals[i];
119 interval.aggIndex = i;
120 const optional<int64_t> valueOpt = getInt64ValueFromEvent(event, matcher);
121 if (!valueOpt) {
122 VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str());
123 StatsdStats::getInstance().noteBadValueType(mMetricId);
124 return seenNewData;
125 }
126
127 // interval.aggregate can be nullptr from cases:
128 // 1. Initialization from default construction of Interval struct.
129 // 2. Ownership of the unique_ptr<KllQuantile> at interval.aggregate being transferred to
130 // PastBucket after flushing.
131 if (!interval.aggregate) {
132 interval.aggregate = KllQuantile::Create();
133 }
134 seenNewData = true;
135 interval.aggregate->Add(valueOpt.value());
136 interval.sampleSize += 1;
137 }
138 return seenNewData;
139 }
140
buildPartialBucket(int64_t bucketEndTimeNs,vector<Interval> & intervals)141 PastBucket<unique_ptr<KllQuantile>> KllMetricProducer::buildPartialBucket(
142 int64_t bucketEndTimeNs, vector<Interval>& intervals) {
143 PastBucket<unique_ptr<KllQuantile>> bucket;
144 bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
145 bucket.mBucketEndNs = bucketEndTimeNs;
146 for (Interval& interval : intervals) {
147 if (interval.hasValue()) {
148 bucket.aggIndex.push_back(interval.aggIndex);
149 // Transfer ownership of unique_ptr<KllQuantile> from interval.aggregate to
150 // bucket.aggregates vector. interval.aggregate is guaranteed to be nullptr after this.
151 bucket.aggregates.push_back(std::move(interval.aggregate));
152 }
153 }
154 return bucket;
155 }
156
byteSizeLocked() const157 size_t KllMetricProducer::byteSizeLocked() const {
158 size_t totalSize = 0;
159 for (const auto& [_, buckets] : mPastBuckets) {
160 totalSize += buckets.size() * kBucketSize;
161 for (const auto& bucket : buckets) {
162 static const size_t kIntSize = sizeof(int);
163 totalSize += bucket.aggIndex.size() * kIntSize;
164 if (!bucket.aggregates.empty()) {
165 static const size_t kInt64Size = sizeof(int64_t);
166 // Assume sketch size is the same for all aggregations in a bucket.
167 totalSize += bucket.aggregates.size() * kInt64Size *
168 bucket.aggregates[0]->num_stored_values();
169 }
170 }
171 }
172 return totalSize;
173 }
174
175 } // namespace statsd
176 } // namespace os
177 } // namespace android
178