• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define STATSD_DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "NumericValueMetricProducer.h"
21 
22 #include <limits.h>
23 #include <stdlib.h>
24 
25 #include "guardrail/StatsdStats.h"
26 #include "metrics/parsing_utils/metrics_manager_util.h"
27 #include "stats_log_util.h"
28 
29 using android::util::FIELD_COUNT_REPEATED;
30 using android::util::FIELD_TYPE_BOOL;
31 using android::util::FIELD_TYPE_DOUBLE;
32 using android::util::FIELD_TYPE_INT32;
33 using android::util::FIELD_TYPE_INT64;
34 using android::util::FIELD_TYPE_MESSAGE;
35 using android::util::FIELD_TYPE_STRING;
36 using android::util::ProtoOutputStream;
37 using std::map;
38 using std::optional;
39 using std::shared_ptr;
40 using std::string;
41 using std::unordered_map;
42 
43 namespace android {
44 namespace os {
45 namespace statsd {
46 
47 // for StatsLogReport
48 const int FIELD_ID_VALUE_METRICS = 7;
49 // for ValueBucketInfo
50 const int FIELD_ID_VALUE_INDEX = 1;
51 const int FIELD_ID_VALUE_LONG = 2;
52 const int FIELD_ID_VALUE_DOUBLE = 3;
53 const int FIELD_ID_VALUE_SAMPLESIZE = 4;
54 const int FIELD_ID_VALUES = 9;
55 const int FIELD_ID_BUCKET_NUM = 4;
56 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
57 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
58 const int FIELD_ID_CONDITION_TRUE_NS = 10;
59 const int FIELD_ID_CONDITION_CORRECTION_NS = 11;
60 
61 const Value ZERO_LONG((int64_t)0);
62 const Value ZERO_DOUBLE(0.0);
63 
64 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
NumericValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions)65 NumericValueMetricProducer::NumericValueMetricProducer(
66         const ConfigKey& key, const ValueMetric& metric, const uint64_t protoHash,
67         const PullOptions& pullOptions, const BucketOptions& bucketOptions,
68         const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
69         const StateOptions& stateOptions, const ActivationOptions& activationOptions,
70         const GuardrailOptions& guardrailOptions)
71     : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions,
72                           conditionOptions, stateOptions, activationOptions, guardrailOptions),
73       mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
74       mAggregationType(metric.aggregation_type()),
75       mIncludeSampleSize(metric.has_include_sample_size()
76                                  ? metric.include_sample_size()
77                                  : metric.aggregation_type() == ValueMetric_AggregationType_AVG),
78       mUseDiff(metric.has_use_diff() ? metric.use_diff() : isPulled()),
79       mValueDirection(metric.value_direction()),
80       mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
81       mUseZeroDefaultBase(metric.use_zero_default_base()),
82       mHasGlobalBase(false),
83       mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC
84                                                       : StatsdStats::kPullMaxDelayNs) {
85     // TODO(b/186677791): Use initializer list to initialize mUploadThreshold.
86     if (metric.has_threshold()) {
87         mUploadThreshold = metric.threshold();
88     }
89 }
90 
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)91 void NumericValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
92                                                          const BucketDropReason reason) {
93     ValueMetricProducer::invalidateCurrentBucket(dropTimeNs, reason);
94 
95     switch (reason) {
96         case BucketDropReason::DUMP_REPORT_REQUESTED:
97         case BucketDropReason::EVENT_IN_WRONG_BUCKET:
98         case BucketDropReason::CONDITION_UNKNOWN:
99         case BucketDropReason::PULL_FAILED:
100         case BucketDropReason::PULL_DELAYED:
101         case BucketDropReason::DIMENSION_GUARDRAIL_REACHED:
102             resetBase();
103             break;
104         default:
105             break;
106     }
107 }
108 
resetBase()109 void NumericValueMetricProducer::resetBase() {
110     for (auto& [_, dimInfo] : mDimInfos) {
111         for (optional<Value>& base : dimInfo.dimExtras) {
112             base.reset();
113         }
114     }
115     mHasGlobalBase = false;
116 }
117 
writePastBucketAggregateToProto(const int aggIndex,const Value & value,const int sampleSize,ProtoOutputStream * const protoOutput) const118 void NumericValueMetricProducer::writePastBucketAggregateToProto(
119         const int aggIndex, const Value& value, const int sampleSize,
120         ProtoOutputStream* const protoOutput) const {
121     uint64_t valueToken =
122             protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
123     protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, aggIndex);
124     if (mIncludeSampleSize) {
125         protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_SAMPLESIZE, sampleSize);
126     }
127     if (value.getType() == LONG) {
128         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)value.long_value);
129         VLOG("\t\t value %d: %lld", aggIndex, (long long)value.long_value);
130     } else if (value.getType() == DOUBLE) {
131         protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, value.double_value);
132         VLOG("\t\t value %d: %.2f", aggIndex, value.double_value);
133     } else {
134         VLOG("Wrong value type for ValueMetric output: %d", value.getType());
135     }
136     protoOutput->end(valueToken);
137 }
138 
onActiveStateChangedInternalLocked(const int64_t eventTimeNs,const bool isActive)139 void NumericValueMetricProducer::onActiveStateChangedInternalLocked(const int64_t eventTimeNs,
140                                                                     const bool isActive) {
141     // When active state changes from true to false for pulled metric, clear diff base but don't
142     // reset other counters as we may accumulate more value in the bucket.
143     if (mUseDiff && !isActive) {
144         resetBase();
145     }
146 }
147 
148 // Only called when mIsActive and the event is NOT too late.
onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)149 void NumericValueMetricProducer::onConditionChangedInternalLocked(const ConditionState oldCondition,
150                                                                   const ConditionState newCondition,
151                                                                   const int64_t eventTimeNs) {
152     // For metrics that use diff, when condition changes from true to false,
153     // clear diff base but don't reset other counts because we may accumulate
154     // more value in the bucket.
155     if (mUseDiff &&
156         (oldCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
157         resetBase();
158     }
159 }
160 
prepareFirstBucketLocked()161 void NumericValueMetricProducer::prepareFirstBucketLocked() {
162     // Kicks off the puller immediately if condition is true and diff based.
163     if (mIsActive && isPulled() && mCondition == ConditionState::kTrue && mUseDiff) {
164         pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
165     }
166 }
167 
pullAndMatchEventsLocked(const int64_t timestampNs)168 void NumericValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
169     vector<shared_ptr<LogEvent>> allData;
170     if (!mPullerManager->Pull(mPullAtomId, mConfigKey, timestampNs, &allData)) {
171         ALOGE("Stats puller failed for tag: %d at %lld", mPullAtomId, (long long)timestampNs);
172         invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
173         return;
174     }
175 
176     accumulateEvents(allData, timestampNs, timestampNs);
177 }
178 
calcPreviousBucketEndTime(const int64_t currentTimeNs)179 int64_t NumericValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
180     return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
181 }
182 
183 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
184 // to be delayed. Other events like condition changes or app upgrade which are not based on
185 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,PullResult pullResult,int64_t originalPullTimeNs)186 void NumericValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
187                                               PullResult pullResult, int64_t originalPullTimeNs) {
188     lock_guard<mutex> lock(mMutex);
189     if (mCondition == ConditionState::kTrue) {
190         // If the pull failed, we won't be able to compute a diff.
191         if (pullResult == PullResult::PULL_RESULT_FAIL) {
192             invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
193         } else if (pullResult == PullResult::PULL_RESULT_SUCCESS) {
194             bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
195             if (isEventLate) {
196                 // If the event is late, we are in the middle of a bucket. Just
197                 // process the data without trying to snap the data to the nearest bucket.
198                 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
199             } else {
200                 // For scheduled pulled data, the effective event time is snap to the nearest
201                 // bucket end. In the case of waking up from a deep sleep state, we will
202                 // attribute to the previous bucket end. If the sleep was long but not very
203                 // long, we will be in the immediate next bucket. Previous bucket may get a
204                 // larger number as we pull at a later time than real bucket end.
205                 //
206                 // If the sleep was very long, we skip more than one bucket before sleep. In
207                 // this case, if the diff base will be cleared and this new data will serve as
208                 // new diff base.
209                 int64_t bucketEndTimeNs = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
210                 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
211                         mMetricId, originalPullTimeNs - bucketEndTimeNs);
212                 accumulateEvents(allData, originalPullTimeNs, bucketEndTimeNs);
213             }
214         }
215     }
216 
217     // We can probably flush the bucket. Since we used bucketEndTimeNs when calling
218     // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
219     flushIfNeededLocked(originalPullTimeNs);
220 }
221 
combineValueFields(pair<LogEvent,vector<int>> & eventValues,const LogEvent & newEvent,const vector<int> & newValueIndices) const222 void NumericValueMetricProducer::combineValueFields(pair<LogEvent, vector<int>>& eventValues,
223                                                     const LogEvent& newEvent,
224                                                     const vector<int>& newValueIndices) const {
225     if (eventValues.second.size() != newValueIndices.size()) {
226         ALOGE("NumericValueMetricProducer value indices sizes don't match");
227         return;
228     }
229     vector<FieldValue>* const aggregateFieldValues = eventValues.first.getMutableValues();
230     const vector<FieldValue>& newFieldValues = newEvent.getValues();
231     for (size_t i = 0; i < eventValues.second.size(); ++i) {
232         if (newValueIndices[i] != -1 && eventValues.second[i] != -1) {
233             (*aggregateFieldValues)[eventValues.second[i]].mValue +=
234                     newFieldValues[newValueIndices[i]].mValue;
235         }
236     }
237 }
238 
239 // Process events retrieved from a pull.
accumulateEvents(const vector<shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs)240 void NumericValueMetricProducer::accumulateEvents(const vector<shared_ptr<LogEvent>>& allData,
241                                                   int64_t originalPullTimeNs,
242                                                   int64_t eventElapsedTimeNs) {
243     if (isEventLateLocked(eventElapsedTimeNs)) {
244         VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
245              (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
246         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
247         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
248         return;
249     }
250 
251     const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
252     const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
253     StatsdStats::getInstance().notePullDelay(mPullAtomId, pullDelayNs);
254     if (pullDelayNs > mMaxPullDelayNs) {
255         ALOGE("Pull finish too late for atom %d, longer than %lld", mPullAtomId,
256               (long long)mMaxPullDelayNs);
257         StatsdStats::getInstance().notePullExceedMaxDelay(mPullAtomId);
258         // We are missing one pull from the bucket which means we will not have a complete view of
259         // what's going on.
260         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
261         return;
262     }
263 
264     mMatchedMetricDimensionKeys.clear();
265     if (mUseDiff) {
266         // An extra aggregation step is needed to sum values with matching dimensions
267         // before calculating the diff between sums of consecutive pulls.
268         std::unordered_map<HashableDimensionKey, pair<LogEvent, vector<int>>> aggregateEvents;
269         for (const auto& data : allData) {
270             if (mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex) !=
271                 MatchingState::kMatched) {
272                 continue;
273             }
274 
275             // Get dimensions_in_what key and value indices.
276             HashableDimensionKey dimensionsInWhat;
277             vector<int> valueIndices(mFieldMatchers.size(), -1);
278             if (!filterValues(mDimensionsInWhat, mFieldMatchers, data->getValues(),
279                               dimensionsInWhat, valueIndices)) {
280                 StatsdStats::getInstance().noteBadValueType(mMetricId);
281             }
282 
283             // Store new event in map or combine values in existing event.
284             auto it = aggregateEvents.find(dimensionsInWhat);
285             if (it == aggregateEvents.end()) {
286                 aggregateEvents.emplace(std::piecewise_construct,
287                                         std::forward_as_tuple(dimensionsInWhat),
288                                         std::forward_as_tuple(*data, valueIndices));
289             } else {
290                 combineValueFields(it->second, *data, valueIndices);
291             }
292         }
293 
294         for (auto& [dimKey, eventInfo] : aggregateEvents) {
295             eventInfo.first.setElapsedTimestampNs(eventElapsedTimeNs);
296             onMatchedLogEventLocked(mWhatMatcherIndex, eventInfo.first);
297         }
298     } else {
299         for (const auto& data : allData) {
300             LogEvent localCopy = *data;
301             if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
302                 MatchingState::kMatched) {
303                 localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
304                 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
305             }
306         }
307     }
308 
309     // If a key that is:
310     // 1. Tracked in mCurrentSlicedBucket and
311     // 2. A superset of the current mStateChangePrimaryKey
312     // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
313     // then we clear the data from mDimInfos to reset the base and current state key.
314     for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
315         const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat();
316         bool presentInPulledData =
317                 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
318         if (!presentInPulledData &&
319             containsLinkedStateValues(whatKey, mStateChangePrimaryKey.second, mMetric2StateLinks,
320                                       mStateChangePrimaryKey.first)) {
321             auto it = mDimInfos.find(whatKey);
322             if (it != mDimInfos.end()) {
323                 mDimInfos.erase(it);
324             }
325             // Turn OFF condition timer for keys not present in pulled data.
326             currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs);
327         }
328     }
329     mMatchedMetricDimensionKeys.clear();
330     mHasGlobalBase = true;
331 
332     // If we reach the guardrail, we might have dropped some data which means the bucket is
333     // incomplete.
334     //
335     // The base also needs to be reset. If we do not have the full data, we might
336     // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
337     // might be missing from mCurrentSlicedBucket.
338     if (hasReachedGuardRailLimit()) {
339         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
340         mCurrentSlicedBucket.clear();
341     }
342 }
343 
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey)344 bool NumericValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
345     // ===========GuardRail==============
346     // 1. Report the tuple count if the tuple count > soft limit
347     if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
348         return false;
349     }
350     if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
351         size_t newTupleCount = mCurrentFullBucket.size() + 1;
352         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
353         if (newTupleCount > mDimensionHardLimit) {
354             if (!mHasHitGuardrail) {
355                 ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
356                       (long long)mMetricId, newKey.toString().c_str());
357                 mHasHitGuardrail = true;
358             }
359             return true;
360         }
361     }
362 
363     return false;
364 }
365 
getDoubleOrLong(const LogEvent & event,const Matcher & matcher,Value & ret)366 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
367     for (const FieldValue& value : event.getValues()) {
368         if (value.mField.matches(matcher)) {
369             switch (value.mValue.type) {
370                 case INT:
371                     ret.setLong(value.mValue.int_value);
372                     break;
373                 case LONG:
374                     ret.setLong(value.mValue.long_value);
375                     break;
376                 case FLOAT:
377                     ret.setDouble(value.mValue.float_value);
378                     break;
379                 case DOUBLE:
380                     ret.setDouble(value.mValue.double_value);
381                     break;
382                 default:
383                     return false;
384                     break;
385             }
386             return true;
387         }
388     }
389     return false;
390 }
391 
aggregateFields(const int64_t eventTimeNs,const MetricDimensionKey & eventKey,const LogEvent & event,vector<Interval> & intervals,ValueBases & bases)392 bool NumericValueMetricProducer::aggregateFields(const int64_t eventTimeNs,
393                                                  const MetricDimensionKey& eventKey,
394                                                  const LogEvent& event, vector<Interval>& intervals,
395                                                  ValueBases& bases) {
396     if (bases.size() < mFieldMatchers.size()) {
397         VLOG("Resizing number of bases to %zu", mFieldMatchers.size());
398         bases.resize(mFieldMatchers.size());
399     }
400 
401     // We only use anomaly detection under certain cases.
402     // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
403     // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
404     // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
405     // Whoever next works on it should look into the cases where it is triggered in this function.
406     // Discussion here: http://ag/6124370.
407     bool useAnomalyDetection = true;
408     bool seenNewData = false;
409     for (size_t i = 0; i < mFieldMatchers.size(); i++) {
410         const Matcher& matcher = mFieldMatchers[i];
411         Interval& interval = intervals[i];
412         interval.aggIndex = i;
413         optional<Value>& base = bases[i];
414         Value value;
415         if (!getDoubleOrLong(event, matcher, value)) {
416             VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str());
417             StatsdStats::getInstance().noteBadValueType(mMetricId);
418             return seenNewData;
419         }
420         seenNewData = true;
421         if (mUseDiff) {
422             if (!base.has_value()) {
423                 if (mHasGlobalBase && mUseZeroDefaultBase) {
424                     // The bucket has global base. This key does not.
425                     // Optionally use zero as base.
426                     base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
427                 } else {
428                     // no base. just update base and return.
429                     base = value;
430 
431                     // If we're missing a base, do not use anomaly detection on incomplete data
432                     useAnomalyDetection = false;
433 
434                     // Continue (instead of return) here in order to set base value for other bases
435                     continue;
436                 }
437             }
438             Value diff;
439             switch (mValueDirection) {
440                 case ValueMetric::INCREASING:
441                     if (value >= base.value()) {
442                         diff = value - base.value();
443                     } else if (mUseAbsoluteValueOnReset) {
444                         diff = value;
445                     } else {
446                         VLOG("Unexpected decreasing value");
447                         StatsdStats::getInstance().notePullDataError(mPullAtomId);
448                         base = value;
449                         // If we've got bad data, do not use anomaly detection
450                         useAnomalyDetection = false;
451                         continue;
452                     }
453                     break;
454                 case ValueMetric::DECREASING:
455                     if (base.value() >= value) {
456                         diff = base.value() - value;
457                     } else if (mUseAbsoluteValueOnReset) {
458                         diff = value;
459                     } else {
460                         VLOG("Unexpected increasing value");
461                         StatsdStats::getInstance().notePullDataError(mPullAtomId);
462                         base = value;
463                         // If we've got bad data, do not use anomaly detection
464                         useAnomalyDetection = false;
465                         continue;
466                     }
467                     break;
468                 case ValueMetric::ANY:
469                     diff = value - base.value();
470                     break;
471                 default:
472                     break;
473             }
474             base = value;
475             value = diff;
476         }
477 
478         if (interval.hasValue()) {
479             switch (mAggregationType) {
480                 case ValueMetric::SUM:
481                     // for AVG, we add up and take average when flushing the bucket
482                 case ValueMetric::AVG:
483                     interval.aggregate += value;
484                     break;
485                 case ValueMetric::MIN:
486                     interval.aggregate = min(value, interval.aggregate);
487                     break;
488                 case ValueMetric::MAX:
489                     interval.aggregate = max(value, interval.aggregate);
490                     break;
491                 default:
492                     break;
493             }
494         } else {
495             interval.aggregate = value;
496         }
497         interval.sampleSize += 1;
498     }
499 
500     // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due
501     // to MULTIPLE_BUCKETS_SKIPPED.
502     if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) {
503         // TODO: propgate proper values down stream when anomaly support doubles
504         long wholeBucketVal = intervals[0].aggregate.long_value;
505         auto prev = mCurrentFullBucket.find(eventKey);
506         if (prev != mCurrentFullBucket.end()) {
507             wholeBucketVal += prev->second;
508         }
509         for (auto& tracker : mAnomalyTrackers) {
510             tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
511                                              wholeBucketVal);
512         }
513     }
514     return seenNewData;
515 }
516 
buildPartialBucket(int64_t bucketEndTimeNs,vector<Interval> & intervals)517 PastBucket<Value> NumericValueMetricProducer::buildPartialBucket(int64_t bucketEndTimeNs,
518                                                                  vector<Interval>& intervals) {
519     PastBucket<Value> bucket;
520     bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
521     bucket.mBucketEndNs = bucketEndTimeNs;
522 
523     // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold,
524     // then all interval values are discarded for this bucket.
525     if (intervals.empty() || (intervals[0].hasValue() && !valuePassesThreshold(intervals[0]))) {
526         return bucket;
527     }
528 
529     for (const Interval& interval : intervals) {
530         // skip the output if the diff is zero
531         if (!interval.hasValue() ||
532             (mSkipZeroDiffOutput && mUseDiff && interval.aggregate.isZero())) {
533             continue;
534         }
535 
536         bucket.aggIndex.push_back(interval.aggIndex);
537         bucket.aggregates.push_back(getFinalValue(interval));
538         if (mIncludeSampleSize) {
539             bucket.sampleSizes.push_back(interval.sampleSize);
540         }
541     }
542     return bucket;
543 }
544 
545 // Also invalidates current bucket if multiple buckets have been skipped
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)546 void NumericValueMetricProducer::closeCurrentBucket(const int64_t eventTimeNs,
547                                                     const int64_t nextBucketStartTimeNs) {
548     ValueMetricProducer::closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
549     if (mAnomalyTrackers.size() > 0) {
550         appendToFullBucket(eventTimeNs > getCurrentBucketEndTimeNs());
551     }
552 }
553 
initNextSlicedBucket(int64_t nextBucketStartTimeNs)554 void NumericValueMetricProducer::initNextSlicedBucket(int64_t nextBucketStartTimeNs) {
555     ValueMetricProducer::initNextSlicedBucket(nextBucketStartTimeNs);
556 
557     // If we do not have a global base when the condition is true,
558     // we will have incomplete bucket for the next bucket.
559     if (mUseDiff && !mHasGlobalBase && mCondition) {
560         // TODO(b/188878815): mCurrentBucketIsSkipped should probably be set to true here.
561         mCurrentBucketIsSkipped = false;
562     }
563 }
564 
appendToFullBucket(const bool isFullBucketReached)565 void NumericValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
566     if (mCurrentBucketIsSkipped) {
567         if (isFullBucketReached) {
568             // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
569             mCurrentFullBucket.clear();
570         }
571         // Current bucket is invalid, we do not add it to the full bucket.
572         return;
573     }
574 
575     if (isFullBucketReached) {  // If full bucket, send to anomaly tracker.
576         // Accumulate partial buckets with current value and then send to anomaly tracker.
577         if (mCurrentFullBucket.size() > 0) {
578             for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
579                 if (hitFullBucketGuardRailLocked(metricDimensionKey) ||
580                     currentBucket.intervals.empty()) {
581                     continue;
582                 }
583                 // TODO: fix this when anomaly can accept double values
584                 auto& interval = currentBucket.intervals[0];
585                 if (interval.hasValue()) {
586                     mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
587                 }
588             }
589             for (const auto& [metricDimensionKey, value] : mCurrentFullBucket) {
590                 for (auto& tracker : mAnomalyTrackers) {
591                     if (tracker != nullptr) {
592                         tracker->addPastBucket(metricDimensionKey, value, mCurrentBucketNum);
593                     }
594                 }
595             }
596             mCurrentFullBucket.clear();
597         } else {
598             // Skip aggregating the partial buckets since there's no previous partial bucket.
599             for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
600                 for (auto& tracker : mAnomalyTrackers) {
601                     if (tracker != nullptr && !currentBucket.intervals.empty()) {
602                         // TODO: fix this when anomaly can accept double values
603                         auto& interval = currentBucket.intervals[0];
604                         if (interval.hasValue()) {
605                             tracker->addPastBucket(metricDimensionKey,
606                                                    interval.aggregate.long_value,
607                                                    mCurrentBucketNum);
608                         }
609                     }
610                 }
611             }
612         }
613     } else {
614         // Accumulate partial bucket.
615         for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
616             if (!currentBucket.intervals.empty()) {
617                 // TODO: fix this when anomaly can accept double values
618                 auto& interval = currentBucket.intervals[0];
619                 if (interval.hasValue()) {
620                     mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
621                 }
622             }
623         }
624     }
625 }
626 
byteSizeLocked() const627 size_t NumericValueMetricProducer::byteSizeLocked() const {
628     size_t totalSize = 0;
629     for (const auto& [_, buckets] : mPastBuckets) {
630         totalSize += buckets.size() * kBucketSize;
631         // TODO(b/189283526): Add bytes used to store PastBucket.aggIndex vector
632     }
633     return totalSize;
634 }
635 
valuePassesThreshold(const Interval & interval) const636 bool NumericValueMetricProducer::valuePassesThreshold(const Interval& interval) const {
637     if (mUploadThreshold == nullopt) {
638         return true;
639     }
640 
641     Value finalValue = getFinalValue(interval);
642 
643     double doubleValue =
644             finalValue.type == LONG ? (double)finalValue.long_value : finalValue.double_value;
645     switch (mUploadThreshold->value_comparison_case()) {
646         case UploadThreshold::kLtInt:
647             return doubleValue < (double)mUploadThreshold->lt_int();
648         case UploadThreshold::kGtInt:
649             return doubleValue > (double)mUploadThreshold->gt_int();
650         case UploadThreshold::kLteInt:
651             return doubleValue <= (double)mUploadThreshold->lte_int();
652         case UploadThreshold::kGteInt:
653             return doubleValue >= (double)mUploadThreshold->gte_int();
654         case UploadThreshold::kLtFloat:
655             return doubleValue <= (double)mUploadThreshold->lt_float();
656         case UploadThreshold::kGtFloat:
657             return doubleValue >= (double)mUploadThreshold->gt_float();
658         default:
659             ALOGE("Value metric no upload threshold type used");
660             return false;
661     }
662 }
663 
getFinalValue(const Interval & interval) const664 Value NumericValueMetricProducer::getFinalValue(const Interval& interval) const {
665     if (mAggregationType != ValueMetric::AVG) {
666         return interval.aggregate;
667     } else {
668         double sum = interval.aggregate.type == LONG ? (double)interval.aggregate.long_value
669                                                      : interval.aggregate.double_value;
670         return Value((double)sum / interval.sampleSize);
671     }
672 }
673 
getDumpProtoFields() const674 NumericValueMetricProducer::DumpProtoFields NumericValueMetricProducer::getDumpProtoFields() const {
675     return {FIELD_ID_VALUE_METRICS,
676             FIELD_ID_BUCKET_NUM,
677             FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
678             FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
679             FIELD_ID_CONDITION_TRUE_NS,
680             FIELD_ID_CONDITION_CORRECTION_NS};
681 }
682 
683 }  // namespace statsd
684 }  // namespace os
685 }  // namespace android
686