1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define STATSD_DEBUG false // STOPSHIP if true
18 #include "Log.h"
19
20 #include "NumericValueMetricProducer.h"
21
22 #include <limits.h>
23 #include <stdlib.h>
24
25 #include "guardrail/StatsdStats.h"
26 #include "metrics/parsing_utils/metrics_manager_util.h"
27 #include "stats_log_util.h"
28
29 using android::util::FIELD_COUNT_REPEATED;
30 using android::util::FIELD_TYPE_BOOL;
31 using android::util::FIELD_TYPE_DOUBLE;
32 using android::util::FIELD_TYPE_INT32;
33 using android::util::FIELD_TYPE_INT64;
34 using android::util::FIELD_TYPE_MESSAGE;
35 using android::util::FIELD_TYPE_STRING;
36 using android::util::ProtoOutputStream;
37 using std::map;
38 using std::optional;
39 using std::shared_ptr;
40 using std::string;
41 using std::unordered_map;
42
43 namespace android {
44 namespace os {
45 namespace statsd {
46
47 // for StatsLogReport
48 const int FIELD_ID_VALUE_METRICS = 7;
49 // for ValueBucketInfo
50 const int FIELD_ID_VALUE_INDEX = 1;
51 const int FIELD_ID_VALUE_LONG = 2;
52 const int FIELD_ID_VALUE_DOUBLE = 3;
53 const int FIELD_ID_VALUE_SAMPLESIZE = 4;
54 const int FIELD_ID_VALUES = 9;
55 const int FIELD_ID_BUCKET_NUM = 4;
56 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
57 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
58 const int FIELD_ID_CONDITION_TRUE_NS = 10;
59 const int FIELD_ID_CONDITION_CORRECTION_NS = 11;
60
61 const Value ZERO_LONG((int64_t)0);
62 const Value ZERO_DOUBLE(0.0);
63
64 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
NumericValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions)65 NumericValueMetricProducer::NumericValueMetricProducer(
66 const ConfigKey& key, const ValueMetric& metric, const uint64_t protoHash,
67 const PullOptions& pullOptions, const BucketOptions& bucketOptions,
68 const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
69 const StateOptions& stateOptions, const ActivationOptions& activationOptions,
70 const GuardrailOptions& guardrailOptions)
71 : ValueMetricProducer(metric.id(), key, protoHash, pullOptions, bucketOptions, whatOptions,
72 conditionOptions, stateOptions, activationOptions, guardrailOptions),
73 mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
74 mAggregationType(metric.aggregation_type()),
75 mIncludeSampleSize(metric.has_include_sample_size()
76 ? metric.include_sample_size()
77 : metric.aggregation_type() == ValueMetric_AggregationType_AVG),
78 mUseDiff(metric.has_use_diff() ? metric.use_diff() : isPulled()),
79 mValueDirection(metric.value_direction()),
80 mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
81 mUseZeroDefaultBase(metric.use_zero_default_base()),
82 mHasGlobalBase(false),
83 mMaxPullDelayNs(metric.has_max_pull_delay_sec() ? metric.max_pull_delay_sec() * NS_PER_SEC
84 : StatsdStats::kPullMaxDelayNs) {
85 // TODO(b/186677791): Use initializer list to initialize mUploadThreshold.
86 if (metric.has_threshold()) {
87 mUploadThreshold = metric.threshold();
88 }
89 }
90
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)91 void NumericValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
92 const BucketDropReason reason) {
93 ValueMetricProducer::invalidateCurrentBucket(dropTimeNs, reason);
94
95 switch (reason) {
96 case BucketDropReason::DUMP_REPORT_REQUESTED:
97 case BucketDropReason::EVENT_IN_WRONG_BUCKET:
98 case BucketDropReason::CONDITION_UNKNOWN:
99 case BucketDropReason::PULL_FAILED:
100 case BucketDropReason::PULL_DELAYED:
101 case BucketDropReason::DIMENSION_GUARDRAIL_REACHED:
102 resetBase();
103 break;
104 default:
105 break;
106 }
107 }
108
resetBase()109 void NumericValueMetricProducer::resetBase() {
110 for (auto& [_, dimInfo] : mDimInfos) {
111 for (optional<Value>& base : dimInfo.dimExtras) {
112 base.reset();
113 }
114 }
115 mHasGlobalBase = false;
116 }
117
writePastBucketAggregateToProto(const int aggIndex,const Value & value,const int sampleSize,ProtoOutputStream * const protoOutput) const118 void NumericValueMetricProducer::writePastBucketAggregateToProto(
119 const int aggIndex, const Value& value, const int sampleSize,
120 ProtoOutputStream* const protoOutput) const {
121 uint64_t valueToken =
122 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
123 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX, aggIndex);
124 if (mIncludeSampleSize) {
125 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_SAMPLESIZE, sampleSize);
126 }
127 if (value.getType() == LONG) {
128 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG, (long long)value.long_value);
129 VLOG("\t\t value %d: %lld", aggIndex, (long long)value.long_value);
130 } else if (value.getType() == DOUBLE) {
131 protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, value.double_value);
132 VLOG("\t\t value %d: %.2f", aggIndex, value.double_value);
133 } else {
134 VLOG("Wrong value type for ValueMetric output: %d", value.getType());
135 }
136 protoOutput->end(valueToken);
137 }
138
onActiveStateChangedInternalLocked(const int64_t eventTimeNs,const bool isActive)139 void NumericValueMetricProducer::onActiveStateChangedInternalLocked(const int64_t eventTimeNs,
140 const bool isActive) {
141 // When active state changes from true to false for pulled metric, clear diff base but don't
142 // reset other counters as we may accumulate more value in the bucket.
143 if (mUseDiff && !isActive) {
144 resetBase();
145 }
146 }
147
148 // Only called when mIsActive and the event is NOT too late.
onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)149 void NumericValueMetricProducer::onConditionChangedInternalLocked(const ConditionState oldCondition,
150 const ConditionState newCondition,
151 const int64_t eventTimeNs) {
152 // For metrics that use diff, when condition changes from true to false,
153 // clear diff base but don't reset other counts because we may accumulate
154 // more value in the bucket.
155 if (mUseDiff &&
156 (oldCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)) {
157 resetBase();
158 }
159 }
160
prepareFirstBucketLocked()161 void NumericValueMetricProducer::prepareFirstBucketLocked() {
162 // Kicks off the puller immediately if condition is true and diff based.
163 if (mIsActive && isPulled() && mCondition == ConditionState::kTrue && mUseDiff) {
164 pullAndMatchEventsLocked(mCurrentBucketStartTimeNs);
165 }
166 }
167
pullAndMatchEventsLocked(const int64_t timestampNs)168 void NumericValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
169 vector<shared_ptr<LogEvent>> allData;
170 if (!mPullerManager->Pull(mPullAtomId, mConfigKey, timestampNs, &allData)) {
171 ALOGE("Stats puller failed for tag: %d at %lld", mPullAtomId, (long long)timestampNs);
172 invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
173 return;
174 }
175
176 accumulateEvents(allData, timestampNs, timestampNs);
177 }
178
calcPreviousBucketEndTime(const int64_t currentTimeNs)179 int64_t NumericValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
180 return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
181 }
182
183 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
184 // to be delayed. Other events like condition changes or app upgrade which are not based on
185 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,PullResult pullResult,int64_t originalPullTimeNs)186 void NumericValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
187 PullResult pullResult, int64_t originalPullTimeNs) {
188 lock_guard<mutex> lock(mMutex);
189 if (mCondition == ConditionState::kTrue) {
190 // If the pull failed, we won't be able to compute a diff.
191 if (pullResult == PullResult::PULL_RESULT_FAIL) {
192 invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
193 } else if (pullResult == PullResult::PULL_RESULT_SUCCESS) {
194 bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
195 if (isEventLate) {
196 // If the event is late, we are in the middle of a bucket. Just
197 // process the data without trying to snap the data to the nearest bucket.
198 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs);
199 } else {
200 // For scheduled pulled data, the effective event time is snap to the nearest
201 // bucket end. In the case of waking up from a deep sleep state, we will
202 // attribute to the previous bucket end. If the sleep was long but not very
203 // long, we will be in the immediate next bucket. Previous bucket may get a
204 // larger number as we pull at a later time than real bucket end.
205 //
206 // If the sleep was very long, we skip more than one bucket before sleep. In
207 // this case, if the diff base will be cleared and this new data will serve as
208 // new diff base.
209 int64_t bucketEndTimeNs = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
210 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
211 mMetricId, originalPullTimeNs - bucketEndTimeNs);
212 accumulateEvents(allData, originalPullTimeNs, bucketEndTimeNs);
213 }
214 }
215 }
216
217 // We can probably flush the bucket. Since we used bucketEndTimeNs when calling
218 // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
219 flushIfNeededLocked(originalPullTimeNs);
220 }
221
combineValueFields(pair<LogEvent,vector<int>> & eventValues,const LogEvent & newEvent,const vector<int> & newValueIndices) const222 void NumericValueMetricProducer::combineValueFields(pair<LogEvent, vector<int>>& eventValues,
223 const LogEvent& newEvent,
224 const vector<int>& newValueIndices) const {
225 if (eventValues.second.size() != newValueIndices.size()) {
226 ALOGE("NumericValueMetricProducer value indices sizes don't match");
227 return;
228 }
229 vector<FieldValue>* const aggregateFieldValues = eventValues.first.getMutableValues();
230 const vector<FieldValue>& newFieldValues = newEvent.getValues();
231 for (size_t i = 0; i < eventValues.second.size(); ++i) {
232 if (newValueIndices[i] != -1 && eventValues.second[i] != -1) {
233 (*aggregateFieldValues)[eventValues.second[i]].mValue +=
234 newFieldValues[newValueIndices[i]].mValue;
235 }
236 }
237 }
238
239 // Process events retrieved from a pull.
accumulateEvents(const vector<shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs)240 void NumericValueMetricProducer::accumulateEvents(const vector<shared_ptr<LogEvent>>& allData,
241 int64_t originalPullTimeNs,
242 int64_t eventElapsedTimeNs) {
243 if (isEventLateLocked(eventElapsedTimeNs)) {
244 VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
245 (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
246 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
247 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
248 return;
249 }
250
251 const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
252 const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
253 StatsdStats::getInstance().notePullDelay(mPullAtomId, pullDelayNs);
254 if (pullDelayNs > mMaxPullDelayNs) {
255 ALOGE("Pull finish too late for atom %d, longer than %lld", mPullAtomId,
256 (long long)mMaxPullDelayNs);
257 StatsdStats::getInstance().notePullExceedMaxDelay(mPullAtomId);
258 // We are missing one pull from the bucket which means we will not have a complete view of
259 // what's going on.
260 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
261 return;
262 }
263
264 mMatchedMetricDimensionKeys.clear();
265 if (mUseDiff) {
266 // An extra aggregation step is needed to sum values with matching dimensions
267 // before calculating the diff between sums of consecutive pulls.
268 std::unordered_map<HashableDimensionKey, pair<LogEvent, vector<int>>> aggregateEvents;
269 for (const auto& data : allData) {
270 if (mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex) !=
271 MatchingState::kMatched) {
272 continue;
273 }
274
275 // Get dimensions_in_what key and value indices.
276 HashableDimensionKey dimensionsInWhat;
277 vector<int> valueIndices(mFieldMatchers.size(), -1);
278 if (!filterValues(mDimensionsInWhat, mFieldMatchers, data->getValues(),
279 dimensionsInWhat, valueIndices)) {
280 StatsdStats::getInstance().noteBadValueType(mMetricId);
281 }
282
283 // Store new event in map or combine values in existing event.
284 auto it = aggregateEvents.find(dimensionsInWhat);
285 if (it == aggregateEvents.end()) {
286 aggregateEvents.emplace(std::piecewise_construct,
287 std::forward_as_tuple(dimensionsInWhat),
288 std::forward_as_tuple(*data, valueIndices));
289 } else {
290 combineValueFields(it->second, *data, valueIndices);
291 }
292 }
293
294 for (auto& [dimKey, eventInfo] : aggregateEvents) {
295 eventInfo.first.setElapsedTimestampNs(eventElapsedTimeNs);
296 onMatchedLogEventLocked(mWhatMatcherIndex, eventInfo.first);
297 }
298 } else {
299 for (const auto& data : allData) {
300 LogEvent localCopy = *data;
301 if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
302 MatchingState::kMatched) {
303 localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
304 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
305 }
306 }
307 }
308
309 // If a key that is:
310 // 1. Tracked in mCurrentSlicedBucket and
311 // 2. A superset of the current mStateChangePrimaryKey
312 // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
313 // then we clear the data from mDimInfos to reset the base and current state key.
314 for (auto& [metricDimensionKey, currentValueBucket] : mCurrentSlicedBucket) {
315 const auto& whatKey = metricDimensionKey.getDimensionKeyInWhat();
316 bool presentInPulledData =
317 mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
318 if (!presentInPulledData &&
319 containsLinkedStateValues(whatKey, mStateChangePrimaryKey.second, mMetric2StateLinks,
320 mStateChangePrimaryKey.first)) {
321 auto it = mDimInfos.find(whatKey);
322 if (it != mDimInfos.end()) {
323 mDimInfos.erase(it);
324 }
325 // Turn OFF condition timer for keys not present in pulled data.
326 currentValueBucket.conditionTimer.onConditionChanged(false, eventElapsedTimeNs);
327 }
328 }
329 mMatchedMetricDimensionKeys.clear();
330 mHasGlobalBase = true;
331
332 // If we reach the guardrail, we might have dropped some data which means the bucket is
333 // incomplete.
334 //
335 // The base also needs to be reset. If we do not have the full data, we might
336 // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
337 // might be missing from mCurrentSlicedBucket.
338 if (hasReachedGuardRailLimit()) {
339 invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
340 mCurrentSlicedBucket.clear();
341 }
342 }
343
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey)344 bool NumericValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
345 // ===========GuardRail==============
346 // 1. Report the tuple count if the tuple count > soft limit
347 if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
348 return false;
349 }
350 if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
351 size_t newTupleCount = mCurrentFullBucket.size() + 1;
352 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
353 if (newTupleCount > mDimensionHardLimit) {
354 if (!mHasHitGuardrail) {
355 ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
356 (long long)mMetricId, newKey.toString().c_str());
357 mHasHitGuardrail = true;
358 }
359 return true;
360 }
361 }
362
363 return false;
364 }
365
getDoubleOrLong(const LogEvent & event,const Matcher & matcher,Value & ret)366 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
367 for (const FieldValue& value : event.getValues()) {
368 if (value.mField.matches(matcher)) {
369 switch (value.mValue.type) {
370 case INT:
371 ret.setLong(value.mValue.int_value);
372 break;
373 case LONG:
374 ret.setLong(value.mValue.long_value);
375 break;
376 case FLOAT:
377 ret.setDouble(value.mValue.float_value);
378 break;
379 case DOUBLE:
380 ret.setDouble(value.mValue.double_value);
381 break;
382 default:
383 return false;
384 break;
385 }
386 return true;
387 }
388 }
389 return false;
390 }
391
aggregateFields(const int64_t eventTimeNs,const MetricDimensionKey & eventKey,const LogEvent & event,vector<Interval> & intervals,ValueBases & bases)392 bool NumericValueMetricProducer::aggregateFields(const int64_t eventTimeNs,
393 const MetricDimensionKey& eventKey,
394 const LogEvent& event, vector<Interval>& intervals,
395 ValueBases& bases) {
396 if (bases.size() < mFieldMatchers.size()) {
397 VLOG("Resizing number of bases to %zu", mFieldMatchers.size());
398 bases.resize(mFieldMatchers.size());
399 }
400
401 // We only use anomaly detection under certain cases.
402 // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
403 // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
404 // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
405 // Whoever next works on it should look into the cases where it is triggered in this function.
406 // Discussion here: http://ag/6124370.
407 bool useAnomalyDetection = true;
408 bool seenNewData = false;
409 for (size_t i = 0; i < mFieldMatchers.size(); i++) {
410 const Matcher& matcher = mFieldMatchers[i];
411 Interval& interval = intervals[i];
412 interval.aggIndex = i;
413 optional<Value>& base = bases[i];
414 Value value;
415 if (!getDoubleOrLong(event, matcher, value)) {
416 VLOG("Failed to get value %zu from event %s", i, event.ToString().c_str());
417 StatsdStats::getInstance().noteBadValueType(mMetricId);
418 return seenNewData;
419 }
420 seenNewData = true;
421 if (mUseDiff) {
422 if (!base.has_value()) {
423 if (mHasGlobalBase && mUseZeroDefaultBase) {
424 // The bucket has global base. This key does not.
425 // Optionally use zero as base.
426 base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
427 } else {
428 // no base. just update base and return.
429 base = value;
430
431 // If we're missing a base, do not use anomaly detection on incomplete data
432 useAnomalyDetection = false;
433
434 // Continue (instead of return) here in order to set base value for other bases
435 continue;
436 }
437 }
438 Value diff;
439 switch (mValueDirection) {
440 case ValueMetric::INCREASING:
441 if (value >= base.value()) {
442 diff = value - base.value();
443 } else if (mUseAbsoluteValueOnReset) {
444 diff = value;
445 } else {
446 VLOG("Unexpected decreasing value");
447 StatsdStats::getInstance().notePullDataError(mPullAtomId);
448 base = value;
449 // If we've got bad data, do not use anomaly detection
450 useAnomalyDetection = false;
451 continue;
452 }
453 break;
454 case ValueMetric::DECREASING:
455 if (base.value() >= value) {
456 diff = base.value() - value;
457 } else if (mUseAbsoluteValueOnReset) {
458 diff = value;
459 } else {
460 VLOG("Unexpected increasing value");
461 StatsdStats::getInstance().notePullDataError(mPullAtomId);
462 base = value;
463 // If we've got bad data, do not use anomaly detection
464 useAnomalyDetection = false;
465 continue;
466 }
467 break;
468 case ValueMetric::ANY:
469 diff = value - base.value();
470 break;
471 default:
472 break;
473 }
474 base = value;
475 value = diff;
476 }
477
478 if (interval.hasValue()) {
479 switch (mAggregationType) {
480 case ValueMetric::SUM:
481 // for AVG, we add up and take average when flushing the bucket
482 case ValueMetric::AVG:
483 interval.aggregate += value;
484 break;
485 case ValueMetric::MIN:
486 interval.aggregate = min(value, interval.aggregate);
487 break;
488 case ValueMetric::MAX:
489 interval.aggregate = max(value, interval.aggregate);
490 break;
491 default:
492 break;
493 }
494 } else {
495 interval.aggregate = value;
496 }
497 interval.sampleSize += 1;
498 }
499
500 // Only trigger the tracker if all intervals are correct and we have not skipped the bucket due
501 // to MULTIPLE_BUCKETS_SKIPPED.
502 if (useAnomalyDetection && !multipleBucketsSkipped(calcBucketsForwardCount(eventTimeNs))) {
503 // TODO: propgate proper values down stream when anomaly support doubles
504 long wholeBucketVal = intervals[0].aggregate.long_value;
505 auto prev = mCurrentFullBucket.find(eventKey);
506 if (prev != mCurrentFullBucket.end()) {
507 wholeBucketVal += prev->second;
508 }
509 for (auto& tracker : mAnomalyTrackers) {
510 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
511 wholeBucketVal);
512 }
513 }
514 return seenNewData;
515 }
516
buildPartialBucket(int64_t bucketEndTimeNs,vector<Interval> & intervals)517 PastBucket<Value> NumericValueMetricProducer::buildPartialBucket(int64_t bucketEndTimeNs,
518 vector<Interval>& intervals) {
519 PastBucket<Value> bucket;
520 bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
521 bucket.mBucketEndNs = bucketEndTimeNs;
522
523 // The first value field acts as a "gatekeeper" - if it does not pass the specified threshold,
524 // then all interval values are discarded for this bucket.
525 if (intervals.empty() || (intervals[0].hasValue() && !valuePassesThreshold(intervals[0]))) {
526 return bucket;
527 }
528
529 for (const Interval& interval : intervals) {
530 // skip the output if the diff is zero
531 if (!interval.hasValue() ||
532 (mSkipZeroDiffOutput && mUseDiff && interval.aggregate.isZero())) {
533 continue;
534 }
535
536 bucket.aggIndex.push_back(interval.aggIndex);
537 bucket.aggregates.push_back(getFinalValue(interval));
538 if (mIncludeSampleSize) {
539 bucket.sampleSizes.push_back(interval.sampleSize);
540 }
541 }
542 return bucket;
543 }
544
545 // Also invalidates current bucket if multiple buckets have been skipped
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)546 void NumericValueMetricProducer::closeCurrentBucket(const int64_t eventTimeNs,
547 const int64_t nextBucketStartTimeNs) {
548 ValueMetricProducer::closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
549 if (mAnomalyTrackers.size() > 0) {
550 appendToFullBucket(eventTimeNs > getCurrentBucketEndTimeNs());
551 }
552 }
553
initNextSlicedBucket(int64_t nextBucketStartTimeNs)554 void NumericValueMetricProducer::initNextSlicedBucket(int64_t nextBucketStartTimeNs) {
555 ValueMetricProducer::initNextSlicedBucket(nextBucketStartTimeNs);
556
557 // If we do not have a global base when the condition is true,
558 // we will have incomplete bucket for the next bucket.
559 if (mUseDiff && !mHasGlobalBase && mCondition) {
560 // TODO(b/188878815): mCurrentBucketIsSkipped should probably be set to true here.
561 mCurrentBucketIsSkipped = false;
562 }
563 }
564
appendToFullBucket(const bool isFullBucketReached)565 void NumericValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
566 if (mCurrentBucketIsSkipped) {
567 if (isFullBucketReached) {
568 // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
569 mCurrentFullBucket.clear();
570 }
571 // Current bucket is invalid, we do not add it to the full bucket.
572 return;
573 }
574
575 if (isFullBucketReached) { // If full bucket, send to anomaly tracker.
576 // Accumulate partial buckets with current value and then send to anomaly tracker.
577 if (mCurrentFullBucket.size() > 0) {
578 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
579 if (hitFullBucketGuardRailLocked(metricDimensionKey) ||
580 currentBucket.intervals.empty()) {
581 continue;
582 }
583 // TODO: fix this when anomaly can accept double values
584 auto& interval = currentBucket.intervals[0];
585 if (interval.hasValue()) {
586 mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
587 }
588 }
589 for (const auto& [metricDimensionKey, value] : mCurrentFullBucket) {
590 for (auto& tracker : mAnomalyTrackers) {
591 if (tracker != nullptr) {
592 tracker->addPastBucket(metricDimensionKey, value, mCurrentBucketNum);
593 }
594 }
595 }
596 mCurrentFullBucket.clear();
597 } else {
598 // Skip aggregating the partial buckets since there's no previous partial bucket.
599 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
600 for (auto& tracker : mAnomalyTrackers) {
601 if (tracker != nullptr && !currentBucket.intervals.empty()) {
602 // TODO: fix this when anomaly can accept double values
603 auto& interval = currentBucket.intervals[0];
604 if (interval.hasValue()) {
605 tracker->addPastBucket(metricDimensionKey,
606 interval.aggregate.long_value,
607 mCurrentBucketNum);
608 }
609 }
610 }
611 }
612 }
613 } else {
614 // Accumulate partial bucket.
615 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
616 if (!currentBucket.intervals.empty()) {
617 // TODO: fix this when anomaly can accept double values
618 auto& interval = currentBucket.intervals[0];
619 if (interval.hasValue()) {
620 mCurrentFullBucket[metricDimensionKey] += interval.aggregate.long_value;
621 }
622 }
623 }
624 }
625 }
626
byteSizeLocked() const627 size_t NumericValueMetricProducer::byteSizeLocked() const {
628 size_t totalSize = 0;
629 for (const auto& [_, buckets] : mPastBuckets) {
630 totalSize += buckets.size() * kBucketSize;
631 // TODO(b/189283526): Add bytes used to store PastBucket.aggIndex vector
632 }
633 return totalSize;
634 }
635
valuePassesThreshold(const Interval & interval) const636 bool NumericValueMetricProducer::valuePassesThreshold(const Interval& interval) const {
637 if (mUploadThreshold == nullopt) {
638 return true;
639 }
640
641 Value finalValue = getFinalValue(interval);
642
643 double doubleValue =
644 finalValue.type == LONG ? (double)finalValue.long_value : finalValue.double_value;
645 switch (mUploadThreshold->value_comparison_case()) {
646 case UploadThreshold::kLtInt:
647 return doubleValue < (double)mUploadThreshold->lt_int();
648 case UploadThreshold::kGtInt:
649 return doubleValue > (double)mUploadThreshold->gt_int();
650 case UploadThreshold::kLteInt:
651 return doubleValue <= (double)mUploadThreshold->lte_int();
652 case UploadThreshold::kGteInt:
653 return doubleValue >= (double)mUploadThreshold->gte_int();
654 case UploadThreshold::kLtFloat:
655 return doubleValue <= (double)mUploadThreshold->lt_float();
656 case UploadThreshold::kGtFloat:
657 return doubleValue >= (double)mUploadThreshold->gt_float();
658 default:
659 ALOGE("Value metric no upload threshold type used");
660 return false;
661 }
662 }
663
getFinalValue(const Interval & interval) const664 Value NumericValueMetricProducer::getFinalValue(const Interval& interval) const {
665 if (mAggregationType != ValueMetric::AVG) {
666 return interval.aggregate;
667 } else {
668 double sum = interval.aggregate.type == LONG ? (double)interval.aggregate.long_value
669 : interval.aggregate.double_value;
670 return Value((double)sum / interval.sampleSize);
671 }
672 }
673
getDumpProtoFields() const674 NumericValueMetricProducer::DumpProtoFields NumericValueMetricProducer::getDumpProtoFields() const {
675 return {FIELD_ID_VALUE_METRICS,
676 FIELD_ID_BUCKET_NUM,
677 FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
678 FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
679 FIELD_ID_CONDITION_TRUE_NS,
680 FIELD_ID_CONDITION_CORRECTION_NS};
681 }
682
683 } // namespace statsd
684 } // namespace os
685 } // namespace android
686