• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define STATSD_DEBUG false  // STOPSHIP if true
18 #include "Log.h"
19 
20 #include "ValueMetricProducer.h"
21 
22 #include <kll.h>
23 #include <limits.h>
24 #include <stdlib.h>
25 
26 #include "FieldValue.h"
27 #include "HashableDimensionKey.h"
28 #include "guardrail/StatsdStats.h"
29 #include "metrics/parsing_utils/metrics_manager_util.h"
30 #include "stats_log_util.h"
31 #include "stats_util.h"
32 
33 using android::util::FIELD_COUNT_REPEATED;
34 using android::util::FIELD_TYPE_BOOL;
35 using android::util::FIELD_TYPE_INT32;
36 using android::util::FIELD_TYPE_INT64;
37 using android::util::FIELD_TYPE_MESSAGE;
38 using android::util::ProtoOutputStream;
39 using dist_proc::aggregation::KllQuantile;
40 using std::optional;
41 using std::shared_ptr;
42 using std::unique_ptr;
43 using std::unordered_map;
44 using std::vector;
45 
46 namespace android {
47 namespace os {
48 namespace statsd {
49 
50 // for StatsLogReport
51 const int FIELD_ID_ID = 1;
52 const int FIELD_ID_TIME_BASE = 9;
53 const int FIELD_ID_BUCKET_SIZE = 10;
54 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
55 const int FIELD_ID_IS_ACTIVE = 14;
56 // for *MetricDataWrapper
57 const int FIELD_ID_DATA = 1;
58 const int FIELD_ID_SKIPPED = 2;
59 // for SkippedBuckets
60 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
61 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
62 const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
63 // for DumpEvent Proto
64 const int FIELD_ID_BUCKET_DROP_REASON = 1;
65 const int FIELD_ID_DROP_TIME = 2;
66 // for *MetricData
67 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
68 const int FIELD_ID_BUCKET_INFO = 3;
69 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
70 const int FIELD_ID_SLICE_BY_STATE = 6;
71 
72 template <typename AggregatedValue, typename DimExtras>
ValueMetricProducer(const int64_t & metricId,const ConfigKey & key,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions)73 ValueMetricProducer<AggregatedValue, DimExtras>::ValueMetricProducer(
74         const int64_t& metricId, const ConfigKey& key, const uint64_t protoHash,
75         const PullOptions& pullOptions, const BucketOptions& bucketOptions,
76         const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
77         const StateOptions& stateOptions, const ActivationOptions& activationOptions,
78         const GuardrailOptions& guardrailOptions)
79     : MetricProducer(metricId, key, bucketOptions.timeBaseNs, conditionOptions.conditionIndex,
80                      conditionOptions.initialConditionCache, conditionOptions.conditionWizard,
81                      protoHash, activationOptions.eventActivationMap,
82                      activationOptions.eventDeactivationMap, stateOptions.slicedStateAtoms,
83                      stateOptions.stateGroupMap, bucketOptions.splitBucketForAppUpgrade),
84       mWhatMatcherIndex(whatOptions.whatMatcherIndex),
85       mEventMatcherWizard(whatOptions.matcherWizard),
86       mPullerManager(pullOptions.pullerManager),
87       mFieldMatchers(whatOptions.fieldMatchers),
88       mPullAtomId(pullOptions.pullAtomId),
89       mMinBucketSizeNs(bucketOptions.minBucketSizeNs),
90       mDimensionSoftLimit(guardrailOptions.dimensionSoftLimit),
91       mDimensionHardLimit(guardrailOptions.dimensionHardLimit),
92       mCurrentBucketIsSkipped(false),
93       // Condition timer will be set later within the constructor after pulling events
94       mConditionTimer(false, bucketOptions.timeBaseNs),
95       mConditionCorrectionThresholdNs(bucketOptions.conditionCorrectionThresholdNs) {
96     // TODO(b/185722221): inject directly via initializer list in MetricProducer.
97     mBucketSizeNs = bucketOptions.bucketSizeNs;
98 
99     // TODO(b/185770171): inject dimensionsInWhat related fields via constructor.
100     if (whatOptions.dimensionsInWhat.field() > 0) {
101         translateFieldMatcher(whatOptions.dimensionsInWhat, &mDimensionsInWhat);
102     }
103     mContainANYPositionInDimensionsInWhat = whatOptions.containsAnyPositionInDimensionsInWhat;
104     mShouldUseNestedDimensions = whatOptions.shouldUseNestedDimensions;
105 
106     if (conditionOptions.conditionLinks.size() > 0) {
107         for (const auto& link : conditionOptions.conditionLinks) {
108             Metric2Condition mc;
109             mc.conditionId = link.condition();
110             translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
111             translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
112             mMetric2ConditionLinks.push_back(mc);
113         }
114 
115         // TODO(b/185770739): use !mMetric2ConditionLinks.empty() instead
116         mConditionSliced = true;
117     }
118 
119     for (const auto& stateLink : stateOptions.stateLinks) {
120         Metric2State ms;
121         ms.stateAtomId = stateLink.state_atom_id();
122         translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields);
123         translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields);
124         mMetric2StateLinks.push_back(ms);
125     }
126 
127     const int64_t numBucketsForward = calcBucketsForwardCount(bucketOptions.startTimeNs);
128     mCurrentBucketNum = numBucketsForward;
129 
130     flushIfNeededLocked(bucketOptions.startTimeNs);
131 
132     if (isPulled()) {
133         mPullerManager->RegisterReceiver(mPullAtomId, mConfigKey, this, getCurrentBucketEndTimeNs(),
134                                          mBucketSizeNs);
135     }
136 
137     // Only do this for partial buckets like first bucket. All other buckets should use
138     // flushIfNeeded to adjust start and end to bucket boundaries.
139     // Adjust start for partial bucket
140     mCurrentBucketStartTimeNs = bucketOptions.startTimeNs;
141     mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs, mCurrentBucketStartTimeNs);
142 
143     // Now that activations are processed, start the condition timer if needed.
144     mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
145                                        mCurrentBucketStartTimeNs);
146 }
147 
148 template <typename AggregatedValue, typename DimExtras>
~ValueMetricProducer()149 ValueMetricProducer<AggregatedValue, DimExtras>::~ValueMetricProducer() {
150     VLOG("~ValueMetricProducer() called");
151     if (isPulled()) {
152         mPullerManager->UnRegisterReceiver(mPullAtomId, mConfigKey, this);
153     }
154 }
155 
156 template <typename AggregatedValue, typename DimExtras>
onStatsdInitCompleted(const int64_t & eventTimeNs)157 void ValueMetricProducer<AggregatedValue, DimExtras>::onStatsdInitCompleted(
158         const int64_t& eventTimeNs) {
159     lock_guard<mutex> lock(mMutex);
160 
161     // TODO(b/188837487): Add mIsActive check
162 
163     if (isPulled() && mCondition == ConditionState::kTrue) {
164         pullAndMatchEventsLocked(eventTimeNs);
165     }
166     flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
167 }
168 
169 template <typename AggregatedValue, typename DimExtras>
notifyAppUpgradeInternalLocked(const int64_t eventTimeNs)170 void ValueMetricProducer<AggregatedValue, DimExtras>::notifyAppUpgradeInternalLocked(
171         const int64_t eventTimeNs) {
172     // TODO(b/188837487): Add mIsActive check
173     if (isPulled() && mCondition == ConditionState::kTrue) {
174         pullAndMatchEventsLocked(eventTimeNs);
175     }
176     flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
177 }
178 
179 template <typename AggregatedValue, typename DimExtras>
onConfigUpdatedLocked(const StatsdConfig & config,const int configIndex,const int metricIndex,const vector<sp<AtomMatchingTracker>> & allAtomMatchingTrackers,const unordered_map<int64_t,int> & oldAtomMatchingTrackerMap,const unordered_map<int64_t,int> & newAtomMatchingTrackerMap,const sp<EventMatcherWizard> & matcherWizard,const vector<sp<ConditionTracker>> & allConditionTrackers,const unordered_map<int64_t,int> & conditionTrackerMap,const sp<ConditionWizard> & wizard,const unordered_map<int64_t,int> & metricToActivationMap,unordered_map<int,vector<int>> & trackerToMetricMap,unordered_map<int,vector<int>> & conditionToMetricMap,unordered_map<int,vector<int>> & activationAtomTrackerToMetricMap,unordered_map<int,vector<int>> & deactivationAtomTrackerToMetricMap,vector<int> & metricsWithActivation)180 bool ValueMetricProducer<AggregatedValue, DimExtras>::onConfigUpdatedLocked(
181         const StatsdConfig& config, const int configIndex, const int metricIndex,
182         const vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers,
183         const unordered_map<int64_t, int>& oldAtomMatchingTrackerMap,
184         const unordered_map<int64_t, int>& newAtomMatchingTrackerMap,
185         const sp<EventMatcherWizard>& matcherWizard,
186         const vector<sp<ConditionTracker>>& allConditionTrackers,
187         const unordered_map<int64_t, int>& conditionTrackerMap, const sp<ConditionWizard>& wizard,
188         const unordered_map<int64_t, int>& metricToActivationMap,
189         unordered_map<int, vector<int>>& trackerToMetricMap,
190         unordered_map<int, vector<int>>& conditionToMetricMap,
191         unordered_map<int, vector<int>>& activationAtomTrackerToMetricMap,
192         unordered_map<int, vector<int>>& deactivationAtomTrackerToMetricMap,
193         vector<int>& metricsWithActivation) {
194     if (!MetricProducer::onConfigUpdatedLocked(
195                 config, configIndex, metricIndex, allAtomMatchingTrackers,
196                 oldAtomMatchingTrackerMap, newAtomMatchingTrackerMap, matcherWizard,
197                 allConditionTrackers, conditionTrackerMap, wizard, metricToActivationMap,
198                 trackerToMetricMap, conditionToMetricMap, activationAtomTrackerToMetricMap,
199                 deactivationAtomTrackerToMetricMap, metricsWithActivation)) {
200         return false;
201     }
202 
203     // Update appropriate indices: mWhatMatcherIndex, mConditionIndex and MetricsManager maps.
204     const int64_t atomMatcherId = getWhatAtomMatcherIdForMetric(config, configIndex);
205     if (!handleMetricWithAtomMatchingTrackers(atomMatcherId, metricIndex, /*enforceOneAtom=*/false,
206                                               allAtomMatchingTrackers, newAtomMatchingTrackerMap,
207                                               trackerToMetricMap, mWhatMatcherIndex)) {
208         return false;
209     }
210 
211     const optional<int64_t>& conditionIdOpt = getConditionIdForMetric(config, configIndex);
212     const ConditionLinks& conditionLinks = getConditionLinksForMetric(config, configIndex);
213     if (conditionIdOpt.has_value() &&
214         !handleMetricWithConditions(conditionIdOpt.value(), metricIndex, conditionTrackerMap,
215                                     conditionLinks, allConditionTrackers, mConditionTrackerIndex,
216                                     conditionToMetricMap)) {
217         return false;
218     }
219 
220     sp<EventMatcherWizard> tmpEventWizard = mEventMatcherWizard;
221     mEventMatcherWizard = matcherWizard;
222     return true;
223 }
224 
225 template <typename AggregatedValue, typename DimExtras>
onStateChanged(int64_t eventTimeNs,int32_t atomId,const HashableDimensionKey & primaryKey,const FieldValue & oldState,const FieldValue & newState)226 void ValueMetricProducer<AggregatedValue, DimExtras>::onStateChanged(
227         int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey,
228         const FieldValue& oldState, const FieldValue& newState) {
229     // TODO(b/189353769): Acquire lock.
230     VLOG("ValueMetricProducer %lld onStateChanged time %lld, State %d, key %s, %d -> %d",
231          (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(),
232          oldState.mValue.int_value, newState.mValue.int_value);
233 
234     FieldValue oldStateCopy = oldState;
235     FieldValue newStateCopy = newState;
236     mapStateValue(atomId, &oldStateCopy);
237     mapStateValue(atomId, &newStateCopy);
238 
239     // If old and new states are in the same StateGroup, then we do not need to
240     // pull for this state change.
241     if (oldStateCopy == newStateCopy) {
242         return;
243     }
244 
245     // If condition is not true or metric is not active, we do not need to pull
246     // for this state change.
247     if (mCondition != ConditionState::kTrue || !mIsActive) {
248         return;
249     }
250 
251     if (isEventLateLocked(eventTimeNs)) {
252         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
253              (long long)mCurrentBucketStartTimeNs);
254         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
255         return;
256     }
257 
258     if (isPulled()) {
259         mStateChangePrimaryKey.first = atomId;
260         mStateChangePrimaryKey.second = primaryKey;
261         // TODO(b/185796114): pass mStateChangePrimaryKey as an argument to
262         // pullAndMatchEventsLocked
263         pullAndMatchEventsLocked(eventTimeNs);
264         mStateChangePrimaryKey.first = 0;
265         mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY;
266     }
267     flushIfNeededLocked(eventTimeNs);
268 }
269 
270 template <typename AggregatedValue, typename DimExtras>
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTime)271 void ValueMetricProducer<AggregatedValue, DimExtras>::onSlicedConditionMayChangeLocked(
272         bool overallCondition, const int64_t eventTime) {
273     VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
274 }
275 
276 template <typename AggregatedValue, typename DimExtras>
dropDataLocked(const int64_t dropTimeNs)277 void ValueMetricProducer<AggregatedValue, DimExtras>::dropDataLocked(const int64_t dropTimeNs) {
278     StatsdStats::getInstance().noteBucketDropped(mMetricId);
279 
280     // The current partial bucket is not flushed and does not require a pull,
281     // so the data is still valid.
282     flushIfNeededLocked(dropTimeNs);
283     clearPastBucketsLocked(dropTimeNs);
284 }
285 
286 template <typename AggregatedValue, typename DimExtras>
clearPastBucketsLocked(const int64_t dumpTimeNs)287 void ValueMetricProducer<AggregatedValue, DimExtras>::clearPastBucketsLocked(
288         const int64_t dumpTimeNs) {
289     mPastBuckets.clear();
290     mSkippedBuckets.clear();
291 }
292 
293 template <typename AggregatedValue, typename DimExtras>
onDumpReportLocked(const int64_t dumpTimeNs,const bool includeCurrentPartialBucket,const bool eraseData,const DumpLatency dumpLatency,set<string> * strSet,ProtoOutputStream * protoOutput)294 void ValueMetricProducer<AggregatedValue, DimExtras>::onDumpReportLocked(
295         const int64_t dumpTimeNs, const bool includeCurrentPartialBucket, const bool eraseData,
296         const DumpLatency dumpLatency, set<string>* strSet, ProtoOutputStream* protoOutput) {
297     VLOG("metric %lld dump report now...", (long long)mMetricId);
298 
299     // TODO(b/188837487): Add mIsActive check
300 
301     if (includeCurrentPartialBucket) {
302         // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
303         // current bucket will have incomplete data and the next will have the wrong snapshot to do
304         // a diff against. If the condition is false, we are fine since the base data is reset and
305         // we are not tracking anything.
306         if (isPulled() && mCondition == ConditionState::kTrue) {
307             switch (dumpLatency) {
308                 case FAST:
309                     invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED);
310                     break;
311                 case NO_TIME_CONSTRAINTS:
312                     pullAndMatchEventsLocked(dumpTimeNs);
313                     break;
314             }
315         }
316         flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
317     }
318 
319     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
320     protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
321 
322     if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
323         return;
324     }
325     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
326     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
327     // Fills the dimension path if not slicing by a primitive repeated field or position ALL.
328     if (!mShouldUseNestedDimensions) {
329         if (!mDimensionsInWhat.empty()) {
330             uint64_t dimenPathToken =
331                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
332             writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
333             protoOutput->end(dimenPathToken);
334         }
335     }
336 
337     const auto& [metricTypeFieldId, bucketNumFieldId, startBucketMsFieldId, endBucketMsFieldId,
338                  conditionTrueNsFieldId,
339                  conditionCorrectionNsFieldId] = getDumpProtoFields();
340 
341     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | metricTypeFieldId);
342 
343     for (const auto& skippedBucket : mSkippedBuckets) {
344         uint64_t wrapperToken =
345                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
346         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
347                            (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
348         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
349                            (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
350         for (const auto& dropEvent : skippedBucket.dropEvents) {
351             uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
352                                                          FIELD_ID_SKIPPED_DROP_EVENT);
353             protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
354             protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME,
355                                (long long)(NanoToMillis(dropEvent.dropTimeNs)));
356             protoOutput->end(dropEventToken);
357         }
358         protoOutput->end(wrapperToken);
359     }
360 
361     for (const auto& [metricDimensionKey, buckets] : mPastBuckets) {
362         VLOG("  dimension key %s", metricDimensionKey.toString().c_str());
363         uint64_t wrapperToken =
364                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
365 
366         // First fill dimension.
367         if (mShouldUseNestedDimensions) {
368             uint64_t dimensionToken =
369                     protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
370             writeDimensionToProto(metricDimensionKey.getDimensionKeyInWhat(), strSet, protoOutput);
371             protoOutput->end(dimensionToken);
372         } else {
373             writeDimensionLeafNodesToProto(metricDimensionKey.getDimensionKeyInWhat(),
374                                            FIELD_ID_DIMENSION_LEAF_IN_WHAT, strSet, protoOutput);
375         }
376 
377         // Then fill slice_by_state.
378         for (auto state : metricDimensionKey.getStateValuesKey().getValues()) {
379             uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
380                                                      FIELD_ID_SLICE_BY_STATE);
381             writeStateToProto(state, protoOutput);
382             protoOutput->end(stateToken);
383         }
384 
385         // Then fill bucket_info (*BucketInfo).
386         for (const auto& bucket : buckets) {
387             uint64_t bucketInfoToken = protoOutput->start(
388                     FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
389 
390             if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
391                 protoOutput->write(FIELD_TYPE_INT64 | startBucketMsFieldId,
392                                    (long long)NanoToMillis(bucket.mBucketStartNs));
393                 protoOutput->write(FIELD_TYPE_INT64 | endBucketMsFieldId,
394                                    (long long)NanoToMillis(bucket.mBucketEndNs));
395             } else {
396                 protoOutput->write(FIELD_TYPE_INT64 | bucketNumFieldId,
397                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
398             }
399             // We only write the condition timer value if the metric has a
400             // condition and/or is sliced by state.
401             // If the metric is sliced by state, the condition timer value is
402             // also sliced by state to reflect time spent in that state.
403             if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) {
404                 protoOutput->write(FIELD_TYPE_INT64 | conditionTrueNsFieldId,
405                                    (long long)bucket.mConditionTrueNs);
406             }
407 
408             if (conditionCorrectionNsFieldId) {
409                 // We write the condition correction value when below conditions are true:
410                 // - if metric is pulled
411                 // - if it is enabled by metric configuration via dedicated field,
412                 //   see condition_correction_threshold_nanos
413                 // - if the abs(value) >= condition_correction_threshold_nanos
414 
415                 if (isPulled() && mConditionCorrectionThresholdNs &&
416                     (abs(bucket.mConditionCorrectionNs) >= mConditionCorrectionThresholdNs)) {
417                     protoOutput->write(FIELD_TYPE_INT64 | conditionCorrectionNsFieldId.value(),
418                                        (long long)bucket.mConditionCorrectionNs);
419                 }
420             }
421 
422             for (int i = 0; i < (int)bucket.aggIndex.size(); i++) {
423                 VLOG("\t bucket [%lld - %lld]", (long long)bucket.mBucketStartNs,
424                      (long long)bucket.mBucketEndNs);
425                 writePastBucketAggregateToProto(bucket.aggIndex[i], bucket.aggregates[i],
426                                                 protoOutput);
427             }
428             protoOutput->end(bucketInfoToken);
429         }
430         protoOutput->end(wrapperToken);
431     }
432     protoOutput->end(protoToken);
433 
434     VLOG("metric %lld done with dump report...", (long long)mMetricId);
435     if (eraseData) {
436         mPastBuckets.clear();
437         mSkippedBuckets.clear();
438     }
439 }
440 
441 template <typename AggregatedValue, typename DimExtras>
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)442 void ValueMetricProducer<AggregatedValue, DimExtras>::invalidateCurrentBucket(
443         const int64_t dropTimeNs, const BucketDropReason reason) {
444     if (!mCurrentBucketIsSkipped) {
445         // Only report to StatsdStats once per invalid bucket.
446         StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
447     }
448 
449     skipCurrentBucket(dropTimeNs, reason);
450 }
451 
452 template <typename AggregatedValue, typename DimExtras>
skipCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)453 void ValueMetricProducer<AggregatedValue, DimExtras>::skipCurrentBucket(
454         const int64_t dropTimeNs, const BucketDropReason reason) {
455     if (!maxDropEventsReached()) {
456         mCurrentSkippedBucket.dropEvents.push_back(buildDropEvent(dropTimeNs, reason));
457     }
458     mCurrentBucketIsSkipped = true;
459 }
460 
461 // Handle active state change. Active state change is treated like a condition change:
462 // - drop bucket if active state change event arrives too late
463 // - if condition is true, pull data on active state changes
464 // - ConditionTimer tracks changes based on AND of condition and active state.
465 template <typename AggregatedValue, typename DimExtras>
onActiveStateChangedLocked(const int64_t eventTimeNs)466 void ValueMetricProducer<AggregatedValue, DimExtras>::onActiveStateChangedLocked(
467         const int64_t eventTimeNs) {
468     const bool eventLate = isEventLateLocked(eventTimeNs);
469     if (eventLate) {
470         // Drop bucket because event arrived too late, ie. we are missing data for this bucket.
471         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
472         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
473     }
474 
475     // Call parent method once we've verified the validity of current bucket.
476     MetricProducer::onActiveStateChangedLocked(eventTimeNs);
477 
478     if (ConditionState::kTrue != mCondition) {
479         return;
480     }
481 
482     // Pull on active state changes.
483     if (!eventLate) {
484         if (isPulled()) {
485             pullAndMatchEventsLocked(eventTimeNs);
486         }
487 
488         onActiveStateChangedInternalLocked(eventTimeNs);
489     }
490 
491     flushIfNeededLocked(eventTimeNs);
492 
493     // Let condition timer know of new active state.
494     mConditionTimer.onConditionChanged(mIsActive, eventTimeNs);
495 
496     updateCurrentSlicedBucketConditionTimers(mIsActive, eventTimeNs);
497 }
498 
499 template <typename AggregatedValue, typename DimExtras>
onConditionChangedLocked(const bool condition,const int64_t eventTimeNs)500 void ValueMetricProducer<AggregatedValue, DimExtras>::onConditionChangedLocked(
501         const bool condition, const int64_t eventTimeNs) {
502     const bool eventLate = isEventLateLocked(eventTimeNs);
503 
504     const ConditionState newCondition = eventLate   ? ConditionState::kUnknown
505                                         : condition ? ConditionState::kTrue
506                                                     : ConditionState::kFalse;
507     const ConditionState oldCondition = mCondition;
508 
509     if (!mIsActive) {
510         mCondition = newCondition;
511         return;
512     }
513 
514     // If the event arrived late, mark the bucket as invalid and skip the event.
515     if (eventLate) {
516         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
517              (long long)mCurrentBucketStartTimeNs);
518         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
519         StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
520         invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
521         mCondition = newCondition;
522         mConditionTimer.onConditionChanged(newCondition, eventTimeNs);
523         updateCurrentSlicedBucketConditionTimers(newCondition, eventTimeNs);
524         return;
525     }
526 
527     // If the previous condition was unknown, mark the bucket as invalid
528     // because the bucket will contain partial data. For example, the condition
529     // change might happen close to the end of the bucket and we might miss a
530     // lot of data.
531     // We still want to pull to set the base for diffed metrics.
532     if (oldCondition == ConditionState::kUnknown) {
533         invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
534     }
535 
536     // Pull and match for the following condition change cases:
537     // unknown/false -> true - condition changed
538     // true -> false - condition changed
539     // true -> true - old condition was true so we can flush the bucket at the
540     // end if needed.
541     //
542     // We don’t need to pull for unknown -> false or false -> false.
543     //
544     // onConditionChangedLocked might happen on bucket boundaries if this is
545     // called before #onDataPulled.
546     if (isPulled() &&
547         (newCondition == ConditionState::kTrue || oldCondition == ConditionState::kTrue)) {
548         pullAndMatchEventsLocked(eventTimeNs);
549     }
550 
551     onConditionChangedInternalLocked(oldCondition, newCondition, eventTimeNs);
552 
553     // Update condition state after pulling.
554     mCondition = newCondition;
555 
556     flushIfNeededLocked(eventTimeNs);
557 
558     mConditionTimer.onConditionChanged(newCondition, eventTimeNs);
559     updateCurrentSlicedBucketConditionTimers(newCondition, eventTimeNs);
560 }
561 
562 template <typename AggregatedValue, typename DimExtras>
updateCurrentSlicedBucketConditionTimers(bool newCondition,int64_t eventTimeNs)563 void ValueMetricProducer<AggregatedValue, DimExtras>::updateCurrentSlicedBucketConditionTimers(
564         bool newCondition, int64_t eventTimeNs) {
565     if (mSlicedStateAtoms.empty()) {
566         return;
567     }
568 
569     // Utilize the current state key of each DimensionsInWhat key to determine
570     // which condition timers to update.
571     //
572     // Assumes that the MetricDimensionKey exists in `mCurrentSlicedBucket`.
573     for (const auto& [dimensionInWhatKey, dimensionInWhatInfo] : mDimInfos) {
574         // If the new condition is true, turn ON the condition timer only if
575         // the DimensionInWhat key was present in the data.
576         mCurrentSlicedBucket[MetricDimensionKey(dimensionInWhatKey,
577                                                 dimensionInWhatInfo.currentState)]
578                 .conditionTimer.onConditionChanged(
579                         newCondition && dimensionInWhatInfo.hasCurrentState, eventTimeNs);
580     }
581 }
582 
583 template <typename AggregatedValue, typename DimExtras>
dumpStatesLocked(FILE * out,bool verbose) const584 void ValueMetricProducer<AggregatedValue, DimExtras>::dumpStatesLocked(FILE* out,
585                                                                        bool verbose) const {
586     if (mCurrentSlicedBucket.size() == 0) {
587         return;
588     }
589 
590     fprintf(out, "ValueMetricProducer %lld dimension size %lu\n", (long long)mMetricId,
591             (unsigned long)mCurrentSlicedBucket.size());
592     if (verbose) {
593         for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
594             for (const Interval& interval : currentBucket.intervals) {
595                 fprintf(out, "\t(what)%s\t(states)%s  (aggregate)%s\n",
596                         metricDimensionKey.getDimensionKeyInWhat().toString().c_str(),
597                         metricDimensionKey.getStateValuesKey().toString().c_str(),
598                         aggregatedValueToString(interval.aggregate).c_str());
599             }
600         }
601     }
602 }
603 
604 template <typename AggregatedValue, typename DimExtras>
hasReachedGuardRailLimit() const605 bool ValueMetricProducer<AggregatedValue, DimExtras>::hasReachedGuardRailLimit() const {
606     return mCurrentSlicedBucket.size() >= mDimensionHardLimit;
607 }
608 
609 template <typename AggregatedValue, typename DimExtras>
hitGuardRailLocked(const MetricDimensionKey & newKey) const610 bool ValueMetricProducer<AggregatedValue, DimExtras>::hitGuardRailLocked(
611         const MetricDimensionKey& newKey) const {
612     // ===========GuardRail==============
613     // 1. Report the tuple count if the tuple count > soft limit
614     if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
615         return false;
616     }
617     if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) {
618         size_t newTupleCount = mCurrentSlicedBucket.size() + 1;
619         StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
620         // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
621         if (hasReachedGuardRailLimit()) {
622             ALOGE("ValueMetricProducer %lld dropping data for dimension key %s",
623                   (long long)mMetricId, newKey.toString().c_str());
624             StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
625             return true;
626         }
627     }
628 
629     return false;
630 }
631 
632 template <typename AggregatedValue, typename DimExtras>
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event,const map<int,HashableDimensionKey> & statePrimaryKeys)633 void ValueMetricProducer<AggregatedValue, DimExtras>::onMatchedLogEventInternalLocked(
634         const size_t matcherIndex, const MetricDimensionKey& eventKey,
635         const ConditionKey& conditionKey, bool condition, const LogEvent& event,
636         const map<int, HashableDimensionKey>& statePrimaryKeys) {
637     // Skip this event if a state change occurred for a different primary key.
638     auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first);
639     // Check that both the atom id and the primary key are equal.
640     if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) {
641         VLOG("ValueMetric skip event with primary key %s because state change primary key "
642              "is %s",
643              it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str());
644         return;
645     }
646 
647     const int64_t eventTimeNs = event.GetElapsedTimestampNs();
648     if (isEventLateLocked(eventTimeNs)) {
649         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
650              (long long)mCurrentBucketStartTimeNs);
651         return;
652     }
653 
654     const auto whatKey = eventKey.getDimensionKeyInWhat();
655     mMatchedMetricDimensionKeys.insert(whatKey);
656 
657     if (!isPulled()) {
658         // Only flushing for pushed because for pulled metrics, we need to do a pull first.
659         flushIfNeededLocked(eventTimeNs);
660     }
661 
662     if (canSkipLogEventLocked(eventKey, condition, eventTimeNs, statePrimaryKeys)) {
663         return;
664     }
665 
666     if (hitGuardRailLocked(eventKey)) {
667         return;
668     }
669 
670     const auto& returnVal = mDimInfos.emplace(whatKey, DimensionsInWhatInfo(getUnknownStateKey()));
671     DimensionsInWhatInfo& dimensionsInWhatInfo = returnVal.first->second;
672     const HashableDimensionKey& oldStateKey = dimensionsInWhatInfo.currentState;
673     CurrentBucket& currentBucket = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)];
674 
675     // Ensure we turn on the condition timer in the case where dimensions
676     // were missing on a previous pull due to a state change.
677     const auto stateKey = eventKey.getStateValuesKey();
678     const bool stateChange = oldStateKey != stateKey || !dimensionsInWhatInfo.hasCurrentState;
679 
680     // We need to get the intervals stored with the previous state key so we can
681     // close these value intervals.
682     vector<Interval>& intervals = currentBucket.intervals;
683     if (intervals.size() < mFieldMatchers.size()) {
684         VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
685         intervals.resize(mFieldMatchers.size());
686     }
687 
688     dimensionsInWhatInfo.hasCurrentState = true;
689     dimensionsInWhatInfo.currentState = stateKey;
690 
691     dimensionsInWhatInfo.seenNewData |= aggregateFields(eventTimeNs, eventKey, event, intervals,
692                                                         dimensionsInWhatInfo.dimExtras);
693 
694     // State change.
695     if (!mSlicedStateAtoms.empty() && stateChange) {
696         // Turn OFF the condition timer for the previous state key.
697         currentBucket.conditionTimer.onConditionChanged(false, eventTimeNs);
698 
699         // Turn ON the condition timer for the new state key.
700         mCurrentSlicedBucket[MetricDimensionKey(whatKey, stateKey)]
701                 .conditionTimer.onConditionChanged(true, eventTimeNs);
702     }
703 }
704 
705 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket
706 // if mCondition and mIsActive are true!
707 template <typename AggregatedValue, typename DimExtras>
flushIfNeededLocked(const int64_t & eventTimeNs)708 void ValueMetricProducer<AggregatedValue, DimExtras>::flushIfNeededLocked(
709         const int64_t& eventTimeNs) {
710     const int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
711     if (eventTimeNs < currentBucketEndTimeNs) {
712         VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs,
713              (long long)(currentBucketEndTimeNs));
714         return;
715     }
716     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
717     int64_t nextBucketStartTimeNs =
718             currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
719     flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
720 }
721 
722 template <typename AggregatedValue, typename DimExtras>
calcBucketsForwardCount(const int64_t eventTimeNs) const723 int64_t ValueMetricProducer<AggregatedValue, DimExtras>::calcBucketsForwardCount(
724         const int64_t eventTimeNs) const {
725     int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
726     if (eventTimeNs < currentBucketEndTimeNs) {
727         return 0;
728     }
729     return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
730 }
731 
732 template <typename AggregatedValue, typename DimExtras>
flushCurrentBucketLocked(const int64_t & eventTimeNs,const int64_t & nextBucketStartTimeNs)733 void ValueMetricProducer<AggregatedValue, DimExtras>::flushCurrentBucketLocked(
734         const int64_t& eventTimeNs, const int64_t& nextBucketStartTimeNs) {
735     if (mCondition == ConditionState::kUnknown) {
736         StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
737         invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
738     }
739 
740     VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
741          (int)mCurrentSlicedBucket.size());
742 
743     closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
744     initNextSlicedBucket(nextBucketStartTimeNs);
745 
746     // Update the condition timer again, in case we skipped buckets.
747     mConditionTimer.newBucketStart(eventTimeNs, nextBucketStartTimeNs);
748 
749     // NOTE: Update the condition timers in `mCurrentSlicedBucket` only when slicing
750     // by state. Otherwise, the "global" condition timer will be used.
751     if (!mSlicedStateAtoms.empty()) {
752         for (auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
753             currentBucket.conditionTimer.newBucketStart(eventTimeNs, nextBucketStartTimeNs);
754         }
755     }
756     mCurrentBucketNum += calcBucketsForwardCount(eventTimeNs);
757 }
758 
759 template <typename AggregatedValue, typename DimExtras>
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)760 void ValueMetricProducer<AggregatedValue, DimExtras>::closeCurrentBucket(
761         const int64_t eventTimeNs, const int64_t nextBucketStartTimeNs) {
762     const int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
763     int64_t bucketEndTimeNs = fullBucketEndTimeNs;
764     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
765 
766     if (multipleBucketsSkipped(numBucketsForward)) {
767         VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
768         StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
769         // Something went wrong. Maybe the device was sleeping for a long time. It is better
770         // to mark the current bucket as invalid. The last pull might have been successful though.
771         invalidateCurrentBucket(eventTimeNs, BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
772 
773         // End the bucket at the next bucket start time so the entire interval is skipped.
774         bucketEndTimeNs = nextBucketStartTimeNs;
775     } else if (eventTimeNs < fullBucketEndTimeNs) {
776         bucketEndTimeNs = eventTimeNs;
777     }
778 
779     // Close the current bucket
780     const auto [globalConditionDurationNs, globalConditionCorrectionNs] =
781             mConditionTimer.newBucketStart(eventTimeNs, bucketEndTimeNs);
782 
783     bool isBucketLargeEnough = bucketEndTimeNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
784     if (!isBucketLargeEnough) {
785         skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
786     }
787     if (!mCurrentBucketIsSkipped) {
788         bool bucketHasData = false;
789         // The current bucket is large enough to keep.
790         for (auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
791             PastBucket<AggregatedValue> bucket =
792                     buildPartialBucket(bucketEndTimeNs, currentBucket.intervals);
793             if (bucket.aggIndex.empty()) {
794                 continue;
795             }
796             bucketHasData = true;
797             if (!mSlicedStateAtoms.empty()) {
798                 const auto [conditionDurationNs, conditionCorrectionNs] =
799                         currentBucket.conditionTimer.newBucketStart(eventTimeNs, bucketEndTimeNs);
800                 bucket.mConditionTrueNs = conditionDurationNs;
801                 bucket.mConditionCorrectionNs = conditionCorrectionNs;
802             } else {
803                 bucket.mConditionTrueNs = globalConditionDurationNs;
804                 bucket.mConditionCorrectionNs = globalConditionCorrectionNs;
805             }
806 
807             auto& bucketList = mPastBuckets[metricDimensionKey];
808             bucketList.push_back(std::move(bucket));
809         }
810         if (!bucketHasData) {
811             skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
812         }
813     }
814 
815     if (mCurrentBucketIsSkipped) {
816         mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
817         mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTimeNs;
818         mSkippedBuckets.push_back(mCurrentSkippedBucket);
819     }
820 
821     // This means that the current bucket was not flushed before a forced bucket split.
822     // This can happen if an app update or a dump report with includeCurrentPartialBucket is
823     // requested before we get a chance to flush the bucket due to receiving new data, either from
824     // the statsd socket or the StatsPullerManager.
825     if (bucketEndTimeNs < nextBucketStartTimeNs) {
826         SkippedBucket bucketInGap;
827         bucketInGap.bucketStartTimeNs = bucketEndTimeNs;
828         bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
829         bucketInGap.dropEvents.emplace_back(buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA));
830         mSkippedBuckets.emplace_back(bucketInGap);
831     }
832 }
833 
834 template <typename AggregatedValue, typename DimExtras>
initNextSlicedBucket(int64_t nextBucketStartTimeNs)835 void ValueMetricProducer<AggregatedValue, DimExtras>::initNextSlicedBucket(
836         int64_t nextBucketStartTimeNs) {
837     StatsdStats::getInstance().noteBucketCount(mMetricId);
838     if (mSlicedStateAtoms.empty()) {
839         mCurrentSlicedBucket.clear();
840     } else {
841         for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
842             bool obsolete = true;
843             for (auto& interval : it->second.intervals) {
844                 interval.sampleSize = 0;
845             }
846 
847             // When slicing by state, only delete the MetricDimensionKey when the
848             // state key in the MetricDimensionKey is not the current state key.
849             const HashableDimensionKey& dimensionInWhatKey = it->first.getDimensionKeyInWhat();
850             const auto& currentDimInfoItr = mDimInfos.find(dimensionInWhatKey);
851 
852             if ((currentDimInfoItr != mDimInfos.end()) &&
853                 (it->first.getStateValuesKey() == currentDimInfoItr->second.currentState)) {
854                 obsolete = false;
855             }
856             if (obsolete) {
857                 it = mCurrentSlicedBucket.erase(it);
858             } else {
859                 it++;
860             }
861         }
862     }
863     for (auto it = mDimInfos.begin(); it != mDimInfos.end();) {
864         if (!it->second.seenNewData) {
865             it = mDimInfos.erase(it);
866         } else {
867             it->second.seenNewData = false;
868             it++;
869         }
870     }
871 
872     mCurrentBucketIsSkipped = false;
873     mCurrentSkippedBucket.reset();
874 
875     mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
876     VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
877          (long long)mCurrentBucketStartTimeNs);
878 }
879 
880 // Explicit template instantiations
881 template class ValueMetricProducer<Value, vector<optional<Value>>>;
882 template class ValueMetricProducer<unique_ptr<KllQuantile>, Empty>;
883 
884 }  // namespace statsd
885 }  // namespace os
886 }  // namespace android
887