1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define STATSD_DEBUG false // STOPSHIP if true
18 #include "Log.h"
19
20 #include "NumericValueMetricProducer.h"
21
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "guardrail/StatsdStats.h"
26 #include "metrics/parsing_utils/metrics_manager_util.h"
27 #include "stats_log_util.h"
28
29 using android::util::FIELD_COUNT_REPEATED;
30 using android::util::FIELD_TYPE_BOOL;
31 using android::util::FIELD_TYPE_DOUBLE;
32 using android::util::FIELD_TYPE_INT32;
33 using android::util::FIELD_TYPE_INT64;
34 using android::util::FIELD_TYPE_MESSAGE;
35 using android::util::FIELD_TYPE_STRING;
36 using android::util::ProtoOutputStream;
37 using std::map;
38 using std::optional;
39 using std::shared_ptr;
40 using std::string;
41 using std::unordered_map;
42
43 namespace android {
44 namespace os {
45 namespace statsd {
46
47 // for StatsLogReport
48 const int FIELD_ID_VALUE_METRICS = 7;
49 // for ValueBucketInfo
50 const int FIELD_ID_VALUE_INDEX = 1;
51 const int FIELD_ID_VALUE_LONG = 2;
52 const int FIELD_ID_VALUE_DOUBLE = 3;
53 const int FIELD_ID_VALUES = 9;
54 const int FIELD_ID_BUCKET_NUM = 4;
55 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
56 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
57 const int FIELD_ID_CONDITION_TRUE_NS = 10;
58 const int FIELD_ID_CONDITION_CORRECTION_NS = 11;
59
60 const Value ZERO_LONG((int64_t)0);
61 const Value ZERO_DOUBLE(0.0);
62
63 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
NumericValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions)64 NumericValueMetricProducer::NumericValueMetricProducer(
65 const ConfigKey& key, const ValueMetric& metric, const uint64_t protoHash,
66 const PullOptions& pullOptions, const BucketOptions& bucketOptions,
67 const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
68 const StateOptions& stateOptions, const ActivationOptions& activationOptions,
69 const GuardrailOptions& guardrailOptions)
70 : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions,
71 conditionOptions, stateOptions, activationOptions, guardrailOptions),
72 mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
73 mAggregationType(metric.aggregation_type()),
74 mUseDiff(metric.has_use_diff() ? metric.use_diff() : isPulled()),
75 mValueDirection(metric.value_direction()),
76 mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
77 mUseZeroDefaultBase(metric.use_zero_default_base()),
78 mHasGlobalBase(false),
79 mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC
80 : StatsdStats::kPullMaxDelayNs) {
81 // TODO(b/186677791): Use initializer list to initialize mUploadThreshold.
82 if (metric.has_threshold()) {
83 mUploadThreshold = metric.threshold();
84 }
85 }
86
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)87 void NumericValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
88 const BucketDropReason reason) {
89 ValueMetricProducer::invalidateCurrentBucket(dropTimeNs, reason);
90
91 switch (reason) {
92 case BucketDropReason::DUMP_REPORT_REQUESTED:
93 case BucketDropReason::EVENT_IN_WRONG_BUCKET:
94 case BucketDropReason::CONDITION_UNKNOWN:
95 case BucketDropReason::PULL_FAILED:
96 case BucketDropReason::PULL_DELAYED:
97 case BucketDropReason::DIMENSION_GUARDRAIL_REACHED:
98 resetBase();
99 break;
100 default:
101 break;
102 }
103 }
104
resetBase()105 void NumericValueMetricProducer::resetBase() {
106 for (auto& [_, dimInfo] : mDimInfos) {
107 for (optional<Value>& base : dimInfo.dimExtras) {
108 base.reset();
109 }
110 }
111 mHasGlobalBase = false;
112 }
113
writePastBucketAggregateToProto(const int aggIndex,const Value & value,ProtoOutputStream * const protoOutput) const114 void NumericValueMetricProducer::writePastBucketAggregateToProto(
115 const int aggIndex, const Value& value, ProtoOutputStream* const protoOutput) const {
116 uint64_t valueToken =
117 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
118 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, aggIndex);
119 if (value.getType() == LONG) {
120 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)value.long_value);
121 VLOG("\t\t value %d: %lld", aggIndex, (long long)value.long_value);
122 } else if (value.getType() == DOUBLE) {
123 protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, value.double_value);
124 VLOG("\t\t value %d: %.2f", aggIndex, value.double_value);
125 } else {
126 VLOG("Wrong value type for ValueMetric output: %d", value.getType());
127 }
128 protoOutput->end(valueToken);
129 }
130
onActiveStateChangedInternalLocked(const int64_t eventTimeNs)131 void NumericValueMetricProducer::onActiveStateChangedInternalLocked(const int64_t eventTimeNs) {
132 // When active state changes from true to false for pulled metric, clear diff base but don't
133 // reset other counters as we may accumulate more value in the bucket.
134 if (mUseDiff && !mIsActive) {
135 resetBase();
136 }
137 }
138
139 // Only called when mIsActive and the event is NOT too late.
onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)140 void NumericValueMetricProducer::onConditionChangedInternalLocked(const ConditionState oldCondition,
141 const ConditionState newCondition,
142 const int64_t eventTimeNs) {
143 // For metrics that use diff, when condition changes from true to false,
144 // clear diff base but don't reset other counts because we may accumulate
145 // more value in the bucket.
146 if (mUseDiff &&
147 (oldCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
148 resetBase();
149 }
150 }
151
prepareFirstBucketLocked()152 void NumericValueMetricProducer::prepareFirstBucketLocked() {
153 // Kicks off the puller immediately if condition is true and diff based.
154 if (mIsActive && isPulled() && mCondition == ConditionState::kTrue && mUseDiff) {
155 pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
156 }
157 }
158
pullAndMatchEventsLocked(const int64_t timestampNs)159 void NumericValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
160 vector<shared_ptr<LogEvent>> allData;
161 if (!mPullerManager->Pull(mPullAtomId, mConfigKey, timestampNs, &allData)) {
162 ALOGE("Stats puller failed for tag: %d at %lld", mPullAtomId, (long long)timestampNs);
163 invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
164 return;
165 }
166
167 accumulateEvents(allData, timestampNs, timestampNs);
168 }
169
calcPreviousBucketEndTime(const int64_t currentTimeNs)170 int64_t NumericValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
171 return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
172 }
173
174 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
175 // to be delayed. Other events like condition changes or app upgrade which are not based on
176 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const vector<shared_ptr<LogEvent>> & allData,bool pullSuccess,int64_t originalPullTimeNs)177 void NumericValueMetricProducer::onDataPulled(const vector<shared_ptr<LogEvent>>& allData,
178 bool pullSuccess, int64_t originalPullTimeNs) {
179 lock_guard<mutex> lock(mMutex);
180 if (mCondition == ConditionState::kTrue) {
181 // If the pull failed, we won't be able to compute a diff.
182 if (!pullSuccess) {
183 invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
184 } else {
185 bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
186 if (isEventLate) {
187 // If the event is late, we are in the middle of a bucket. Just
188 // process the data without trying to snap the data to the nearest bucket.
189 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
190 } else {
191 // For scheduled pulled data, the effective event time is snap to the nearest
192 // bucket end. In the case of waking up from a deep sleep state, we will
193 // attribute to the previous bucket end. If the sleep was long but not very
194 // long, we will be in the immediate next bucket. Previous bucket may get a
195 // larger number as we pull at a later time than real bucket end.
196 //
197 // If the sleep was very long, we skip more than one bucket before sleep. In
198 // this case, if the diff base will be cleared and this new data will serve as
199 // new diff base.
200 int64_t bucketEndTimeNs = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
201 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
202 mMetricId, originalPullTimeNs - bucketEndTimeNs);
203 accumulateEvents(allData, originalPullTimeNs, bucketEndTimeNs);
204 }
205 }
206 }
207
208 // We can probably flush the bucket. Since we used bucketEndTimeNs when calling
209 // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
210 flushIfNeededLocked(originalPullTimeNs);
211 }
212
combineValueFields(pair<LogEvent,vector<int>> & eventValues,const LogEvent & newEvent,const vector<int> & newValueIndices) const213 void NumericValueMetricProducer::combineValueFields(pair<LogEvent, vector<int>>& eventValues,
214 const LogEvent& newEvent,
215 const vector<int>& newValueIndices) const {
216 if (eventValues.second.size() != newValueIndices.size()) {
217 ALOGE("NumericValueMetricProducer value indices sizes don't match");
218 return;
219 }
220 vector<FieldValue>* const aggregateFieldValues = eventValues.first.getMutableValues();
221 const vector<FieldValue>& newFieldValues = newEvent.getValues();
222 for (size_t i = 0; i < eventValues.second.size(); ++i) {
223 if (newValueIndices[i] != -1 && eventValues.second[i] != -1) {
224 (*aggregateFieldValues)[eventValues.second[i]].mValue +=
225 newFieldValues[newValueIndices[i]].mValue;
226 }
227 }
228 }
229
230 // Process events retrieved from a pull.
accumulateEvents(const vector<shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs)231 void NumericValueMetricProducer::accumulateEvents(const vector<shared_ptr<LogEvent>>& allData,
232 int64_t originalPullTimeNs,
233 int64_t eventElapsedTimeNs) {
234 if (isEventLateLocked(eventElapsedTimeNs)) {
235 VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
236 (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
237 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
238 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
239 return;
240 }
241
242 const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
243 const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
244 StatsdStats::getInstance().notePullDelay(mPullAtomId, pullDelayNs);
245 if (pullDelayNs > mMaxPullDelayNs) {
246 ALOGE("Pull finish too late for atom %d, longer than %lld", mPullAtomId,
247 (long long)mMaxPullDelayNs);
248 StatsdStats::getInstance().notePullExceedMaxDelay(mPullAtomId);
249 // We are missing one pull from the bucket which means we will not have a complete view of
250 // what's going on.
251 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
252 return;
253 }
254
255 mMatchedMetricDimensionKeys.clear();
256 if (mUseDiff) {
257 // An extra aggregation step is needed to sum values with matching dimensions
258 // before calculating the diff between sums of consecutive pulls.
259 std::unordered_map<HashableDimensionKey, pair<LogEvent, vector<int>>> aggregateEvents;
260 for (const auto& data : allData) {
261 if (mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex) !=
262 MatchingState::kMatched) {
263 continue;
264 }
265
266 // Get dimensions_in_what key and value indices.
267 HashableDimensionKey dimensionsInWhat;
268 vector<int> valueIndices(mFieldMatchers.size(), -1);
269 if (!filterValues(mDimensionsInWhat, mFieldMatchers, data->getValues(),
270 dimensionsInWhat, valueIndices)) {
271 StatsdStats::getInstance().noteBadValueType(mMetricId);
272 }
273
274 // Store new event in map or combine values in existing event.
275 auto it = aggregateEvents.find(dimensionsInWhat);
276 if (it == aggregateEvents.end()) {
277 aggregateEvents.emplace(std::piecewise_construct,
278 std::forward_as_tuple(dimensionsInWhat),
279 std::forward_as_tuple(*data, valueIndices));
280 } else {
281 combineValueFields(it->second, *data, valueIndices);
282 }
283 }
284
285 for (auto& [dimKey, eventInfo] : aggregateEvents) {
286 eventInfo.first.setElapsedTimestampNs(eventElapsedTimeNs);
287 onMatchedLogEventLocked(mWhatMatcherIndex, eventInfo.first);
288 }
289 } else {
290 for (const auto& data : allData) {
291 LogEvent localCopy = *data;
292 if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
293 MatchingState::kMatched) {
294 localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
295 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
296 }
297 }
298 }
299
300 // If a key that is:
301 // 1. Tracked in mCurrentSlicedBucket and
302 // 2. A superset of the current mStateChangePrimaryKey
303 // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
304 // then we clear the data from mDimInfos to reset the base and current state key.
305 for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
306 const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat();
307 bool presentInPulledData =
308 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
309 if (!presentInPulledData &&
310 containsLinkedStateValues(whatKey, mStateChangePrimaryKey.second, mMetric2StateLinks,
311 mStateChangePrimaryKey.first)) {
312 auto it = mDimInfos.find(whatKey);
313 if (it != mDimInfos.end()) {
314 mDimInfos.erase(it);
315 }
316 // Turn OFF condition timer for keys not present in pulled data.
317 currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs);
318 }
319 }
320 mMatchedMetricDimensionKeys.clear();
321 mHasGlobalBase = true;
322
323 // If we reach the guardrail, we might have dropped some data which means the bucket is
324 // incomplete.
325 //
326 // The base also needs to be reset. If we do not have the full data, we might
327 // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
328 // might be missing from mCurrentSlicedBucket.
329 if (hasReachedGuardRailLimit()) {
330 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
331 mCurrentSlicedBucket.clear();
332 }
333 }
334
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey) const335 bool NumericValueMetricProducer::hitFullBucketGuardRailLocked(
336 const MetricDimensionKey& newKey) const {
337 // ===========GuardRail==============
338 // 1. Report the tuple count if the tuple count > soft limit
339 if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
340 return false;
341 }
342 if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
343 size_t newTupleCount = mCurrentFullBucket.size() + 1;
344 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
345 if (newTupleCount > mDimensionHardLimit) {
346 ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
347 (long long)mMetricId, newKey.toString().c_str());
348 return true;
349 }
350 }
351
352 return false;
353 }
354
getDoubleOrLong(const LogEvent & event,const Matcher & matcher,Value & ret)355 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
356 for (const FieldValue& value : event.getValues()) {
357 if (value.mField.matches(matcher)) {
358 switch (value.mValue.type) {
359 case INT:
360 ret.setLong(value.mValue.int_value);
361 break;
362 case LONG:
363 ret.setLong(value.mValue.long_value);
364 break;
365 case FLOAT:
366 ret.setDouble(value.mValue.float_value);
367 break;
368 case DOUBLE:
369 ret.setDouble(value.mValue.double_value);
370 break;
371 default:
372 return false;
373 break;
374 }
375 return true;
376 }
377 }
378 return false;
379 }
380
aggregateFields(const int64_t eventTimeNs,const MetricDimensionKey & eventKey,const LogEvent & event,vector<Interval> & intervals,ValueBases & bases)381 bool NumericValueMetricProducer::aggregateFields(const int64_t eventTimeNs,
382 const MetricDimensionKey& eventKey,
383 const LogEvent& event, vector<Interval>& intervals,
384 ValueBases& bases) {
385 if (bases.size() < mFieldMatchers.size()) {
386 VLOG("Resizing number of bases to %zu", mFieldMatchers.size());
387 bases.resize(mFieldMatchers.size());
388 }
389
390 // We only use anomaly detection under certain cases.
391 // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
392 // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
393 // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
394 // Whoever next works on it should look into the cases where it is triggered in this function.
395 // Discussion here: http://ag/6124370.
396 bool useAnomalyDetection = true;
397 bool seenNewData = false;
398 for (size_t i = 0; i < mFieldMatchers.size(); i++) {
399 const Matcher& matcher = mFieldMatchers[i];
400 Interval& interval = intervals[i];
401 interval.aggIndex = i;
402 optional<Value>& base = bases[i];
403 Value value;
404 if (!getDoubleOrLong(event, matcher, value)) {
405 VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str());
406 StatsdStats::getInstance().noteBadValueType(mMetricId);
407 return seenNewData;
408 }
409 seenNewData = true;
410 if (mUseDiff) {
411 if (!base.has_value()) {
412 if (mHasGlobalBase && mUseZeroDefaultBase) {
413 // The bucket has global base. This key does not.
414 // Optionally use zero as base.
415 base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
416 } else {
417 // no base. just update base and return.
418 base = value;
419
420 // If we're missing a base, do not use anomaly detection on incomplete data
421 useAnomalyDetection = false;
422
423 // Continue (instead of return) here in order to set base value for other bases
424 continue;
425 }
426 }
427 Value diff;
428 switch (mValueDirection) {
429 case ValueMetric::INCREASING:
430 if (value >= base.value()) {
431 diff = value - base.value();
432 } else if (mUseAbsoluteValueOnReset) {
433 diff = value;
434 } else {
435 VLOG("Unexpected decreasing value");
436 StatsdStats::getInstance().notePullDataError(mPullAtomId);
437 base = value;
438 // If we've got bad data, do not use anomaly detection
439 useAnomalyDetection = false;
440 continue;
441 }
442 break;
443 case ValueMetric::DECREASING:
444 if (base.value() >= value) {
445 diff = base.value() - value;
446 } else if (mUseAbsoluteValueOnReset) {
447 diff = value;
448 } else {
449 VLOG("Unexpected increasing value");
450 StatsdStats::getInstance().notePullDataError(mPullAtomId);
451 base = value;
452 // If we've got bad data, do not use anomaly detection
453 useAnomalyDetection = false;
454 continue;
455 }
456 break;
457 case ValueMetric::ANY:
458 diff = value - base.value();
459 break;
460 default:
461 break;
462 }
463 base = value;
464 value = diff;
465 }
466
467 if (interval.hasValue()) {
468 switch (mAggregationType) {
469 case ValueMetric::SUM:
470 // for AVG, we add up and take average when flushing the bucket
471 case ValueMetric::AVG:
472 interval.aggregate += value;
473 break;
474 case ValueMetric::MIN:
475 interval.aggregate = min(value, interval.aggregate);
476 break;
477 case ValueMetric::MAX:
478 interval.aggregate = max(value, interval.aggregate);
479 break;
480 default:
481 break;
482 }
483 } else {
484 interval.aggregate = value;
485 }
486 interval.sampleSize += 1;
487 }
488
489 // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due
490 // to MULTIPLE_BUCKETS_SKIPPED.
491 if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) {
492 // TODO: propgate proper values down stream when anomaly support doubles
493 long wholeBucketVal = intervals[0].aggregate.long_value;
494 auto prev = mCurrentFullBucket.find(eventKey);
495 if (prev != mCurrentFullBucket.end()) {
496 wholeBucketVal += prev->second;
497 }
498 for (auto& tracker : mAnomalyTrackers) {
499 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
500 wholeBucketVal);
501 }
502 }
503 return seenNewData;
504 }
505
buildPartialBucket(int64_t bucketEndTimeNs,vector<Interval> & intervals)506 PastBucket<Value> NumericValueMetricProducer::buildPartialBucket(int64_t bucketEndTimeNs,
507 vector<Interval>& intervals) {
508 PastBucket<Value> bucket;
509 bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
510 bucket.mBucketEndNs = bucketEndTimeNs;
511
512 // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold,
513 // then all interval values are discarded for this bucket.
514 if (intervals.empty() || (intervals[0].hasValue() && !valuePassesThreshold(intervals[0]))) {
515 return bucket;
516 }
517
518 for (const Interval& interval : intervals) {
519 // skip the output if the diff is zero
520 if (!interval.hasValue() ||
521 (mSkipZeroDiffOutput && mUseDiff && interval.aggregate.isZero())) {
522 continue;
523 }
524
525 bucket.aggIndex.push_back(interval.aggIndex);
526 bucket.aggregates.push_back(getFinalValue(interval));
527 }
528 return bucket;
529 }
530
531 // Also invalidates current bucket if multiple buckets have been skipped
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)532 void NumericValueMetricProducer::closeCurrentBucket(const int64_t eventTimeNs,
533 const int64_t nextBucketStartTimeNs) {
534 ValueMetricProducer::closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
535 if (mAnomalyTrackers.size() > 0) {
536 appendToFullBucket(eventTimeNs > getCurrentBucketEndTimeNs());
537 }
538 }
539
initNextSlicedBucket(int64_t nextBucketStartTimeNs)540 void NumericValueMetricProducer::initNextSlicedBucket(int64_t nextBucketStartTimeNs) {
541 ValueMetricProducer::initNextSlicedBucket(nextBucketStartTimeNs);
542
543 // If we do not have a global base when the condition is true,
544 // we will have incomplete bucket for the next bucket.
545 if (mUseDiff && !mHasGlobalBase && mCondition) {
546 // TODO(b/188878815): mCurrentBucketIsSkipped should probably be set to true here.
547 mCurrentBucketIsSkipped = false;
548 }
549 }
550
appendToFullBucket(const bool isFullBucketReached)551 void NumericValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
552 if (mCurrentBucketIsSkipped) {
553 if (isFullBucketReached) {
554 // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
555 mCurrentFullBucket.clear();
556 }
557 // Current bucket is invalid, we do not add it to the full bucket.
558 return;
559 }
560
561 if (isFullBucketReached) { // If full bucket, send to anomaly tracker.
562 // Accumulate partial buckets with current value and then send to anomaly tracker.
563 if (mCurrentFullBucket.size() > 0) {
564 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
565 if (hitFullBucketGuardRailLocked(metricDimensionKey) ||
566 currentBucket.intervals.empty()) {
567 continue;
568 }
569 // TODO: fix this when anomaly can accept double values
570 auto& interval = currentBucket.intervals[0];
571 if (interval.hasValue()) {
572 mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
573 }
574 }
575 for (const auto& [metricDimensionKey, value] : mCurrentFullBucket) {
576 for (auto& tracker : mAnomalyTrackers) {
577 if (tracker != nullptr) {
578 tracker->addPastBucket(metricDimensionKey, value, mCurrentBucketNum);
579 }
580 }
581 }
582 mCurrentFullBucket.clear();
583 } else {
584 // Skip aggregating the partial buckets since there's no previous partial bucket.
585 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
586 for (auto& tracker : mAnomalyTrackers) {
587 if (tracker != nullptr && !currentBucket.intervals.empty()) {
588 // TODO: fix this when anomaly can accept double values
589 auto& interval = currentBucket.intervals[0];
590 if (interval.hasValue()) {
591 tracker->addPastBucket(metricDimensionKey,
592 interval.aggregate.long_value,
593 mCurrentBucketNum);
594 }
595 }
596 }
597 }
598 }
599 } else {
600 // Accumulate partial bucket.
601 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
602 if (!currentBucket.intervals.empty()) {
603 // TODO: fix this when anomaly can accept double values
604 auto& interval = currentBucket.intervals[0];
605 if (interval.hasValue()) {
606 mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
607 }
608 }
609 }
610 }
611 }
612
byteSizeLocked() const613 size_t NumericValueMetricProducer::byteSizeLocked() const {
614 size_t totalSize = 0;
615 for (const auto& [_, buckets] : mPastBuckets) {
616 totalSize += buckets.size() * kBucketSize;
617 // TODO(b/189283526): Add bytes used to store PastBucket.aggIndex vector
618 }
619 return totalSize;
620 }
621
valuePassesThreshold(const Interval & interval) const622 bool NumericValueMetricProducer::valuePassesThreshold(const Interval& interval) const {
623 if (mUploadThreshold == nullopt) {
624 return true;
625 }
626
627 Value finalValue = getFinalValue(interval);
628
629 double doubleValue =
630 finalValue.type == LONG ? (double)finalValue.long_value : finalValue.double_value;
631 switch (mUploadThreshold->value_comparison_case()) {
632 case UploadThreshold::kLtInt:
633 return doubleValue < (double)mUploadThreshold->lt_int();
634 case UploadThreshold::kGtInt:
635 return doubleValue > (double)mUploadThreshold->gt_int();
636 case UploadThreshold::kLteInt:
637 return doubleValue <= (double)mUploadThreshold->lte_int();
638 case UploadThreshold::kGteInt:
639 return doubleValue >= (double)mUploadThreshold->gte_int();
640 case UploadThreshold::kLtFloat:
641 return doubleValue <= (double)mUploadThreshold->lt_float();
642 case UploadThreshold::kGtFloat:
643 return doubleValue >= (double)mUploadThreshold->gt_float();
644 default:
645 ALOGE("Value metric no upload threshold type used");
646 return false;
647 }
648 }
649
getFinalValue(const Interval & interval) const650 Value NumericValueMetricProducer::getFinalValue(const Interval& interval) const {
651 if (mAggregationType != ValueMetric::AVG) {
652 return interval.aggregate;
653 } else {
654 double sum = interval.aggregate.type == LONG ? (double)interval.aggregate.long_value
655 : interval.aggregate.double_value;
656 return Value((double)sum / interval.sampleSize);
657 }
658 }
659
getDumpProtoFields() const660 NumericValueMetricProducer::DumpProtoFields NumericValueMetricProducer::getDumpProtoFields() const {
661 return {FIELD_ID_VALUE_METRICS,
662 FIELD_ID_BUCKET_NUM,
663 FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
664 FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
665 FIELD_ID_CONDITION_TRUE_NS,
666 FIELD_ID_CONDITION_CORRECTION_NS};
667 }
668
669 } // namespace statsd
670 } // namespace os
671 } // namespace android
672