• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define STATSD_DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "NumericValueMetricProducer.h"
21 
22 #include <limits.h>
23 #include <stdlib.h>
24 
25 #include "guardrail/StatsdStats.h"
26 #include "metrics/parsing_utils/metrics_manager_util.h"
27 #include "stats_log_util.h"
28 
29 using android::util::FIELD_COUNT_REPEATED;
30 using android::util::FIELD_TYPE_BOOL;
31 using android::util::FIELD_TYPE_DOUBLE;
32 using android::util::FIELD_TYPE_INT32;
33 using android::util::FIELD_TYPE_INT64;
34 using android::util::FIELD_TYPE_MESSAGE;
35 using android::util::FIELD_TYPE_STRING;
36 using android::util::ProtoOutputStream;
37 using std::map;
38 using std::optional;
39 using std::shared_ptr;
40 using std::string;
41 using std::unordered_map;
42 
43 namespace android {
44 namespace os {
45 namespace statsd {
46 
47 // for StatsLogReport
48 const int FIELD_ID_VALUE_METRICS = 7;
49 // for ValueBucketInfo
50 const int FIELD_ID_VALUE_INDEX = 1;
51 const int FIELD_ID_VALUE_LONG = 2;
52 const int FIELD_ID_VALUE_DOUBLE = 3;
53 const int FIELD_ID_VALUES = 9;
54 const int FIELD_ID_BUCKET_NUM = 4;
55 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
56 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
57 const int FIELD_ID_CONDITION_TRUE_NS = 10;
58 const int FIELD_ID_CONDITION_CORRECTION_NS = 11;
59 
60 const Value ZERO_LONG((int64_t)0);
61 const Value ZERO_DOUBLE(0.0);
62 
63 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
NumericValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions)64 NumericValueMetricProducer::NumericValueMetricProducer(
65         const ConfigKey& key, const ValueMetric& metric, const uint64_t protoHash,
66         const PullOptions& pullOptions, const BucketOptions& bucketOptions,
67         const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
68         const StateOptions& stateOptions, const ActivationOptions& activationOptions,
69         const GuardrailOptions& guardrailOptions)
70     : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions,
71                           conditionOptions, stateOptions, activationOptions, guardrailOptions),
72       mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
73       mAggregationType(metric.aggregation_type()),
74       mUseDiff(metric.has_use_diff() ? metric.use_diff() : isPulled()),
75       mValueDirection(metric.value_direction()),
76       mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
77       mUseZeroDefaultBase(metric.use_zero_default_base()),
78       mHasGlobalBase(false),
79       mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC
80                                                       : StatsdStats::kPullMaxDelayNs) {
81     // TODO(b/186677791): Use initializer list to initialize mUploadThreshold.
82     if (metric.has_threshold()) {
83         mUploadThreshold = metric.threshold();
84     }
85 }
86 
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)87 void NumericValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
88                                                          const BucketDropReason reason) {
89     ValueMetricProducer::invalidateCurrentBucket(dropTimeNs, reason);
90 
91     switch (reason) {
92         case BucketDropReason::DUMP_REPORT_REQUESTED:
93         case BucketDropReason::EVENT_IN_WRONG_BUCKET:
94         case BucketDropReason::CONDITION_UNKNOWN:
95         case BucketDropReason::PULL_FAILED:
96         case BucketDropReason::PULL_DELAYED:
97         case BucketDropReason::DIMENSION_GUARDRAIL_REACHED:
98             resetBase();
99             break;
100         default:
101             break;
102     }
103 }
104 
resetBase()105 void NumericValueMetricProducer::resetBase() {
106     for (auto& [_, dimInfo] : mDimInfos) {
107         for (optional<Value>& base : dimInfo.dimExtras) {
108             base.reset();
109         }
110     }
111     mHasGlobalBase = false;
112 }
113 
writePastBucketAggregateToProto(const int aggIndex,const Value & value,ProtoOutputStream * const protoOutput) const114 void NumericValueMetricProducer::writePastBucketAggregateToProto(
115         const int aggIndex, const Value& value, ProtoOutputStream* const protoOutput) const {
116     uint64_t valueToken =
117             protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
118     protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, aggIndex);
119     if (value.getType() == LONG) {
120         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)value.long_value);
121         VLOG("\t\t value %d: %lld", aggIndex, (long long)value.long_value);
122     } else if (value.getType() == DOUBLE) {
123         protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, value.double_value);
124         VLOG("\t\t value %d: %.2f", aggIndex, value.double_value);
125     } else {
126         VLOG("Wrong value type for ValueMetric output: %d", value.getType());
127     }
128     protoOutput->end(valueToken);
129 }
130 
onActiveStateChangedInternalLocked(const int64_t eventTimeNs)131 void NumericValueMetricProducer::onActiveStateChangedInternalLocked(const int64_t eventTimeNs) {
132     // When active state changes from true to false for pulled metric, clear diff base but don't
133     // reset other counters as we may accumulate more value in the bucket.
134     if (mUseDiff && !mIsActive) {
135         resetBase();
136     }
137 }
138 
139 // Only called when mIsActive and the event is NOT too late.
onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)140 void NumericValueMetricProducer::onConditionChangedInternalLocked(const ConditionState oldCondition,
141                                                                   const ConditionState newCondition,
142                                                                   const int64_t eventTimeNs) {
143     // For metrics that use diff, when condition changes from true to false,
144     // clear diff base but don't reset other counts because we may accumulate
145     // more value in the bucket.
146     if (mUseDiff &&
147         (oldCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
148         resetBase();
149     }
150 }
151 
prepareFirstBucketLocked()152 void NumericValueMetricProducer::prepareFirstBucketLocked() {
153     // Kicks off the puller immediately if condition is true and diff based.
154     if (mIsActive && isPulled() && mCondition == ConditionState::kTrue && mUseDiff) {
155         pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
156     }
157 }
158 
pullAndMatchEventsLocked(const int64_t timestampNs)159 void NumericValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
160     vector<shared_ptr<LogEvent>> allData;
161     if (!mPullerManager->Pull(mPullAtomId, mConfigKey, timestampNs, &allData)) {
162         ALOGE("Stats puller failed for tag: %d at %lld", mPullAtomId, (long long)timestampNs);
163         invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
164         return;
165     }
166 
167     accumulateEvents(allData, timestampNs, timestampNs);
168 }
169 
calcPreviousBucketEndTime(const int64_t currentTimeNs)170 int64_t NumericValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
171     return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
172 }
173 
174 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
175 // to be delayed. Other events like condition changes or app upgrade which are not based on
176 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const vector<shared_ptr<LogEvent>> & allData,bool pullSuccess,int64_t originalPullTimeNs)177 void NumericValueMetricProducer::onDataPulled(const vector<shared_ptr<LogEvent>>& allData,
178                                               bool pullSuccess, int64_t originalPullTimeNs) {
179     lock_guard<mutex> lock(mMutex);
180     if (mCondition == ConditionState::kTrue) {
181         // If the pull failed, we won't be able to compute a diff.
182         if (!pullSuccess) {
183             invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
184         } else {
185             bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
186             if (isEventLate) {
187                 // If the event is late, we are in the middle of a bucket. Just
188                 // process the data without trying to snap the data to the nearest bucket.
189                 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
190             } else {
191                 // For scheduled pulled data, the effective event time is snap to the nearest
192                 // bucket end. In the case of waking up from a deep sleep state, we will
193                 // attribute to the previous bucket end. If the sleep was long but not very
194                 // long, we will be in the immediate next bucket. Previous bucket may get a
195                 // larger number as we pull at a later time than real bucket end.
196                 //
197                 // If the sleep was very long, we skip more than one bucket before sleep. In
198                 // this case, if the diff base will be cleared and this new data will serve as
199                 // new diff base.
200                 int64_t bucketEndTimeNs = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
201                 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
202                         mMetricId, originalPullTimeNs - bucketEndTimeNs);
203                 accumulateEvents(allData, originalPullTimeNs, bucketEndTimeNs);
204             }
205         }
206     }
207 
208     // We can probably flush the bucket. Since we used bucketEndTimeNs when calling
209     // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
210     flushIfNeededLocked(originalPullTimeNs);
211 }
212 
combineValueFields(pair<LogEvent,vector<int>> & eventValues,const LogEvent & newEvent,const vector<int> & newValueIndices) const213 void NumericValueMetricProducer::combineValueFields(pair<LogEvent, vector<int>>& eventValues,
214                                                     const LogEvent& newEvent,
215                                                     const vector<int>& newValueIndices) const {
216     if (eventValues.second.size() != newValueIndices.size()) {
217         ALOGE("NumericValueMetricProducer value indices sizes don't match");
218         return;
219     }
220     vector<FieldValue>* const aggregateFieldValues = eventValues.first.getMutableValues();
221     const vector<FieldValue>& newFieldValues = newEvent.getValues();
222     for (size_t i = 0; i < eventValues.second.size(); ++i) {
223         if (newValueIndices[i] != -1 && eventValues.second[i] != -1) {
224             (*aggregateFieldValues)[eventValues.second[i]].mValue +=
225                     newFieldValues[newValueIndices[i]].mValue;
226         }
227     }
228 }
229 
230 // Process events retrieved from a pull.
accumulateEvents(const vector<shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs)231 void NumericValueMetricProducer::accumulateEvents(const vector<shared_ptr<LogEvent>>& allData,
232                                                   int64_t originalPullTimeNs,
233                                                   int64_t eventElapsedTimeNs) {
234     if (isEventLateLocked(eventElapsedTimeNs)) {
235         VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
236              (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
237         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
238         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
239         return;
240     }
241 
242     const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
243     const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
244     StatsdStats::getInstance().notePullDelay(mPullAtomId, pullDelayNs);
245     if (pullDelayNs > mMaxPullDelayNs) {
246         ALOGE("Pull finish too late for atom %d, longer than %lld", mPullAtomId,
247               (long long)mMaxPullDelayNs);
248         StatsdStats::getInstance().notePullExceedMaxDelay(mPullAtomId);
249         // We are missing one pull from the bucket which means we will not have a complete view of
250         // what's going on.
251         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
252         return;
253     }
254 
255     mMatchedMetricDimensionKeys.clear();
256     if (mUseDiff) {
257         // An extra aggregation step is needed to sum values with matching dimensions
258         // before calculating the diff between sums of consecutive pulls.
259         std::unordered_map<HashableDimensionKey, pair<LogEvent, vector<int>>> aggregateEvents;
260         for (const auto& data : allData) {
261             if (mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex) !=
262                 MatchingState::kMatched) {
263                 continue;
264             }
265 
266             // Get dimensions_in_what key and value indices.
267             HashableDimensionKey dimensionsInWhat;
268             vector<int> valueIndices(mFieldMatchers.size(), -1);
269             if (!filterValues(mDimensionsInWhat, mFieldMatchers, data->getValues(),
270                               dimensionsInWhat, valueIndices)) {
271                 StatsdStats::getInstance().noteBadValueType(mMetricId);
272             }
273 
274             // Store new event in map or combine values in existing event.
275             auto it = aggregateEvents.find(dimensionsInWhat);
276             if (it == aggregateEvents.end()) {
277                 aggregateEvents.emplace(std::piecewise_construct,
278                                         std::forward_as_tuple(dimensionsInWhat),
279                                         std::forward_as_tuple(*data, valueIndices));
280             } else {
281                 combineValueFields(it->second, *data, valueIndices);
282             }
283         }
284 
285         for (auto& [dimKey, eventInfo] : aggregateEvents) {
286             eventInfo.first.setElapsedTimestampNs(eventElapsedTimeNs);
287             onMatchedLogEventLocked(mWhatMatcherIndex, eventInfo.first);
288         }
289     } else {
290         for (const auto& data : allData) {
291             LogEvent localCopy = *data;
292             if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
293                 MatchingState::kMatched) {
294                 localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
295                 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
296             }
297         }
298     }
299 
300     // If a key that is:
301     // 1. Tracked in mCurrentSlicedBucket and
302     // 2. A superset of the current mStateChangePrimaryKey
303     // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
304     // then we clear the data from mDimInfos to reset the base and current state key.
305     for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
306         const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat();
307         bool presentInPulledData =
308                 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
309         if (!presentInPulledData &&
310             containsLinkedStateValues(whatKey, mStateChangePrimaryKey.second, mMetric2StateLinks,
311                                       mStateChangePrimaryKey.first)) {
312             auto it = mDimInfos.find(whatKey);
313             if (it != mDimInfos.end()) {
314                 mDimInfos.erase(it);
315             }
316             // Turn OFF condition timer for keys not present in pulled data.
317             currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs);
318         }
319     }
320     mMatchedMetricDimensionKeys.clear();
321     mHasGlobalBase = true;
322 
323     // If we reach the guardrail, we might have dropped some data which means the bucket is
324     // incomplete.
325     //
326     // The base also needs to be reset. If we do not have the full data, we might
327     // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
328     // might be missing from mCurrentSlicedBucket.
329     if (hasReachedGuardRailLimit()) {
330         invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
331         mCurrentSlicedBucket.clear();
332     }
333 }
334 
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey) const335 bool NumericValueMetricProducer::hitFullBucketGuardRailLocked(
336         const MetricDimensionKey& newKey) const {
337     // ===========GuardRail==============
338     // 1. Report the tuple count if the tuple count > soft limit
339     if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
340         return false;
341     }
342     if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
343         size_t newTupleCount = mCurrentFullBucket.size() + 1;
344         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
345         if (newTupleCount > mDimensionHardLimit) {
346             ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
347                   (long long)mMetricId, newKey.toString().c_str());
348             return true;
349         }
350     }
351 
352     return false;
353 }
354 
getDoubleOrLong(const LogEvent & event,const Matcher & matcher,Value & ret)355 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
356     for (const FieldValue& value : event.getValues()) {
357         if (value.mField.matches(matcher)) {
358             switch (value.mValue.type) {
359                 case INT:
360                     ret.setLong(value.mValue.int_value);
361                     break;
362                 case LONG:
363                     ret.setLong(value.mValue.long_value);
364                     break;
365                 case FLOAT:
366                     ret.setDouble(value.mValue.float_value);
367                     break;
368                 case DOUBLE:
369                     ret.setDouble(value.mValue.double_value);
370                     break;
371                 default:
372                     return false;
373                     break;
374             }
375             return true;
376         }
377     }
378     return false;
379 }
380 
aggregateFields(const int64_t eventTimeNs,const MetricDimensionKey & eventKey,const LogEvent & event,vector<Interval> & intervals,ValueBases & bases)381 bool NumericValueMetricProducer::aggregateFields(const int64_t eventTimeNs,
382                                                  const MetricDimensionKey& eventKey,
383                                                  const LogEvent& event, vector<Interval>& intervals,
384                                                  ValueBases& bases) {
385     if (bases.size() < mFieldMatchers.size()) {
386         VLOG("Resizing number of bases to %zu", mFieldMatchers.size());
387         bases.resize(mFieldMatchers.size());
388     }
389 
390     // We only use anomaly detection under certain cases.
391     // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
392     // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
393     // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
394     // Whoever next works on it should look into the cases where it is triggered in this function.
395     // Discussion here: http://ag/6124370.
396     bool useAnomalyDetection = true;
397     bool seenNewData = false;
398     for (size_t i = 0; i < mFieldMatchers.size(); i++) {
399         const Matcher& matcher = mFieldMatchers[i];
400         Interval& interval = intervals[i];
401         interval.aggIndex = i;
402         optional<Value>& base = bases[i];
403         Value value;
404         if (!getDoubleOrLong(event, matcher, value)) {
405             VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str());
406             StatsdStats::getInstance().noteBadValueType(mMetricId);
407             return seenNewData;
408         }
409         seenNewData = true;
410         if (mUseDiff) {
411             if (!base.has_value()) {
412                 if (mHasGlobalBase && mUseZeroDefaultBase) {
413                     // The bucket has global base. This key does not.
414                     // Optionally use zero as base.
415                     base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
416                 } else {
417                     // no base. just update base and return.
418                     base = value;
419 
420                     // If we're missing a base, do not use anomaly detection on incomplete data
421                     useAnomalyDetection = false;
422 
423                     // Continue (instead of return) here in order to set base value for other bases
424                     continue;
425                 }
426             }
427             Value diff;
428             switch (mValueDirection) {
429                 case ValueMetric::INCREASING:
430                     if (value >= base.value()) {
431                         diff = value - base.value();
432                     } else if (mUseAbsoluteValueOnReset) {
433                         diff = value;
434                     } else {
435                         VLOG("Unexpected decreasing value");
436                         StatsdStats::getInstance().notePullDataError(mPullAtomId);
437                         base = value;
438                         // If we've got bad data, do not use anomaly detection
439                         useAnomalyDetection = false;
440                         continue;
441                     }
442                     break;
443                 case ValueMetric::DECREASING:
444                     if (base.value() >= value) {
445                         diff = base.value() - value;
446                     } else if (mUseAbsoluteValueOnReset) {
447                         diff = value;
448                     } else {
449                         VLOG("Unexpected increasing value");
450                         StatsdStats::getInstance().notePullDataError(mPullAtomId);
451                         base = value;
452                         // If we've got bad data, do not use anomaly detection
453                         useAnomalyDetection = false;
454                         continue;
455                     }
456                     break;
457                 case ValueMetric::ANY:
458                     diff = value - base.value();
459                     break;
460                 default:
461                     break;
462             }
463             base = value;
464             value = diff;
465         }
466 
467         if (interval.hasValue()) {
468             switch (mAggregationType) {
469                 case ValueMetric::SUM:
470                     // for AVG, we add up and take average when flushing the bucket
471                 case ValueMetric::AVG:
472                     interval.aggregate += value;
473                     break;
474                 case ValueMetric::MIN:
475                     interval.aggregate = min(value, interval.aggregate);
476                     break;
477                 case ValueMetric::MAX:
478                     interval.aggregate = max(value, interval.aggregate);
479                     break;
480                 default:
481                     break;
482             }
483         } else {
484             interval.aggregate = value;
485         }
486         interval.sampleSize += 1;
487     }
488 
489     // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due
490     // to MULTIPLE_BUCKETS_SKIPPED.
491     if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) {
492         // TODO: propgate proper values down stream when anomaly support doubles
493         long wholeBucketVal = intervals[0].aggregate.long_value;
494         auto prev = mCurrentFullBucket.find(eventKey);
495         if (prev != mCurrentFullBucket.end()) {
496             wholeBucketVal += prev->second;
497         }
498         for (auto& tracker : mAnomalyTrackers) {
499             tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
500                                              wholeBucketVal);
501         }
502     }
503     return seenNewData;
504 }
505 
buildPartialBucket(int64_t bucketEndTimeNs,vector<Interval> & intervals)506 PastBucket<Value> NumericValueMetricProducer::buildPartialBucket(int64_t bucketEndTimeNs,
507                                                                  vector<Interval>& intervals) {
508     PastBucket<Value> bucket;
509     bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
510     bucket.mBucketEndNs = bucketEndTimeNs;
511 
512     // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold,
513     // then all interval values are discarded for this bucket.
514     if (intervals.empty() || (intervals[0].hasValue() && !valuePassesThreshold(intervals[0]))) {
515         return bucket;
516     }
517 
518     for (const Interval& interval : intervals) {
519         // skip the output if the diff is zero
520         if (!interval.hasValue() ||
521             (mSkipZeroDiffOutput && mUseDiff && interval.aggregate.isZero())) {
522             continue;
523         }
524 
525         bucket.aggIndex.push_back(interval.aggIndex);
526         bucket.aggregates.push_back(getFinalValue(interval));
527     }
528     return bucket;
529 }
530 
531 // Also invalidates current bucket if multiple buckets have been skipped
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)532 void NumericValueMetricProducer::closeCurrentBucket(const int64_t eventTimeNs,
533                                                     const int64_t nextBucketStartTimeNs) {
534     ValueMetricProducer::closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
535     if (mAnomalyTrackers.size() > 0) {
536         appendToFullBucket(eventTimeNs > getCurrentBucketEndTimeNs());
537     }
538 }
539 
initNextSlicedBucket(int64_t nextBucketStartTimeNs)540 void NumericValueMetricProducer::initNextSlicedBucket(int64_t nextBucketStartTimeNs) {
541     ValueMetricProducer::initNextSlicedBucket(nextBucketStartTimeNs);
542 
543     // If we do not have a global base when the condition is true,
544     // we will have incomplete bucket for the next bucket.
545     if (mUseDiff && !mHasGlobalBase && mCondition) {
546         // TODO(b/188878815): mCurrentBucketIsSkipped should probably be set to true here.
547         mCurrentBucketIsSkipped = false;
548     }
549 }
550 
appendToFullBucket(const bool isFullBucketReached)551 void NumericValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
552     if (mCurrentBucketIsSkipped) {
553         if (isFullBucketReached) {
554             // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
555             mCurrentFullBucket.clear();
556         }
557         // Current bucket is invalid, we do not add it to the full bucket.
558         return;
559     }
560 
561     if (isFullBucketReached) {  // If full bucket, send to anomaly tracker.
562         // Accumulate partial buckets with current value and then send to anomaly tracker.
563         if (mCurrentFullBucket.size() > 0) {
564             for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
565                 if (hitFullBucketGuardRailLocked(metricDimensionKey) ||
566                     currentBucket.intervals.empty()) {
567                     continue;
568                 }
569                 // TODO: fix this when anomaly can accept double values
570                 auto& interval = currentBucket.intervals[0];
571                 if (interval.hasValue()) {
572                     mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
573                 }
574             }
575             for (const auto& [metricDimensionKey, value] : mCurrentFullBucket) {
576                 for (auto& tracker : mAnomalyTrackers) {
577                     if (tracker != nullptr) {
578                         tracker->addPastBucket(metricDimensionKey, value, mCurrentBucketNum);
579                     }
580                 }
581             }
582             mCurrentFullBucket.clear();
583         } else {
584             // Skip aggregating the partial buckets since there's no previous partial bucket.
585             for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
586                 for (auto& tracker : mAnomalyTrackers) {
587                     if (tracker != nullptr && !currentBucket.intervals.empty()) {
588                         // TODO: fix this when anomaly can accept double values
589                         auto& interval = currentBucket.intervals[0];
590                         if (interval.hasValue()) {
591                             tracker->addPastBucket(metricDimensionKey,
592                                                    interval.aggregate.long_value,
593                                                    mCurrentBucketNum);
594                         }
595                     }
596                 }
597             }
598         }
599     } else {
600         // Accumulate partial bucket.
601         for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
602             if (!currentBucket.intervals.empty()) {
603                 // TODO: fix this when anomaly can accept double values
604                 auto& interval = currentBucket.intervals[0];
605                 if (interval.hasValue()) {
606                     mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
607                 }
608             }
609         }
610     }
611 }
612 
byteSizeLocked() const613 size_t NumericValueMetricProducer::byteSizeLocked() const {
614     size_t totalSize = 0;
615     for (const auto& [_, buckets] : mPastBuckets) {
616         totalSize += buckets.size() * kBucketSize;
617         // TODO(b/189283526): Add bytes used to store PastBucket.aggIndex vector
618     }
619     return totalSize;
620 }
621 
valuePassesThreshold(const Interval & interval) const622 bool NumericValueMetricProducer::valuePassesThreshold(const Interval& interval) const {
623     if (mUploadThreshold == nullopt) {
624         return true;
625     }
626 
627     Value finalValue = getFinalValue(interval);
628 
629     double doubleValue =
630             finalValue.type == LONG ? (double)finalValue.long_value : finalValue.double_value;
631     switch (mUploadThreshold->value_comparison_case()) {
632         case UploadThreshold::kLtInt:
633             return doubleValue < (double)mUploadThreshold->lt_int();
634         case UploadThreshold::kGtInt:
635             return doubleValue > (double)mUploadThreshold->gt_int();
636         case UploadThreshold::kLteInt:
637             return doubleValue <= (double)mUploadThreshold->lte_int();
638         case UploadThreshold::kGteInt:
639             return doubleValue >= (double)mUploadThreshold->gte_int();
640         case UploadThreshold::kLtFloat:
641             return doubleValue <= (double)mUploadThreshold->lt_float();
642         case UploadThreshold::kGtFloat:
643             return doubleValue >= (double)mUploadThreshold->gt_float();
644         default:
645             ALOGE("Value metric no upload threshold type used");
646             return false;
647     }
648 }
649 
getFinalValue(const Interval & interval) const650 Value NumericValueMetricProducer::getFinalValue(const Interval& interval) const {
651     if (mAggregationType != ValueMetric::AVG) {
652         return interval.aggregate;
653     } else {
654         double sum = interval.aggregate.type == LONG ? (double)interval.aggregate.long_value
655                                                      : interval.aggregate.double_value;
656         return Value((double)sum / interval.sampleSize);
657     }
658 }
659 
getDumpProtoFields() const660 NumericValueMetricProducer::DumpProtoFields NumericValueMetricProducer::getDumpProtoFields() const {
661     return {FIELD_ID_VALUE_METRICS,
662             FIELD_ID_BUCKET_NUM,
663             FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
664             FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
665             FIELD_ID_CONDITION_TRUE_NS,
666             FIELD_ID_CONDITION_CORRECTION_NS};
667 }
668 
669 }  // namespace statsd
670 }  // namespace os
671 }  // namespace android
672