1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define DEBUG false // STOPSHIP if true
18 #include "Log.h"
19
20 #include "ValueMetricProducer.h"
21 #include "../guardrail/StatsdStats.h"
22 #include "../stats_log_util.h"
23
24 #include <cutils/log.h>
25 #include <limits.h>
26 #include <stdlib.h>
27
28 using android::util::FIELD_COUNT_REPEATED;
29 using android::util::FIELD_TYPE_BOOL;
30 using android::util::FIELD_TYPE_DOUBLE;
31 using android::util::FIELD_TYPE_INT32;
32 using android::util::FIELD_TYPE_INT64;
33 using android::util::FIELD_TYPE_MESSAGE;
34 using android::util::FIELD_TYPE_STRING;
35 using android::util::ProtoOutputStream;
36 using std::list;
37 using std::make_pair;
38 using std::make_shared;
39 using std::map;
40 using std::shared_ptr;
41 using std::unique_ptr;
42 using std::unordered_map;
43
44 namespace android {
45 namespace os {
46 namespace statsd {
47
48 // for StatsLogReport
49 const int FIELD_ID_ID = 1;
50 const int FIELD_ID_VALUE_METRICS = 7;
51 const int FIELD_ID_TIME_BASE = 9;
52 const int FIELD_ID_BUCKET_SIZE = 10;
53 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
54 const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12;
55 const int FIELD_ID_IS_ACTIVE = 14;
56 // for ValueMetricDataWrapper
57 const int FIELD_ID_DATA = 1;
58 const int FIELD_ID_SKIPPED = 2;
59 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
60 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
61 // for ValueMetricData
62 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
63 const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
64 const int FIELD_ID_BUCKET_INFO = 3;
65 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
66 const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
67 // for ValueBucketInfo
68 const int FIELD_ID_VALUE_INDEX = 1;
69 const int FIELD_ID_VALUE_LONG = 2;
70 const int FIELD_ID_VALUE_DOUBLE = 3;
71 const int FIELD_ID_VALUES = 9;
72 const int FIELD_ID_BUCKET_NUM = 4;
73 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
74 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
75 const int FIELD_ID_CONDITION_TRUE_NS = 10;
76
77 const Value ZERO_LONG((int64_t)0);
78 const Value ZERO_DOUBLE((int64_t)0);
79
80 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
ValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const int conditionIndex,const sp<ConditionWizard> & conditionWizard,const int whatMatcherIndex,const sp<EventMatcherWizard> & matcherWizard,const int pullTagId,const int64_t timeBaseNs,const int64_t startTimeNs,const sp<StatsPullerManager> & pullerManager)81 ValueMetricProducer::ValueMetricProducer(
82 const ConfigKey& key, const ValueMetric& metric, const int conditionIndex,
83 const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex,
84 const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs,
85 const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager)
86 : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard),
87 mWhatMatcherIndex(whatMatcherIndex),
88 mEventMatcherWizard(matcherWizard),
89 mPullerManager(pullerManager),
90 mPullTagId(pullTagId),
91 mIsPulled(pullTagId != -1),
92 mMinBucketSizeNs(metric.min_bucket_size_nanos()),
93 mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
94 StatsdStats::kAtomDimensionKeySizeLimitMap.end()
95 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
96 : StatsdStats::kDimensionKeySizeSoftLimit),
97 mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
98 StatsdStats::kAtomDimensionKeySizeLimitMap.end()
99 ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
100 : StatsdStats::kDimensionKeySizeHardLimit),
101 mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
102 mAggregationType(metric.aggregation_type()),
103 mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
104 mValueDirection(metric.value_direction()),
105 mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
106 mUseZeroDefaultBase(metric.use_zero_default_base()),
107 mHasGlobalBase(false),
108 mCurrentBucketIsInvalid(false),
109 mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
110 : StatsdStats::kPullMaxDelayNs),
111 mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()),
112 // Condition timer will be set in prepareFirstBucketLocked.
113 mConditionTimer(false, timeBaseNs) {
114 int64_t bucketSizeMills = 0;
115 if (metric.has_bucket()) {
116 bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
117 } else {
118 bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
119 }
120
121 mBucketSizeNs = bucketSizeMills * 1000000;
122
123 translateFieldMatcher(metric.value_field(), &mFieldMatchers);
124
125 if (metric.has_dimensions_in_what()) {
126 translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
127 mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
128 }
129
130 if (metric.has_dimensions_in_condition()) {
131 translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition);
132 }
133
134 if (metric.links().size() > 0) {
135 for (const auto& link : metric.links()) {
136 Metric2Condition mc;
137 mc.conditionId = link.condition();
138 translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
139 translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
140 mMetric2ConditionLinks.push_back(mc);
141 }
142 }
143
144 mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
145 mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
146 HasPositionALL(metric.dimensions_in_condition());
147
148 int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs);
149 mCurrentBucketNum += numBucketsForward;
150
151 flushIfNeededLocked(startTimeNs);
152
153 if (mIsPulled) {
154 mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(),
155 mBucketSizeNs);
156 }
157
158 // Only do this for partial buckets like first bucket. All other buckets should use
159 // flushIfNeeded to adjust start and end to bucket boundaries.
160 // Adjust start for partial bucket
161 mCurrentBucketStartTimeNs = startTimeNs;
162 mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs);
163 VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
164 (long long)mBucketSizeNs, (long long)mTimeBaseNs);
165 }
166
~ValueMetricProducer()167 ValueMetricProducer::~ValueMetricProducer() {
168 VLOG("~ValueMetricProducer() called");
169 if (mIsPulled) {
170 mPullerManager->UnRegisterReceiver(mPullTagId, this);
171 }
172 }
173
prepareFirstBucketLocked()174 void ValueMetricProducer::prepareFirstBucketLocked() {
175 // Kicks off the puller immediately if condition is true and diff based.
176 if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
177 pullAndMatchEventsLocked(mCurrentBucketStartTimeNs, mCondition);
178 }
179 // Now that activations are processed, start the condition timer if needed.
180 mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
181 mCurrentBucketStartTimeNs);
182 }
183
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTime)184 void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
185 const int64_t eventTime) {
186 VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
187 }
188
dropDataLocked(const int64_t dropTimeNs)189 void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
190 StatsdStats::getInstance().noteBucketDropped(mMetricId);
191 // We are going to flush the data without doing a pull first so we need to invalidte the data.
192 bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
193 if (pullNeeded) {
194 invalidateCurrentBucket();
195 }
196 flushIfNeededLocked(dropTimeNs);
197 clearPastBucketsLocked(dropTimeNs);
198 }
199
clearPastBucketsLocked(const int64_t dumpTimeNs)200 void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
201 mPastBuckets.clear();
202 mSkippedBuckets.clear();
203 }
204
onDumpReportLocked(const int64_t dumpTimeNs,const bool include_current_partial_bucket,const bool erase_data,const DumpLatency dumpLatency,std::set<string> * str_set,ProtoOutputStream * protoOutput)205 void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
206 const bool include_current_partial_bucket,
207 const bool erase_data,
208 const DumpLatency dumpLatency,
209 std::set<string> *str_set,
210 ProtoOutputStream* protoOutput) {
211 VLOG("metric %lld dump report now...", (long long)mMetricId);
212 if (include_current_partial_bucket) {
213 // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
214 // current bucket will have incomplete data and the next will have the wrong snapshot to do
215 // a diff against. If the condition is false, we are fine since the base data is reset and
216 // we are not tracking anything.
217 bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
218 if (pullNeeded) {
219 switch (dumpLatency) {
220 case FAST:
221 invalidateCurrentBucket();
222 break;
223 case NO_TIME_CONSTRAINTS:
224 pullAndMatchEventsLocked(dumpTimeNs, mCondition);
225 break;
226 }
227 }
228 flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
229 }
230 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
231 protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
232
233 if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
234 return;
235 }
236 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
237 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
238 // Fills the dimension path if not slicing by ALL.
239 if (!mSliceByPositionALL) {
240 if (!mDimensionsInWhat.empty()) {
241 uint64_t dimenPathToken =
242 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
243 writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
244 protoOutput->end(dimenPathToken);
245 }
246 if (!mDimensionsInCondition.empty()) {
247 uint64_t dimenPathToken =
248 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
249 writeDimensionPathToProto(mDimensionsInCondition, protoOutput);
250 protoOutput->end(dimenPathToken);
251 }
252 }
253
254 uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
255
256 for (const auto& pair : mSkippedBuckets) {
257 uint64_t wrapperToken =
258 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
259 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
260 (long long)(NanoToMillis(pair.first)));
261 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
262 (long long)(NanoToMillis(pair.second)));
263 protoOutput->end(wrapperToken);
264 }
265
266 for (const auto& pair : mPastBuckets) {
267 const MetricDimensionKey& dimensionKey = pair.first;
268 VLOG(" dimension key %s", dimensionKey.toString().c_str());
269 uint64_t wrapperToken =
270 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
271
272 // First fill dimension.
273 if (mSliceByPositionALL) {
274 uint64_t dimensionToken =
275 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
276 writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
277 protoOutput->end(dimensionToken);
278 if (dimensionKey.hasDimensionKeyInCondition()) {
279 uint64_t dimensionInConditionToken =
280 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
281 writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set,
282 protoOutput);
283 protoOutput->end(dimensionInConditionToken);
284 }
285 } else {
286 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
287 FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
288 if (dimensionKey.hasDimensionKeyInCondition()) {
289 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(),
290 FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set,
291 protoOutput);
292 }
293 }
294
295 // Then fill bucket_info (ValueBucketInfo).
296 for (const auto& bucket : pair.second) {
297 uint64_t bucketInfoToken = protoOutput->start(
298 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
299
300 if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
301 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
302 (long long)NanoToMillis(bucket.mBucketStartNs));
303 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
304 (long long)NanoToMillis(bucket.mBucketEndNs));
305 } else {
306 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
307 (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
308 }
309 // only write the condition timer value if the metric has a condition.
310 if (mConditionTrackerIndex >= 0) {
311 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS,
312 (long long)bucket.mConditionTrueNs);
313 }
314 for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) {
315 int index = bucket.valueIndex[i];
316 const Value& value = bucket.values[i];
317 uint64_t valueToken = protoOutput->start(
318 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
319 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX,
320 index);
321 if (value.getType() == LONG) {
322 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
323 (long long)value.long_value);
324 VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs,
325 (long long)bucket.mBucketEndNs, index, (long long)value.long_value);
326 } else if (value.getType() == DOUBLE) {
327 protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
328 value.double_value);
329 VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs,
330 (long long)bucket.mBucketEndNs, index, value.double_value);
331 } else {
332 VLOG("Wrong value type for ValueMetric output: %d", value.getType());
333 }
334 protoOutput->end(valueToken);
335 }
336 protoOutput->end(bucketInfoToken);
337 }
338 protoOutput->end(wrapperToken);
339 }
340 protoOutput->end(protoToken);
341
342 VLOG("metric %lld dump report now...", (long long)mMetricId);
343 if (erase_data) {
344 mPastBuckets.clear();
345 mSkippedBuckets.clear();
346 }
347 }
348
invalidateCurrentBucketWithoutResetBase()349 void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase() {
350 if (!mCurrentBucketIsInvalid) {
351 // Only report once per invalid bucket.
352 StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
353 }
354 mCurrentBucketIsInvalid = true;
355 }
356
invalidateCurrentBucket()357 void ValueMetricProducer::invalidateCurrentBucket() {
358 invalidateCurrentBucketWithoutResetBase();
359 resetBase();
360 }
361
resetBase()362 void ValueMetricProducer::resetBase() {
363 for (auto& slice : mCurrentSlicedBucket) {
364 for (auto& interval : slice.second) {
365 interval.hasBase = false;
366 }
367 }
368 mHasGlobalBase = false;
369 }
370
371 // Handle active state change. Active state change is treated like a condition change:
372 // - drop bucket if active state change event arrives too late
373 // - if condition is true, pull data on active state changes
374 // - ConditionTimer tracks changes based on AND of condition and active state.
onActiveStateChangedLocked(const int64_t & eventTimeNs)375 void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) {
376 bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs;
377 if (ConditionState::kTrue == mCondition && isEventTooLate) {
378 // Drop bucket because event arrived too late, ie. we are missing data for this bucket.
379 invalidateCurrentBucket();
380 }
381
382 // Call parent method once we've verified the validity of current bucket.
383 MetricProducer::onActiveStateChangedLocked(eventTimeNs);
384
385 if (ConditionState::kTrue != mCondition) {
386 return;
387 }
388
389 // Pull on active state changes.
390 if (!isEventTooLate) {
391 if (mIsPulled) {
392 pullAndMatchEventsLocked(eventTimeNs, mCondition);
393 }
394 // When active state changes from true to false, clear diff base but don't
395 // reset other counters as we may accumulate more value in the bucket.
396 if (mUseDiff && !mIsActive) {
397 resetBase();
398 }
399 }
400
401 flushIfNeededLocked(eventTimeNs);
402
403 // Let condition timer know of new active state.
404 mConditionTimer.onConditionChanged(mIsActive, eventTimeNs);
405 }
406
onConditionChangedLocked(const bool condition,const int64_t eventTimeNs)407 void ValueMetricProducer::onConditionChangedLocked(const bool condition,
408 const int64_t eventTimeNs) {
409 ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
410 bool isEventTooLate = eventTimeNs < mCurrentBucketStartTimeNs;
411
412 if (mIsActive) {
413 if (isEventTooLate) {
414 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
415 (long long)mCurrentBucketStartTimeNs);
416 StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
417 invalidateCurrentBucket();
418 } else {
419 if (mCondition == ConditionState::kUnknown) {
420 // If the condition was unknown, we mark the bucket as invalid since the bucket will
421 // contain partial data. For instance, the condition change might happen close to
422 // the end of the bucket and we might miss lots of data.
423 //
424 // We still want to pull to set the base.
425 invalidateCurrentBucket();
426 }
427
428 // Pull on condition changes.
429 bool conditionChanged =
430 (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)
431 || (mCondition == ConditionState::kFalse &&
432 newCondition == ConditionState::kTrue);
433 // We do not need to pull when we go from unknown to false.
434 //
435 // We also pull if the condition was already true in order to be able to flush the
436 // bucket at the end if needed.
437 //
438 // onConditionChangedLocked might happen on bucket boundaries if this is called before
439 // #onDataPulled.
440 if (mIsPulled && (conditionChanged || condition)) {
441 pullAndMatchEventsLocked(eventTimeNs, newCondition);
442 }
443
444 // When condition change from true to false, clear diff base but don't
445 // reset other counters as we may accumulate more value in the bucket.
446 if (mUseDiff && mCondition == ConditionState::kTrue
447 && newCondition == ConditionState::kFalse) {
448 resetBase();
449 }
450 }
451 }
452
453 mCondition = isEventTooLate ? initialCondition(mConditionTrackerIndex) : newCondition;
454
455 if (mIsActive) {
456 flushIfNeededLocked(eventTimeNs);
457 mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
458 }
459 }
460
pullAndMatchEventsLocked(const int64_t timestampNs,ConditionState condition)461 void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs,
462 ConditionState condition) {
463 vector<std::shared_ptr<LogEvent>> allData;
464 if (!mPullerManager->Pull(mPullTagId, &allData)) {
465 ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
466 invalidateCurrentBucket();
467 return;
468 }
469
470 accumulateEvents(allData, timestampNs, timestampNs, condition);
471 }
472
calcPreviousBucketEndTime(const int64_t currentTimeNs)473 int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
474 return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
475 }
476
477 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
478 // to be delayed. Other events like condition changes or app upgrade which are not based on
479 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,bool pullSuccess,int64_t originalPullTimeNs)480 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
481 bool pullSuccess, int64_t originalPullTimeNs) {
482 std::lock_guard<std::mutex> lock(mMutex);
483 if (mCondition == ConditionState::kTrue) {
484 // If the pull failed, we won't be able to compute a diff.
485 if (!pullSuccess) {
486 invalidateCurrentBucket();
487 } else {
488 bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
489 if (isEventLate) {
490 // If the event is late, we are in the middle of a bucket. Just
491 // process the data without trying to snap the data to the nearest bucket.
492 accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs, mCondition);
493 } else {
494 // For scheduled pulled data, the effective event time is snap to the nearest
495 // bucket end. In the case of waking up from a deep sleep state, we will
496 // attribute to the previous bucket end. If the sleep was long but not very
497 // long, we will be in the immediate next bucket. Previous bucket may get a
498 // larger number as we pull at a later time than real bucket end.
499 //
500 // If the sleep was very long, we skip more than one bucket before sleep. In
501 // this case, if the diff base will be cleared and this new data will serve as
502 // new diff base.
503 int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
504 StatsdStats::getInstance().noteBucketBoundaryDelayNs(
505 mMetricId, originalPullTimeNs - bucketEndTime);
506 accumulateEvents(allData, originalPullTimeNs, bucketEndTime, mCondition);
507 }
508 }
509 }
510
511 // We can probably flush the bucket. Since we used bucketEndTime when calling
512 // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
513 flushIfNeededLocked(originalPullTimeNs);
514 }
515
accumulateEvents(const std::vector<std::shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs,ConditionState condition)516 void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
517 int64_t originalPullTimeNs, int64_t eventElapsedTimeNs,
518 ConditionState condition) {
519 bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
520 if (isEventLate) {
521 VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
522 (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
523 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
524 invalidateCurrentBucket();
525 return;
526 }
527
528 const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs;
529 StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
530 if (pullDelayNs > mMaxPullDelayNs) {
531 ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
532 (long long)mMaxPullDelayNs);
533 StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
534 // We are missing one pull from the bucket which means we will not have a complete view of
535 // what's going on.
536 invalidateCurrentBucket();
537 return;
538 }
539
540 if (allData.size() == 0) {
541 VLOG("Data pulled is empty");
542 StatsdStats::getInstance().noteEmptyData(mPullTagId);
543 }
544
545 mMatchedMetricDimensionKeys.clear();
546 for (const auto& data : allData) {
547 LogEvent localCopy = data->makeCopy();
548 if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
549 MatchingState::kMatched) {
550 localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
551 onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
552 }
553 }
554 // If the new pulled data does not contains some keys we track in our intervals, we need to
555 // reset the base.
556 for (auto& slice : mCurrentSlicedBucket) {
557 bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first)
558 != mMatchedMetricDimensionKeys.end();
559 if (!presentInPulledData) {
560 for (auto& interval : slice.second) {
561 interval.hasBase = false;
562 }
563 }
564 }
565 mMatchedMetricDimensionKeys.clear();
566 mHasGlobalBase = true;
567
568 // If we reach the guardrail, we might have dropped some data which means the bucket is
569 // incomplete.
570 //
571 // The base also needs to be reset. If we do not have the full data, we might
572 // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
573 // might be missing from mCurrentSlicedBucket.
574 if (hasReachedGuardRailLimit()) {
575 invalidateCurrentBucket();
576 mCurrentSlicedBucket.clear();
577 }
578 }
579
dumpStatesLocked(FILE * out,bool verbose) const580 void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
581 if (mCurrentSlicedBucket.size() == 0) {
582 return;
583 }
584
585 fprintf(out, "ValueMetric %lld dimension size %lu\n", (long long)mMetricId,
586 (unsigned long)mCurrentSlicedBucket.size());
587 if (verbose) {
588 for (const auto& it : mCurrentSlicedBucket) {
589 for (const auto& interval : it.second) {
590 fprintf(out, "\t(what)%s\t(condition)%s (value)%s\n",
591 it.first.getDimensionKeyInWhat().toString().c_str(),
592 it.first.getDimensionKeyInCondition().toString().c_str(),
593 interval.value.toString().c_str());
594 }
595 }
596 }
597 }
598
hasReachedGuardRailLimit() const599 bool ValueMetricProducer::hasReachedGuardRailLimit() const {
600 return mCurrentSlicedBucket.size() >= mDimensionHardLimit;
601 }
602
hitGuardRailLocked(const MetricDimensionKey & newKey)603 bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
604 // ===========GuardRail==============
605 // 1. Report the tuple count if the tuple count > soft limit
606 if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
607 return false;
608 }
609 if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) {
610 size_t newTupleCount = mCurrentSlicedBucket.size() + 1;
611 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
612 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
613 if (hasReachedGuardRailLimit()) {
614 ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId,
615 newKey.toString().c_str());
616 StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
617 return true;
618 }
619 }
620
621 return false;
622 }
623
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey)624 bool ValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
625 // ===========GuardRail==============
626 // 1. Report the tuple count if the tuple count > soft limit
627 if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
628 return false;
629 }
630 if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
631 size_t newTupleCount = mCurrentFullBucket.size() + 1;
632 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
633 if (newTupleCount > mDimensionHardLimit) {
634 ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
635 (long long)mMetricId,
636 newKey.toString().c_str());
637 return true;
638 }
639 }
640
641 return false;
642 }
643
getDoubleOrLong(const LogEvent & event,const Matcher & matcher,Value & ret)644 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
645 for (const FieldValue& value : event.getValues()) {
646 if (value.mField.matches(matcher)) {
647 switch (value.mValue.type) {
648 case INT:
649 ret.setLong(value.mValue.int_value);
650 break;
651 case LONG:
652 ret.setLong(value.mValue.long_value);
653 break;
654 case FLOAT:
655 ret.setDouble(value.mValue.float_value);
656 break;
657 case DOUBLE:
658 ret.setDouble(value.mValue.double_value);
659 break;
660 default:
661 break;
662 }
663 return true;
664 }
665 }
666 return false;
667 }
668
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event)669 void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
670 const MetricDimensionKey& eventKey,
671 const ConditionKey& conditionKey,
672 bool condition, const LogEvent& event) {
673 int64_t eventTimeNs = event.GetElapsedTimestampNs();
674 if (eventTimeNs < mCurrentBucketStartTimeNs) {
675 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
676 (long long)mCurrentBucketStartTimeNs);
677 return;
678 }
679 mMatchedMetricDimensionKeys.insert(eventKey);
680
681 if (!mIsPulled) {
682 // We cannot flush without doing a pull first.
683 flushIfNeededLocked(eventTimeNs);
684 }
685
686 // We should not accumulate the data for pushed metrics when the condition is false.
687 bool shouldSkipForPushMetric = !mIsPulled && !condition;
688 // For pulled metrics, there are two cases:
689 // - to compute diffs, we need to process all the state changes
690 // - for non-diffs metrics, we should ignore the data if the condition wasn't true. If we have a
691 // state change from
692 // + True -> True: we should process the data, it might be a bucket boundary
693 // + True -> False: we als need to process the data.
694 bool shouldSkipForPulledMetric = mIsPulled && !mUseDiff
695 && mCondition != ConditionState::kTrue;
696 if (shouldSkipForPushMetric || shouldSkipForPulledMetric) {
697 VLOG("ValueMetric skip event because condition is false");
698 return;
699 }
700
701 if (hitGuardRailLocked(eventKey)) {
702 return;
703 }
704 vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey];
705 if (multiIntervals.size() < mFieldMatchers.size()) {
706 VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
707 multiIntervals.resize(mFieldMatchers.size());
708 }
709
710 // We only use anomaly detection under certain cases.
711 // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
712 // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
713 // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
714 // Whoever next works on it should look into the cases where it is triggered in this function.
715 // Discussion here: http://ag/6124370.
716 bool useAnomalyDetection = true;
717
718 for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
719 const Matcher& matcher = mFieldMatchers[i];
720 Interval& interval = multiIntervals[i];
721 interval.valueIndex = i;
722 Value value;
723 if (!getDoubleOrLong(event, matcher, value)) {
724 VLOG("Failed to get value %d from event %s", i, event.ToString().c_str());
725 StatsdStats::getInstance().noteBadValueType(mMetricId);
726 return;
727 }
728 interval.seenNewData = true;
729
730 if (mUseDiff) {
731 if (!interval.hasBase) {
732 if (mHasGlobalBase && mUseZeroDefaultBase) {
733 // The bucket has global base. This key does not.
734 // Optionally use zero as base.
735 interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
736 interval.hasBase = true;
737 } else {
738 // no base. just update base and return.
739 interval.base = value;
740 interval.hasBase = true;
741 // If we're missing a base, do not use anomaly detection on incomplete data
742 useAnomalyDetection = false;
743 // Continue (instead of return) here in order to set interval.base and
744 // interval.hasBase for other intervals
745 continue;
746 }
747 }
748 Value diff;
749 switch (mValueDirection) {
750 case ValueMetric::INCREASING:
751 if (value >= interval.base) {
752 diff = value - interval.base;
753 } else if (mUseAbsoluteValueOnReset) {
754 diff = value;
755 } else {
756 VLOG("Unexpected decreasing value");
757 StatsdStats::getInstance().notePullDataError(mPullTagId);
758 interval.base = value;
759 // If we've got bad data, do not use anomaly detection
760 useAnomalyDetection = false;
761 continue;
762 }
763 break;
764 case ValueMetric::DECREASING:
765 if (interval.base >= value) {
766 diff = interval.base - value;
767 } else if (mUseAbsoluteValueOnReset) {
768 diff = value;
769 } else {
770 VLOG("Unexpected increasing value");
771 StatsdStats::getInstance().notePullDataError(mPullTagId);
772 interval.base = value;
773 // If we've got bad data, do not use anomaly detection
774 useAnomalyDetection = false;
775 continue;
776 }
777 break;
778 case ValueMetric::ANY:
779 diff = value - interval.base;
780 break;
781 default:
782 break;
783 }
784 interval.base = value;
785 value = diff;
786 }
787
788 if (interval.hasValue) {
789 switch (mAggregationType) {
790 case ValueMetric::SUM:
791 // for AVG, we add up and take average when flushing the bucket
792 case ValueMetric::AVG:
793 interval.value += value;
794 break;
795 case ValueMetric::MIN:
796 interval.value = std::min(value, interval.value);
797 break;
798 case ValueMetric::MAX:
799 interval.value = std::max(value, interval.value);
800 break;
801 default:
802 break;
803 }
804 } else {
805 interval.value = value;
806 interval.hasValue = true;
807 }
808 interval.sampleSize += 1;
809 }
810
811 // Only trigger the tracker if all intervals are correct
812 if (useAnomalyDetection) {
813 // TODO: propgate proper values down stream when anomaly support doubles
814 long wholeBucketVal = multiIntervals[0].value.long_value;
815 auto prev = mCurrentFullBucket.find(eventKey);
816 if (prev != mCurrentFullBucket.end()) {
817 wholeBucketVal += prev->second;
818 }
819 for (auto& tracker : mAnomalyTrackers) {
820 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
821 wholeBucketVal);
822 }
823 }
824 }
825
826 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket
827 // if mCondition is true!
flushIfNeededLocked(const int64_t & eventTimeNs)828 void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
829 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
830 if (eventTimeNs < currentBucketEndTimeNs) {
831 VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
832 (long long)(currentBucketEndTimeNs));
833 return;
834 }
835 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
836 int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
837 flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
838 }
839
calcBucketsForwardCount(const int64_t & eventTimeNs) const840 int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const {
841 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
842 if (eventTimeNs < currentBucketEndTimeNs) {
843 return 0;
844 }
845 return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
846 }
847
flushCurrentBucketLocked(const int64_t & eventTimeNs,const int64_t & nextBucketStartTimeNs)848 void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
849 const int64_t& nextBucketStartTimeNs) {
850 if (mCondition == ConditionState::kUnknown) {
851 StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
852 }
853
854 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
855 if (numBucketsForward > 1) {
856 VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
857 StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
858 // Something went wrong. Maybe the device was sleeping for a long time. It is better
859 // to mark the current bucket as invalid. The last pull might have been successful through.
860 invalidateCurrentBucketWithoutResetBase();
861 }
862
863 VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
864 (int)mCurrentSlicedBucket.size());
865 int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
866 int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
867 // Close the current bucket.
868 int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
869 bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
870 if (isBucketLargeEnough && !mCurrentBucketIsInvalid) {
871 // The current bucket is large enough to keep.
872 for (const auto& slice : mCurrentSlicedBucket) {
873 ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
874 bucket.mConditionTrueNs = conditionTrueDuration;
875 // it will auto create new vector of ValuebucketInfo if the key is not found.
876 if (bucket.valueIndex.size() > 0) {
877 auto& bucketList = mPastBuckets[slice.first];
878 bucketList.push_back(bucket);
879 }
880 }
881 } else {
882 mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
883 }
884
885 appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
886 initCurrentSlicedBucket(nextBucketStartTimeNs);
887 // Update the condition timer again, in case we skipped buckets.
888 mConditionTimer.newBucketStart(nextBucketStartTimeNs);
889 mCurrentBucketNum += numBucketsForward;
890 }
891
buildPartialBucket(int64_t bucketEndTime,const std::vector<Interval> & intervals)892 ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
893 const std::vector<Interval>& intervals) {
894 ValueBucket bucket;
895 bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
896 bucket.mBucketEndNs = bucketEndTime;
897 for (const auto& interval : intervals) {
898 if (interval.hasValue) {
899 // skip the output if the diff is zero
900 if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
901 continue;
902 }
903 bucket.valueIndex.push_back(interval.valueIndex);
904 if (mAggregationType != ValueMetric::AVG) {
905 bucket.values.push_back(interval.value);
906 } else {
907 double sum = interval.value.type == LONG ? (double)interval.value.long_value
908 : interval.value.double_value;
909 bucket.values.push_back(Value((double)sum / interval.sampleSize));
910 }
911 }
912 }
913 return bucket;
914 }
915
initCurrentSlicedBucket(int64_t nextBucketStartTimeNs)916 void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) {
917 StatsdStats::getInstance().noteBucketCount(mMetricId);
918 // Cleanup data structure to aggregate values.
919 for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
920 bool obsolete = true;
921 for (auto& interval : it->second) {
922 interval.hasValue = false;
923 interval.sampleSize = 0;
924 if (interval.seenNewData) {
925 obsolete = false;
926 }
927 interval.seenNewData = false;
928 }
929
930 if (obsolete) {
931 it = mCurrentSlicedBucket.erase(it);
932 } else {
933 it++;
934 }
935 }
936
937 mCurrentBucketIsInvalid = false;
938 // If we do not have a global base when the condition is true,
939 // we will have incomplete bucket for the next bucket.
940 if (mUseDiff && !mHasGlobalBase && mCondition) {
941 mCurrentBucketIsInvalid = false;
942 }
943 mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
944 VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
945 (long long)mCurrentBucketStartTimeNs);
946 }
947
appendToFullBucket(int64_t eventTimeNs,int64_t fullBucketEndTimeNs)948 void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
949 bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs;
950 if (mCurrentBucketIsInvalid) {
951 if (isFullBucketReached) {
952 // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
953 mCurrentFullBucket.clear();
954 }
955 // Current bucket is invalid, we do not add it to the full bucket.
956 return;
957 }
958
959 if (isFullBucketReached) { // If full bucket, send to anomaly tracker.
960 // Accumulate partial buckets with current value and then send to anomaly tracker.
961 if (mCurrentFullBucket.size() > 0) {
962 for (const auto& slice : mCurrentSlicedBucket) {
963 if (hitFullBucketGuardRailLocked(slice.first)) {
964 continue;
965 }
966 // TODO: fix this when anomaly can accept double values
967 auto& interval = slice.second[0];
968 if (interval.hasValue) {
969 mCurrentFullBucket[slice.first] += interval.value.long_value;
970 }
971 }
972 for (const auto& slice : mCurrentFullBucket) {
973 for (auto& tracker : mAnomalyTrackers) {
974 if (tracker != nullptr) {
975 tracker->addPastBucket(slice.first, slice.second, mCurrentBucketNum);
976 }
977 }
978 }
979 mCurrentFullBucket.clear();
980 } else {
981 // Skip aggregating the partial buckets since there's no previous partial bucket.
982 for (const auto& slice : mCurrentSlicedBucket) {
983 for (auto& tracker : mAnomalyTrackers) {
984 if (tracker != nullptr) {
985 // TODO: fix this when anomaly can accept double values
986 auto& interval = slice.second[0];
987 if (interval.hasValue) {
988 tracker->addPastBucket(slice.first, interval.value.long_value,
989 mCurrentBucketNum);
990 }
991 }
992 }
993 }
994 }
995 } else {
996 // Accumulate partial bucket.
997 for (const auto& slice : mCurrentSlicedBucket) {
998 // TODO: fix this when anomaly can accept double values
999 auto& interval = slice.second[0];
1000 if (interval.hasValue) {
1001 mCurrentFullBucket[slice.first] += interval.value.long_value;
1002 }
1003 }
1004 }
1005 }
1006
byteSizeLocked() const1007 size_t ValueMetricProducer::byteSizeLocked() const {
1008 size_t totalSize = 0;
1009 for (const auto& pair : mPastBuckets) {
1010 totalSize += pair.second.size() * kBucketSize;
1011 }
1012 return totalSize;
1013 }
1014
1015 } // namespace statsd
1016 } // namespace os
1017 } // namespace android
1018