1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define STATSD_DEBUG false // STOPSHIP if true
18 #include "Log.h"
19
20 #include "ValueMetricProducer.h"
21
22 #include <kll.h>
23 #include <limits.h>
24 #include <stdlib.h>
25
26 #include "FieldValue.h"
27 #include "HashableDimensionKey.h"
28 #include "guardrail/StatsdStats.h"
29 #include "metrics/parsing_utils/metrics_manager_util.h"
30 #include "stats_log_util.h"
31 #include "stats_util.h"
32
33 using android::util::FIELD_COUNT_REPEATED;
34 using android::util::FIELD_TYPE_BOOL;
35 using android::util::FIELD_TYPE_INT32;
36 using android::util::FIELD_TYPE_INT64;
37 using android::util::FIELD_TYPE_MESSAGE;
38 using android::util::ProtoOutputStream;
39 using dist_proc::aggregation::KllQuantile;
40 using std::optional;
41 using std::shared_ptr;
42 using std::unique_ptr;
43 using std::unordered_map;
44 using std::vector;
45
46 namespace android {
47 namespace os {
48 namespace statsd {
49
50 // for StatsLogReport
51 const int FIELD_ID_ID = 1;
52 const int FIELD_ID_TIME_BASE = 9;
53 const int FIELD_ID_BUCKET_SIZE = 10;
54 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
55 const int FIELD_ID_IS_ACTIVE = 14;
56 // for *MetricDataWrapper
57 const int FIELD_ID_DATA = 1;
58 const int FIELD_ID_SKIPPED = 2;
59 // for SkippedBuckets
60 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
61 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
62 const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
63 // for DumpEvent Proto
64 const int FIELD_ID_BUCKET_DROP_REASON = 1;
65 const int FIELD_ID_DROP_TIME = 2;
66 // for *MetricData
67 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
68 const int FIELD_ID_BUCKET_INFO = 3;
69 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
70 const int FIELD_ID_SLICE_BY_STATE = 6;
71
72 template <typename AggregatedValue, typename DimExtras>
ValueMetricProducer(const int64_t & metricId,const ConfigKey & key,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions)73 ValueMetricProducer<AggregatedValue, DimExtras>::ValueMetricProducer(
74 const int64_t& metricId, const ConfigKey& key, const uint64_t protoHash,
75 const PullOptions& pullOptions, const BucketOptions& bucketOptions,
76 const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
77 const StateOptions& stateOptions, const ActivationOptions& activationOptions,
78 const GuardrailOptions& guardrailOptions)
79 : MetricProducer(metricId, key, bucketOptions.timeBaseNs, conditionOptions.conditionIndex,
80 conditionOptions.initialConditionCache, conditionOptions.conditionWizard,
81 protoHash, activationOptions.eventActivationMap,
82 activationOptions.eventDeactivationMap, stateOptions.slicedStateAtoms,
83 stateOptions.stateGroupMap, bucketOptions.splitBucketForAppUpgrade),
84 mWhatMatcherIndex(whatOptions.whatMatcherIndex),
85 mEventMatcherWizard(whatOptions.matcherWizard),
86 mPullerManager(pullOptions.pullerManager),
87 mFieldMatchers(whatOptions.fieldMatchers),
88 mPullAtomId(pullOptions.pullAtomId),
89 mMinBucketSizeNs(bucketOptions.minBucketSizeNs),
90 mDimensionSoftLimit(guardrailOptions.dimensionSoftLimit),
91 mDimensionHardLimit(guardrailOptions.dimensionHardLimit),
92 mCurrentBucketIsSkipped(false),
93 mConditionCorrectionThresholdNs(bucketOptions.conditionCorrectionThresholdNs) {
94 // TODO(b/185722221): inject directly via initializer list in MetricProducer.
95 mBucketSizeNs = bucketOptions.bucketSizeNs;
96
97 // TODO(b/185770171): inject dimensionsInWhat related fields via constructor.
98 if (whatOptions.dimensionsInWhat.field() > 0) {
99 translateFieldMatcher(whatOptions.dimensionsInWhat, &mDimensionsInWhat);
100 }
101 mContainANYPositionInDimensionsInWhat = whatOptions.containsAnyPositionInDimensionsInWhat;
102 mShouldUseNestedDimensions = whatOptions.shouldUseNestedDimensions;
103
104 if (conditionOptions.conditionLinks.size() > 0) {
105 for (const auto& link : conditionOptions.conditionLinks) {
106 Metric2Condition mc;
107 mc.conditionId = link.condition();
108 translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
109 translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
110 mMetric2ConditionLinks.push_back(mc);
111 }
112
113 // TODO(b/185770739): use !mMetric2ConditionLinks.empty() instead
114 mConditionSliced = true;
115 }
116
117 for (const auto& stateLink : stateOptions.stateLinks) {
118 Metric2State ms;
119 ms.stateAtomId = stateLink.state_atom_id();
120 translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields);
121 translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields);
122 mMetric2StateLinks.push_back(ms);
123 }
124
125 const int64_t numBucketsForward = calcBucketsForwardCount(bucketOptions.startTimeNs);
126 mCurrentBucketNum = numBucketsForward;
127
128 flushIfNeededLocked(bucketOptions.startTimeNs);
129
130 if (isPulled()) {
131 mPullerManager->RegisterReceiver(mPullAtomId, mConfigKey, this, getCurrentBucketEndTimeNs(),
132 mBucketSizeNs);
133 }
134
135 // Only do this for partial buckets like first bucket. All other buckets should use
136 // flushIfNeeded to adjust start and end to bucket boundaries.
137 // Adjust start for partial bucket
138 mCurrentBucketStartTimeNs = bucketOptions.startTimeNs;
139 mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs, mCurrentBucketStartTimeNs);
140
141 // Now that activations are processed, start the condition timer if needed.
142 mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
143 mCurrentBucketStartTimeNs);
144 }
145
146 template <typename AggregatedValue, typename DimExtras>
~ValueMetricProducer()147 ValueMetricProducer<AggregatedValue, DimExtras>::~ValueMetricProducer() {
148 VLOG("~ValueMetricProducer() called");
149 if (isPulled()) {
150 mPullerManager->UnRegisterReceiver(mPullAtomId, mConfigKey, this);
151 }
152 }
153
154 template <typename AggregatedValue, typename DimExtras>
onStatsdInitCompleted(const int64_t & eventTimeNs)155 void ValueMetricProducer<AggregatedValue, DimExtras>::onStatsdInitCompleted(
156 const int64_t& eventTimeNs) {
157 lock_guard<mutex> lock(mMutex);
158
159 if (isPulled() && mCondition == ConditionState::kTrue && mIsActive) {
160 pullAndMatchEventsLocked(eventTimeNs);
161 }
162 flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
163 }
164
165 template <typename AggregatedValue, typename DimExtras>
notifyAppUpgradeInternalLocked(const int64_t eventTimeNs)166 void ValueMetricProducer<AggregatedValue, DimExtras>::notifyAppUpgradeInternalLocked(
167 const int64_t eventTimeNs) {
168 if (isPulled() && mCondition == ConditionState::kTrue && mIsActive) {
169 pullAndMatchEventsLocked(eventTimeNs);
170 }
171 flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
172 }
173
174 template <typename AggregatedValue, typename DimExtras>
175 optional<InvalidConfigReason>
onConfigUpdatedLocked(const StatsdConfig & config,const int configIndex,const int metricIndex,const vector<sp<AtomMatchingTracker>> & allAtomMatchingTrackers,const unordered_map<int64_t,int> & oldAtomMatchingTrackerMap,const unordered_map<int64_t,int> & newAtomMatchingTrackerMap,const sp<EventMatcherWizard> & matcherWizard,const vector<sp<ConditionTracker>> & allConditionTrackers,const unordered_map<int64_t,int> & conditionTrackerMap,const sp<ConditionWizard> & wizard,const unordered_map<int64_t,int> & metricToActivationMap,unordered_map<int,vector<int>> & trackerToMetricMap,unordered_map<int,vector<int>> & conditionToMetricMap,unordered_map<int,vector<int>> & activationAtomTrackerToMetricMap,unordered_map<int,vector<int>> & deactivationAtomTrackerToMetricMap,vector<int> & metricsWithActivation)176 ValueMetricProducer<AggregatedValue, DimExtras>::onConfigUpdatedLocked(
177 const StatsdConfig& config, const int configIndex, const int metricIndex,
178 const vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers,
179 const unordered_map<int64_t, int>& oldAtomMatchingTrackerMap,
180 const unordered_map<int64_t, int>& newAtomMatchingTrackerMap,
181 const sp<EventMatcherWizard>& matcherWizard,
182 const vector<sp<ConditionTracker>>& allConditionTrackers,
183 const unordered_map<int64_t, int>& conditionTrackerMap, const sp<ConditionWizard>& wizard,
184 const unordered_map<int64_t, int>& metricToActivationMap,
185 unordered_map<int, vector<int>>& trackerToMetricMap,
186 unordered_map<int, vector<int>>& conditionToMetricMap,
187 unordered_map<int, vector<int>>& activationAtomTrackerToMetricMap,
188 unordered_map<int, vector<int>>& deactivationAtomTrackerToMetricMap,
189 vector<int>& metricsWithActivation) {
190 optional<InvalidConfigReason> invalidConfigReason = MetricProducer::onConfigUpdatedLocked(
191 config, configIndex, metricIndex, allAtomMatchingTrackers, oldAtomMatchingTrackerMap,
192 newAtomMatchingTrackerMap, matcherWizard, allConditionTrackers, conditionTrackerMap,
193 wizard, metricToActivationMap, trackerToMetricMap, conditionToMetricMap,
194 activationAtomTrackerToMetricMap, deactivationAtomTrackerToMetricMap,
195 metricsWithActivation);
196 if (invalidConfigReason.has_value()) {
197 return invalidConfigReason;
198 }
199 // Update appropriate indices: mWhatMatcherIndex, mConditionIndex and MetricsManager maps.
200 const int64_t atomMatcherId = getWhatAtomMatcherIdForMetric(config, configIndex);
201 invalidConfigReason = handleMetricWithAtomMatchingTrackers(
202 atomMatcherId, mMetricId, metricIndex, /*enforceOneAtom=*/false,
203 allAtomMatchingTrackers, newAtomMatchingTrackerMap, trackerToMetricMap,
204 mWhatMatcherIndex);
205 if (invalidConfigReason.has_value()) {
206 return invalidConfigReason;
207 }
208 const optional<int64_t>& conditionIdOpt = getConditionIdForMetric(config, configIndex);
209 const ConditionLinks& conditionLinks = getConditionLinksForMetric(config, configIndex);
210 if (conditionIdOpt.has_value()) {
211 invalidConfigReason = handleMetricWithConditions(
212 conditionIdOpt.value(), mMetricId, metricIndex, conditionTrackerMap, conditionLinks,
213 allConditionTrackers, mConditionTrackerIndex, conditionToMetricMap);
214 if (invalidConfigReason.has_value()) {
215 return invalidConfigReason;
216 }
217 }
218 sp<EventMatcherWizard> tmpEventWizard = mEventMatcherWizard;
219 mEventMatcherWizard = matcherWizard;
220 return nullopt;
221 }
222
223 template <typename AggregatedValue, typename DimExtras>
onStateChanged(int64_t eventTimeNs,int32_t atomId,const HashableDimensionKey & primaryKey,const FieldValue & oldState,const FieldValue & newState)224 void ValueMetricProducer<AggregatedValue, DimExtras>::onStateChanged(
225 int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey,
226 const FieldValue& oldState, const FieldValue& newState) {
227 // TODO(b/189353769): Acquire lock.
228 VLOG("ValueMetricProducer %lld onStateChanged time %lld, State %d, key %s, %d -> %d",
229 (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(),
230 oldState.mValue.int_value, newState.mValue.int_value);
231
232 FieldValue oldStateCopy = oldState;
233 FieldValue newStateCopy = newState;
234 mapStateValue(atomId, &oldStateCopy);
235 mapStateValue(atomId, &newStateCopy);
236
237 // If old and new states are in the same StateGroup, then we do not need to
238 // pull for this state change.
239 if (oldStateCopy == newStateCopy) {
240 return;
241 }
242
243 // If condition is not true or metric is not active, we do not need to pull
244 // for this state change.
245 if (mCondition != ConditionState::kTrue || !mIsActive) {
246 return;
247 }
248
249 if (isEventLateLocked(eventTimeNs)) {
250 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
251 (long long)mCurrentBucketStartTimeNs);
252 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
253 return;
254 }
255
256 if (isPulled()) {
257 mStateChangePrimaryKey.first = atomId;
258 mStateChangePrimaryKey.second = primaryKey;
259 // TODO(b/185796114): pass mStateChangePrimaryKey as an argument to
260 // pullAndMatchEventsLocked
261 pullAndMatchEventsLocked(eventTimeNs);
262 mStateChangePrimaryKey.first = 0;
263 mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY;
264 }
265 flushIfNeededLocked(eventTimeNs);
266 }
267
268 template <typename AggregatedValue, typename DimExtras>
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTime)269 void ValueMetricProducer<AggregatedValue, DimExtras>::onSlicedConditionMayChangeLocked(
270 bool overallCondition, const int64_t eventTime) {
271 VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
272 }
273
274 template <typename AggregatedValue, typename DimExtras>
dropDataLocked(const int64_t dropTimeNs)275 void ValueMetricProducer<AggregatedValue, DimExtras>::dropDataLocked(const int64_t dropTimeNs) {
276 StatsdStats::getInstance().noteBucketDropped(mMetricId);
277
278 // The current partial bucket is not flushed and does not require a pull,
279 // so the data is still valid.
280 flushIfNeededLocked(dropTimeNs);
281 clearPastBucketsLocked(dropTimeNs);
282 }
283
284 template <typename AggregatedValue, typename DimExtras>
clearPastBucketsLocked(const int64_t dumpTimeNs)285 void ValueMetricProducer<AggregatedValue, DimExtras>::clearPastBucketsLocked(
286 const int64_t dumpTimeNs) {
287 mPastBuckets.clear();
288 mSkippedBuckets.clear();
289 }
290
291 template <typename AggregatedValue, typename DimExtras>
onDumpReportLocked(const int64_t dumpTimeNs,const bool includeCurrentPartialBucket,const bool eraseData,const DumpLatency dumpLatency,set<string> * strSet,ProtoOutputStream * protoOutput)292 void ValueMetricProducer<AggregatedValue, DimExtras>::onDumpReportLocked(
293 const int64_t dumpTimeNs, const bool includeCurrentPartialBucket, const bool eraseData,
294 const DumpLatency dumpLatency, set<string>* strSet, ProtoOutputStream* protoOutput) {
295 VLOG("metric %lld dump report now...", (long long)mMetricId);
296
297 // Pulled metrics need to pull before flushing, which is why they do not call flushIfNeeded.
298 // TODO: b/249823426 see if we can pull and call flushIfneeded for pulled value metrics.
299 if (!isPulled()) {
300 flushIfNeededLocked(dumpTimeNs);
301 }
302 if (includeCurrentPartialBucket) {
303 // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
304 // current bucket will have incomplete data and the next will have the wrong snapshot to do
305 // a diff against. If the condition is false, we are fine since the base data is reset and
306 // we are not tracking anything.
307 if (isPulled() && mCondition == ConditionState::kTrue && mIsActive) {
308 switch (dumpLatency) {
309 case FAST:
310 invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED);
311 break;
312 case NO_TIME_CONSTRAINTS:
313 pullAndMatchEventsLocked(dumpTimeNs);
314 break;
315 }
316 }
317 flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
318 }
319
320 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
321 protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
322
323 if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
324 return;
325 }
326 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
327 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
328 // Fills the dimension path if not slicing by a primitive repeated field or position ALL.
329 if (!mShouldUseNestedDimensions) {
330 if (!mDimensionsInWhat.empty()) {
331 uint64_t dimenPathToken =
332 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
333 writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
334 protoOutput->end(dimenPathToken);
335 }
336 }
337
338 const auto& [metricTypeFieldId, bucketNumFieldId, startBucketMsFieldId, endBucketMsFieldId,
339 conditionTrueNsFieldId,
340 conditionCorrectionNsFieldId] = getDumpProtoFields();
341
342 uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | metricTypeFieldId);
343
344 for (const auto& skippedBucket : mSkippedBuckets) {
345 uint64_t wrapperToken =
346 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
347 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
348 (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
349 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
350 (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
351 for (const auto& dropEvent : skippedBucket.dropEvents) {
352 uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
353 FIELD_ID_SKIPPED_DROP_EVENT);
354 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
355 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME,
356 (long long)(NanoToMillis(dropEvent.dropTimeNs)));
357 protoOutput->end(dropEventToken);
358 }
359 protoOutput->end(wrapperToken);
360 }
361
362 for (const auto& [metricDimensionKey, buckets] : mPastBuckets) {
363 VLOG(" dimension key %s", metricDimensionKey.toString().c_str());
364 uint64_t wrapperToken =
365 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
366
367 // First fill dimension.
368 if (mShouldUseNestedDimensions) {
369 uint64_t dimensionToken =
370 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
371 writeDimensionToProto(metricDimensionKey.getDimensionKeyInWhat(), strSet, protoOutput);
372 protoOutput->end(dimensionToken);
373 } else {
374 writeDimensionLeafNodesToProto(metricDimensionKey.getDimensionKeyInWhat(),
375 FIELD_ID_DIMENSION_LEAF_IN_WHAT, strSet, protoOutput);
376 }
377
378 // Then fill slice_by_state.
379 for (auto state : metricDimensionKey.getStateValuesKey().getValues()) {
380 uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
381 FIELD_ID_SLICE_BY_STATE);
382 writeStateToProto(state, protoOutput);
383 protoOutput->end(stateToken);
384 }
385
386 // Then fill bucket_info (*BucketInfo).
387 for (const auto& bucket : buckets) {
388 uint64_t bucketInfoToken = protoOutput->start(
389 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
390
391 if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
392 protoOutput->write(FIELD_TYPE_INT64 | startBucketMsFieldId,
393 (long long)NanoToMillis(bucket.mBucketStartNs));
394 protoOutput->write(FIELD_TYPE_INT64 | endBucketMsFieldId,
395 (long long)NanoToMillis(bucket.mBucketEndNs));
396 } else {
397 protoOutput->write(FIELD_TYPE_INT64 | bucketNumFieldId,
398 (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
399 }
400 // We only write the condition timer value if the metric has a
401 // condition and/or is sliced by state.
402 // If the metric is sliced by state, the condition timer value is
403 // also sliced by state to reflect time spent in that state.
404 if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) {
405 protoOutput->write(FIELD_TYPE_INT64 | conditionTrueNsFieldId,
406 (long long)bucket.mConditionTrueNs);
407 }
408
409 if (conditionCorrectionNsFieldId) {
410 // We write the condition correction value when below conditions are true:
411 // - if metric is pulled
412 // - if it is enabled by metric configuration via dedicated field,
413 // see condition_correction_threshold_nanos
414 // - if the abs(value) >= condition_correction_threshold_nanos
415
416 if (isPulled() && mConditionCorrectionThresholdNs &&
417 (abs(bucket.mConditionCorrectionNs) >= mConditionCorrectionThresholdNs)) {
418 protoOutput->write(FIELD_TYPE_INT64 | conditionCorrectionNsFieldId.value(),
419 (long long)bucket.mConditionCorrectionNs);
420 }
421 }
422
423 for (int i = 0; i < (int)bucket.aggIndex.size(); i++) {
424 VLOG("\t bucket [%lld - %lld]", (long long)bucket.mBucketStartNs,
425 (long long)bucket.mBucketEndNs);
426 int sampleSize = !bucket.sampleSizes.empty() ? bucket.sampleSizes[i] : 0;
427 writePastBucketAggregateToProto(bucket.aggIndex[i], bucket.aggregates[i],
428 sampleSize, protoOutput);
429 }
430 protoOutput->end(bucketInfoToken);
431 }
432 protoOutput->end(wrapperToken);
433 }
434 protoOutput->end(protoToken);
435
436 VLOG("metric %lld done with dump report...", (long long)mMetricId);
437 if (eraseData) {
438 mPastBuckets.clear();
439 mSkippedBuckets.clear();
440 }
441 }
442
443 template <typename AggregatedValue, typename DimExtras>
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)444 void ValueMetricProducer<AggregatedValue, DimExtras>::invalidateCurrentBucket(
445 const int64_t dropTimeNs, const BucketDropReason reason) {
446 if (!mCurrentBucketIsSkipped) {
447 // Only report to StatsdStats once per invalid bucket.
448 StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
449 }
450
451 skipCurrentBucket(dropTimeNs, reason);
452 }
453
454 template <typename AggregatedValue, typename DimExtras>
skipCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)455 void ValueMetricProducer<AggregatedValue, DimExtras>::skipCurrentBucket(
456 const int64_t dropTimeNs, const BucketDropReason reason) {
457 if (!mIsActive) {
458 // Don't keep track of skipped buckets if metric is not active.
459 return;
460 }
461
462 if (!maxDropEventsReached()) {
463 mCurrentSkippedBucket.dropEvents.push_back(buildDropEvent(dropTimeNs, reason));
464 }
465 mCurrentBucketIsSkipped = true;
466 }
467
468 // Handle active state change. Active state change is *mostly* treated like a condition change:
469 // - drop bucket if active state change event arrives too late
470 // - if condition is true, pull data on active state changes
471 // - ConditionTimer tracks changes based on AND of condition and active state.
472 template <typename AggregatedValue, typename DimExtras>
onActiveStateChangedLocked(const int64_t eventTimeNs,const bool isActive)473 void ValueMetricProducer<AggregatedValue, DimExtras>::onActiveStateChangedLocked(
474 const int64_t eventTimeNs, const bool isActive) {
475 const bool eventLate = isEventLateLocked(eventTimeNs);
476 if (eventLate) {
477 // Drop bucket because event arrived too late, ie. we are missing data for this bucket.
478 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
479 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
480 }
481
482 if (ConditionState::kTrue != mCondition) {
483 // Call parent method before early return.
484 MetricProducer::onActiveStateChangedLocked(eventTimeNs, isActive);
485 return;
486 }
487
488 // Pull on active state changes.
489 if (!eventLate) {
490 if (isPulled()) {
491 pullAndMatchEventsLocked(eventTimeNs);
492 }
493
494 onActiveStateChangedInternalLocked(eventTimeNs, isActive);
495 }
496
497 // Once any pulls are processed, call through to parent method which might flush the current
498 // bucket.
499 MetricProducer::onActiveStateChangedLocked(eventTimeNs, isActive);
500
501 // Let condition timer know of new active state.
502 mConditionTimer.onConditionChanged(isActive, eventTimeNs);
503
504 updateCurrentSlicedBucketConditionTimers(isActive, eventTimeNs);
505 }
506
507 template <typename AggregatedValue, typename DimExtras>
onConditionChangedLocked(const bool condition,const int64_t eventTimeNs)508 void ValueMetricProducer<AggregatedValue, DimExtras>::onConditionChangedLocked(
509 const bool condition, const int64_t eventTimeNs) {
510 const bool eventLate = isEventLateLocked(eventTimeNs);
511
512 const ConditionState newCondition = eventLate ? ConditionState::kUnknown
513 : condition ? ConditionState::kTrue
514 : ConditionState::kFalse;
515 const ConditionState oldCondition = mCondition;
516
517 if (!mIsActive) {
518 mCondition = newCondition;
519 return;
520 }
521
522 // If the event arrived late, mark the bucket as invalid and skip the event.
523 if (eventLate) {
524 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
525 (long long)mCurrentBucketStartTimeNs);
526 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
527 StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
528 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
529 mCondition = newCondition;
530 mConditionTimer.onConditionChanged(newCondition, eventTimeNs);
531 updateCurrentSlicedBucketConditionTimers(newCondition, eventTimeNs);
532 return;
533 }
534
535 // If the previous condition was unknown, mark the bucket as invalid
536 // because the bucket will contain partial data. For example, the condition
537 // change might happen close to the end of the bucket and we might miss a
538 // lot of data.
539 // We still want to pull to set the base for diffed metrics.
540 if (oldCondition == ConditionState::kUnknown) {
541 invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
542 }
543
544 // Pull and match for the following condition change cases:
545 // unknown/false -> true - condition changed
546 // true -> false - condition changed
547 // true -> true - old condition was true so we can flush the bucket at the
548 // end if needed.
549 //
550 // We don’t need to pull for unknown -> false or false -> false.
551 //
552 // onConditionChangedLocked might happen on bucket boundaries if this is
553 // called before #onDataPulled.
554 if (isPulled() &&
555 (newCondition == ConditionState::kTrue || oldCondition == ConditionState::kTrue)) {
556 pullAndMatchEventsLocked(eventTimeNs);
557 }
558
559 onConditionChangedInternalLocked(oldCondition, newCondition, eventTimeNs);
560
561 // Update condition state after pulling.
562 mCondition = newCondition;
563
564 flushIfNeededLocked(eventTimeNs);
565
566 mConditionTimer.onConditionChanged(newCondition, eventTimeNs);
567 updateCurrentSlicedBucketConditionTimers(newCondition, eventTimeNs);
568 }
569
570 template <typename AggregatedValue, typename DimExtras>
updateCurrentSlicedBucketConditionTimers(bool newCondition,int64_t eventTimeNs)571 void ValueMetricProducer<AggregatedValue, DimExtras>::updateCurrentSlicedBucketConditionTimers(
572 bool newCondition, int64_t eventTimeNs) {
573 if (mSlicedStateAtoms.empty()) {
574 return;
575 }
576
577 // Utilize the current state key of each DimensionsInWhat key to determine
578 // which condition timers to update.
579 //
580 // Assumes that the MetricDimensionKey exists in `mCurrentSlicedBucket`.
581 for (const auto& [dimensionInWhatKey, dimensionInWhatInfo] : mDimInfos) {
582 // If the new condition is true, turn ON the condition timer only if
583 // the DimensionInWhat key was present in the data.
584 mCurrentSlicedBucket[MetricDimensionKey(dimensionInWhatKey,
585 dimensionInWhatInfo.currentState)]
586 .conditionTimer.onConditionChanged(
587 newCondition && dimensionInWhatInfo.hasCurrentState, eventTimeNs);
588 }
589 }
590
591 template <typename AggregatedValue, typename DimExtras>
dumpStatesLocked(FILE * out,bool verbose) const592 void ValueMetricProducer<AggregatedValue, DimExtras>::dumpStatesLocked(FILE* out,
593 bool verbose) const {
594 if (mCurrentSlicedBucket.size() == 0) {
595 return;
596 }
597
598 fprintf(out, "ValueMetricProducer %lld dimension size %lu\n", (long long)mMetricId,
599 (unsigned long)mCurrentSlicedBucket.size());
600 if (verbose) {
601 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
602 for (const Interval& interval : currentBucket.intervals) {
603 fprintf(out, "\t(what)%s\t(states)%s (aggregate)%s\n",
604 metricDimensionKey.getDimensionKeyInWhat().toString().c_str(),
605 metricDimensionKey.getStateValuesKey().toString().c_str(),
606 aggregatedValueToString(interval.aggregate).c_str());
607 }
608 }
609 }
610 }
611
612 template <typename AggregatedValue, typename DimExtras>
hasReachedGuardRailLimit() const613 bool ValueMetricProducer<AggregatedValue, DimExtras>::hasReachedGuardRailLimit() const {
614 return mCurrentSlicedBucket.size() >= mDimensionHardLimit;
615 }
616
617 template <typename AggregatedValue, typename DimExtras>
hitGuardRailLocked(const MetricDimensionKey & newKey)618 bool ValueMetricProducer<AggregatedValue, DimExtras>::hitGuardRailLocked(
619 const MetricDimensionKey& newKey) {
620 // ===========GuardRail==============
621 // 1. Report the tuple count if the tuple count > soft limit
622 if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
623 return false;
624 }
625 if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) {
626 size_t newTupleCount = mCurrentSlicedBucket.size() + 1;
627 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
628 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
629 if (hasReachedGuardRailLimit()) {
630 if (!mHasHitGuardrail) {
631 ALOGE("ValueMetricProducer %lld dropping data for dimension key %s",
632 (long long)mMetricId, newKey.toString().c_str());
633 mHasHitGuardrail = true;
634 }
635 StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
636 return true;
637 }
638 }
639
640 return false;
641 }
642
643 template <typename AggregatedValue, typename DimExtras>
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event,const map<int,HashableDimensionKey> & statePrimaryKeys)644 void ValueMetricProducer<AggregatedValue, DimExtras>::onMatchedLogEventInternalLocked(
645 const size_t matcherIndex, const MetricDimensionKey& eventKey,
646 const ConditionKey& conditionKey, bool condition, const LogEvent& event,
647 const map<int, HashableDimensionKey>& statePrimaryKeys) {
648 // Skip this event if a state change occurred for a different primary key.
649 auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first);
650 // Check that both the atom id and the primary key are equal.
651 if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) {
652 VLOG("ValueMetric skip event with primary key %s because state change primary key "
653 "is %s",
654 it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str());
655 return;
656 }
657
658 const int64_t eventTimeNs = event.GetElapsedTimestampNs();
659 if (isEventLateLocked(eventTimeNs)) {
660 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
661 (long long)mCurrentBucketStartTimeNs);
662 return;
663 }
664
665 const auto whatKey = eventKey.getDimensionKeyInWhat();
666 mMatchedMetricDimensionKeys.insert(whatKey);
667
668 if (!isPulled()) {
669 // Only flushing for pushed because for pulled metrics, we need to do a pull first.
670 flushIfNeededLocked(eventTimeNs);
671 }
672
673 if (canSkipLogEventLocked(eventKey, condition, eventTimeNs, statePrimaryKeys)) {
674 return;
675 }
676
677 if (hitGuardRailLocked(eventKey)) {
678 return;
679 }
680
681 const auto& returnVal = mDimInfos.emplace(whatKey, DimensionsInWhatInfo(getUnknownStateKey()));
682 DimensionsInWhatInfo& dimensionsInWhatInfo = returnVal.first->second;
683 const HashableDimensionKey& oldStateKey = dimensionsInWhatInfo.currentState;
684 CurrentBucket& currentBucket = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)];
685
686 // Ensure we turn on the condition timer in the case where dimensions
687 // were missing on a previous pull due to a state change.
688 const auto stateKey = eventKey.getStateValuesKey();
689 const bool stateChange = oldStateKey != stateKey || !dimensionsInWhatInfo.hasCurrentState;
690
691 // We need to get the intervals stored with the previous state key so we can
692 // close these value intervals.
693 vector<Interval>& intervals = currentBucket.intervals;
694 if (intervals.size() < mFieldMatchers.size()) {
695 VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
696 intervals.resize(mFieldMatchers.size());
697 }
698
699 dimensionsInWhatInfo.hasCurrentState = true;
700 dimensionsInWhatInfo.currentState = stateKey;
701
702 dimensionsInWhatInfo.seenNewData |= aggregateFields(eventTimeNs, eventKey, event, intervals,
703 dimensionsInWhatInfo.dimExtras);
704
705 // State change.
706 if (!mSlicedStateAtoms.empty() && stateChange) {
707 // Turn OFF the condition timer for the previous state key.
708 currentBucket.conditionTimer.onConditionChanged(false, eventTimeNs);
709
710 // Turn ON the condition timer for the new state key.
711 mCurrentSlicedBucket[MetricDimensionKey(whatKey, stateKey)]
712 .conditionTimer.onConditionChanged(true, eventTimeNs);
713 }
714 }
715
716 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket
717 // if mCondition and mIsActive are true!
718 template <typename AggregatedValue, typename DimExtras>
flushIfNeededLocked(const int64_t & eventTimeNs)719 void ValueMetricProducer<AggregatedValue, DimExtras>::flushIfNeededLocked(
720 const int64_t& eventTimeNs) {
721 const int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
722 if (eventTimeNs < currentBucketEndTimeNs) {
723 VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs,
724 (long long)(currentBucketEndTimeNs));
725 return;
726 }
727 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
728 int64_t nextBucketStartTimeNs =
729 currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
730 flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
731 }
732
733 template <typename AggregatedValue, typename DimExtras>
calcBucketsForwardCount(const int64_t eventTimeNs) const734 int64_t ValueMetricProducer<AggregatedValue, DimExtras>::calcBucketsForwardCount(
735 const int64_t eventTimeNs) const {
736 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
737 if (eventTimeNs < currentBucketEndTimeNs) {
738 return 0;
739 }
740 return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
741 }
742
743 template <typename AggregatedValue, typename DimExtras>
flushCurrentBucketLocked(const int64_t & eventTimeNs,const int64_t & nextBucketStartTimeNs)744 void ValueMetricProducer<AggregatedValue, DimExtras>::flushCurrentBucketLocked(
745 const int64_t& eventTimeNs, const int64_t& nextBucketStartTimeNs) {
746 if (mCondition == ConditionState::kUnknown) {
747 StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
748 invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
749 }
750
751 VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
752 (int)mCurrentSlicedBucket.size());
753
754 closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
755 initNextSlicedBucket(nextBucketStartTimeNs);
756
757 // Update the condition timer again, in case we skipped buckets.
758 mConditionTimer.newBucketStart(eventTimeNs, nextBucketStartTimeNs);
759
760 // NOTE: Update the condition timers in `mCurrentSlicedBucket` only when slicing
761 // by state. Otherwise, the "global" condition timer will be used.
762 if (!mSlicedStateAtoms.empty()) {
763 for (auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
764 currentBucket.conditionTimer.newBucketStart(eventTimeNs, nextBucketStartTimeNs);
765 }
766 }
767 mCurrentBucketNum += calcBucketsForwardCount(eventTimeNs);
768 }
769
770 template <typename AggregatedValue, typename DimExtras>
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)771 void ValueMetricProducer<AggregatedValue, DimExtras>::closeCurrentBucket(
772 const int64_t eventTimeNs, const int64_t nextBucketStartTimeNs) {
773 const int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
774 int64_t bucketEndTimeNs = fullBucketEndTimeNs;
775 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
776
777 if (multipleBucketsSkipped(numBucketsForward)) {
778 VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
779 StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
780 // Something went wrong. Maybe the device was sleeping for a long time. It is better
781 // to mark the current bucket as invalid. The last pull might have been successful though.
782 invalidateCurrentBucket(eventTimeNs, BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
783
784 // End the bucket at the next bucket start time so the entire interval is skipped.
785 bucketEndTimeNs = nextBucketStartTimeNs;
786 } else if (eventTimeNs < fullBucketEndTimeNs) {
787 bucketEndTimeNs = eventTimeNs;
788 }
789
790 // Close the current bucket
791 const auto [globalConditionDurationNs, globalConditionCorrectionNs] =
792 mConditionTimer.newBucketStart(eventTimeNs, bucketEndTimeNs);
793
794 bool isBucketLargeEnough = bucketEndTimeNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
795 if (!isBucketLargeEnough) {
796 skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
797 }
798 if (!mCurrentBucketIsSkipped) {
799 bool bucketHasData = false;
800 // The current bucket is large enough to keep.
801 for (auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
802 PastBucket<AggregatedValue> bucket =
803 buildPartialBucket(bucketEndTimeNs, currentBucket.intervals);
804 if (bucket.aggIndex.empty()) {
805 continue;
806 }
807 bucketHasData = true;
808 if (!mSlicedStateAtoms.empty()) {
809 const auto [conditionDurationNs, conditionCorrectionNs] =
810 currentBucket.conditionTimer.newBucketStart(eventTimeNs, bucketEndTimeNs);
811 bucket.mConditionTrueNs = conditionDurationNs;
812 bucket.mConditionCorrectionNs = conditionCorrectionNs;
813 } else {
814 bucket.mConditionTrueNs = globalConditionDurationNs;
815 bucket.mConditionCorrectionNs = globalConditionCorrectionNs;
816 }
817
818 auto& bucketList = mPastBuckets[metricDimensionKey];
819 bucketList.push_back(std::move(bucket));
820 }
821 if (!bucketHasData) {
822 skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
823 }
824 }
825
826 if (mCurrentBucketIsSkipped) {
827 mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
828 mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTimeNs;
829 mSkippedBuckets.push_back(mCurrentSkippedBucket);
830 }
831
832 // This means that the current bucket was not flushed before a forced bucket split.
833 // This can happen if an app update or a dump report with includeCurrentPartialBucket is
834 // requested before we get a chance to flush the bucket due to receiving new data, either from
835 // the statsd socket or the StatsPullerManager.
836 if (bucketEndTimeNs < nextBucketStartTimeNs) {
837 SkippedBucket bucketInGap;
838 bucketInGap.bucketStartTimeNs = bucketEndTimeNs;
839 bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
840 bucketInGap.dropEvents.emplace_back(buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA));
841 mSkippedBuckets.emplace_back(bucketInGap);
842 }
843 }
844
845 template <typename AggregatedValue, typename DimExtras>
initNextSlicedBucket(int64_t nextBucketStartTimeNs)846 void ValueMetricProducer<AggregatedValue, DimExtras>::initNextSlicedBucket(
847 int64_t nextBucketStartTimeNs) {
848 StatsdStats::getInstance().noteBucketCount(mMetricId);
849 if (mSlicedStateAtoms.empty()) {
850 mCurrentSlicedBucket.clear();
851 } else {
852 for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
853 bool obsolete = true;
854 for (auto& interval : it->second.intervals) {
855 interval.sampleSize = 0;
856 }
857
858 // When slicing by state, only delete the MetricDimensionKey when the
859 // state key in the MetricDimensionKey is not the current state key.
860 const HashableDimensionKey& dimensionInWhatKey = it->first.getDimensionKeyInWhat();
861 const auto& currentDimInfoItr = mDimInfos.find(dimensionInWhatKey);
862
863 if ((currentDimInfoItr != mDimInfos.end()) &&
864 (it->first.getStateValuesKey() == currentDimInfoItr->second.currentState)) {
865 obsolete = false;
866 }
867 if (obsolete) {
868 it = mCurrentSlicedBucket.erase(it);
869 } else {
870 it++;
871 }
872 }
873 }
874 for (auto it = mDimInfos.begin(); it != mDimInfos.end();) {
875 if (!it->second.seenNewData) {
876 it = mDimInfos.erase(it);
877 } else {
878 it->second.seenNewData = false;
879 it++;
880 }
881 }
882
883 mCurrentBucketIsSkipped = false;
884 mCurrentSkippedBucket.reset();
885
886 mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
887 // Reset mHasHitGuardrail boolean since bucket was reset
888 mHasHitGuardrail = false;
889 VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
890 (long long)mCurrentBucketStartTimeNs);
891 }
892
893 // Explicit template instantiations
894 template class ValueMetricProducer<Value, vector<optional<Value>>>;
895 template class ValueMetricProducer<unique_ptr<KllQuantile>, Empty>;
896
897 } // namespace statsd
898 } // namespace os
899 } // namespace android
900