1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <gtest/gtest_prod.h> 20 21 #include <optional> 22 23 #include "FieldValue.h" 24 #include "HashableDimensionKey.h" 25 #include "MetricProducer.h" 26 #include "anomaly/AnomalyTracker.h" 27 #include "condition/ConditionTracker.h" 28 #include "external/PullDataReceiver.h" 29 #include "external/StatsPullerManager.h" 30 #include "matchers/EventMatcherWizard.h" 31 #include "src/statsd_config.pb.h" 32 #include "stats_log_util.h" 33 #include "stats_util.h" 34 35 namespace android { 36 namespace os { 37 namespace statsd { 38 39 template <typename AggregatedValue> 40 struct PastBucket { 41 int64_t mBucketStartNs; 42 int64_t mBucketEndNs; 43 std::vector<int> aggIndex; 44 std::vector<AggregatedValue> aggregates; 45 std::vector<int> sampleSizes; 46 47 /** 48 * If the metric has no condition, then this field is just wasted. 49 * When we tune statsd memory usage in the future, this is a candidate to optimize. 50 */ 51 int64_t mConditionTrueNs; 52 53 /** 54 * The semantic is the value which needs to be applied to mConditionTrueNs for correction 55 * to be performed prior normalization calculation on the user (read server) side. Applied only 56 * to ValueMetrics with pulled atoms. 57 */ 58 int64_t mConditionCorrectionNs; 59 }; 60 61 // Aggregates values within buckets. 62 // 63 // There are different events that might complete a bucket 64 // - a condition change 65 // - an app upgrade 66 // - an alarm set to the end of the bucket 67 template <typename AggregatedValue, typename DimExtras> 68 class ValueMetricProducer : public MetricProducer, public virtual PullDataReceiver { 69 public: 70 struct PullOptions { 71 const int pullAtomId; 72 const sp<StatsPullerManager>& pullerManager; 73 }; 74 75 struct BucketOptions { 76 const int64_t timeBaseNs; 77 const int64_t startTimeNs; 78 const int64_t bucketSizeNs; 79 const int64_t minBucketSizeNs; 80 const optional<int64_t> conditionCorrectionThresholdNs; 81 const optional<bool> splitBucketForAppUpgrade; 82 }; 83 84 struct WhatOptions { 85 const bool containsAnyPositionInDimensionsInWhat; 86 const bool shouldUseNestedDimensions; 87 const int whatMatcherIndex; 88 const sp<EventMatcherWizard>& matcherWizard; 89 const FieldMatcher& dimensionsInWhat; 90 const vector<Matcher>& fieldMatchers; 91 }; 92 93 struct ConditionOptions { 94 const int conditionIndex; 95 const ConditionLinks& conditionLinks; 96 const vector<ConditionState>& initialConditionCache; 97 const sp<ConditionWizard>& conditionWizard; 98 }; 99 100 struct StateOptions { 101 const StateLinks& stateLinks; 102 const vector<int>& slicedStateAtoms; 103 const unordered_map<int, unordered_map<int, int64_t>>& stateGroupMap; 104 }; 105 106 struct ActivationOptions { 107 const std::unordered_map<int, std::shared_ptr<Activation>>& eventActivationMap; 108 const std::unordered_map<int, std::vector<std::shared_ptr<Activation>>>& 109 eventDeactivationMap; 110 }; 111 112 struct GuardrailOptions { 113 const size_t dimensionSoftLimit; 114 const size_t dimensionHardLimit; 115 }; 116 117 virtual ~ValueMetricProducer(); 118 119 // Process data pulled on bucket boundary. onDataPulled(const std::vector<std::shared_ptr<LogEvent>> & data,PullResult pullResult,int64_t originalPullTimeNs)120 virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, 121 PullResult pullResult, int64_t originalPullTimeNs) override { 122 } 123 124 // Determine if metric needs to pull isPullNeeded()125 virtual bool isPullNeeded() const override { 126 return false; 127 } 128 129 // ValueMetric needs special logic if it's a pulled atom. 130 void onStatsdInitCompleted(const int64_t& eventTimeNs) override; 131 132 void onStateChanged(int64_t eventTimeNs, int32_t atomId, const HashableDimensionKey& primaryKey, 133 const FieldValue& oldState, const FieldValue& newState) override; 134 135 protected: 136 ValueMetricProducer(const int64_t& metricId, const ConfigKey& key, const uint64_t protoHash, 137 const PullOptions& pullOptions, const BucketOptions& bucketOptions, 138 const WhatOptions& whatOptions, const ConditionOptions& conditionOptions, 139 const StateOptions& stateOptions, 140 const ActivationOptions& activationOptions, 141 const GuardrailOptions& guardrailOptions); 142 143 void onMatchedLogEventInternalLocked( 144 const size_t matcherIndex, const MetricDimensionKey& eventKey, 145 const ConditionKey& conditionKey, bool condition, const LogEvent& event, 146 const std::map<int, HashableDimensionKey>& statePrimaryKeys) override; 147 148 // Determine whether or not a LogEvent can be skipped. 149 virtual inline bool canSkipLogEventLocked( 150 const MetricDimensionKey& eventKey, bool condition, int64_t eventTimeNs, 151 const std::map<int, HashableDimensionKey>& statePrimaryKeys) const = 0; 152 153 void notifyAppUpgradeInternalLocked(const int64_t eventTimeNs) override; 154 155 void onDumpReportLocked(const int64_t dumpTimeNs, const bool includeCurrentPartialBucket, 156 const bool eraseData, const DumpLatency dumpLatency, 157 std::set<string>* strSet, 158 android::util::ProtoOutputStream* protoOutput) override; 159 160 struct DumpProtoFields { 161 const int metricTypeFieldId; 162 const int bucketNumFieldId; 163 const int startBucketMsFieldId; 164 const int endBucketMsFieldId; 165 const int conditionTrueNsFieldId; 166 const optional<int> conditionCorrectionNsFieldId; 167 }; 168 169 virtual DumpProtoFields getDumpProtoFields() const = 0; 170 171 void clearPastBucketsLocked(const int64_t dumpTimeNs) override; 172 173 // ValueMetricProducer internal interface to handle active state change. 174 void onActiveStateChangedLocked(const int64_t eventTimeNs, const bool isActive) override; 175 onActiveStateChangedInternalLocked(const int64_t eventTimeNs,const bool isActive)176 virtual void onActiveStateChangedInternalLocked(const int64_t eventTimeNs, 177 const bool isActive) { 178 } 179 180 // ValueMetricProducer internal interface to handle condition change. 181 void onConditionChangedLocked(const bool condition, const int64_t eventTimeNs) override; 182 183 // Only called when mIsActive, the event is NOT too late, and after pulling. onConditionChangedInternalLocked(const ConditionState oldCondition,const ConditionState newCondition,const int64_t eventTimeNs)184 virtual void onConditionChangedInternalLocked(const ConditionState oldCondition, 185 const ConditionState newCondition, 186 const int64_t eventTimeNs) { 187 } 188 189 // Internal interface to handle sliced condition change. 190 void onSlicedConditionMayChangeLocked(bool overallCondition, const int64_t eventTime) override; 191 192 void dumpStatesLocked(FILE* out, bool verbose) const override; 193 194 virtual std::string aggregatedValueToString(const AggregatedValue& aggregate) const = 0; 195 196 // For pulled metrics, this method should only be called if a pull has been done. Else we will 197 // not have complete data for the bucket. 198 void flushIfNeededLocked(const int64_t& eventTime) override; 199 200 // For pulled metrics, this method should only be called if a pulled has been done. Else we will 201 // not have complete data for the bucket. 202 void flushCurrentBucketLocked(const int64_t& eventTimeNs, 203 const int64_t& nextBucketStartTimeNs) override; 204 205 void dropDataLocked(const int64_t dropTimeNs) override; 206 207 // Calculate how many buckets are present between the current bucket and eventTimeNs. 208 int64_t calcBucketsForwardCount(const int64_t eventTimeNs) const; 209 210 // Mark the data as invalid. 211 virtual void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); 212 213 // Skips the current bucket without notifying StatsdStats of the skipped bucket. 214 // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that 215 // causes the bucket to be invalidated will not notify StatsdStats. 216 void skipCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); 217 218 optional<InvalidConfigReason> onConfigUpdatedLocked( 219 const StatsdConfig& config, const int configIndex, const int metricIndex, 220 const std::vector<sp<AtomMatchingTracker>>& allAtomMatchingTrackers, 221 const std::unordered_map<int64_t, int>& oldAtomMatchingTrackerMap, 222 const std::unordered_map<int64_t, int>& newAtomMatchingTrackerMap, 223 const sp<EventMatcherWizard>& matcherWizard, 224 const std::vector<sp<ConditionTracker>>& allConditionTrackers, 225 const std::unordered_map<int64_t, int>& conditionTrackerMap, 226 const sp<ConditionWizard>& wizard, 227 const std::unordered_map<int64_t, int>& metricToActivationMap, 228 std::unordered_map<int, std::vector<int>>& trackerToMetricMap, 229 std::unordered_map<int, std::vector<int>>& conditionToMetricMap, 230 std::unordered_map<int, std::vector<int>>& activationAtomTrackerToMetricMap, 231 std::unordered_map<int, std::vector<int>>& deactivationAtomTrackerToMetricMap, 232 std::vector<int>& metricsWithActivation) override; 233 234 virtual optional<int64_t> getConditionIdForMetric(const StatsdConfig& config, 235 const int configIndex) const = 0; 236 237 virtual int64_t getWhatAtomMatcherIdForMetric(const StatsdConfig& config, 238 const int configIndex) const = 0; 239 240 virtual ConditionLinks getConditionLinksForMetric(const StatsdConfig& config, 241 const int configIndex) const = 0; 242 243 int mWhatMatcherIndex; 244 245 sp<EventMatcherWizard> mEventMatcherWizard; 246 247 const sp<StatsPullerManager> mPullerManager; 248 249 // Value fields for matching. 250 const std::vector<Matcher> mFieldMatchers; 251 252 // Value fields for matching. 253 std::set<HashableDimensionKey> mMatchedMetricDimensionKeys; 254 255 // Holds the atom id, primary key pair from a state change. 256 // Only used for pulled metrics. 257 // TODO(b/185796114): can be passed as function arguments instead. 258 pair<int32_t, HashableDimensionKey> mStateChangePrimaryKey; 259 260 // Atom Id for pulled data. -1 if this is not pulled. 261 const int mPullAtomId; 262 263 // Tracks the value information of one value field. 264 struct Interval { 265 // Index in multi value aggregation. 266 int aggIndex; 267 268 // Current aggregation, depending on the aggregation type. 269 AggregatedValue aggregate; 270 271 // Number of samples collected. 272 int sampleSize = 0; 273 hasValueInterval274 inline bool hasValue() const { 275 return sampleSize > 0; 276 } 277 }; 278 279 // Internal state of an ongoing aggregation bucket. 280 struct CurrentBucket { 281 // If the `MetricDimensionKey` state key is the current state key, then 282 // the condition timer will be updated later (e.g. condition/state/active 283 // state change) with the correct condition and time. CurrentBucketCurrentBucket284 CurrentBucket() : intervals(), conditionTimer(ConditionTimer(false, 0)) { 285 } 286 // Value information for each value field of the metric. 287 std::vector<Interval> intervals; 288 // Tracks how long the condition is true. 289 ConditionTimer conditionTimer; 290 }; 291 292 // Tracks the internal state in the ongoing aggregation bucket for each DimensionsInWhat 293 // key and StateValuesKey pair. 294 std::unordered_map<MetricDimensionKey, CurrentBucket> mCurrentSlicedBucket; 295 296 // State key and any extra information for a specific DimensionsInWhat key. 297 struct DimensionsInWhatInfo { DimensionsInWhatInfoDimensionsInWhatInfo298 DimensionsInWhatInfo(const HashableDimensionKey& stateKey) 299 : dimExtras(), currentState(stateKey), hasCurrentState(false) { 300 } 301 302 DimExtras dimExtras; 303 304 // Whether new data is seen in the bucket. 305 // TODO, this could be per base in the dim extras. 306 bool seenNewData = false; 307 308 // Last seen state value(s). 309 HashableDimensionKey currentState; 310 // Whether this dimensions in what key has a current state key. 311 bool hasCurrentState; 312 }; 313 314 // Tracks current state key and other information for each DimensionsInWhat key. 315 std::unordered_map<HashableDimensionKey, DimensionsInWhatInfo> mDimInfos; 316 317 // Save the past buckets and we can clear when the StatsLogReport is dumped. 318 std::unordered_map<MetricDimensionKey, std::vector<PastBucket<AggregatedValue>>> mPastBuckets; 319 320 const int64_t mMinBucketSizeNs; 321 322 // Util function to check whether the specified dimension hits the guardrail. 323 bool hitGuardRailLocked(const MetricDimensionKey& newKey); 324 325 bool hasReachedGuardRailLimit() const; 326 pullAndMatchEventsLocked(const int64_t timestampNs)327 virtual void pullAndMatchEventsLocked(const int64_t timestampNs) { 328 } 329 330 virtual bool multipleBucketsSkipped(const int64_t numBucketsForward) const = 0; 331 332 virtual PastBucket<AggregatedValue> buildPartialBucket(int64_t bucketEndTime, 333 std::vector<Interval>& intervals) = 0; 334 335 virtual void closeCurrentBucket(const int64_t eventTimeNs, const int64_t nextBucketStartTimeNs); 336 337 virtual void initNextSlicedBucket(int64_t nextBucketStartTimeNs); 338 339 // Updates the condition timers in the current sliced bucket when there is a 340 // condition change or an active state change. 341 void updateCurrentSlicedBucketConditionTimers(bool newCondition, int64_t eventTimeNs); 342 343 virtual void writePastBucketAggregateToProto(const int aggIndex, 344 const AggregatedValue& aggregate, 345 const int sampleSize, 346 ProtoOutputStream* const protoOutput) const = 0; 347 348 static const size_t kBucketSize = sizeof(PastBucket<AggregatedValue>{}); 349 350 const size_t mDimensionSoftLimit; 351 352 const size_t mDimensionHardLimit; 353 354 // This is to track whether or not the bucket is skipped for any of the reasons listed in 355 // BucketDropReason, many of which make the bucket potentially invalid. 356 bool mCurrentBucketIsSkipped; 357 358 /** Stores condition correction threshold from the ValueMetric configuration */ 359 optional<int64_t> mConditionCorrectionThresholdNs; 360 isEventLateLocked(const int64_t eventTimeNs)361 inline bool isEventLateLocked(const int64_t eventTimeNs) const { 362 return eventTimeNs < mCurrentBucketStartTimeNs; 363 } 364 365 // Returns true if any of the intervals have seen new data. 366 // This should return true unless there is an error parsing the value fields from the event. 367 virtual bool aggregateFields(const int64_t eventTimeNs, const MetricDimensionKey& eventKey, 368 const LogEvent& event, std::vector<Interval>& intervals, 369 DimExtras& dimExtras) = 0; 370 371 // If this is a pulled metric isPulled()372 inline bool isPulled() const { 373 return mPullAtomId != -1; 374 } 375 376 private: 377 }; // ValueMetricProducer 378 379 } // namespace statsd 380 } // namespace os 381 } // namespace android 382