• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define STATSD_DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "ValueMetricProducer.h"
21 
22 #include <kll.h>
23 #include <limits.h>
24 #include <stdlib.h>
25 
26 #include "FieldValue.h"
27 #include "HashableDimensionKey.h"
28 #include "guardrail/StatsdStats.h"
29 #include "metrics/parsing_utils/metrics_manager_util.h"
30 #include "stats_log_util.h"
31 #include "stats_util.h"
32 
33 using android::util::FIELD_COUNT_REPEATED;
34 using android::util::FIELD_TYPE_BOOL;
35 using android::util::FIELD_TYPE_INT32;
36 using android::util::FIELD_TYPE_INT64;
37 using android::util::FIELD_TYPE_MESSAGE;
38 using android::util::ProtoOutputStream;
39 using dist_proc::aggregation::KllQuantile;
40 using std::optional;
41 using std::shared_ptr;
42 using std::unique_ptr;
43 using std::unordered_map;
44 using std::vector;
45 
46 namespace android {
47 namespace os {
48 namespace statsd {
49 
50 // for StatsLogReport
51 const int FIELD_ID_ID = 1;
52 const int FIELD_ID_TIME_BASE = 9;
53 const int FIELD_ID_BUCKET_SIZE = 10;
54 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
55 const int FIELD_ID_IS_ACTIVE = 14;
56 // for *MetricDataWrapper
57 const int FIELD_ID_DATA = 1;
58 const int FIELD_ID_SKIPPED = 2;
59 // for SkippedBuckets
60 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
61 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
62 const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
63 // for DumpEvent Proto
64 const int FIELD_ID_BUCKET_DROP_REASON = 1;
65 const int FIELD_ID_DROP_TIME = 2;
66 // for *MetricData
67 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
68 const int FIELD_ID_BUCKET_INFO = 3;
69 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
70 const int FIELD_ID_SLICE_BY_STATE = 6;
71 
72 template <typename AggregatedValue, typename DimExtras>
ValueMetricProducer(const int64_t & metricId,const ConfigKey & key,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions)73 ValueMetricProducer<AggregatedValue, DimExtras>::ValueMetricProducer(
74         const int64_t& metricId, const ConfigKey& key, const uint64_t protoHash,
75         const PullOptions& pullOptions, const BucketOptions& bucketOptions,
76         const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
77         const StateOptions& stateOptions, const ActivationOptions& activationOptions,
78         const GuardrailOptions& guardrailOptions)
79     : MetricProducer(metricId, key, bucketOptions.timeBaseNs, conditionOptions.conditionIndex,
80                      conditionOptions.initialConditionCache, conditionOptions.conditionWizard,
81                      protoHash, activationOptions.eventActivationMap,
82                      activationOptions.eventDeactivationMap, stateOptions.slicedStateAtoms,
83                      stateOptions.stateGroupMap, bucketOptions.splitBucketForAppUpgrade),
84       mWhatMatcherIndex(whatOptions.whatMatcherIndex),
85       mEventMatcherWizard(whatOptions.matcherWizard),
86       mPullerManager(pullOptions.pullerManager),
87       mFieldMatchers(whatOptions.fieldMatchers),
88       mPullAtomId(pullOptions.pullAtomId),
89       mMinBucketSizeNs(bucketOptions.minBucketSizeNs),
90       mDimensionSoftLimit(guardrailOptions.dimensionSoftLimit),
91       mDimensionHardLimit(guardrailOptions.dimensionHardLimit),
92       mCurrentBucketIsSkipped(false),
93       mConditionCorrectionThresholdNs(bucketOptions.conditionCorrectionThresholdNs) {
94     // TODO(b/185722221): inject directly via initializer list in MetricProducer.
95     mBucketSizeNs = bucketOptions.bucketSizeNs;
96 
97     // TODO(b/185770171): inject dimensionsInWhat related fields via constructor.
98     if (whatOptions.dimensionsInWhat.field() > 0) {
99         translateFieldMatcher(whatOptions.dimensionsInWhat, &mDimensionsInWhat);
100     }
101     mContainANYPositionInDimensionsInWhat = whatOptions.containsAnyPositionInDimensionsInWhat;
102     mShouldUseNestedDimensions = whatOptions.shouldUseNestedDimensions;
103 
104     if (conditionOptions.conditionLinks.size() > 0) {
105         for (const auto& link : conditionOptions.conditionLinks) {
106             Metric2Condition mc;
107             mc.conditionId = link.condition();
108             translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
109             translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
110             mMetric2ConditionLinks.push_back(mc);
111         }
112 
113         // TODO(b/185770739): use !mMetric2ConditionLinks.empty() instead
114         mConditionSliced = true;
115     }
116 
117     for (const auto& stateLink : stateOptions.stateLinks) {
118         Metric2State ms;
119         ms.stateAtomId = stateLink.state_atom_id();
120         translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields);
121         translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields);
122         mMetric2StateLinks.push_back(ms);
123     }
124 
125     const int64_t numBucketsForward = calcBucketsForwardCount(bucketOptions.startTimeNs);
126     mCurrentBucketNum = numBucketsForward;
127 
128     flushIfNeededLocked(bucketOptions.startTimeNs);
129 
130     if (isPulled()) {
131         mPullerManager->RegisterReceiver(mPullAtomId, mConfigKey, this, getCurrentBucketEndTimeNs(),
132                                          mBucketSizeNs);
133     }
134 
135     // Only do this for partial buckets like first bucket. All other buckets should use
136     // flushIfNeeded to adjust start and end to bucket boundaries.
137     // Adjust start for partial bucket
138     mCurrentBucketStartTimeNs = bucketOptions.startTimeNs;
139     mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs, mCurrentBucketStartTimeNs);
140 
141     // Now that activations are processed, start the condition timer if needed.
142     mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
143                                        mCurrentBucketStartTimeNs);
144 }
145 
146 template <typename AggregatedValue, typename DimExtras>
~ValueMetricProducer()147 ValueMetricProducer<AggregatedValue, DimExtras>::~ValueMetricProducer() {
148     VLOG("~ValueMetricProducer() called");
149     if (isPulled()) {
150         mPullerManager->UnRegisterReceiver(mPullAtomId, mConfigKey, this);
151     }
152 }
153 
154 template <typename AggregatedValue, typename DimExtras>
onStatsdInitCompleted(const int64_t & eventTimeNs)155 void ValueMetricProducer<AggregatedValue, DimExtras>::onStatsdInitCompleted(
156         const int64_t& eventTimeNs) {
157     lock_guard<mutex> lock(mMutex);
158 
159     if (isPulled() && mCondition == ConditionState::kTrue && mIsActive) {
160         pullAndMatchEventsLocked(eventTimeNs);
161     }
162     flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
163 }
164 
165 template <typename AggregatedValue, typename DimExtras>
notifyAppUpgradeInternalLocked(const int64_t eventTimeNs)166 void ValueMetricProducer<AggregatedValue, DimExtras>::notifyAppUpgradeInternalLocked(
167         const int64_t eventTimeNs) {
168     if (isPulled() && mCondition == ConditionState::kTrue && mIsActive) {
169         pullAndMatchEventsLocked(eventTimeNs);
170     }
171     flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
172 }
173 
174 template <typename AggregatedValue, typename DimExtras>
175 optional<InvalidConfigReason>
onConfigUpdatedLocked(const StatsdConfig & config,const int configIndex,const int metricIndex,const vector<sp<AtomMatchingTracker>> & allAtomMatchingTrackers,const unordered_map<int64_t,int> & oldAtomMatchingTrackerMap,const unordered_map<int64_t,int> & newAtomMatchingTrackerMap,const sp<EventMatcherWizard> & matcherWizard,const vector<sp<ConditionTracker>> & allConditionTrackers,const unordered_map<int64_t,int> & conditionTrackerMap,const sp<ConditionWizard> & wizard,const unordered_map<int64_t,int> & metricToActivationMap,unordered_map<int,vector<int>> & trackerToMetricMap,unordered_map<int,vector<int>> & conditionToMetricMap,unordered_map<int,vector<int>> & activationAtomTrackerToMetricMap,unordered_map<int,vector<int>> & deactivationAtomTrackerToMetricMap,vector<int> & metricsWithActivation)176 ValueMetricProducer<AggregatedValue, DimExtras>::onConfigUpdatedLocked(
177         const StatsdConfig& config, const int configIndex, const int metricIndex,
178         const vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers,
179         const unordered_map<int64_t, int>& oldAtomMatchingTrackerMap,
180         const unordered_map<int64_t, int>& newAtomMatchingTrackerMap,
181         const sp<EventMatcherWizard>& matcherWizard,
182         const vector<sp<ConditionTracker>>& allConditionTrackers,
183         const unordered_map<int64_t, int>& conditionTrackerMap, const sp<ConditionWizard>& wizard,
184         const unordered_map<int64_t, int>& metricToActivationMap,
185         unordered_map<int, vector<int>>& trackerToMetricMap,
186         unordered_map<int, vector<int>>& conditionToMetricMap,
187         unordered_map<int, vector<int>>& activationAtomTrackerToMetricMap,
188         unordered_map<int, vector<int>>& deactivationAtomTrackerToMetricMap,
189         vector<int>& metricsWithActivation) {
190     optional<InvalidConfigReason> invalidConfigReason = MetricProducer::onConfigUpdatedLocked(
191             config, configIndex, metricIndex, allAtomMatchingTrackers, oldAtomMatchingTrackerMap,
192             newAtomMatchingTrackerMap, matcherWizard, allConditionTrackers, conditionTrackerMap,
193             wizard, metricToActivationMap, trackerToMetricMap, conditionToMetricMap,
194             activationAtomTrackerToMetricMap, deactivationAtomTrackerToMetricMap,
195             metricsWithActivation);
196     if (invalidConfigReason.has_value()) {
197         return invalidConfigReason;
198     }
199     // Update appropriate indices: mWhatMatcherIndex, mConditionIndex and MetricsManager maps.
200     const int64_t atomMatcherId = getWhatAtomMatcherIdForMetric(config, configIndex);
201     invalidConfigReason = handleMetricWithAtomMatchingTrackers(
202             atomMatcherId, mMetricId, metricIndex, /*enforceOneAtom=*/false,
203             allAtomMatchingTrackers, newAtomMatchingTrackerMap, trackerToMetricMap,
204             mWhatMatcherIndex);
205     if (invalidConfigReason.has_value()) {
206         return invalidConfigReason;
207     }
208     const optional<int64_t>& conditionIdOpt = getConditionIdForMetric(config, configIndex);
209     const ConditionLinks& conditionLinks = getConditionLinksForMetric(config, configIndex);
210     if (conditionIdOpt.has_value()) {
211         invalidConfigReason = handleMetricWithConditions(
212                 conditionIdOpt.value(), mMetricId, metricIndex, conditionTrackerMap, conditionLinks,
213                 allConditionTrackers, mConditionTrackerIndex, conditionToMetricMap);
214         if (invalidConfigReason.has_value()) {
215             return invalidConfigReason;
216         }
217     }
218     sp<EventMatcherWizard> tmpEventWizard = mEventMatcherWizard;
219     mEventMatcherWizard = matcherWizard;
220     return nullopt;
221 }
222 
223 template <typename AggregatedValue, typename DimExtras>
onStateChanged(int64_t eventTimeNs,int32_t atomId,const HashableDimensionKey & primaryKey,const FieldValue & oldState,const FieldValue & newState)224 void ValueMetricProducer<AggregatedValue, DimExtras>::onStateChanged(
225         int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey,
226         const FieldValue& oldState, const FieldValue& newState) {
227     // TODO(b/189353769): Acquire lock.
228     VLOG("ValueMetricProducer %lld onStateChanged time %lld, State %d, key %s, %d -> %d",
229          (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(),
230          oldState.mValue.int_value, newState.mValue.int_value);
231 
232     FieldValue oldStateCopy = oldState;
233     FieldValue newStateCopy = newState;
234     mapStateValue(atomId, &oldStateCopy);
235     mapStateValue(atomId, &newStateCopy);
236 
237     // If old and new states are in the same StateGroup, then we do not need to
238     // pull for this state change.
239     if (oldStateCopy == newStateCopy) {
240         return;
241     }
242 
243     // If condition is not true or metric is not active, we do not need to pull
244     // for this state change.
245     if (mCondition != ConditionState::kTrue || !mIsActive) {
246         return;
247     }
248 
249     if (isEventLateLocked(eventTimeNs)) {
250         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
251              (long long)mCurrentBucketStartTimeNs);
252         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
253         return;
254     }
255 
256     if (isPulled()) {
257         mStateChangePrimaryKey.first = atomId;
258         mStateChangePrimaryKey.second = primaryKey;
259         // TODO(b/185796114): pass mStateChangePrimaryKey as an argument to
260         // pullAndMatchEventsLocked
261         pullAndMatchEventsLocked(eventTimeNs);
262         mStateChangePrimaryKey.first = 0;
263         mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY;
264     }
265     flushIfNeededLocked(eventTimeNs);
266 }
267 
268 template <typename AggregatedValue, typename DimExtras>
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTime)269 void ValueMetricProducer<AggregatedValue, DimExtras>::onSlicedConditionMayChangeLocked(
270         bool overallCondition, const int64_t eventTime) {
271     VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
272 }
273 
274 template <typename AggregatedValue, typename DimExtras>
dropDataLocked(const int64_t dropTimeNs)275 void ValueMetricProducer<AggregatedValue, DimExtras>::dropDataLocked(const int64_t dropTimeNs) {
276     StatsdStats::getInstance().noteBucketDropped(mMetricId);
277 
278     // The current partial bucket is not flushed and does not require a pull,
279     // so the data is still valid.
280     flushIfNeededLocked(dropTimeNs);
281     clearPastBucketsLocked(dropTimeNs);
282 }
283 
284 template <typename AggregatedValue, typename DimExtras>
clearPastBucketsLocked(const int64_t dumpTimeNs)285 void ValueMetricProducer<AggregatedValue, DimExtras>::clearPastBucketsLocked(
286         const int64_t dumpTimeNs) {
287     mPastBuckets.clear();
288     mSkippedBuckets.clear();
289 }
290 
291 template <typename AggregatedValue, typename DimExtras>
onDumpReportLocked(const int64_t dumpTimeNs,const bool includeCurrentPartialBucket,const bool eraseData,const DumpLatency dumpLatency,set<string> * strSet,ProtoOutputStream * protoOutput)292 void ValueMetricProducer<AggregatedValue, DimExtras>::onDumpReportLocked(
293         const int64_t dumpTimeNs, const bool includeCurrentPartialBucket, const bool eraseData,
294         const DumpLatency dumpLatency, set<string>* strSet, ProtoOutputStream* protoOutput) {
295     VLOG("metric %lld dump report now...", (long long)mMetricId);
296 
297     // Pulled metrics need to pull before flushing, which is why they do not call flushIfNeeded.
298     // TODO: b/249823426 see if we can pull and call flushIfneeded for pulled value metrics.
299     if (!isPulled()) {
300         flushIfNeededLocked(dumpTimeNs);
301     }
302     if (includeCurrentPartialBucket) {
303         // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
304         // current bucket will have incomplete data and the next will have the wrong snapshot to do
305         // a diff against. If the condition is false, we are fine since the base data is reset and
306         // we are not tracking anything.
307         if (isPulled() && mCondition == ConditionState::kTrue && mIsActive) {
308             switch (dumpLatency) {
309                 case FAST:
310                     invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED);
311                     break;
312                 case NO_TIME_CONSTRAINTS:
313                     pullAndMatchEventsLocked(dumpTimeNs);
314                     break;
315             }
316         }
317         flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
318     }
319 
320     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
321     protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
322 
323     if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
324         return;
325     }
326     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
327     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
328     // Fills the dimension path if not slicing by a primitive repeated field or position ALL.
329     if (!mShouldUseNestedDimensions) {
330         if (!mDimensionsInWhat.empty()) {
331             uint64_t dimenPathToken =
332                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
333             writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
334             protoOutput->end(dimenPathToken);
335         }
336     }
337 
338     const auto& [metricTypeFieldId, bucketNumFieldId, startBucketMsFieldId, endBucketMsFieldId,
339                  conditionTrueNsFieldId,
340                  conditionCorrectionNsFieldId] = getDumpProtoFields();
341 
342     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | metricTypeFieldId);
343 
344     for (const auto& skippedBucket : mSkippedBuckets) {
345         uint64_t wrapperToken =
346                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
347         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
348                            (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
349         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
350                            (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
351         for (const auto& dropEvent : skippedBucket.dropEvents) {
352             uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
353                                                          FIELD_ID_SKIPPED_DROP_EVENT);
354             protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
355             protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME,
356                                (long long)(NanoToMillis(dropEvent.dropTimeNs)));
357             protoOutput->end(dropEventToken);
358         }
359         protoOutput->end(wrapperToken);
360     }
361 
362     for (const auto& [metricDimensionKey, buckets] : mPastBuckets) {
363         VLOG("  dimension key %s", metricDimensionKey.toString().c_str());
364         uint64_t wrapperToken =
365                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
366 
367         // First fill dimension.
368         if (mShouldUseNestedDimensions) {
369             uint64_t dimensionToken =
370                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
371             writeDimensionToProto(metricDimensionKey.getDimensionKeyInWhat(), strSet, protoOutput);
372             protoOutput->end(dimensionToken);
373         } else {
374             writeDimensionLeafNodesToProto(metricDimensionKey.getDimensionKeyInWhat(),
375                                            FIELD_ID_DIMENSION_LEAF_IN_WHAT, strSet, protoOutput);
376         }
377 
378         // Then fill slice_by_state.
379         for (auto state : metricDimensionKey.getStateValuesKey().getValues()) {
380             uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
381                                                      FIELD_ID_SLICE_BY_STATE);
382             writeStateToProto(state, protoOutput);
383             protoOutput->end(stateToken);
384         }
385 
386         // Then fill bucket_info (*BucketInfo).
387         for (const auto& bucket : buckets) {
388             uint64_t bucketInfoToken = protoOutput->start(
389                     FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
390 
391             if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
392                 protoOutput->write(FIELD_TYPE_INT64 | startBucketMsFieldId,
393                                    (long long)NanoToMillis(bucket.mBucketStartNs));
394                 protoOutput->write(FIELD_TYPE_INT64 | endBucketMsFieldId,
395                                    (long long)NanoToMillis(bucket.mBucketEndNs));
396             } else {
397                 protoOutput->write(FIELD_TYPE_INT64 | bucketNumFieldId,
398                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
399             }
400             // We only write the condition timer value if the metric has a
401             // condition and/or is sliced by state.
402             // If the metric is sliced by state, the condition timer value is
403             // also sliced by state to reflect time spent in that state.
404             if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) {
405                 protoOutput->write(FIELD_TYPE_INT64 | conditionTrueNsFieldId,
406                                    (long long)bucket.mConditionTrueNs);
407             }
408 
409             if (conditionCorrectionNsFieldId) {
410                 // We write the condition correction value when below conditions are true:
411                 // - if metric is pulled
412                 // - if it is enabled by metric configuration via dedicated field,
413                 //   see condition_correction_threshold_nanos
414                 // - if the abs(value) >= condition_correction_threshold_nanos
415 
416                 if (isPulled() && mConditionCorrectionThresholdNs &&
417                     (abs(bucket.mConditionCorrectionNs) >= mConditionCorrectionThresholdNs)) {
418                     protoOutput->write(FIELD_TYPE_INT64 | conditionCorrectionNsFieldId.value(),
419                                        (long long)bucket.mConditionCorrectionNs);
420                 }
421             }
422 
423             for (int i = 0; i < (int)bucket.aggIndex.size(); i++) {
424                 VLOG("\t bucket [%lld - %lld]", (long long)bucket.mBucketStartNs,
425                      (long long)bucket.mBucketEndNs);
426                 int sampleSize = !bucket.sampleSizes.empty() ? bucket.sampleSizes[i] : 0;
427                 writePastBucketAggregateToProto(bucket.aggIndex[i], bucket.aggregates[i],
428                                                 sampleSize, protoOutput);
429             }
430             protoOutput->end(bucketInfoToken);
431         }
432         protoOutput->end(wrapperToken);
433     }
434     protoOutput->end(protoToken);
435 
436     VLOG("metric %lld done with dump report...", (long long)mMetricId);
437     if (eraseData) {
438         mPastBuckets.clear();
439         mSkippedBuckets.clear();
440     }
441 }
442 
443 template <typename AggregatedValue, typename DimExtras>
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)444 void ValueMetricProducer<AggregatedValue, DimExtras>::invalidateCurrentBucket(
445         const int64_t dropTimeNs, const BucketDropReason reason) {
446     if (!mCurrentBucketIsSkipped) {
447         // Only report to StatsdStats once per invalid bucket.
448         StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
449     }
450 
451     skipCurrentBucket(dropTimeNs, reason);
452 }
453 
454 template <typename AggregatedValue, typename DimExtras>
skipCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)455 void ValueMetricProducer<AggregatedValue, DimExtras>::skipCurrentBucket(
456         const int64_t dropTimeNs, const BucketDropReason reason) {
457     if (!mIsActive) {
458         // Don't keep track of skipped buckets if metric is not active.
459         return;
460     }
461 
462     if (!maxDropEventsReached()) {
463         mCurrentSkippedBucket.dropEvents.push_back(buildDropEvent(dropTimeNs, reason));
464     }
465     mCurrentBucketIsSkipped = true;
466 }
467 
468 // Handle active state change. Active state change is *mostly* treated like a condition change:
469 // - drop bucket if active state change event arrives too late
470 // - if condition is true, pull data on active state changes
471 // - ConditionTimer tracks changes based on AND of condition and active state.
472 template <typename AggregatedValue, typename DimExtras>
onActiveStateChangedLocked(const int64_t eventTimeNs,const bool isActive)473 void ValueMetricProducer<AggregatedValue, DimExtras>::onActiveStateChangedLocked(
474         const int64_t eventTimeNs, const bool isActive) {
475     const bool eventLate = isEventLateLocked(eventTimeNs);
476     if (eventLate) {
477         // Drop bucket because event arrived too late, ie. we are missing data for this bucket.
478         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
479         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
480     }
481 
482     if (ConditionState::kTrue != mCondition) {
483         // Call parent method before early return.
484         MetricProducer::onActiveStateChangedLocked(eventTimeNs, isActive);
485         return;
486     }
487 
488     // Pull on active state changes.
489     if (!eventLate) {
490         if (isPulled()) {
491             pullAndMatchEventsLocked(eventTimeNs);
492         }
493 
494         onActiveStateChangedInternalLocked(eventTimeNs, isActive);
495     }
496 
497     // Once any pulls are processed, call through to parent method which might flush the current
498     // bucket.
499     MetricProducer::onActiveStateChangedLocked(eventTimeNs, isActive);
500 
501     // Let condition timer know of new active state.
502     mConditionTimer.onConditionChanged(isActive, eventTimeNs);
503 
504     updateCurrentSlicedBucketConditionTimers(isActive, eventTimeNs);
505 }
506 
507 template <typename AggregatedValue, typename DimExtras>
onConditionChangedLocked(const bool condition,const int64_t eventTimeNs)508 void ValueMetricProducer<AggregatedValue, DimExtras>::onConditionChangedLocked(
509         const bool condition, const int64_t eventTimeNs) {
510     const bool eventLate = isEventLateLocked(eventTimeNs);
511 
512     const ConditionState newCondition = eventLate   ? ConditionState::kUnknown
513                                         : condition ? ConditionState::kTrue
514                                                     : ConditionState::kFalse;
515     const ConditionState oldCondition = mCondition;
516 
517     if (!mIsActive) {
518         mCondition = newCondition;
519         return;
520     }
521 
522     // If the event arrived late, mark the bucket as invalid and skip the event.
523     if (eventLate) {
524         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
525              (long long)mCurrentBucketStartTimeNs);
526         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
527         StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
528         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
529         mCondition = newCondition;
530         mConditionTimer.onConditionChanged(newCondition, eventTimeNs);
531         updateCurrentSlicedBucketConditionTimers(newCondition, eventTimeNs);
532         return;
533     }
534 
535     // If the previous condition was unknown, mark the bucket as invalid
536     // because the bucket will contain partial data. For example, the condition
537     // change might happen close to the end of the bucket and we might miss a
538     // lot of data.
539     // We still want to pull to set the base for diffed metrics.
540     if (oldCondition == ConditionState::kUnknown) {
541         invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
542     }
543 
544     // Pull and match for the following condition change cases:
545     // unknown/false -> true - condition changed
546     // true -> false - condition changed
547     // true -> true - old condition was true so we can flush the bucket at the
548     // end if needed.
549     //
550     // We don’t need to pull for unknown -> false or false -> false.
551     //
552     // onConditionChangedLocked might happen on bucket boundaries if this is
553     // called before #onDataPulled.
554     if (isPulled() &&
555         (newCondition == ConditionState::kTrue || oldCondition == ConditionState::kTrue)) {
556         pullAndMatchEventsLocked(eventTimeNs);
557     }
558 
559     onConditionChangedInternalLocked(oldCondition, newCondition, eventTimeNs);
560 
561     // Update condition state after pulling.
562     mCondition = newCondition;
563 
564     flushIfNeededLocked(eventTimeNs);
565 
566     mConditionTimer.onConditionChanged(newCondition, eventTimeNs);
567     updateCurrentSlicedBucketConditionTimers(newCondition, eventTimeNs);
568 }
569 
570 template <typename AggregatedValue, typename DimExtras>
updateCurrentSlicedBucketConditionTimers(bool newCondition,int64_t eventTimeNs)571 void ValueMetricProducer<AggregatedValue, DimExtras>::updateCurrentSlicedBucketConditionTimers(
572         bool newCondition, int64_t eventTimeNs) {
573     if (mSlicedStateAtoms.empty()) {
574         return;
575     }
576 
577     // Utilize the current state key of each DimensionsInWhat key to determine
578     // which condition timers to update.
579     //
580     // Assumes that the MetricDimensionKey exists in `mCurrentSlicedBucket`.
581     for (const auto& [dimensionInWhatKey, dimensionInWhatInfo] : mDimInfos) {
582         // If the new condition is true, turn ON the condition timer only if
583         // the DimensionInWhat key was present in the data.
584         mCurrentSlicedBucket[MetricDimensionKey(dimensionInWhatKey,
585                                                 dimensionInWhatInfo.currentState)]
586                 .conditionTimer.onConditionChanged(
587                         newCondition && dimensionInWhatInfo.hasCurrentState, eventTimeNs);
588     }
589 }
590 
591 template <typename AggregatedValue, typename DimExtras>
dumpStatesLocked(FILE * out,bool verbose) const592 void ValueMetricProducer<AggregatedValue, DimExtras>::dumpStatesLocked(FILE* out,
593                                                                        bool verbose) const {
594     if (mCurrentSlicedBucket.size() == 0) {
595         return;
596     }
597 
598     fprintf(out, "ValueMetricProducer %lld dimension size %lu\n", (long long)mMetricId,
599             (unsigned long)mCurrentSlicedBucket.size());
600     if (verbose) {
601         for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
602             for (const Interval& interval : currentBucket.intervals) {
603                 fprintf(out, "\t(what)%s\t(states)%s  (aggregate)%s\n",
604                         metricDimensionKey.getDimensionKeyInWhat().toString().c_str(),
605                         metricDimensionKey.getStateValuesKey().toString().c_str(),
606                         aggregatedValueToString(interval.aggregate).c_str());
607             }
608         }
609     }
610 }
611 
612 template <typename AggregatedValue, typename DimExtras>
hasReachedGuardRailLimit() const613 bool ValueMetricProducer<AggregatedValue, DimExtras>::hasReachedGuardRailLimit() const {
614     return mCurrentSlicedBucket.size() >= mDimensionHardLimit;
615 }
616 
617 template <typename AggregatedValue, typename DimExtras>
hitGuardRailLocked(const MetricDimensionKey & newKey)618 bool ValueMetricProducer<AggregatedValue, DimExtras>::hitGuardRailLocked(
619         const MetricDimensionKey& newKey) {
620     // ===========GuardRail==============
621     // 1. Report the tuple count if the tuple count > soft limit
622     if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
623         return false;
624     }
625     if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) {
626         size_t newTupleCount = mCurrentSlicedBucket.size() + 1;
627         StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
628         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
629         if (hasReachedGuardRailLimit()) {
630             if (!mHasHitGuardrail) {
631                 ALOGE("ValueMetricProducer %lld dropping data for dimension key %s",
632                       (long long)mMetricId, newKey.toString().c_str());
633                 mHasHitGuardrail = true;
634             }
635             StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
636             return true;
637         }
638     }
639 
640     return false;
641 }
642 
643 template <typename AggregatedValue, typename DimExtras>
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event,const map<int,HashableDimensionKey> & statePrimaryKeys)644 void ValueMetricProducer<AggregatedValue, DimExtras>::onMatchedLogEventInternalLocked(
645         const size_t matcherIndex, const MetricDimensionKey& eventKey,
646         const ConditionKey& conditionKey, bool condition, const LogEvent& event,
647         const map<int, HashableDimensionKey>& statePrimaryKeys) {
648     // Skip this event if a state change occurred for a different primary key.
649     auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first);
650     // Check that both the atom id and the primary key are equal.
651     if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) {
652         VLOG("ValueMetric skip event with primary key %s because state change primary key "
653              "is %s",
654              it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str());
655         return;
656     }
657 
658     const int64_t eventTimeNs = event.GetElapsedTimestampNs();
659     if (isEventLateLocked(eventTimeNs)) {
660         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
661              (long long)mCurrentBucketStartTimeNs);
662         return;
663     }
664 
665     const auto whatKey = eventKey.getDimensionKeyInWhat();
666     mMatchedMetricDimensionKeys.insert(whatKey);
667 
668     if (!isPulled()) {
669         // Only flushing for pushed because for pulled metrics, we need to do a pull first.
670         flushIfNeededLocked(eventTimeNs);
671     }
672 
673     if (canSkipLogEventLocked(eventKey, condition, eventTimeNs, statePrimaryKeys)) {
674         return;
675     }
676 
677     if (hitGuardRailLocked(eventKey)) {
678         return;
679     }
680 
681     const auto& returnVal = mDimInfos.emplace(whatKey, DimensionsInWhatInfo(getUnknownStateKey()));
682     DimensionsInWhatInfo& dimensionsInWhatInfo = returnVal.first->second;
683     const HashableDimensionKey& oldStateKey = dimensionsInWhatInfo.currentState;
684     CurrentBucket& currentBucket = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)];
685 
686     // Ensure we turn on the condition timer in the case where dimensions
687     // were missing on a previous pull due to a state change.
688     const auto stateKey = eventKey.getStateValuesKey();
689     const bool stateChange = oldStateKey != stateKey || !dimensionsInWhatInfo.hasCurrentState;
690 
691     // We need to get the intervals stored with the previous state key so we can
692     // close these value intervals.
693     vector<Interval>& intervals = currentBucket.intervals;
694     if (intervals.size() < mFieldMatchers.size()) {
695         VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
696         intervals.resize(mFieldMatchers.size());
697     }
698 
699     dimensionsInWhatInfo.hasCurrentState = true;
700     dimensionsInWhatInfo.currentState = stateKey;
701 
702     dimensionsInWhatInfo.seenNewData |= aggregateFields(eventTimeNs, eventKey, event, intervals,
703                                                         dimensionsInWhatInfo.dimExtras);
704 
705     // State change.
706     if (!mSlicedStateAtoms.empty() && stateChange) {
707         // Turn OFF the condition timer for the previous state key.
708         currentBucket.conditionTimer.onConditionChanged(false, eventTimeNs);
709 
710         // Turn ON the condition timer for the new state key.
711         mCurrentSlicedBucket[MetricDimensionKey(whatKey, stateKey)]
712                 .conditionTimer.onConditionChanged(true, eventTimeNs);
713     }
714 }
715 
716 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket
717 // if mCondition and mIsActive are true!
718 template <typename AggregatedValue, typename DimExtras>
flushIfNeededLocked(const int64_t & eventTimeNs)719 void ValueMetricProducer<AggregatedValue, DimExtras>::flushIfNeededLocked(
720         const int64_t& eventTimeNs) {
721     const int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
722     if (eventTimeNs < currentBucketEndTimeNs) {
723         VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs,
724              (long long)(currentBucketEndTimeNs));
725         return;
726     }
727     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
728     int64_t nextBucketStartTimeNs =
729             currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
730     flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
731 }
732 
733 template <typename AggregatedValue, typename DimExtras>
calcBucketsForwardCount(const int64_t eventTimeNs) const734 int64_t ValueMetricProducer<AggregatedValue, DimExtras>::calcBucketsForwardCount(
735         const int64_t eventTimeNs) const {
736     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
737     if (eventTimeNs < currentBucketEndTimeNs) {
738         return 0;
739     }
740     return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
741 }
742 
743 template <typename AggregatedValue, typename DimExtras>
flushCurrentBucketLocked(const int64_t & eventTimeNs,const int64_t & nextBucketStartTimeNs)744 void ValueMetricProducer<AggregatedValue, DimExtras>::flushCurrentBucketLocked(
745         const int64_t& eventTimeNs, const int64_t& nextBucketStartTimeNs) {
746     if (mCondition == ConditionState::kUnknown) {
747         StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
748         invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
749     }
750 
751     VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
752          (int)mCurrentSlicedBucket.size());
753 
754     closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
755     initNextSlicedBucket(nextBucketStartTimeNs);
756 
757     // Update the condition timer again, in case we skipped buckets.
758     mConditionTimer.newBucketStart(eventTimeNs, nextBucketStartTimeNs);
759 
760     // NOTE: Update the condition timers in `mCurrentSlicedBucket` only when slicing
761     // by state. Otherwise, the "global" condition timer will be used.
762     if (!mSlicedStateAtoms.empty()) {
763         for (auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
764             currentBucket.conditionTimer.newBucketStart(eventTimeNs, nextBucketStartTimeNs);
765         }
766     }
767     mCurrentBucketNum += calcBucketsForwardCount(eventTimeNs);
768 }
769 
770 template <typename AggregatedValue, typename DimExtras>
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)771 void ValueMetricProducer<AggregatedValue, DimExtras>::closeCurrentBucket(
772         const int64_t eventTimeNs, const int64_t nextBucketStartTimeNs) {
773     const int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
774     int64_t bucketEndTimeNs = fullBucketEndTimeNs;
775     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
776 
777     if (multipleBucketsSkipped(numBucketsForward)) {
778         VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
779         StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
780         // Something went wrong. Maybe the device was sleeping for a long time. It is better
781         // to mark the current bucket as invalid. The last pull might have been successful though.
782         invalidateCurrentBucket(eventTimeNs, BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
783 
784         // End the bucket at the next bucket start time so the entire interval is skipped.
785         bucketEndTimeNs = nextBucketStartTimeNs;
786     } else if (eventTimeNs < fullBucketEndTimeNs) {
787         bucketEndTimeNs = eventTimeNs;
788     }
789 
790     // Close the current bucket
791     const auto [globalConditionDurationNs, globalConditionCorrectionNs] =
792             mConditionTimer.newBucketStart(eventTimeNs, bucketEndTimeNs);
793 
794     bool isBucketLargeEnough = bucketEndTimeNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
795     if (!isBucketLargeEnough) {
796         skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
797     }
798     if (!mCurrentBucketIsSkipped) {
799         bool bucketHasData = false;
800         // The current bucket is large enough to keep.
801         for (auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
802             PastBucket<AggregatedValue> bucket =
803                     buildPartialBucket(bucketEndTimeNs, currentBucket.intervals);
804             if (bucket.aggIndex.empty()) {
805                 continue;
806             }
807             bucketHasData = true;
808             if (!mSlicedStateAtoms.empty()) {
809                 const auto [conditionDurationNs, conditionCorrectionNs] =
810                         currentBucket.conditionTimer.newBucketStart(eventTimeNs, bucketEndTimeNs);
811                 bucket.mConditionTrueNs = conditionDurationNs;
812                 bucket.mConditionCorrectionNs = conditionCorrectionNs;
813             } else {
814                 bucket.mConditionTrueNs = globalConditionDurationNs;
815                 bucket.mConditionCorrectionNs = globalConditionCorrectionNs;
816             }
817 
818             auto& bucketList = mPastBuckets[metricDimensionKey];
819             bucketList.push_back(std::move(bucket));
820         }
821         if (!bucketHasData) {
822             skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
823         }
824     }
825 
826     if (mCurrentBucketIsSkipped) {
827         mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
828         mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTimeNs;
829         mSkippedBuckets.push_back(mCurrentSkippedBucket);
830     }
831 
832     // This means that the current bucket was not flushed before a forced bucket split.
833     // This can happen if an app update or a dump report with includeCurrentPartialBucket is
834     // requested before we get a chance to flush the bucket due to receiving new data, either from
835     // the statsd socket or the StatsPullerManager.
836     if (bucketEndTimeNs < nextBucketStartTimeNs) {
837         SkippedBucket bucketInGap;
838         bucketInGap.bucketStartTimeNs = bucketEndTimeNs;
839         bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
840         bucketInGap.dropEvents.emplace_back(buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA));
841         mSkippedBuckets.emplace_back(bucketInGap);
842     }
843 }
844 
845 template <typename AggregatedValue, typename DimExtras>
initNextSlicedBucket(int64_t nextBucketStartTimeNs)846 void ValueMetricProducer<AggregatedValue, DimExtras>::initNextSlicedBucket(
847         int64_t nextBucketStartTimeNs) {
848     StatsdStats::getInstance().noteBucketCount(mMetricId);
849     if (mSlicedStateAtoms.empty()) {
850         mCurrentSlicedBucket.clear();
851     } else {
852         for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
853             bool obsolete = true;
854             for (auto& interval : it->second.intervals) {
855                 interval.sampleSize = 0;
856             }
857 
858             // When slicing by state, only delete the MetricDimensionKey when the
859             // state key in the MetricDimensionKey is not the current state key.
860             const HashableDimensionKey& dimensionInWhatKey = it->first.getDimensionKeyInWhat();
861             const auto& currentDimInfoItr = mDimInfos.find(dimensionInWhatKey);
862 
863             if ((currentDimInfoItr != mDimInfos.end()) &&
864                 (it->first.getStateValuesKey() == currentDimInfoItr->second.currentState)) {
865                 obsolete = false;
866             }
867             if (obsolete) {
868                 it = mCurrentSlicedBucket.erase(it);
869             } else {
870                 it++;
871             }
872         }
873     }
874     for (auto it = mDimInfos.begin(); it != mDimInfos.end();) {
875         if (!it->second.seenNewData) {
876             it = mDimInfos.erase(it);
877         } else {
878             it->second.seenNewData = false;
879             it++;
880         }
881     }
882 
883     mCurrentBucketIsSkipped = false;
884     mCurrentSkippedBucket.reset();
885 
886     mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
887     // Reset mHasHitGuardrail boolean since bucket was reset
888     mHasHitGuardrail = false;
889     VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
890          (long long)mCurrentBucketStartTimeNs);
891 }
892 
893 // Explicit template instantiations
894 template class ValueMetricProducer<Value, vector<optional<Value>>>;
895 template class ValueMetricProducer<unique_ptr<KllQuantile>, Empty>;
896 
897 }  // namespace statsd
898 }  // namespace os
899 }  // namespace android
900