1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define STATSD_DEBUG false // STOPSHIP if true
18 #include "Log.h"
19
20 #include "ValueMetricProducer.h"
21
22 #include <kll.h>
23 #include <limits.h>
24 #include <stdlib.h>
25
26 #include "FieldValue.h"
27 #include "HashableDimensionKey.h"
28 #include "guardrail/StatsdStats.h"
29 #include "metrics/parsing_utils/metrics_manager_util.h"
30 #include "stats_log_util.h"
31 #include "stats_util.h"
32
33 using android::util::FIELD_COUNT_REPEATED;
34 using android::util::FIELD_TYPE_BOOL;
35 using android::util::FIELD_TYPE_INT32;
36 using android::util::FIELD_TYPE_INT64;
37 using android::util::FIELD_TYPE_MESSAGE;
38 using android::util::ProtoOutputStream;
39 using dist_proc::aggregation::KllQuantile;
40 using std::optional;
41 using std::shared_ptr;
42 using std::unique_ptr;
43 using std::unordered_map;
44 using std::vector;
45
46 namespace android {
47 namespace os {
48 namespace statsd {
49
50 // for StatsLogReport
51 const int FIELD_ID_ID = 1;
52 const int FIELD_ID_TIME_BASE = 9;
53 const int FIELD_ID_BUCKET_SIZE = 10;
54 const int FIELD_ID_DIMENSION_PATH_IN_WHAT = 11;
55 const int FIELD_ID_IS_ACTIVE = 14;
56 // for *MetricDataWrapper
57 const int FIELD_ID_DATA = 1;
58 const int FIELD_ID_SKIPPED = 2;
59 // for SkippedBuckets
60 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
61 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
62 const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
63 // for DumpEvent Proto
64 const int FIELD_ID_BUCKET_DROP_REASON = 1;
65 const int FIELD_ID_DROP_TIME = 2;
66 // for *MetricData
67 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
68 const int FIELD_ID_BUCKET_INFO = 3;
69 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
70 const int FIELD_ID_SLICE_BY_STATE = 6;
71
72 template <typename AggregatedValue, typename DimExtras>
ValueMetricProducer(const int64_t & metricId,const ConfigKey & key,const uint64_t protoHash,const PullOptions & pullOptions,const BucketOptions & bucketOptions,const WhatOptions & whatOptions,const ConditionOptions & conditionOptions,const StateOptions & stateOptions,const ActivationOptions & activationOptions,const GuardrailOptions & guardrailOptions)73 ValueMetricProducer<AggregatedValue, DimExtras>::ValueMetricProducer(
74 const int64_t& metricId, const ConfigKey& key, const uint64_t protoHash,
75 const PullOptions& pullOptions, const BucketOptions& bucketOptions,
76 const WhatOptions& whatOptions, const ConditionOptions& conditionOptions,
77 const StateOptions& stateOptions, const ActivationOptions& activationOptions,
78 const GuardrailOptions& guardrailOptions)
79 : MetricProducer(metricId, key, bucketOptions.timeBaseNs, conditionOptions.conditionIndex,
80 conditionOptions.initialConditionCache, conditionOptions.conditionWizard,
81 protoHash, activationOptions.eventActivationMap,
82 activationOptions.eventDeactivationMap, stateOptions.slicedStateAtoms,
83 stateOptions.stateGroupMap, bucketOptions.splitBucketForAppUpgrade),
84 mWhatMatcherIndex(whatOptions.whatMatcherIndex),
85 mEventMatcherWizard(whatOptions.matcherWizard),
86 mPullerManager(pullOptions.pullerManager),
87 mFieldMatchers(whatOptions.fieldMatchers),
88 mPullAtomId(pullOptions.pullAtomId),
89 mMinBucketSizeNs(bucketOptions.minBucketSizeNs),
90 mDimensionSoftLimit(guardrailOptions.dimensionSoftLimit),
91 mDimensionHardLimit(guardrailOptions.dimensionHardLimit),
92 mCurrentBucketIsSkipped(false),
93 // Condition timer will be set later within the constructor after pulling events
94 mConditionTimer(false, bucketOptions.timeBaseNs),
95 mConditionCorrectionThresholdNs(bucketOptions.conditionCorrectionThresholdNs) {
96 // TODO(b/185722221): inject directly via initializer list in MetricProducer.
97 mBucketSizeNs = bucketOptions.bucketSizeNs;
98
99 // TODO(b/185770171): inject dimensionsInWhat related fields via constructor.
100 if (whatOptions.dimensionsInWhat.field() > 0) {
101 translateFieldMatcher(whatOptions.dimensionsInWhat, &mDimensionsInWhat);
102 }
103 mContainANYPositionInDimensionsInWhat = whatOptions.containsAnyPositionInDimensionsInWhat;
104 mShouldUseNestedDimensions = whatOptions.shouldUseNestedDimensions;
105
106 if (conditionOptions.conditionLinks.size() > 0) {
107 for (const auto& link : conditionOptions.conditionLinks) {
108 Metric2Condition mc;
109 mc.conditionId = link.condition();
110 translateFieldMatcher(link.fields_in_what(), &mc.metricFields);
111 translateFieldMatcher(link.fields_in_condition(), &mc.conditionFields);
112 mMetric2ConditionLinks.push_back(mc);
113 }
114
115 // TODO(b/185770739): use !mMetric2ConditionLinks.empty() instead
116 mConditionSliced = true;
117 }
118
119 for (const auto& stateLink : stateOptions.stateLinks) {
120 Metric2State ms;
121 ms.stateAtomId = stateLink.state_atom_id();
122 translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields);
123 translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields);
124 mMetric2StateLinks.push_back(ms);
125 }
126
127 const int64_t numBucketsForward = calcBucketsForwardCount(bucketOptions.startTimeNs);
128 mCurrentBucketNum = numBucketsForward;
129
130 flushIfNeededLocked(bucketOptions.startTimeNs);
131
132 if (isPulled()) {
133 mPullerManager->RegisterReceiver(mPullAtomId, mConfigKey, this, getCurrentBucketEndTimeNs(),
134 mBucketSizeNs);
135 }
136
137 // Only do this for partial buckets like first bucket. All other buckets should use
138 // flushIfNeeded to adjust start and end to bucket boundaries.
139 // Adjust start for partial bucket
140 mCurrentBucketStartTimeNs = bucketOptions.startTimeNs;
141 mConditionTimer.newBucketStart(mCurrentBucketStartTimeNs, mCurrentBucketStartTimeNs);
142
143 // Now that activations are processed, start the condition timer if needed.
144 mConditionTimer.onConditionChanged(mIsActive && mCondition == ConditionState::kTrue,
145 mCurrentBucketStartTimeNs);
146 }
147
148 template <typename AggregatedValue, typename DimExtras>
~ValueMetricProducer()149 ValueMetricProducer<AggregatedValue, DimExtras>::~ValueMetricProducer() {
150 VLOG("~ValueMetricProducer() called");
151 if (isPulled()) {
152 mPullerManager->UnRegisterReceiver(mPullAtomId, mConfigKey, this);
153 }
154 }
155
156 template <typename AggregatedValue, typename DimExtras>
onStatsdInitCompleted(const int64_t & eventTimeNs)157 void ValueMetricProducer<AggregatedValue, DimExtras>::onStatsdInitCompleted(
158 const int64_t& eventTimeNs) {
159 lock_guard<mutex> lock(mMutex);
160
161 // TODO(b/188837487): Add mIsActive check
162
163 if (isPulled() && mCondition == ConditionState::kTrue) {
164 pullAndMatchEventsLocked(eventTimeNs);
165 }
166 flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
167 }
168
169 template <typename AggregatedValue, typename DimExtras>
notifyAppUpgradeInternalLocked(const int64_t eventTimeNs)170 void ValueMetricProducer<AggregatedValue, DimExtras>::notifyAppUpgradeInternalLocked(
171 const int64_t eventTimeNs) {
172 // TODO(b/188837487): Add mIsActive check
173 if (isPulled() && mCondition == ConditionState::kTrue) {
174 pullAndMatchEventsLocked(eventTimeNs);
175 }
176 flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
177 }
178
179 template <typename AggregatedValue, typename DimExtras>
onConfigUpdatedLocked(const StatsdConfig & config,const int configIndex,const int metricIndex,const vector<sp<AtomMatchingTracker>> & allAtomMatchingTrackers,const unordered_map<int64_t,int> & oldAtomMatchingTrackerMap,const unordered_map<int64_t,int> & newAtomMatchingTrackerMap,const sp<EventMatcherWizard> & matcherWizard,const vector<sp<ConditionTracker>> & allConditionTrackers,const unordered_map<int64_t,int> & conditionTrackerMap,const sp<ConditionWizard> & wizard,const unordered_map<int64_t,int> & metricToActivationMap,unordered_map<int,vector<int>> & trackerToMetricMap,unordered_map<int,vector<int>> & conditionToMetricMap,unordered_map<int,vector<int>> & activationAtomTrackerToMetricMap,unordered_map<int,vector<int>> & deactivationAtomTrackerToMetricMap,vector<int> & metricsWithActivation)180 bool ValueMetricProducer<AggregatedValue, DimExtras>::onConfigUpdatedLocked(
181 const StatsdConfig& config, const int configIndex, const int metricIndex,
182 const vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers,
183 const unordered_map<int64_t, int>& oldAtomMatchingTrackerMap,
184 const unordered_map<int64_t, int>& newAtomMatchingTrackerMap,
185 const sp<EventMatcherWizard>& matcherWizard,
186 const vector<sp<ConditionTracker>>& allConditionTrackers,
187 const unordered_map<int64_t, int>& conditionTrackerMap, const sp<ConditionWizard>& wizard,
188 const unordered_map<int64_t, int>& metricToActivationMap,
189 unordered_map<int, vector<int>>& trackerToMetricMap,
190 unordered_map<int, vector<int>>& conditionToMetricMap,
191 unordered_map<int, vector<int>>& activationAtomTrackerToMetricMap,
192 unordered_map<int, vector<int>>& deactivationAtomTrackerToMetricMap,
193 vector<int>& metricsWithActivation) {
194 if (!MetricProducer::onConfigUpdatedLocked(
195 config, configIndex, metricIndex, allAtomMatchingTrackers,
196 oldAtomMatchingTrackerMap, newAtomMatchingTrackerMap, matcherWizard,
197 allConditionTrackers, conditionTrackerMap, wizard, metricToActivationMap,
198 trackerToMetricMap, conditionToMetricMap, activationAtomTrackerToMetricMap,
199 deactivationAtomTrackerToMetricMap, metricsWithActivation)) {
200 return false;
201 }
202
203 // Update appropriate indices: mWhatMatcherIndex, mConditionIndex and MetricsManager maps.
204 const int64_t atomMatcherId = getWhatAtomMatcherIdForMetric(config, configIndex);
205 if (!handleMetricWithAtomMatchingTrackers(atomMatcherId, metricIndex, /*enforceOneAtom=*/false,
206 allAtomMatchingTrackers, newAtomMatchingTrackerMap,
207 trackerToMetricMap, mWhatMatcherIndex)) {
208 return false;
209 }
210
211 const optional<int64_t>& conditionIdOpt = getConditionIdForMetric(config, configIndex);
212 const ConditionLinks& conditionLinks = getConditionLinksForMetric(config, configIndex);
213 if (conditionIdOpt.has_value() &&
214 !handleMetricWithConditions(conditionIdOpt.value(), metricIndex, conditionTrackerMap,
215 conditionLinks, allConditionTrackers, mConditionTrackerIndex,
216 conditionToMetricMap)) {
217 return false;
218 }
219
220 sp<EventMatcherWizard> tmpEventWizard = mEventMatcherWizard;
221 mEventMatcherWizard = matcherWizard;
222 return true;
223 }
224
225 template <typename AggregatedValue, typename DimExtras>
onStateChanged(int64_t eventTimeNs,int32_t atomId,const HashableDimensionKey & primaryKey,const FieldValue & oldState,const FieldValue & newState)226 void ValueMetricProducer<AggregatedValue, DimExtras>::onStateChanged(
227 int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey,
228 const FieldValue& oldState, const FieldValue& newState) {
229 // TODO(b/189353769): Acquire lock.
230 VLOG("ValueMetricProducer %lld onStateChanged time %lld, State %d, key %s, %d -> %d",
231 (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(),
232 oldState.mValue.int_value, newState.mValue.int_value);
233
234 FieldValue oldStateCopy = oldState;
235 FieldValue newStateCopy = newState;
236 mapStateValue(atomId, &oldStateCopy);
237 mapStateValue(atomId, &newStateCopy);
238
239 // If old and new states are in the same StateGroup, then we do not need to
240 // pull for this state change.
241 if (oldStateCopy == newStateCopy) {
242 return;
243 }
244
245 // If condition is not true or metric is not active, we do not need to pull
246 // for this state change.
247 if (mCondition != ConditionState::kTrue || !mIsActive) {
248 return;
249 }
250
251 if (isEventLateLocked(eventTimeNs)) {
252 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
253 (long long)mCurrentBucketStartTimeNs);
254 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
255 return;
256 }
257
258 if (isPulled()) {
259 mStateChangePrimaryKey.first = atomId;
260 mStateChangePrimaryKey.second = primaryKey;
261 // TODO(b/185796114): pass mStateChangePrimaryKey as an argument to
262 // pullAndMatchEventsLocked
263 pullAndMatchEventsLocked(eventTimeNs);
264 mStateChangePrimaryKey.first = 0;
265 mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY;
266 }
267 flushIfNeededLocked(eventTimeNs);
268 }
269
270 template <typename AggregatedValue, typename DimExtras>
onSlicedConditionMayChangeLocked(bool overallCondition,const int64_t eventTime)271 void ValueMetricProducer<AggregatedValue, DimExtras>::onSlicedConditionMayChangeLocked(
272 bool overallCondition, const int64_t eventTime) {
273 VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
274 }
275
276 template <typename AggregatedValue, typename DimExtras>
dropDataLocked(const int64_t dropTimeNs)277 void ValueMetricProducer<AggregatedValue, DimExtras>::dropDataLocked(const int64_t dropTimeNs) {
278 StatsdStats::getInstance().noteBucketDropped(mMetricId);
279
280 // The current partial bucket is not flushed and does not require a pull,
281 // so the data is still valid.
282 flushIfNeededLocked(dropTimeNs);
283 clearPastBucketsLocked(dropTimeNs);
284 }
285
286 template <typename AggregatedValue, typename DimExtras>
clearPastBucketsLocked(const int64_t dumpTimeNs)287 void ValueMetricProducer<AggregatedValue, DimExtras>::clearPastBucketsLocked(
288 const int64_t dumpTimeNs) {
289 mPastBuckets.clear();
290 mSkippedBuckets.clear();
291 }
292
293 template <typename AggregatedValue, typename DimExtras>
onDumpReportLocked(const int64_t dumpTimeNs,const bool includeCurrentPartialBucket,const bool eraseData,const DumpLatency dumpLatency,set<string> * strSet,ProtoOutputStream * protoOutput)294 void ValueMetricProducer<AggregatedValue, DimExtras>::onDumpReportLocked(
295 const int64_t dumpTimeNs, const bool includeCurrentPartialBucket, const bool eraseData,
296 const DumpLatency dumpLatency, set<string>* strSet, ProtoOutputStream* protoOutput) {
297 VLOG("metric %lld dump report now...", (long long)mMetricId);
298
299 // TODO(b/188837487): Add mIsActive check
300
301 if (includeCurrentPartialBucket) {
302 // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
303 // current bucket will have incomplete data and the next will have the wrong snapshot to do
304 // a diff against. If the condition is false, we are fine since the base data is reset and
305 // we are not tracking anything.
306 if (isPulled() && mCondition == ConditionState::kTrue) {
307 switch (dumpLatency) {
308 case FAST:
309 invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED);
310 break;
311 case NO_TIME_CONSTRAINTS:
312 pullAndMatchEventsLocked(dumpTimeNs);
313 break;
314 }
315 }
316 flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
317 }
318
319 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
320 protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
321
322 if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
323 return;
324 }
325 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_TIME_BASE, (long long)mTimeBaseNs);
326 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_SIZE, (long long)mBucketSizeNs);
327 // Fills the dimension path if not slicing by a primitive repeated field or position ALL.
328 if (!mShouldUseNestedDimensions) {
329 if (!mDimensionsInWhat.empty()) {
330 uint64_t dimenPathToken =
331 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_PATH_IN_WHAT);
332 writeDimensionPathToProto(mDimensionsInWhat, protoOutput);
333 protoOutput->end(dimenPathToken);
334 }
335 }
336
337 const auto& [metricTypeFieldId, bucketNumFieldId, startBucketMsFieldId, endBucketMsFieldId,
338 conditionTrueNsFieldId,
339 conditionCorrectionNsFieldId] = getDumpProtoFields();
340
341 uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | metricTypeFieldId);
342
343 for (const auto& skippedBucket : mSkippedBuckets) {
344 uint64_t wrapperToken =
345 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
346 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
347 (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
348 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
349 (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
350 for (const auto& dropEvent : skippedBucket.dropEvents) {
351 uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
352 FIELD_ID_SKIPPED_DROP_EVENT);
353 protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
354 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME,
355 (long long)(NanoToMillis(dropEvent.dropTimeNs)));
356 protoOutput->end(dropEventToken);
357 }
358 protoOutput->end(wrapperToken);
359 }
360
361 for (const auto& [metricDimensionKey, buckets] : mPastBuckets) {
362 VLOG(" dimension key %s", metricDimensionKey.toString().c_str());
363 uint64_t wrapperToken =
364 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
365
366 // First fill dimension.
367 if (mShouldUseNestedDimensions) {
368 uint64_t dimensionToken =
369 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION_IN_WHAT);
370 writeDimensionToProto(metricDimensionKey.getDimensionKeyInWhat(), strSet, protoOutput);
371 protoOutput->end(dimensionToken);
372 } else {
373 writeDimensionLeafNodesToProto(metricDimensionKey.getDimensionKeyInWhat(),
374 FIELD_ID_DIMENSION_LEAF_IN_WHAT, strSet, protoOutput);
375 }
376
377 // Then fill slice_by_state.
378 for (auto state : metricDimensionKey.getStateValuesKey().getValues()) {
379 uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
380 FIELD_ID_SLICE_BY_STATE);
381 writeStateToProto(state, protoOutput);
382 protoOutput->end(stateToken);
383 }
384
385 // Then fill bucket_info (*BucketInfo).
386 for (const auto& bucket : buckets) {
387 uint64_t bucketInfoToken = protoOutput->start(
388 FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
389
390 if (bucket.mBucketEndNs - bucket.mBucketStartNs != mBucketSizeNs) {
391 protoOutput->write(FIELD_TYPE_INT64 | startBucketMsFieldId,
392 (long long)NanoToMillis(bucket.mBucketStartNs));
393 protoOutput->write(FIELD_TYPE_INT64 | endBucketMsFieldId,
394 (long long)NanoToMillis(bucket.mBucketEndNs));
395 } else {
396 protoOutput->write(FIELD_TYPE_INT64 | bucketNumFieldId,
397 (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
398 }
399 // We only write the condition timer value if the metric has a
400 // condition and/or is sliced by state.
401 // If the metric is sliced by state, the condition timer value is
402 // also sliced by state to reflect time spent in that state.
403 if (mConditionTrackerIndex >= 0 || !mSlicedStateAtoms.empty()) {
404 protoOutput->write(FIELD_TYPE_INT64 | conditionTrueNsFieldId,
405 (long long)bucket.mConditionTrueNs);
406 }
407
408 if (conditionCorrectionNsFieldId) {
409 // We write the condition correction value when below conditions are true:
410 // - if metric is pulled
411 // - if it is enabled by metric configuration via dedicated field,
412 // see condition_correction_threshold_nanos
413 // - if the abs(value) >= condition_correction_threshold_nanos
414
415 if (isPulled() && mConditionCorrectionThresholdNs &&
416 (abs(bucket.mConditionCorrectionNs) >= mConditionCorrectionThresholdNs)) {
417 protoOutput->write(FIELD_TYPE_INT64 | conditionCorrectionNsFieldId.value(),
418 (long long)bucket.mConditionCorrectionNs);
419 }
420 }
421
422 for (int i = 0; i < (int)bucket.aggIndex.size(); i++) {
423 VLOG("\t bucket [%lld - %lld]", (long long)bucket.mBucketStartNs,
424 (long long)bucket.mBucketEndNs);
425 writePastBucketAggregateToProto(bucket.aggIndex[i], bucket.aggregates[i],
426 protoOutput);
427 }
428 protoOutput->end(bucketInfoToken);
429 }
430 protoOutput->end(wrapperToken);
431 }
432 protoOutput->end(protoToken);
433
434 VLOG("metric %lld done with dump report...", (long long)mMetricId);
435 if (eraseData) {
436 mPastBuckets.clear();
437 mSkippedBuckets.clear();
438 }
439 }
440
441 template <typename AggregatedValue, typename DimExtras>
invalidateCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)442 void ValueMetricProducer<AggregatedValue, DimExtras>::invalidateCurrentBucket(
443 const int64_t dropTimeNs, const BucketDropReason reason) {
444 if (!mCurrentBucketIsSkipped) {
445 // Only report to StatsdStats once per invalid bucket.
446 StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
447 }
448
449 skipCurrentBucket(dropTimeNs, reason);
450 }
451
452 template <typename AggregatedValue, typename DimExtras>
skipCurrentBucket(const int64_t dropTimeNs,const BucketDropReason reason)453 void ValueMetricProducer<AggregatedValue, DimExtras>::skipCurrentBucket(
454 const int64_t dropTimeNs, const BucketDropReason reason) {
455 if (!maxDropEventsReached()) {
456 mCurrentSkippedBucket.dropEvents.push_back(buildDropEvent(dropTimeNs, reason));
457 }
458 mCurrentBucketIsSkipped = true;
459 }
460
461 // Handle active state change. Active state change is treated like a condition change:
462 // - drop bucket if active state change event arrives too late
463 // - if condition is true, pull data on active state changes
464 // - ConditionTimer tracks changes based on AND of condition and active state.
465 template <typename AggregatedValue, typename DimExtras>
onActiveStateChangedLocked(const int64_t eventTimeNs)466 void ValueMetricProducer<AggregatedValue, DimExtras>::onActiveStateChangedLocked(
467 const int64_t eventTimeNs) {
468 const bool eventLate = isEventLateLocked(eventTimeNs);
469 if (eventLate) {
470 // Drop bucket because event arrived too late, ie. we are missing data for this bucket.
471 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
472 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
473 }
474
475 // Call parent method once we've verified the validity of current bucket.
476 MetricProducer::onActiveStateChangedLocked(eventTimeNs);
477
478 if (ConditionState::kTrue != mCondition) {
479 return;
480 }
481
482 // Pull on active state changes.
483 if (!eventLate) {
484 if (isPulled()) {
485 pullAndMatchEventsLocked(eventTimeNs);
486 }
487
488 onActiveStateChangedInternalLocked(eventTimeNs);
489 }
490
491 flushIfNeededLocked(eventTimeNs);
492
493 // Let condition timer know of new active state.
494 mConditionTimer.onConditionChanged(mIsActive, eventTimeNs);
495
496 updateCurrentSlicedBucketConditionTimers(mIsActive, eventTimeNs);
497 }
498
499 template <typename AggregatedValue, typename DimExtras>
onConditionChangedLocked(const bool condition,const int64_t eventTimeNs)500 void ValueMetricProducer<AggregatedValue, DimExtras>::onConditionChangedLocked(
501 const bool condition, const int64_t eventTimeNs) {
502 const bool eventLate = isEventLateLocked(eventTimeNs);
503
504 const ConditionState newCondition = eventLate ? ConditionState::kUnknown
505 : condition ? ConditionState::kTrue
506 : ConditionState::kFalse;
507 const ConditionState oldCondition = mCondition;
508
509 if (!mIsActive) {
510 mCondition = newCondition;
511 return;
512 }
513
514 // If the event arrived late, mark the bucket as invalid and skip the event.
515 if (eventLate) {
516 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
517 (long long)mCurrentBucketStartTimeNs);
518 StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
519 StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
520 invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
521 mCondition = newCondition;
522 mConditionTimer.onConditionChanged(newCondition, eventTimeNs);
523 updateCurrentSlicedBucketConditionTimers(newCondition, eventTimeNs);
524 return;
525 }
526
527 // If the previous condition was unknown, mark the bucket as invalid
528 // because the bucket will contain partial data. For example, the condition
529 // change might happen close to the end of the bucket and we might miss a
530 // lot of data.
531 // We still want to pull to set the base for diffed metrics.
532 if (oldCondition == ConditionState::kUnknown) {
533 invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
534 }
535
536 // Pull and match for the following condition change cases:
537 // unknown/false -> true - condition changed
538 // true -> false - condition changed
539 // true -> true - old condition was true so we can flush the bucket at the
540 // end if needed.
541 //
542 // We don’t need to pull for unknown -> false or false -> false.
543 //
544 // onConditionChangedLocked might happen on bucket boundaries if this is
545 // called before #onDataPulled.
546 if (isPulled() &&
547 (newCondition == ConditionState::kTrue || oldCondition == ConditionState::kTrue)) {
548 pullAndMatchEventsLocked(eventTimeNs);
549 }
550
551 onConditionChangedInternalLocked(oldCondition, newCondition, eventTimeNs);
552
553 // Update condition state after pulling.
554 mCondition = newCondition;
555
556 flushIfNeededLocked(eventTimeNs);
557
558 mConditionTimer.onConditionChanged(newCondition, eventTimeNs);
559 updateCurrentSlicedBucketConditionTimers(newCondition, eventTimeNs);
560 }
561
562 template <typename AggregatedValue, typename DimExtras>
updateCurrentSlicedBucketConditionTimers(bool newCondition,int64_t eventTimeNs)563 void ValueMetricProducer<AggregatedValue, DimExtras>::updateCurrentSlicedBucketConditionTimers(
564 bool newCondition, int64_t eventTimeNs) {
565 if (mSlicedStateAtoms.empty()) {
566 return;
567 }
568
569 // Utilize the current state key of each DimensionsInWhat key to determine
570 // which condition timers to update.
571 //
572 // Assumes that the MetricDimensionKey exists in `mCurrentSlicedBucket`.
573 for (const auto& [dimensionInWhatKey, dimensionInWhatInfo] : mDimInfos) {
574 // If the new condition is true, turn ON the condition timer only if
575 // the DimensionInWhat key was present in the data.
576 mCurrentSlicedBucket[MetricDimensionKey(dimensionInWhatKey,
577 dimensionInWhatInfo.currentState)]
578 .conditionTimer.onConditionChanged(
579 newCondition && dimensionInWhatInfo.hasCurrentState, eventTimeNs);
580 }
581 }
582
583 template <typename AggregatedValue, typename DimExtras>
dumpStatesLocked(FILE * out,bool verbose) const584 void ValueMetricProducer<AggregatedValue, DimExtras>::dumpStatesLocked(FILE* out,
585 bool verbose) const {
586 if (mCurrentSlicedBucket.size() == 0) {
587 return;
588 }
589
590 fprintf(out, "ValueMetricProducer %lld dimension size %lu\n", (long long)mMetricId,
591 (unsigned long)mCurrentSlicedBucket.size());
592 if (verbose) {
593 for (const auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
594 for (const Interval& interval : currentBucket.intervals) {
595 fprintf(out, "\t(what)%s\t(states)%s (aggregate)%s\n",
596 metricDimensionKey.getDimensionKeyInWhat().toString().c_str(),
597 metricDimensionKey.getStateValuesKey().toString().c_str(),
598 aggregatedValueToString(interval.aggregate).c_str());
599 }
600 }
601 }
602 }
603
604 template <typename AggregatedValue, typename DimExtras>
hasReachedGuardRailLimit() const605 bool ValueMetricProducer<AggregatedValue, DimExtras>::hasReachedGuardRailLimit() const {
606 return mCurrentSlicedBucket.size() >= mDimensionHardLimit;
607 }
608
609 template <typename AggregatedValue, typename DimExtras>
hitGuardRailLocked(const MetricDimensionKey & newKey) const610 bool ValueMetricProducer<AggregatedValue, DimExtras>::hitGuardRailLocked(
611 const MetricDimensionKey& newKey) const {
612 // ===========GuardRail==============
613 // 1. Report the tuple count if the tuple count > soft limit
614 if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
615 return false;
616 }
617 if (mCurrentSlicedBucket.size() > mDimensionSoftLimit - 1) {
618 size_t newTupleCount = mCurrentSlicedBucket.size() + 1;
619 StatsdStats::getInstance().noteMetricDimensionSize(mConfigKey, mMetricId, newTupleCount);
620 // 2. Don't add more tuples, we are above the allowed threshold. Drop the data.
621 if (hasReachedGuardRailLimit()) {
622 ALOGE("ValueMetricProducer %lld dropping data for dimension key %s",
623 (long long)mMetricId, newKey.toString().c_str());
624 StatsdStats::getInstance().noteHardDimensionLimitReached(mMetricId);
625 return true;
626 }
627 }
628
629 return false;
630 }
631
632 template <typename AggregatedValue, typename DimExtras>
onMatchedLogEventInternalLocked(const size_t matcherIndex,const MetricDimensionKey & eventKey,const ConditionKey & conditionKey,bool condition,const LogEvent & event,const map<int,HashableDimensionKey> & statePrimaryKeys)633 void ValueMetricProducer<AggregatedValue, DimExtras>::onMatchedLogEventInternalLocked(
634 const size_t matcherIndex, const MetricDimensionKey& eventKey,
635 const ConditionKey& conditionKey, bool condition, const LogEvent& event,
636 const map<int, HashableDimensionKey>& statePrimaryKeys) {
637 // Skip this event if a state change occurred for a different primary key.
638 auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first);
639 // Check that both the atom id and the primary key are equal.
640 if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) {
641 VLOG("ValueMetric skip event with primary key %s because state change primary key "
642 "is %s",
643 it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str());
644 return;
645 }
646
647 const int64_t eventTimeNs = event.GetElapsedTimestampNs();
648 if (isEventLateLocked(eventTimeNs)) {
649 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
650 (long long)mCurrentBucketStartTimeNs);
651 return;
652 }
653
654 const auto whatKey = eventKey.getDimensionKeyInWhat();
655 mMatchedMetricDimensionKeys.insert(whatKey);
656
657 if (!isPulled()) {
658 // Only flushing for pushed because for pulled metrics, we need to do a pull first.
659 flushIfNeededLocked(eventTimeNs);
660 }
661
662 if (canSkipLogEventLocked(eventKey, condition, eventTimeNs, statePrimaryKeys)) {
663 return;
664 }
665
666 if (hitGuardRailLocked(eventKey)) {
667 return;
668 }
669
670 const auto& returnVal = mDimInfos.emplace(whatKey, DimensionsInWhatInfo(getUnknownStateKey()));
671 DimensionsInWhatInfo& dimensionsInWhatInfo = returnVal.first->second;
672 const HashableDimensionKey& oldStateKey = dimensionsInWhatInfo.currentState;
673 CurrentBucket& currentBucket = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)];
674
675 // Ensure we turn on the condition timer in the case where dimensions
676 // were missing on a previous pull due to a state change.
677 const auto stateKey = eventKey.getStateValuesKey();
678 const bool stateChange = oldStateKey != stateKey || !dimensionsInWhatInfo.hasCurrentState;
679
680 // We need to get the intervals stored with the previous state key so we can
681 // close these value intervals.
682 vector<Interval>& intervals = currentBucket.intervals;
683 if (intervals.size() < mFieldMatchers.size()) {
684 VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
685 intervals.resize(mFieldMatchers.size());
686 }
687
688 dimensionsInWhatInfo.hasCurrentState = true;
689 dimensionsInWhatInfo.currentState = stateKey;
690
691 dimensionsInWhatInfo.seenNewData |= aggregateFields(eventTimeNs, eventKey, event, intervals,
692 dimensionsInWhatInfo.dimExtras);
693
694 // State change.
695 if (!mSlicedStateAtoms.empty() && stateChange) {
696 // Turn OFF the condition timer for the previous state key.
697 currentBucket.conditionTimer.onConditionChanged(false, eventTimeNs);
698
699 // Turn ON the condition timer for the new state key.
700 mCurrentSlicedBucket[MetricDimensionKey(whatKey, stateKey)]
701 .conditionTimer.onConditionChanged(true, eventTimeNs);
702 }
703 }
704
705 // For pulled metrics, we always need to make sure we do a pull before flushing the bucket
706 // if mCondition and mIsActive are true!
707 template <typename AggregatedValue, typename DimExtras>
flushIfNeededLocked(const int64_t & eventTimeNs)708 void ValueMetricProducer<AggregatedValue, DimExtras>::flushIfNeededLocked(
709 const int64_t& eventTimeNs) {
710 const int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
711 if (eventTimeNs < currentBucketEndTimeNs) {
712 VLOG("eventTime is %lld, less than current bucket end time %lld", (long long)eventTimeNs,
713 (long long)(currentBucketEndTimeNs));
714 return;
715 }
716 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
717 int64_t nextBucketStartTimeNs =
718 currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
719 flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
720 }
721
722 template <typename AggregatedValue, typename DimExtras>
calcBucketsForwardCount(const int64_t eventTimeNs) const723 int64_t ValueMetricProducer<AggregatedValue, DimExtras>::calcBucketsForwardCount(
724 const int64_t eventTimeNs) const {
725 int64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
726 if (eventTimeNs < currentBucketEndTimeNs) {
727 return 0;
728 }
729 return 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
730 }
731
732 template <typename AggregatedValue, typename DimExtras>
flushCurrentBucketLocked(const int64_t & eventTimeNs,const int64_t & nextBucketStartTimeNs)733 void ValueMetricProducer<AggregatedValue, DimExtras>::flushCurrentBucketLocked(
734 const int64_t& eventTimeNs, const int64_t& nextBucketStartTimeNs) {
735 if (mCondition == ConditionState::kUnknown) {
736 StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
737 invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
738 }
739
740 VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
741 (int)mCurrentSlicedBucket.size());
742
743 closeCurrentBucket(eventTimeNs, nextBucketStartTimeNs);
744 initNextSlicedBucket(nextBucketStartTimeNs);
745
746 // Update the condition timer again, in case we skipped buckets.
747 mConditionTimer.newBucketStart(eventTimeNs, nextBucketStartTimeNs);
748
749 // NOTE: Update the condition timers in `mCurrentSlicedBucket` only when slicing
750 // by state. Otherwise, the "global" condition timer will be used.
751 if (!mSlicedStateAtoms.empty()) {
752 for (auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
753 currentBucket.conditionTimer.newBucketStart(eventTimeNs, nextBucketStartTimeNs);
754 }
755 }
756 mCurrentBucketNum += calcBucketsForwardCount(eventTimeNs);
757 }
758
759 template <typename AggregatedValue, typename DimExtras>
closeCurrentBucket(const int64_t eventTimeNs,const int64_t nextBucketStartTimeNs)760 void ValueMetricProducer<AggregatedValue, DimExtras>::closeCurrentBucket(
761 const int64_t eventTimeNs, const int64_t nextBucketStartTimeNs) {
762 const int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
763 int64_t bucketEndTimeNs = fullBucketEndTimeNs;
764 int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
765
766 if (multipleBucketsSkipped(numBucketsForward)) {
767 VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
768 StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
769 // Something went wrong. Maybe the device was sleeping for a long time. It is better
770 // to mark the current bucket as invalid. The last pull might have been successful though.
771 invalidateCurrentBucket(eventTimeNs, BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
772
773 // End the bucket at the next bucket start time so the entire interval is skipped.
774 bucketEndTimeNs = nextBucketStartTimeNs;
775 } else if (eventTimeNs < fullBucketEndTimeNs) {
776 bucketEndTimeNs = eventTimeNs;
777 }
778
779 // Close the current bucket
780 const auto [globalConditionDurationNs, globalConditionCorrectionNs] =
781 mConditionTimer.newBucketStart(eventTimeNs, bucketEndTimeNs);
782
783 bool isBucketLargeEnough = bucketEndTimeNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
784 if (!isBucketLargeEnough) {
785 skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
786 }
787 if (!mCurrentBucketIsSkipped) {
788 bool bucketHasData = false;
789 // The current bucket is large enough to keep.
790 for (auto& [metricDimensionKey, currentBucket] : mCurrentSlicedBucket) {
791 PastBucket<AggregatedValue> bucket =
792 buildPartialBucket(bucketEndTimeNs, currentBucket.intervals);
793 if (bucket.aggIndex.empty()) {
794 continue;
795 }
796 bucketHasData = true;
797 if (!mSlicedStateAtoms.empty()) {
798 const auto [conditionDurationNs, conditionCorrectionNs] =
799 currentBucket.conditionTimer.newBucketStart(eventTimeNs, bucketEndTimeNs);
800 bucket.mConditionTrueNs = conditionDurationNs;
801 bucket.mConditionCorrectionNs = conditionCorrectionNs;
802 } else {
803 bucket.mConditionTrueNs = globalConditionDurationNs;
804 bucket.mConditionCorrectionNs = globalConditionCorrectionNs;
805 }
806
807 auto& bucketList = mPastBuckets[metricDimensionKey];
808 bucketList.push_back(std::move(bucket));
809 }
810 if (!bucketHasData) {
811 skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
812 }
813 }
814
815 if (mCurrentBucketIsSkipped) {
816 mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
817 mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTimeNs;
818 mSkippedBuckets.push_back(mCurrentSkippedBucket);
819 }
820
821 // This means that the current bucket was not flushed before a forced bucket split.
822 // This can happen if an app update or a dump report with includeCurrentPartialBucket is
823 // requested before we get a chance to flush the bucket due to receiving new data, either from
824 // the statsd socket or the StatsPullerManager.
825 if (bucketEndTimeNs < nextBucketStartTimeNs) {
826 SkippedBucket bucketInGap;
827 bucketInGap.bucketStartTimeNs = bucketEndTimeNs;
828 bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
829 bucketInGap.dropEvents.emplace_back(buildDropEvent(eventTimeNs, BucketDropReason::NO_DATA));
830 mSkippedBuckets.emplace_back(bucketInGap);
831 }
832 }
833
834 template <typename AggregatedValue, typename DimExtras>
initNextSlicedBucket(int64_t nextBucketStartTimeNs)835 void ValueMetricProducer<AggregatedValue, DimExtras>::initNextSlicedBucket(
836 int64_t nextBucketStartTimeNs) {
837 StatsdStats::getInstance().noteBucketCount(mMetricId);
838 if (mSlicedStateAtoms.empty()) {
839 mCurrentSlicedBucket.clear();
840 } else {
841 for (auto it = mCurrentSlicedBucket.begin(); it != mCurrentSlicedBucket.end();) {
842 bool obsolete = true;
843 for (auto& interval : it->second.intervals) {
844 interval.sampleSize = 0;
845 }
846
847 // When slicing by state, only delete the MetricDimensionKey when the
848 // state key in the MetricDimensionKey is not the current state key.
849 const HashableDimensionKey& dimensionInWhatKey = it->first.getDimensionKeyInWhat();
850 const auto& currentDimInfoItr = mDimInfos.find(dimensionInWhatKey);
851
852 if ((currentDimInfoItr != mDimInfos.end()) &&
853 (it->first.getStateValuesKey() == currentDimInfoItr->second.currentState)) {
854 obsolete = false;
855 }
856 if (obsolete) {
857 it = mCurrentSlicedBucket.erase(it);
858 } else {
859 it++;
860 }
861 }
862 }
863 for (auto it = mDimInfos.begin(); it != mDimInfos.end();) {
864 if (!it->second.seenNewData) {
865 it = mDimInfos.erase(it);
866 } else {
867 it->second.seenNewData = false;
868 it++;
869 }
870 }
871
872 mCurrentBucketIsSkipped = false;
873 mCurrentSkippedBucket.reset();
874
875 mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
876 VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
877 (long long)mCurrentBucketStartTimeNs);
878 }
879
880 // Explicit template instantiations
881 template class ValueMetricProducer<Value, vector<optional<Value>>>;
882 template class ValueMetricProducer<unique_ptr<KllQuantile>, Empty>;
883
884 } // namespace statsd
885 } // namespace os
886 } // namespace android
887