• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "ValueMetricProducer.h"
21 #include "../guardrail/StatsdStats.h"
22 #include "../stats_log_util.h"
23 
24 #include <cutils/log.h>
25 #include <limits.h>
26 #include <stdlib.h>
27 
28 using android::util::FIELD_COUNT_REPEATED;
29 using android::util::FIELD_TYPE_BOOL;
30 using android::util::FIELD_TYPE_DOUBLE;
31 using android::util::FIELD_TYPE_INT32;
32 using android::util::FIELD_TYPE_INT64;
33 using android::util::FIELD_TYPE_MESSAGE;
34 using android::util::FIELD_TYPE_STRING;
35 using android::util::ProtoOutputStream;
36 using std::list;
37 using std::make_pair;
38 using std::make_shared;
39 using std::map;
40 using std::shared_ptr;
41 using std::unique_ptr;
42 using std::unordered_map;
43 
44 namespace android {
45 namespace os {
46 namespace statsd {
47 
48 // for StatsLogReport
49 const int FIELD_ID_ID = 1;
50 const int FIELD_ID_VALUE_METRICS = 7;
51 const int FIELD_ID_TIME_BASE = 9;
52 const int FIELD_ID_BUCKET_SIZE = 10;
53 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
54 const int FIELD_ID_DIMENSION_PATH_IN_CONDITION = 12;
55 const int FIELD_ID_IS_ACTIVE = 14;
56 // for ValueMetricDataWrapper
57 const int FIELD_ID_DATA = 1;
58 const int FIELD_ID_SKIPPED = 2;
59 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
60 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
61 // for ValueMetricData
62 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
63 const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
64 const int FIELD_ID_BUCKET_INFO = 3;
65 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
66 const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
67 // for ValueBucketInfo
68 const int FIELD_ID_VALUE_INDEX = 1;
69 const int FIELD_ID_VALUE_LONG = 2;
70 const int FIELD_ID_VALUE_DOUBLE = 3;
71 const int FIELD_ID_VALUES = 9;
72 const int FIELD_ID_BUCKET_NUM = 4;
73 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
74 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
75 const int FIELD_ID_CONDITION_TRUE_NS = 10;
76 
77 const Value ZERO_LONG((int64_t)0);
78 const Value ZERO_DOUBLE((int64_t)0);
79 
80 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
ValueMetricProducer(const ConfigKey & key,const ValueMetric & metric,const int conditionIndex,const sp<ConditionWizard> & conditionWizard,const int whatMatcherIndex,const sp<EventMatcherWizard> & matcherWizard,const int pullTagId,const int64_t timeBaseNs,const int64_t startTimeNs,const sp<StatsPullerManager> & pullerManager)81 ValueMetricProducer::ValueMetricProducer(
82         const ConfigKey& key, const ValueMetric& metric, const int conditionIndex,
83         const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex,
84         const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs,
85         const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager)
86     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard),
87       mWhatMatcherIndex(whatMatcherIndex),
88       mEventMatcherWizard(matcherWizard),
89       mPullerManager(pullerManager),
90       mPullTagId(pullTagId),
91       mIsPulled(pullTagId != -1),
92       mMinBucketSizeNs(metric.min_bucket_size_nanos()),
93       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
94                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
95                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
96                                   : StatsdStats::kDimensionKeySizeSoftLimit),
97       mDimensionHardLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
98                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
99                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
100                                   : StatsdStats::kDimensionKeySizeHardLimit),
101       mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
102       mAggregationType(metric.aggregation_type()),
103       mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
104       mValueDirection(metric.value_direction()),
105       mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
106       mUseZeroDefaultBase(metric.use_zero_default_base()),
107       mHasGlobalBase(false),
108       mCurrentBucketIsInvalid(false),
109       mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
110                                                       : StatsdStats::kPullMaxDelayNs),
111       mSplitBucketForAppUpgrade(metric.split_bucket_for_app_upgrade()),
112       // Condition timer will be set in prepareFirstBucketLocked.
113       mConditionTimer(false, timeBaseNs) {
114     int64_t bucketSizeMills = 0;
115     if (metric.has_bucket()) {
116         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
117     } else {
118         bucketSizeMills = TimeUnitToBucketSizeInMillis(ONE_HOUR);
119     }
120 
121     mBucketSizeNs = bucketSizeMills * 1000000;
122 
123     translateFieldMatcher(metric.value_field(), &mFieldMatchers);
124 
125     if (metric.has_dimensions_in_what()) {
126         translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
127         mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
128     }
129 
130     if (metric.has_dimensions_in_condition()) {
131         translateFieldMatcher(metric.dimensions_in_condition(), &mDimensionsInCondition);
132     }
133 
134     if (metric.links().size() > 0) {
135         for (const auto& link : metric.links()) {
136             Metric2Condition mc;
137             mc.conditionId = link.condition();
138             translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
139             translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
140             mMetric2ConditionLinks.push_back(mc);
141         }
142     }
143 
144     mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
145     mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
146                           HasPositionALL(metric.dimensions_in_condition());
147 
148     int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs);
149     mCurrentBucketNum += numBucketsForward;
150 
151     flushIfNeededLocked(startTimeNs);
152 
153     if (mIsPulled) {
154         mPullerManager->RegisterReceiver(mPullTagId, this, getCurrentBucketEndTimeNs(),
155                                          mBucketSizeNs);
156     }
157 
158     // Only do this for partial buckets like first bucket. All other buckets should use
159     // flushIfNeeded to adjust start and end to bucket boundaries.
160     // Adjust start for partial bucket
161     mCurrentBucketStartTimeNs = startTimeNs;
162     mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs);
163     VLOG("value metric %lld created. bucket size %lld start_time: %lld", (long long)metric.id(),
164          (long long)mBucketSizeNs, (long long)mTimeBaseNs);
165 }
166 
~ValueMetricProducer()167 ValueMetricProducer::~ValueMetricProducer() {
168     VLOG("~ValueMetricProducer() called");
169     if (mIsPulled) {
170         mPullerManager->UnRegisterReceiver(mPullTagId, this);
171     }
172 }
173 
prepareFirstBucketLocked()174 void ValueMetricProducer::prepareFirstBucketLocked() {
175     // Kicks off the puller immediately if condition is true and diff based.
176     if (mIsActive && mIsPulled && mCondition == ConditionState::kTrue && mUseDiff) {
177         pullAndMatchEventsLocked(mCurrentBucketStartTimeNs, mCondition);
178     }
179     // Now that activations are processed, start the condition timer if needed.
180     mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
181                                        mCurrentBucketStartTimeNs);
182 }
183 
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTime)184 void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
185                                                            const int64_t eventTime) {
186     VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
187 }
188 
dropDataLocked(const int64_t dropTimeNs)189 void ValueMetricProducer::dropDataLocked(const int64_t dropTimeNs) {
190     StatsdStats::getInstance().noteBucketDropped(mMetricId);
191     // We are going to flush the data without doing a pull first so we need to invalidte the data.
192     bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
193     if (pullNeeded) {
194         invalidateCurrentBucket();
195     }
196     flushIfNeededLocked(dropTimeNs);
197     clearPastBucketsLocked(dropTimeNs);
198 }
199 
clearPastBucketsLocked(const int64_t dumpTimeNs)200 void ValueMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
201     mPastBuckets.clear();
202     mSkippedBuckets.clear();
203 }
204 
onDumpReportLocked(const int64_t dumpTimeNs,const bool include_current_partial_bucket,const bool erase_data,const DumpLatency dumpLatency,std::set<string> * str_set,ProtoOutputStream * protoOutput)205 void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
206                                              const bool include_current_partial_bucket,
207                                              const bool erase_data,
208                                              const DumpLatency dumpLatency,
209                                              std::set<string> *str_set,
210                                              ProtoOutputStream* protoOutput) {
211     VLOG("metric %lld dump report now...", (long long)mMetricId);
212     if (include_current_partial_bucket) {
213         // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
214         // current bucket will have incomplete data and the next will have the wrong snapshot to do
215         // a diff against. If the condition is false, we are fine since the base data is reset and
216         // we are not tracking anything.
217         bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
218         if (pullNeeded) {
219             switch (dumpLatency) {
220                 case FAST:
221                     invalidateCurrentBucket();
222                     break;
223                 case NO_TIME_CONSTRAINTS:
224                     pullAndMatchEventsLocked(dumpTimeNs, mCondition);
225                     break;
226             }
227         }
228         flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
229     }
230     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
231     protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
232 
233     if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
234         return;
235     }
236     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
237     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
238     // Fills the dimension path if not slicing by ALL.
239     if (!mSliceByPositionALL) {
240         if (!mDimensionsInWhat.empty()) {
241             uint64_t dimenPathToken =
242                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
243             writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
244             protoOutput->end(dimenPathToken);
245         }
246         if (!mDimensionsInCondition.empty()) {
247             uint64_t dimenPathToken =
248                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_CONDITION);
249             writeDimensionPathToProto(mDimensionsInCondition, protoOutput);
250             protoOutput->end(dimenPathToken);
251         }
252     }
253 
254     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
255 
256     for (const auto& pair : mSkippedBuckets) {
257         uint64_t wrapperToken =
258                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
259         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
260                            (long long)(NanoToMillis(pair.first)));
261         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
262                            (long long)(NanoToMillis(pair.second)));
263         protoOutput->end(wrapperToken);
264     }
265 
266     for (const auto& pair : mPastBuckets) {
267         const MetricDimensionKey& dimensionKey = pair.first;
268         VLOG("  dimension key %s", dimensionKey.toString().c_str());
269         uint64_t wrapperToken =
270                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
271 
272         // First fill dimension.
273         if (mSliceByPositionALL) {
274             uint64_t dimensionToken =
275                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
276             writeDimensionToProto(dimensionKey.getDimensionKeyInWhat(), str_set, protoOutput);
277             protoOutput->end(dimensionToken);
278             if (dimensionKey.hasDimensionKeyInCondition()) {
279                 uint64_t dimensionInConditionToken =
280                         protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_CONDITION);
281                 writeDimensionToProto(dimensionKey.getDimensionKeyInCondition(), str_set,
282                                       protoOutput);
283                 protoOutput->end(dimensionInConditionToken);
284             }
285         } else {
286             writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInWhat(),
287                                            FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
288             if (dimensionKey.hasDimensionKeyInCondition()) {
289                 writeDimensionLeafNodesToProto(dimensionKey.getDimensionKeyInCondition(),
290                                                FIELD_ID_DIMENSION_LEAF_IN_CONDITION, str_set,
291                                                protoOutput);
292             }
293         }
294 
295         // Then fill bucket_info (ValueBucketInfo).
296         for (const auto& bucket : pair.second) {
297             uint64_t bucketInfoToken = protoOutput->start(
298                     FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
299 
300             if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
301                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_ELAPSED_MILLIS,
302                                    (long long)NanoToMillis(bucket.mBucketStartNs));
303                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_ELAPSED_MILLIS,
304                                    (long long)NanoToMillis(bucket.mBucketEndNs));
305             } else {
306                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
307                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
308             }
309             // only write the condition timer value if the metric has a condition.
310             if (mConditionTrackerIndex >= 0) {
311                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS,
312                                    (long long)bucket.mConditionTrueNs);
313             }
314             for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) {
315                 int index = bucket.valueIndex[i];
316                 const Value& value = bucket.values[i];
317                 uint64_t valueToken = protoOutput->start(
318                         FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
319                 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX,
320                                    index);
321                 if (value.getType() == LONG) {
322                     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
323                                        (long long)value.long_value);
324                     VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs,
325                          (long long)bucket.mBucketEndNs, index, (long long)value.long_value);
326                 } else if (value.getType() == DOUBLE) {
327                     protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
328                                        value.double_value);
329                     VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs,
330                          (long long)bucket.mBucketEndNs, index, value.double_value);
331                 } else {
332                     VLOG("Wrong value type for ValueMetric output: %d", value.getType());
333                 }
334                 protoOutput->end(valueToken);
335             }
336             protoOutput->end(bucketInfoToken);
337         }
338         protoOutput->end(wrapperToken);
339     }
340     protoOutput->end(protoToken);
341 
342     VLOG("metric %lld dump report now...", (long long)mMetricId);
343     if (erase_data) {
344         mPastBuckets.clear();
345         mSkippedBuckets.clear();
346     }
347 }
348 
invalidateCurrentBucketWithoutResetBase()349 void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase() {
350     if (!mCurrentBucketIsInvalid) {
351         // Only report once per invalid bucket.
352         StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
353     }
354     mCurrentBucketIsInvalid = true;
355 }
356 
invalidateCurrentBucket()357 void ValueMetricProducer::invalidateCurrentBucket() {
358     invalidateCurrentBucketWithoutResetBase();
359     resetBase();
360 }
361 
resetBase()362 void ValueMetricProducer::resetBase() {
363     for (auto& slice : mCurrentSlicedBucket) {
364         for (auto& interval : slice.second) {
365             interval.hasBase = false;
366         }
367     }
368     mHasGlobalBase = false;
369 }
370 
371 // Handle active state change. Active state change is treated like a condition change:
372 // - drop bucket if active state change event arrives too late
373 // - if condition is true, pull data on active state changes
374 // - ConditionTimer tracks changes based on AND of condition and active state.
onActiveStateChangedLocked(const int64_t & eventTimeNs)375 void ValueMetricProducer::onActiveStateChangedLocked(const int64_t& eventTimeNs) {
376     bool isEventTooLate  = eventTimeNs < mCurrentBucketStartTimeNs;
377     if (ConditionState::kTrue == mCondition && isEventTooLate) {
378         // Drop bucket because event arrived too late, ie. we are missing data for this bucket.
379         invalidateCurrentBucket();
380     }
381 
382     // Call parent method once we've verified the validity of current bucket.
383     MetricProducer::onActiveStateChangedLocked(eventTimeNs);
384 
385     if (ConditionState::kTrue != mCondition) {
386         return;
387     }
388 
389     // Pull on active state changes.
390     if (!isEventTooLate) {
391         if (mIsPulled) {
392             pullAndMatchEventsLocked(eventTimeNs, mCondition);
393         }
394         // When active state changes from true to false, clear diff base but don't
395         // reset other counters as we may accumulate more value in the bucket.
396         if (mUseDiff && !mIsActive) {
397             resetBase();
398         }
399     }
400 
401     flushIfNeededLocked(eventTimeNs);
402 
403     // Let condition timer know of new active state.
404     mConditionTimer.onConditionChanged(mIsActive, eventTimeNs);
405 }
406 
onConditionChangedLocked(const bool condition,const int64_t eventTimeNs)407 void ValueMetricProducer::onConditionChangedLocked(const bool condition,
408                                                    const int64_t eventTimeNs) {
409     ConditionState newCondition = condition ? ConditionState::kTrue : ConditionState::kFalse;
410     bool isEventTooLate  = eventTimeNs < mCurrentBucketStartTimeNs;
411 
412     if (mIsActive) {
413         if (isEventTooLate) {
414             VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
415                  (long long)mCurrentBucketStartTimeNs);
416             StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
417             invalidateCurrentBucket();
418         } else {
419             if (mCondition == ConditionState::kUnknown) {
420                 // If the condition was unknown, we mark the bucket as invalid since the bucket will
421                 // contain partial data. For instance, the condition change might happen close to
422                 // the end of the bucket and we might miss lots of data.
423                 //
424                 // We still want to pull to set the base.
425                 invalidateCurrentBucket();
426             }
427 
428             // Pull on condition changes.
429             bool conditionChanged =
430                     (mCondition == ConditionState::kTrue && newCondition == ConditionState::kFalse)
431                     || (mCondition == ConditionState::kFalse &&
432                             newCondition == ConditionState::kTrue);
433             // We do not need to pull when we go from unknown to false.
434             //
435             // We also pull if the condition was already true in order to be able to flush the
436             // bucket at the end if needed.
437             //
438             // onConditionChangedLocked might happen on bucket boundaries if this is called before
439             // #onDataPulled.
440             if (mIsPulled && (conditionChanged || condition)) {
441                 pullAndMatchEventsLocked(eventTimeNs, newCondition);
442             }
443 
444             // When condition change from true to false, clear diff base but don't
445             // reset other counters as we may accumulate more value in the bucket.
446             if (mUseDiff && mCondition == ConditionState::kTrue
447                     && newCondition == ConditionState::kFalse) {
448                 resetBase();
449             }
450         }
451     }
452 
453     mCondition = isEventTooLate ? initialCondition(mConditionTrackerIndex) : newCondition;
454 
455     if (mIsActive) {
456         flushIfNeededLocked(eventTimeNs);
457         mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
458     }
459 }
460 
pullAndMatchEventsLocked(const int64_t timestampNs,ConditionState condition)461 void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs,
462         ConditionState condition) {
463     vector<std::shared_ptr<LogEvent>> allData;
464     if (!mPullerManager->Pull(mPullTagId, &allData)) {
465         ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
466         invalidateCurrentBucket();
467         return;
468     }
469 
470     accumulateEvents(allData, timestampNs, timestampNs, condition);
471 }
472 
calcPreviousBucketEndTime(const int64_t currentTimeNs)473 int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
474     return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
475 }
476 
477 // By design, statsd pulls data at bucket boundaries using AlarmManager. These pulls are likely
478 // to be delayed. Other events like condition changes or app upgrade which are not based on
479 // AlarmManager might have arrived earlier and close the bucket.
onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & allData,bool pullSuccess,int64_t originalPullTimeNs)480 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
481                                        bool pullSuccess, int64_t originalPullTimeNs) {
482     std::lock_guard<std::mutex> lock(mMutex);
483         if (mCondition == ConditionState::kTrue) {
484             // If the pull failed, we won't be able to compute a diff.
485             if (!pullSuccess) {
486                 invalidateCurrentBucket();
487             } else {
488                 bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
489                 if (isEventLate) {
490                     // If the event is late, we are in the middle of a bucket. Just
491                     // process the data without trying to snap the data to the nearest bucket.
492                     accumulateEvents(allData, originalPullTimeNs, originalPullTimeNs, mCondition);
493                 } else {
494                     // For scheduled pulled data, the effective event time is snap to the nearest
495                     // bucket end. In the case of waking up from a deep sleep state, we will
496                     // attribute to the previous bucket end. If the sleep was long but not very
497                     // long, we will be in the immediate next bucket. Previous bucket may get a
498                     // larger number as we pull at a later time than real bucket end.
499                     //
500                     // If the sleep was very long, we skip more than one bucket before sleep. In
501                     // this case, if the diff base will be cleared and this new data will serve as
502                     // new diff base.
503                     int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
504                     StatsdStats::getInstance().noteBucketBoundaryDelayNs(
505                             mMetricId, originalPullTimeNs - bucketEndTime);
506                     accumulateEvents(allData, originalPullTimeNs, bucketEndTime, mCondition);
507                 }
508             }
509         }
510 
511     // We can probably flush the bucket. Since we used bucketEndTime when calling
512     // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
513     flushIfNeededLocked(originalPullTimeNs);
514 }
515 
accumulateEvents(const std::vector<std::shared_ptr<LogEvent>> & allData,int64_t originalPullTimeNs,int64_t eventElapsedTimeNs,ConditionState condition)516 void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
517                                            int64_t originalPullTimeNs, int64_t eventElapsedTimeNs,
518                                            ConditionState condition) {
519     bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
520     if (isEventLate) {
521         VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
522              (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
523         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
524         invalidateCurrentBucket();
525         return;
526     }
527 
528     const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs;
529     StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
530     if (pullDelayNs > mMaxPullDelayNs) {
531         ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
532               (long long)mMaxPullDelayNs);
533         StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
534         // We are missing one pull from the bucket which means we will not have a complete view of
535         // what's going on.
536         invalidateCurrentBucket();
537         return;
538     }
539 
540     if (allData.size() == 0) {
541         VLOG("Data pulled is empty");
542         StatsdStats::getInstance().noteEmptyData(mPullTagId);
543     }
544 
545     mMatchedMetricDimensionKeys.clear();
546     for (const auto& data : allData) {
547         LogEvent localCopy = data->makeCopy();
548         if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
549             MatchingState::kMatched) {
550             localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
551             onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
552         }
553     }
554     // If the new pulled data does not contains some keys we track in our intervals, we need to
555     // reset the base.
556     for (auto& slice : mCurrentSlicedBucket) {
557         bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first)
558                 != mMatchedMetricDimensionKeys.end();
559         if (!presentInPulledData) {
560             for (auto& interval : slice.second) {
561                 interval.hasBase = false;
562             }
563         }
564     }
565     mMatchedMetricDimensionKeys.clear();
566     mHasGlobalBase = true;
567 
568     // If we reach the guardrail, we might have dropped some data which means the bucket is
569     // incomplete.
570     //
571     // The base also needs to be reset. If we do not have the full data, we might
572     // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
573     // might be missing from mCurrentSlicedBucket.
574     if (hasReachedGuardRailLimit()) {
575         invalidateCurrentBucket();
576         mCurrentSlicedBucket.clear();
577     }
578 }
579 
dumpStatesLocked(FILE * out,bool verbose) const580 void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
581     if (mCurrentSlicedBucket.size() == 0) {
582         return;
583     }
584 
585     fprintf(out, "ValueMetric %lld dimension size %lu\n", (long long)mMetricId,
586             (unsigned long)mCurrentSlicedBucket.size());
587     if (verbose) {
588         for (const auto& it : mCurrentSlicedBucket) {
589           for (const auto& interval : it.second) {
590             fprintf(out, "\t(what)%s\t(condition)%s  (value)%s\n",
591                     it.first.getDimensionKeyInWhat().toString().c_str(),
592                     it.first.getDimensionKeyInCondition().toString().c_str(),
593                     interval.value.toString().c_str());
594           }
595         }
596     }
597 }
598 
hasReachedGuardRailLimit() const599 bool ValueMetricProducer::hasReachedGuardRailLimit() const {
600     return mCurrentSlicedBucket.size() >= mDimensionHardLimit;
601 }
602 
hitGuardRailLocked(const MetricDimensionKey & newKey)603 bool ValueMetricProducer::hitGuardRailLocked(const MetricDimensionKey& newKey) {
604     // ===========GuardRail==============
605     // 1. Report the tuple count if the tuple count > soft limit
606     if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
607         return false;
608     }
609     if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) {
610         size_t newTupleCount = mCurrentSlicedBucket.size() + 1;
611         StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
612         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
613         if (hasReachedGuardRailLimit()) {
614             ALOGE("ValueMetric %lld dropping data for dimension key %s", (long long)mMetricId,
615                   newKey.toString().c_str());
616             StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
617             return true;
618         }
619     }
620 
621     return false;
622 }
623 
hitFullBucketGuardRailLocked(const MetricDimensionKey & newKey)624 bool ValueMetricProducer::hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey) {
625     // ===========GuardRail==============
626     // 1. Report the tuple count if the tuple count > soft limit
627     if (mCurrentFullBucket.find(newKey) != mCurrentFullBucket.end()) {
628         return false;
629     }
630     if (mCurrentFullBucket.size() > mDimensionSoftLimit - 1) {
631         size_t newTupleCount = mCurrentFullBucket.size() + 1;
632         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
633         if (newTupleCount > mDimensionHardLimit) {
634             ALOGE("ValueMetric %lld dropping data for full bucket dimension key %s",
635                   (long long)mMetricId,
636                   newKey.toString().c_str());
637             return true;
638         }
639     }
640 
641     return false;
642 }
643 
getDoubleOrLong(const LogEvent & event,const Matcher & matcher,Value & ret)644 bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
645     for (const FieldValue& value : event.getValues()) {
646         if (value.mField.matches(matcher)) {
647             switch (value.mValue.type) {
648                 case INT:
649                     ret.setLong(value.mValue.int_value);
650                     break;
651                 case LONG:
652                     ret.setLong(value.mValue.long_value);
653                     break;
654                 case FLOAT:
655                     ret.setDouble(value.mValue.float_value);
656                     break;
657                 case DOUBLE:
658                     ret.setDouble(value.mValue.double_value);
659                     break;
660                 default:
661                     break;
662             }
663             return true;
664         }
665     }
666     return false;
667 }
668 
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event)669 void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
670                                                           const MetricDimensionKey& eventKey,
671                                                           const ConditionKey& conditionKey,
672                                                           bool condition, const LogEvent& event) {
673     int64_t eventTimeNs = event.GetElapsedTimestampNs();
674     if (eventTimeNs < mCurrentBucketStartTimeNs) {
675         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
676              (long long)mCurrentBucketStartTimeNs);
677         return;
678     }
679     mMatchedMetricDimensionKeys.insert(eventKey);
680 
681     if (!mIsPulled) {
682         // We cannot flush without doing a pull first.
683         flushIfNeededLocked(eventTimeNs);
684     }
685 
686     // We should not accumulate the data for pushed metrics when the condition is false.
687     bool shouldSkipForPushMetric = !mIsPulled && !condition;
688     // For pulled metrics, there are two cases:
689     // - to compute diffs, we need to process all the state changes
690     // - for non-diffs metrics, we should ignore the data if the condition wasn't true. If we have a
691     // state change from
692     //     + True -> True: we should process the data, it might be a bucket boundary
693     //     + True -> False: we als need to process the data.
694     bool shouldSkipForPulledMetric = mIsPulled && !mUseDiff
695             && mCondition != ConditionState::kTrue;
696     if (shouldSkipForPushMetric || shouldSkipForPulledMetric) {
697         VLOG("ValueMetric skip event because condition is false");
698         return;
699     }
700 
701     if (hitGuardRailLocked(eventKey)) {
702         return;
703     }
704     vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey];
705     if (multiIntervals.size() < mFieldMatchers.size()) {
706         VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
707         multiIntervals.resize(mFieldMatchers.size());
708     }
709 
710     // We only use anomaly detection under certain cases.
711     // N.B.: The anomaly detection cases were modified in order to fix an issue with value metrics
712     // containing multiple values. We tried to retain all previous behaviour, but we are unsure the
713     // previous behaviour was correct. At the time of the fix, anomaly detection had no owner.
714     // Whoever next works on it should look into the cases where it is triggered in this function.
715     // Discussion here: http://ag/6124370.
716     bool useAnomalyDetection = true;
717 
718     for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
719         const Matcher& matcher = mFieldMatchers[i];
720         Interval& interval = multiIntervals[i];
721         interval.valueIndex = i;
722         Value value;
723         if (!getDoubleOrLong(event, matcher, value)) {
724             VLOG("Failed to get value %d from event %s", i, event.ToString().c_str());
725             StatsdStats::getInstance().noteBadValueType(mMetricId);
726             return;
727         }
728         interval.seenNewData = true;
729 
730         if (mUseDiff) {
731             if (!interval.hasBase) {
732                 if (mHasGlobalBase && mUseZeroDefaultBase) {
733                     // The bucket has global base. This key does not.
734                     // Optionally use zero as base.
735                     interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
736                     interval.hasBase = true;
737                 } else {
738                     // no base. just update base and return.
739                     interval.base = value;
740                     interval.hasBase = true;
741                     // If we're missing a base, do not use anomaly detection on incomplete data
742                     useAnomalyDetection = false;
743                     // Continue (instead of return) here in order to set interval.base and
744                     // interval.hasBase for other intervals
745                     continue;
746                 }
747             }
748             Value diff;
749             switch (mValueDirection) {
750                 case ValueMetric::INCREASING:
751                     if (value >= interval.base) {
752                         diff = value - interval.base;
753                     } else if (mUseAbsoluteValueOnReset) {
754                         diff = value;
755                     } else {
756                         VLOG("Unexpected decreasing value");
757                         StatsdStats::getInstance().notePullDataError(mPullTagId);
758                         interval.base = value;
759                         // If we've got bad data, do not use anomaly detection
760                         useAnomalyDetection = false;
761                         continue;
762                     }
763                     break;
764                 case ValueMetric::DECREASING:
765                     if (interval.base >= value) {
766                         diff = interval.base - value;
767                     } else if (mUseAbsoluteValueOnReset) {
768                         diff = value;
769                     } else {
770                         VLOG("Unexpected increasing value");
771                         StatsdStats::getInstance().notePullDataError(mPullTagId);
772                         interval.base = value;
773                         // If we've got bad data, do not use anomaly detection
774                         useAnomalyDetection = false;
775                         continue;
776                     }
777                     break;
778                 case ValueMetric::ANY:
779                     diff = value - interval.base;
780                     break;
781                 default:
782                     break;
783             }
784             interval.base = value;
785             value = diff;
786         }
787 
788         if (interval.hasValue) {
789             switch (mAggregationType) {
790                 case ValueMetric::SUM:
791                     // for AVG, we add up and take average when flushing the bucket
792                 case ValueMetric::AVG:
793                     interval.value += value;
794                     break;
795                 case ValueMetric::MIN:
796                     interval.value = std::min(value, interval.value);
797                     break;
798                 case ValueMetric::MAX:
799                     interval.value = std::max(value, interval.value);
800                     break;
801                 default:
802                     break;
803             }
804         } else {
805             interval.value = value;
806             interval.hasValue = true;
807         }
808         interval.sampleSize += 1;
809     }
810 
811     // Only trigger the tracker if all intervals are correct
812     if (useAnomalyDetection) {
813         // TODO: propgate proper values down stream when anomaly support doubles
814         long wholeBucketVal = multiIntervals[0].value.long_value;
815         auto prev = mCurrentFullBucket.find(eventKey);
816         if (prev != mCurrentFullBucket.end()) {
817             wholeBucketVal += prev->second;
818         }
819         for (auto& tracker : mAnomalyTrackers) {
820             tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, mMetricId, eventKey,
821                                              wholeBucketVal);
822         }
823     }
824 }
825 
826 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket
827 // if mCondition is true!
flushIfNeededLocked(const int64_t & eventTimeNs)828 void ValueMetricProducer::flushIfNeededLocked(const int64_t& eventTimeNs) {
829     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
830     if (eventTimeNs < currentBucketEndTimeNs) {
831         VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
832              (long long)(currentBucketEndTimeNs));
833         return;
834     }
835     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
836     int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
837     flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
838 }
839 
calcBucketsForwardCount(const int64_t & eventTimeNs) const840 int64_t ValueMetricProducer::calcBucketsForwardCount(const int64_t& eventTimeNs) const {
841     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
842     if (eventTimeNs < currentBucketEndTimeNs) {
843         return 0;
844     }
845     return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
846 }
847 
flushCurrentBucketLocked(const int64_t & eventTimeNs,const int64_t & nextBucketStartTimeNs)848 void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
849                                                    const int64_t& nextBucketStartTimeNs) {
850     if (mCondition == ConditionState::kUnknown) {
851         StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
852     }
853 
854     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
855     if (numBucketsForward > 1) {
856         VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
857         StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
858         // Something went wrong. Maybe the device was sleeping for a long time. It is better
859         // to mark the current bucket as invalid. The last pull might have been successful through.
860         invalidateCurrentBucketWithoutResetBase();
861     }
862 
863     VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
864          (int)mCurrentSlicedBucket.size());
865     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
866     int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
867     // Close the current bucket.
868     int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
869     bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
870     if (isBucketLargeEnough && !mCurrentBucketIsInvalid) {
871         // The current bucket is large enough to keep.
872         for (const auto& slice : mCurrentSlicedBucket) {
873             ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
874             bucket.mConditionTrueNs = conditionTrueDuration;
875             // it will auto create new vector of ValuebucketInfo if the key is not found.
876             if (bucket.valueIndex.size() > 0) {
877                 auto& bucketList = mPastBuckets[slice.first];
878                 bucketList.push_back(bucket);
879             }
880         }
881     } else {
882         mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
883     }
884 
885     appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
886     initCurrentSlicedBucket(nextBucketStartTimeNs);
887     // Update the condition timer again, in case we skipped buckets.
888     mConditionTimer.newBucketStart(nextBucketStartTimeNs);
889     mCurrentBucketNum += numBucketsForward;
890 }
891 
buildPartialBucket(int64_t bucketEndTime,const std::vector<Interval> & intervals)892 ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
893                                                     const std::vector<Interval>& intervals) {
894     ValueBucket bucket;
895     bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
896     bucket.mBucketEndNs = bucketEndTime;
897     for (const auto& interval : intervals) {
898         if (interval.hasValue) {
899             // skip the output if the diff is zero
900             if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
901                 continue;
902             }
903             bucket.valueIndex.push_back(interval.valueIndex);
904             if (mAggregationType != ValueMetric::AVG) {
905                 bucket.values.push_back(interval.value);
906             } else {
907                 double sum = interval.value.type == LONG ? (double)interval.value.long_value
908                                                          : interval.value.double_value;
909                 bucket.values.push_back(Value((double)sum / interval.sampleSize));
910             }
911         }
912     }
913     return bucket;
914 }
915 
initCurrentSlicedBucket(int64_t nextBucketStartTimeNs)916 void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) {
917     StatsdStats::getInstance().noteBucketCount(mMetricId);
918     // Cleanup data structure to aggregate values.
919     for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
920         bool obsolete = true;
921         for (auto& interval : it->second) {
922             interval.hasValue = false;
923             interval.sampleSize = 0;
924             if (interval.seenNewData) {
925                 obsolete = false;
926             }
927             interval.seenNewData = false;
928         }
929 
930         if (obsolete) {
931             it = mCurrentSlicedBucket.erase(it);
932         } else {
933             it++;
934         }
935     }
936 
937     mCurrentBucketIsInvalid = false;
938     // If we do not have a global base when the condition is true,
939     // we will have incomplete bucket for the next bucket.
940     if (mUseDiff && !mHasGlobalBase && mCondition) {
941         mCurrentBucketIsInvalid = false;
942     }
943     mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
944     VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
945          (long long)mCurrentBucketStartTimeNs);
946 }
947 
appendToFullBucket(int64_t eventTimeNs,int64_t fullBucketEndTimeNs)948 void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
949     bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs;
950     if (mCurrentBucketIsInvalid) {
951         if (isFullBucketReached) {
952             // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
953             mCurrentFullBucket.clear();
954         }
955         // Current bucket is invalid, we do not add it to the full bucket.
956         return;
957     }
958 
959     if (isFullBucketReached) {  // If full bucket, send to anomaly tracker.
960         // Accumulate partial buckets with current value and then send to anomaly tracker.
961         if (mCurrentFullBucket.size() > 0) {
962             for (const auto& slice : mCurrentSlicedBucket) {
963                 if (hitFullBucketGuardRailLocked(slice.first)) {
964                     continue;
965                 }
966                 // TODO: fix this when anomaly can accept double values
967                 auto& interval = slice.second[0];
968                 if (interval.hasValue) {
969                     mCurrentFullBucket[slice.first] += interval.value.long_value;
970                 }
971             }
972             for (const auto& slice : mCurrentFullBucket) {
973                 for (auto& tracker : mAnomalyTrackers) {
974                     if (tracker != nullptr) {
975                         tracker->addPastBucket(slice.first, slice.second, mCurrentBucketNum);
976                     }
977                 }
978             }
979             mCurrentFullBucket.clear();
980         } else {
981             // Skip aggregating the partial buckets since there's no previous partial bucket.
982             for (const auto& slice : mCurrentSlicedBucket) {
983                 for (auto& tracker : mAnomalyTrackers) {
984                     if (tracker != nullptr) {
985                         // TODO: fix this when anomaly can accept double values
986                         auto& interval = slice.second[0];
987                         if (interval.hasValue) {
988                             tracker->addPastBucket(slice.first, interval.value.long_value,
989                                                    mCurrentBucketNum);
990                         }
991                     }
992                 }
993             }
994         }
995     } else {
996         // Accumulate partial bucket.
997         for (const auto& slice : mCurrentSlicedBucket) {
998             // TODO: fix this when anomaly can accept double values
999             auto& interval = slice.second[0];
1000             if (interval.hasValue) {
1001                 mCurrentFullBucket[slice.first] += interval.value.long_value;
1002             }
1003         }
1004     }
1005 }
1006 
byteSizeLocked() const1007 size_t ValueMetricProducer::byteSizeLocked() const {
1008     size_t totalSize = 0;
1009     for (const auto& pair : mPastBuckets) {
1010         totalSize += pair.second.size() * kBucketSize;
1011     }
1012     return totalSize;
1013 }
1014 
1015 }  // namespace statsd
1016 }  // namespace os
1017 }  // namespace android
1018