• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define STATSD_DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "KllMetricProducer.h"
21 
22 #include <limits.h>
23 #include <stdlib.h>
24 
25 #include "guardrail/StatsdStats.h"
26 #include "metrics/parsing_utils/metrics_manager_util.h"
27 #include "stats_log_util.h"
28 
29 using android::util::FIELD_COUNT_REPEATED;
30 using android::util::FIELD_TYPE_BYTES;
31 using android::util::FIELD_TYPE_INT32;
32 using android::util::FIELD_TYPE_MESSAGE;
33 using android::util::ProtoOutputStream;
34 using std::map;
35 using std::nullopt;
36 using std::optional;
37 using std::shared_ptr;
38 using std::string;
39 using std::unordered_map;
40 using zetasketch::android::AggregatorStateProto;
41 
42 namespace android {
43 namespace os {
44 namespace statsd {
45 
46 // for StatsLogReport
47 const int FIELD_ID_KLL_METRICS = 16;
48 // for KllBucketInfo
49 const int FIELD_ID_SKETCH_INDEX = 1;
50 const int FIELD_ID_KLL_SKETCH = 2;
51 const int FIELD_ID_SKETCHES = 3;
52 const int FIELD_ID_BUCKET_NUM = 4;
53 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
54 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
55 const int FIELD_ID_CONDITION_TRUE_NS = 7;
56 
KllMetricProducer(const ConfigKey & key,const KllMetric & metric,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions)57 KllMetricProducer::KllMetricProducer(const ConfigKey& key, const KllMetric& metric,
58                                      const uint64_t protoHash, const PullOptions& pullOptions,
59                                      const BucketOptions& bucketOptions,
60                                      const WhatOptions& whatOptions,
61                                      const ConditionOptions& conditionOptions,
62                                      const StateOptions& stateOptions,
63                                      const ActivationOptions& activationOptions,
64                                      const GuardrailOptions& guardrailOptions)
65     : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions,
66                           conditionOptions, stateOptions, activationOptions, guardrailOptions) {
67 }
68 
getDumpProtoFields() const69 KllMetricProducer::DumpProtoFields KllMetricProducer::getDumpProtoFields() const {
70     return {FIELD_ID_KLL_METRICS,
71             FIELD_ID_BUCKET_NUM,
72             FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
73             FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
74             FIELD_ID_CONDITION_TRUE_NS,
75             /*conditionCorrectionNsFieldId=*/nullopt};
76 }
77 
writePastBucketAggregateToProto(const int aggIndex,const unique_ptr<KllQuantile> & kll,const int sampleSize,ProtoOutputStream * const protoOutput) const78 void KllMetricProducer::writePastBucketAggregateToProto(
79         const int aggIndex, const unique_ptr<KllQuantile>& kll, const int sampleSize,
80         ProtoOutputStream* const protoOutput) const {
81     uint64_t sketchesToken =
82             protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKETCHES);
83     protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_SKETCH_INDEX, aggIndex);
84 
85     // TODO(b/186737273): Serialize directly to ProtoOutputStream
86     const AggregatorStateProto& aggProto = kll->SerializeToProto();
87     const size_t numBytes = aggProto.ByteSizeLong();
88     const unique_ptr<char[]> buffer(new char[numBytes]);
89     aggProto.SerializeToArray(&buffer[0], numBytes);
90     protoOutput->write(FIELD_TYPE_BYTES | FIELD_ID_KLL_SKETCH, &buffer[0], numBytes);
91 
92     VLOG("\t\t sketch %d: %zu bytes", aggIndex, numBytes);
93     protoOutput->end(sketchesToken);
94 }
95 
getInt64ValueFromEvent(const LogEvent & event,const Matcher & matcher)96 optional<int64_t> getInt64ValueFromEvent(const LogEvent& event, const Matcher& matcher) {
97     for (const FieldValue& value : event.getValues()) {
98         if (value.mField.matches(matcher)) {
99             switch (value.mValue.type) {
100                 case INT:
101                     return {value.mValue.int_value};
102                 case LONG:
103                     return {value.mValue.long_value};
104                 default:
105                     return nullopt;
106             }
107         }
108     }
109     return nullopt;
110 }
111 
aggregateFields(const int64_t eventTimeNs,const MetricDimensionKey & eventKey,const LogEvent & event,vector<Interval> & intervals,Empty & empty)112 bool KllMetricProducer::aggregateFields(const int64_t eventTimeNs,
113                                         const MetricDimensionKey& eventKey, const LogEvent& event,
114                                         vector<Interval>& intervals, Empty& empty) {
115     bool seenNewData = false;
116     for (size_t i = 0; i < mFieldMatchers.size(); i++) {
117         const Matcher& matcher = mFieldMatchers[i];
118         Interval& interval = intervals[i];
119         interval.aggIndex = i;
120         const optional<int64_t> valueOpt = getInt64ValueFromEvent(event, matcher);
121         if (!valueOpt) {
122             VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str());
123             StatsdStats::getInstance().noteBadValueType(mMetricId);
124             return seenNewData;
125         }
126 
127         // interval.aggregate can be nullptr from cases:
128         // 1. Initialization from default construction of Interval struct.
129         // 2. Ownership of the unique_ptr<KllQuantile> at interval.aggregate being transferred to
130         // PastBucket after flushing.
131         if (!interval.aggregate) {
132             interval.aggregate = KllQuantile::Create();
133         }
134         seenNewData = true;
135         interval.aggregate->Add(valueOpt.value());
136         interval.sampleSize += 1;
137     }
138     return seenNewData;
139 }
140 
buildPartialBucket(int64_t bucketEndTimeNs,vector<Interval> & intervals)141 PastBucket<unique_ptr<KllQuantile>> KllMetricProducer::buildPartialBucket(
142         int64_t bucketEndTimeNs, vector<Interval>& intervals) {
143     PastBucket<unique_ptr<KllQuantile>> bucket;
144     bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
145     bucket.mBucketEndNs = bucketEndTimeNs;
146     for (Interval& interval : intervals) {
147         if (interval.hasValue()) {
148             bucket.aggIndex.push_back(interval.aggIndex);
149             // Transfer ownership of unique_ptr<KllQuantile> from interval.aggregate to
150             // bucket.aggregates vector. interval.aggregate is guaranteed to be nullptr after this.
151             bucket.aggregates.push_back(std::move(interval.aggregate));
152         }
153     }
154     return bucket;
155 }
156 
byteSizeLocked() const157 size_t KllMetricProducer::byteSizeLocked() const {
158     size_t totalSize = 0;
159     for (const auto& [_, buckets] : mPastBuckets) {
160         totalSize += buckets.size() * kBucketSize;
161         for (const auto& bucket : buckets) {
162             static const size_t kIntSize = sizeof(int);
163             totalSize += bucket.aggIndex.size() * kIntSize;
164             if (!bucket.aggregates.empty()) {
165                 static const size_t kInt64Size = sizeof(int64_t);
166                 // Assume sketch size is the same for all aggregations in a bucket.
167                 totalSize += bucket.aggregates.size() * kInt64Size *
168                              bucket.aggregates[0]->num_stored_values();
169             }
170         }
171     }
172     return totalSize;
173 }
174 
175 }  // namespace statsd
176 }  // namespace os
177 }  // namespace android
178