1 // Copyright (C) 2017 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/metrics/GaugeMetricProducer.h"
16
17 #include <gmock/gmock.h>
18 #include <gtest/gtest.h>
19 #include <math.h>
20 #include <stdio.h>
21
22 #include <vector>
23
24 #include "logd/LogEvent.h"
25 #include "metrics_test_helper.h"
26 #include "src/matchers/SimpleAtomMatchingTracker.h"
27 #include "src/metrics/MetricProducer.h"
28 #include "src/stats_log_util.h"
29 #include "stats_event.h"
30 #include "tests/statsd_test_util.h"
31
32 using namespace testing;
33 using android::sp;
34 using std::set;
35 using std::unordered_map;
36 using std::vector;
37 using std::make_shared;
38
39 #ifdef __ANDROID__
40
41 namespace android {
42 namespace os {
43 namespace statsd {
44
45 namespace {
46
47 const ConfigKey kConfigKey(0, 12345);
48 const int tagId = 1;
49 const int64_t metricId = 123;
50 const uint64_t protoHash = 0x123456789;
51 const int logEventMatcherIndex = 0;
52 const int64_t bucketStartTimeNs = 10 * NS_PER_SEC;
53 const int64_t bucketSizeNs = TimeUnitToBucketSizeInMillis(ONE_MINUTE) * 1000000LL;
54 const int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
55 const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
56 const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs;
57 const int64_t partialBucketSplitTimeNs = bucketStartTimeNs + 15 * NS_PER_SEC;
58
makeLogEvent(int32_t atomId,int64_t timestampNs,int32_t value1,string str1,int32_t value2)59 shared_ptr<LogEvent> makeLogEvent(int32_t atomId, int64_t timestampNs, int32_t value1, string str1,
60 int32_t value2) {
61 AStatsEvent* statsEvent = AStatsEvent_obtain();
62 AStatsEvent_setAtomId(statsEvent, atomId);
63 AStatsEvent_overwriteTimestamp(statsEvent, timestampNs);
64
65 AStatsEvent_writeInt32(statsEvent, value1);
66 AStatsEvent_writeString(statsEvent, str1.c_str());
67 AStatsEvent_writeInt32(statsEvent, value2);
68
69 shared_ptr<LogEvent> logEvent = std::make_shared<LogEvent>(/*uid=*/0, /*pid=*/0);
70 parseStatsEventToLogEvent(statsEvent, logEvent.get());
71 return logEvent;
72 }
73 } // anonymous namespace
74
75 // Setup for parameterized tests.
76 class GaugeMetricProducerTest_PartialBucket : public TestWithParam<BucketSplitEvent> {};
77
78 INSTANTIATE_TEST_SUITE_P(GaugeMetricProducerTest_PartialBucket,
79 GaugeMetricProducerTest_PartialBucket,
80 testing::Values(APP_UPGRADE, BOOT_COMPLETE));
81
82 /*
83 * Tests that the first bucket works correctly
84 */
TEST(GaugeMetricProducerTest,TestFirstBucket)85 TEST(GaugeMetricProducerTest, TestFirstBucket) {
86 GaugeMetric metric;
87 metric.set_id(metricId);
88 metric.set_bucket(ONE_MINUTE);
89 metric.mutable_gauge_fields_filter()->set_include_all(false);
90 auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
91 gaugeFieldMatcher->set_field(tagId);
92 gaugeFieldMatcher->add_child()->set_field(1);
93 gaugeFieldMatcher->add_child()->set_field(3);
94
95 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
96
97 sp<EventMatcherWizard> eventMatcherWizard =
98 createEventMatcherWizard(tagId, logEventMatcherIndex);
99
100 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
101
102 // statsd started long ago.
103 // The metric starts in the middle of the bucket
104 GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
105 wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
106 -1, -1, tagId, 5, 600 * NS_PER_SEC + NS_PER_SEC / 2,
107 pullerManager);
108 gaugeProducer.prepareFirstBucket();
109
110 EXPECT_EQ(600500000000, gaugeProducer.mCurrentBucketStartTimeNs);
111 EXPECT_EQ(10, gaugeProducer.mCurrentBucketNum);
112 EXPECT_EQ(660000000005, gaugeProducer.getCurrentBucketEndTimeNs());
113 }
114
TEST(GaugeMetricProducerTest,TestPulledEventsNoCondition)115 TEST(GaugeMetricProducerTest, TestPulledEventsNoCondition) {
116 GaugeMetric metric;
117 metric.set_id(metricId);
118 metric.set_bucket(ONE_MINUTE);
119 metric.mutable_gauge_fields_filter()->set_include_all(false);
120 metric.set_max_pull_delay_sec(INT_MAX);
121 auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
122 gaugeFieldMatcher->set_field(tagId);
123 gaugeFieldMatcher->add_child()->set_field(1);
124 gaugeFieldMatcher->add_child()->set_field(3);
125
126 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
127
128 sp<EventMatcherWizard> eventMatcherWizard =
129 createEventMatcherWizard(tagId, logEventMatcherIndex);
130
131 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
132 EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
133 EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
134 EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, bucketStartTimeNs, _))
135 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
136 vector<std::shared_ptr<LogEvent>>* data) {
137 EXPECT_EQ(eventTimeNs, bucketStartTimeNs);
138 data->clear();
139 data->push_back(makeLogEvent(tagId, eventTimeNs + 10, 3, "some value", 11));
140 return true;
141 }));
142
143 GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
144 wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
145 tagId, -1, tagId, bucketStartTimeNs, bucketStartTimeNs,
146 pullerManager);
147 gaugeProducer.prepareFirstBucket();
148
149 vector<shared_ptr<LogEvent>> allData;
150 allData.clear();
151 allData.push_back(makeLogEvent(tagId, bucket2StartTimeNs + 1, 10, "some value", 11));
152
153 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucket2StartTimeNs);
154 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
155 auto it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
156 EXPECT_EQ(INT, it->mValue.getType());
157 EXPECT_EQ(10, it->mValue.int_value);
158 it++;
159 EXPECT_EQ(11, it->mValue.int_value);
160 ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
161 EXPECT_EQ(3, gaugeProducer.mPastBuckets.begin()
162 ->second.back()
163 .mAggregatedAtoms.begin()
164 ->first.getAtomFieldValues()
165 .getValues()
166 .begin()
167 ->mValue.int_value);
168
169 allData.clear();
170 allData.push_back(makeLogEvent(tagId, bucket3StartTimeNs + 10, 24, "some value", 25));
171 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucket3StartTimeNs);
172 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
173 it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
174 EXPECT_EQ(INT, it->mValue.getType());
175 EXPECT_EQ(24, it->mValue.int_value);
176 it++;
177 EXPECT_EQ(INT, it->mValue.getType());
178 EXPECT_EQ(25, it->mValue.int_value);
179 // One dimension.
180 ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
181 ASSERT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.size());
182 auto it2 = gaugeProducer.mPastBuckets.begin()
183 ->second.back()
184 .mAggregatedAtoms.begin()
185 ->first.getAtomFieldValues()
186 .getValues()
187 .begin();
188 EXPECT_EQ(INT, it2->mValue.getType());
189 EXPECT_EQ(10L, it2->mValue.int_value);
190 it2++;
191 EXPECT_EQ(INT, it2->mValue.getType());
192 EXPECT_EQ(11L, it2->mValue.int_value);
193
194 gaugeProducer.flushIfNeededLocked(bucket4StartTimeNs);
195 ASSERT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
196 // One dimension.
197 ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
198 ASSERT_EQ(3UL, gaugeProducer.mPastBuckets.begin()->second.size());
199 it2 = gaugeProducer.mPastBuckets.begin()
200 ->second.back()
201 .mAggregatedAtoms.begin()
202 ->first.getAtomFieldValues()
203 .getValues()
204 .begin();
205 EXPECT_EQ(INT, it2->mValue.getType());
206 EXPECT_EQ(24L, it2->mValue.int_value);
207 it2++;
208 EXPECT_EQ(INT, it2->mValue.getType());
209 EXPECT_EQ(25L, it2->mValue.int_value);
210 }
211
TEST_P(GaugeMetricProducerTest_PartialBucket,TestPushedEvents)212 TEST_P(GaugeMetricProducerTest_PartialBucket, TestPushedEvents) {
213 sp<AlarmMonitor> alarmMonitor;
214 GaugeMetric metric;
215 metric.set_id(metricId);
216 metric.set_bucket(ONE_MINUTE);
217 metric.mutable_gauge_fields_filter()->set_include_all(true);
218 metric.set_split_bucket_for_app_upgrade(true);
219
220 Alert alert;
221 alert.set_id(101);
222 alert.set_metric_id(metricId);
223 alert.set_trigger_if_sum_gt(25);
224 alert.set_num_buckets(100);
225 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
226 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
227
228 sp<EventMatcherWizard> eventMatcherWizard =
229 createEventMatcherWizard(tagId, logEventMatcherIndex);
230
231 GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
232 wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
233 -1 /* -1 means no pulling */, -1, tagId, bucketStartTimeNs,
234 bucketStartTimeNs, pullerManager);
235 gaugeProducer.prepareFirstBucket();
236
237 sp<AnomalyTracker> anomalyTracker =
238 gaugeProducer.addAnomalyTracker(alert, alarmMonitor, UPDATE_NEW, bucketStartTimeNs);
239 EXPECT_TRUE(anomalyTracker != nullptr);
240
241 LogEvent event1(/*uid=*/0, /*pid=*/0);
242 CreateTwoValueLogEvent(&event1, tagId, bucketStartTimeNs + 10, 1, 10);
243 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
244 EXPECT_EQ(1UL, (*gaugeProducer.mCurrentSlicedBucket).count(DEFAULT_METRIC_DIMENSION_KEY));
245
246 switch (GetParam()) {
247 case APP_UPGRADE:
248 gaugeProducer.notifyAppUpgrade(partialBucketSplitTimeNs);
249 break;
250 case BOOT_COMPLETE:
251 gaugeProducer.onStatsdInitCompleted(partialBucketSplitTimeNs);
252 break;
253 }
254 EXPECT_EQ(0UL, (*gaugeProducer.mCurrentSlicedBucket).count(DEFAULT_METRIC_DIMENSION_KEY));
255 ASSERT_EQ(1UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
256 EXPECT_EQ(bucketStartTimeNs,
257 gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs);
258 EXPECT_EQ(partialBucketSplitTimeNs,
259 gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketEndNs);
260 EXPECT_EQ(0L, gaugeProducer.mCurrentBucketNum);
261 EXPECT_EQ(partialBucketSplitTimeNs, gaugeProducer.mCurrentBucketStartTimeNs);
262 // Partial buckets are not sent to anomaly tracker.
263 EXPECT_EQ(0, anomalyTracker->getSumOverPastBuckets(DEFAULT_METRIC_DIMENSION_KEY));
264
265 // Create an event in the same partial bucket.
266 LogEvent event2(/*uid=*/0, /*pid=*/0);
267 CreateTwoValueLogEvent(&event2, tagId, bucketStartTimeNs + 59 * NS_PER_SEC, 1, 10);
268 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
269 EXPECT_EQ(0L, gaugeProducer.mCurrentBucketNum);
270 ASSERT_EQ(1UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
271 EXPECT_EQ(bucketStartTimeNs,
272 gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs);
273 EXPECT_EQ(partialBucketSplitTimeNs,
274 gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketEndNs);
275 EXPECT_EQ((int64_t)partialBucketSplitTimeNs, gaugeProducer.mCurrentBucketStartTimeNs);
276 // Partial buckets are not sent to anomaly tracker.
277 EXPECT_EQ(0, anomalyTracker->getSumOverPastBuckets(DEFAULT_METRIC_DIMENSION_KEY));
278
279 // Next event should trigger creation of new bucket and send previous full bucket to anomaly
280 // tracker.
281 LogEvent event3(/*uid=*/0, /*pid=*/0);
282 CreateTwoValueLogEvent(&event3, tagId, bucketStartTimeNs + 65 * NS_PER_SEC, 1, 10);
283 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, event3);
284 EXPECT_EQ(1L, gaugeProducer.mCurrentBucketNum);
285 ASSERT_EQ(2UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
286 EXPECT_EQ((int64_t)bucketStartTimeNs + bucketSizeNs, gaugeProducer.mCurrentBucketStartTimeNs);
287 EXPECT_EQ(1, anomalyTracker->getSumOverPastBuckets(DEFAULT_METRIC_DIMENSION_KEY));
288
289 // Next event should trigger creation of new bucket.
290 LogEvent event4(/*uid=*/0, /*pid=*/0);
291 CreateTwoValueLogEvent(&event4, tagId, bucketStartTimeNs + 125 * NS_PER_SEC, 1, 10);
292 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, event4);
293 EXPECT_EQ(2L, gaugeProducer.mCurrentBucketNum);
294 ASSERT_EQ(3UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
295 EXPECT_EQ(2, anomalyTracker->getSumOverPastBuckets(DEFAULT_METRIC_DIMENSION_KEY));
296 }
297
TEST_P(GaugeMetricProducerTest_PartialBucket,TestPulled)298 TEST_P(GaugeMetricProducerTest_PartialBucket, TestPulled) {
299 GaugeMetric metric;
300 metric.set_id(metricId);
301 metric.set_bucket(ONE_MINUTE);
302 metric.set_max_pull_delay_sec(INT_MAX);
303 metric.set_split_bucket_for_app_upgrade(true);
304 auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
305 gaugeFieldMatcher->set_field(tagId);
306 gaugeFieldMatcher->add_child()->set_field(2);
307
308 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
309
310 sp<EventMatcherWizard> eventMatcherWizard =
311 createEventMatcherWizard(tagId, logEventMatcherIndex);
312
313 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
314 EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
315 EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
316 EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _))
317 .WillOnce(Return(false))
318 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
319 vector<std::shared_ptr<LogEvent>>* data) {
320 EXPECT_EQ(eventTimeNs, partialBucketSplitTimeNs);
321 data->clear();
322 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 2));
323 return true;
324 }));
325
326 GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
327 wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
328 tagId, -1, tagId, bucketStartTimeNs, bucketStartTimeNs,
329 pullerManager);
330 gaugeProducer.prepareFirstBucket();
331
332 vector<shared_ptr<LogEvent>> allData;
333 allData.push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 1, 1));
334 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucketStartTimeNs);
335 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
336 EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
337 ->second.front()
338 .mFields->begin()
339 ->mValue.int_value);
340
341 switch (GetParam()) {
342 case APP_UPGRADE:
343 gaugeProducer.notifyAppUpgrade(partialBucketSplitTimeNs);
344 break;
345 case BOOT_COMPLETE:
346 gaugeProducer.onStatsdInitCompleted(partialBucketSplitTimeNs);
347 break;
348 }
349 ASSERT_EQ(1UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
350 EXPECT_EQ(bucketStartTimeNs,
351 gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs);
352 EXPECT_EQ(partialBucketSplitTimeNs,
353 gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketEndNs);
354 EXPECT_EQ(0L, gaugeProducer.mCurrentBucketNum);
355 EXPECT_EQ(partialBucketSplitTimeNs, gaugeProducer.mCurrentBucketStartTimeNs);
356 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
357 EXPECT_EQ(2, gaugeProducer.mCurrentSlicedBucket->begin()
358 ->second.front()
359 .mFields->begin()
360 ->mValue.int_value);
361
362 allData.clear();
363 allData.push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + bucketSizeNs + 1, 3));
364 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS,
365 bucketStartTimeNs + bucketSizeNs);
366 ASSERT_EQ(2UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
367 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
368 EXPECT_EQ(3, gaugeProducer.mCurrentSlicedBucket->begin()
369 ->second.front()
370 .mFields->begin()
371 ->mValue.int_value);
372 }
373
TEST(GaugeMetricProducerTest,TestPulledWithAppUpgradeDisabled)374 TEST(GaugeMetricProducerTest, TestPulledWithAppUpgradeDisabled) {
375 GaugeMetric metric;
376 metric.set_id(metricId);
377 metric.set_bucket(ONE_MINUTE);
378 metric.set_max_pull_delay_sec(INT_MAX);
379 metric.set_split_bucket_for_app_upgrade(false);
380 auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
381 gaugeFieldMatcher->set_field(tagId);
382 gaugeFieldMatcher->add_child()->set_field(2);
383
384 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
385
386 sp<EventMatcherWizard> eventMatcherWizard =
387 createEventMatcherWizard(tagId, logEventMatcherIndex);
388
389 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
390 EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
391 EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
392 EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, bucketStartTimeNs, _))
393 .WillOnce(Return(false));
394
395 GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
396 wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
397 tagId, -1, tagId, bucketStartTimeNs, bucketStartTimeNs,
398 pullerManager);
399 gaugeProducer.prepareFirstBucket();
400
401 vector<shared_ptr<LogEvent>> allData;
402 allData.push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 1, 1));
403 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucketStartTimeNs);
404 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
405 EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
406 ->second.front()
407 .mFields->begin()
408 ->mValue.int_value);
409
410 gaugeProducer.notifyAppUpgrade(partialBucketSplitTimeNs);
411 ASSERT_EQ(0UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
412 EXPECT_EQ(0L, gaugeProducer.mCurrentBucketNum);
413 EXPECT_EQ(bucketStartTimeNs, gaugeProducer.mCurrentBucketStartTimeNs);
414 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
415 EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
416 ->second.front()
417 .mFields->begin()
418 ->mValue.int_value);
419 }
420
TEST(GaugeMetricProducerTest,TestPulledEventsWithCondition)421 TEST(GaugeMetricProducerTest, TestPulledEventsWithCondition) {
422 GaugeMetric metric;
423 metric.set_id(metricId);
424 metric.set_bucket(ONE_MINUTE);
425 metric.set_max_pull_delay_sec(INT_MAX);
426 auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
427 gaugeFieldMatcher->set_field(tagId);
428 gaugeFieldMatcher->add_child()->set_field(2);
429 metric.set_condition(StringToId("SCREEN_ON"));
430
431 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
432
433 sp<EventMatcherWizard> eventMatcherWizard =
434 createEventMatcherWizard(tagId, logEventMatcherIndex);
435
436 int64_t conditionChangeNs = bucketStartTimeNs + 8;
437
438 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
439 EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
440 EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
441 EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, conditionChangeNs, _))
442 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
443 vector<std::shared_ptr<LogEvent>>* data) {
444 data->clear();
445 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs + 10, 100));
446 return true;
447 }));
448
449 GaugeMetricProducer gaugeProducer(kConfigKey, metric, 0 /*condition index*/,
450 {ConditionState::kUnknown}, wizard, protoHash,
451 logEventMatcherIndex, eventMatcherWizard, tagId, -1, tagId,
452 bucketStartTimeNs, bucketStartTimeNs, pullerManager);
453 gaugeProducer.prepareFirstBucket();
454
455 gaugeProducer.onConditionChanged(true, conditionChangeNs);
456 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
457 EXPECT_EQ(100, gaugeProducer.mCurrentSlicedBucket->begin()
458 ->second.front()
459 .mFields->begin()
460 ->mValue.int_value);
461 ASSERT_EQ(0UL, gaugeProducer.mPastBuckets.size());
462
463 vector<shared_ptr<LogEvent>> allData;
464 allData.clear();
465 allData.push_back(CreateRepeatedValueLogEvent(tagId, bucket2StartTimeNs + 1, 110));
466 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucket2StartTimeNs);
467
468 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
469 EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin()
470 ->second.front()
471 .mFields->begin()
472 ->mValue.int_value);
473 ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
474
475 EXPECT_EQ(100, gaugeProducer.mPastBuckets.begin()
476 ->second.back()
477 .mAggregatedAtoms.begin()
478 ->first.getAtomFieldValues()
479 .getValues()
480 .begin()
481 ->mValue.int_value);
482
483 gaugeProducer.onConditionChanged(false, bucket2StartTimeNs + 10);
484 gaugeProducer.flushIfNeededLocked(bucket3StartTimeNs + 10);
485 ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
486 ASSERT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.size());
487 EXPECT_EQ(110L, gaugeProducer.mPastBuckets.begin()
488 ->second.back()
489 .mAggregatedAtoms.begin()
490 ->first.getAtomFieldValues()
491 .getValues()
492 .begin()
493 ->mValue.int_value);
494 }
495
TEST(GaugeMetricProducerTest,TestPulledEventsWithSlicedCondition)496 TEST(GaugeMetricProducerTest, TestPulledEventsWithSlicedCondition) {
497 const int conditionTag = 65;
498 GaugeMetric metric;
499 metric.set_id(1111111);
500 metric.set_bucket(ONE_MINUTE);
501 metric.mutable_gauge_fields_filter()->set_include_all(true);
502 metric.set_condition(StringToId("APP_DIED"));
503 metric.set_max_pull_delay_sec(INT_MAX);
504 auto dim = metric.mutable_dimensions_in_what();
505 dim->set_field(tagId);
506 dim->add_child()->set_field(1);
507
508 sp<EventMatcherWizard> eventMatcherWizard =
509 createEventMatcherWizard(tagId, logEventMatcherIndex);
510
511 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
512 EXPECT_CALL(*wizard, query(_, _, _))
513 .WillRepeatedly(
514 Invoke([](const int conditionIndex, const ConditionKey& conditionParameters,
515 const bool isPartialLink) {
516 int pos[] = {1, 0, 0};
517 Field f(conditionTag, pos, 0);
518 HashableDimensionKey key;
519 key.mutableValues()->emplace_back(f, Value((int32_t)1000000));
520
521 return ConditionState::kTrue;
522 }));
523
524 int64_t sliceConditionChangeNs = bucketStartTimeNs + 8;
525
526 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
527 EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
528 EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
529 EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, sliceConditionChangeNs, _))
530 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
531 vector<std::shared_ptr<LogEvent>>* data) {
532 data->clear();
533 data->push_back(CreateTwoValueLogEvent(tagId, eventTimeNs + 10, 1000, 100));
534 return true;
535 }));
536
537 GaugeMetricProducer gaugeProducer(kConfigKey, metric, 0 /*condition index*/,
538 {ConditionState::kUnknown}, wizard, protoHash,
539 logEventMatcherIndex, eventMatcherWizard, tagId, -1, tagId,
540 bucketStartTimeNs, bucketStartTimeNs, pullerManager);
541 gaugeProducer.prepareFirstBucket();
542
543 gaugeProducer.onSlicedConditionMayChange(true, sliceConditionChangeNs);
544
545 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
546 const auto& key = gaugeProducer.mCurrentSlicedBucket->begin()->first;
547 ASSERT_EQ(1UL, key.getDimensionKeyInWhat().getValues().size());
548 EXPECT_EQ(1000, key.getDimensionKeyInWhat().getValues()[0].mValue.int_value);
549
550 ASSERT_EQ(0UL, gaugeProducer.mPastBuckets.size());
551
552 vector<shared_ptr<LogEvent>> allData;
553 allData.clear();
554 allData.push_back(CreateTwoValueLogEvent(tagId, bucket2StartTimeNs + 1, 1000, 110));
555 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucket2StartTimeNs);
556
557 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
558 ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
559 }
560
TEST(GaugeMetricProducerTest,TestPulledEventsAnomalyDetection)561 TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) {
562 sp<AlarmMonitor> alarmMonitor;
563 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
564
565 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
566 EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
567 EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
568 EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, bucketStartTimeNs, _))
569 .WillOnce(Return(false));
570
571 GaugeMetric metric;
572 metric.set_id(metricId);
573 metric.set_bucket(ONE_MINUTE);
574 metric.set_max_pull_delay_sec(INT_MAX);
575 auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
576 gaugeFieldMatcher->set_field(tagId);
577 gaugeFieldMatcher->add_child()->set_field(2);
578
579 sp<EventMatcherWizard> eventMatcherWizard =
580 createEventMatcherWizard(tagId, logEventMatcherIndex);
581
582 GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
583 wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
584 tagId, -1, tagId, bucketStartTimeNs, bucketStartTimeNs,
585 pullerManager);
586 gaugeProducer.prepareFirstBucket();
587
588 Alert alert;
589 alert.set_id(101);
590 alert.set_metric_id(metricId);
591 alert.set_trigger_if_sum_gt(25);
592 alert.set_num_buckets(2);
593 const int32_t refPeriodSec = 60;
594 alert.set_refractory_period_secs(refPeriodSec);
595 sp<AnomalyTracker> anomalyTracker =
596 gaugeProducer.addAnomalyTracker(alert, alarmMonitor, UPDATE_NEW, bucketStartTimeNs);
597
598 int tagId = 1;
599 vector<shared_ptr<LogEvent>> allData;
600 allData.clear();
601 allData.push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 1, 13));
602 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucketStartTimeNs);
603 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
604 EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()
605 ->second.front()
606 .mFields->begin()
607 ->mValue.int_value);
608 EXPECT_EQ(anomalyTracker->getRefractoryPeriodEndsSec(DEFAULT_METRIC_DIMENSION_KEY), 0U);
609
610 std::shared_ptr<LogEvent> event2 =
611 CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + bucketSizeNs + 20, 15);
612
613 allData.clear();
614 allData.push_back(event2);
615 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS,
616 bucketStartTimeNs + bucketSizeNs);
617 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
618 EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()
619 ->second.front()
620 .mFields->begin()
621 ->mValue.int_value);
622 EXPECT_EQ(anomalyTracker->getRefractoryPeriodEndsSec(DEFAULT_METRIC_DIMENSION_KEY),
623 std::ceil(1.0 * event2->GetElapsedTimestampNs() / NS_PER_SEC) + refPeriodSec);
624
625 allData.clear();
626 allData.push_back(
627 CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 2 * bucketSizeNs + 10, 26));
628 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS,
629 bucket2StartTimeNs + 2 * bucketSizeNs);
630 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
631 EXPECT_EQ(26L, gaugeProducer.mCurrentSlicedBucket->begin()
632 ->second.front()
633 .mFields->begin()
634 ->mValue.int_value);
635 EXPECT_EQ(anomalyTracker->getRefractoryPeriodEndsSec(DEFAULT_METRIC_DIMENSION_KEY),
636 std::ceil(1.0 * event2->GetElapsedTimestampNs() / NS_PER_SEC + refPeriodSec));
637
638 // This event does not have the gauge field. Thus the current bucket value is 0.
639 allData.clear();
640 allData.push_back(CreateNoValuesLogEvent(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 10));
641 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS,
642 bucketStartTimeNs + 3 * bucketSizeNs);
643 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
644 EXPECT_TRUE(gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->empty());
645 }
646
TEST(GaugeMetricProducerTest,TestPullOnTrigger)647 TEST(GaugeMetricProducerTest, TestPullOnTrigger) {
648 GaugeMetric metric;
649 metric.set_id(metricId);
650 metric.set_bucket(ONE_MINUTE);
651 metric.set_sampling_type(GaugeMetric::FIRST_N_SAMPLES);
652 metric.mutable_gauge_fields_filter()->set_include_all(false);
653 metric.set_max_pull_delay_sec(INT_MAX);
654 auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
655 gaugeFieldMatcher->set_field(tagId);
656 gaugeFieldMatcher->add_child()->set_field(1);
657
658 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
659
660 sp<EventMatcherWizard> eventMatcherWizard =
661 createEventMatcherWizard(tagId, logEventMatcherIndex);
662
663 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
664 EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _))
665 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
666 vector<std::shared_ptr<LogEvent>>* data) {
667 EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 10);
668 data->clear();
669 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 4));
670 return true;
671 }))
672 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
673 vector<std::shared_ptr<LogEvent>>* data) {
674 EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 20);
675 data->clear();
676 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 5));
677 return true;
678 }))
679 .WillOnce(Return(true));
680
681 int triggerId = 5;
682 GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
683 wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
684 tagId, triggerId, tagId, bucketStartTimeNs, bucketStartTimeNs,
685 pullerManager);
686 gaugeProducer.prepareFirstBucket();
687
688 ASSERT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
689
690 LogEvent triggerEvent(/*uid=*/0, /*pid=*/0);
691 CreateNoValuesLogEvent(&triggerEvent, triggerId, bucketStartTimeNs + 10);
692 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
693 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
694 triggerEvent.setElapsedTimestampNs(bucketStartTimeNs + 20);
695 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
696 ASSERT_EQ(2UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
697 triggerEvent.setElapsedTimestampNs(bucket2StartTimeNs + 1);
698 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
699
700 ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
701 ASSERT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.back().mAggregatedAtoms.size());
702 auto it = gaugeProducer.mPastBuckets.begin()->second.back().mAggregatedAtoms.begin();
703 vector<int> atomValues;
704 atomValues.emplace_back(it->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
705 it++;
706 atomValues.emplace_back(it->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
707 EXPECT_THAT(atomValues, UnorderedElementsAre(4, 5));
708 }
709
TEST(GaugeMetricProducerTest,TestPullNWithoutTrigger)710 TEST(GaugeMetricProducerTest, TestPullNWithoutTrigger) {
711 GaugeMetric metric;
712 metric.set_id(metricId);
713 metric.set_bucket(ONE_MINUTE);
714 metric.set_sampling_type(GaugeMetric::FIRST_N_SAMPLES);
715 metric.set_max_pull_delay_sec(INT_MAX);
716 metric.set_max_num_gauge_atoms_per_bucket(3);
717 auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
718 gaugeFieldMatcher->set_field(tagId);
719
720 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
721
722 sp<EventMatcherWizard> eventMatcherWizard =
723 createEventMatcherWizard(tagId, logEventMatcherIndex);
724
725 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
726 EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
727 EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
728 EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _))
729 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
730 vector<std::shared_ptr<LogEvent>>* data) {
731 EXPECT_EQ(eventTimeNs, bucketStartTimeNs);
732 data->clear();
733 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 4));
734 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 5));
735 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 6));
736 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 7));
737 return true;
738 }));
739
740 GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
741 wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
742 tagId, /*triggerId=*/-1, tagId, bucketStartTimeNs,
743 bucketStartTimeNs, pullerManager);
744
745 EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
746 gaugeProducer.prepareFirstBucket();
747 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
748 EXPECT_EQ(3UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
749
750 vector<std::shared_ptr<LogEvent>> allData;
751 allData.push_back(CreateNoValuesLogEvent(tagId, bucket2StartTimeNs + 10));
752 gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucket2StartTimeNs + 30);
753 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
754 EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
755
756 ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
757 ASSERT_EQ(3UL, gaugeProducer.mPastBuckets.begin()->second.back().mAggregatedAtoms.size());
758 auto it = gaugeProducer.mPastBuckets.begin()->second.back().mAggregatedAtoms.begin();
759 vector<int> atomValues;
760 atomValues.emplace_back(it->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
761 it++;
762 atomValues.emplace_back(it->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
763 it++;
764 atomValues.emplace_back(it->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
765 EXPECT_THAT(atomValues, UnorderedElementsAre(4, 5, 6));
766 }
767
TEST(GaugeMetricProducerTest,TestRemoveDimensionInOutput)768 TEST(GaugeMetricProducerTest, TestRemoveDimensionInOutput) {
769 GaugeMetric metric;
770 metric.set_id(metricId);
771 metric.set_bucket(ONE_MINUTE);
772 metric.set_sampling_type(GaugeMetric::FIRST_N_SAMPLES);
773 metric.mutable_gauge_fields_filter()->set_include_all(true);
774 metric.set_max_pull_delay_sec(INT_MAX);
775 auto dimensionMatcher = metric.mutable_dimensions_in_what();
776 // use field 1 as dimension.
777 dimensionMatcher->set_field(tagId);
778 dimensionMatcher->add_child()->set_field(1);
779
780 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
781
782 sp<EventMatcherWizard> eventMatcherWizard =
783 createEventMatcherWizard(tagId, logEventMatcherIndex);
784
785 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
786 EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _))
787 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
788 vector<std::shared_ptr<LogEvent>>* data) {
789 EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 3);
790 data->clear();
791 data->push_back(CreateTwoValueLogEvent(tagId, eventTimeNs, 3, 4));
792 return true;
793 }))
794 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
795 vector<std::shared_ptr<LogEvent>>* data) {
796 EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 10);
797 data->clear();
798 data->push_back(CreateTwoValueLogEvent(tagId, eventTimeNs, 4, 5));
799 return true;
800 }))
801 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
802 vector<std::shared_ptr<LogEvent>>* data) {
803 EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 20);
804 data->clear();
805 data->push_back(CreateTwoValueLogEvent(tagId, eventTimeNs, 4, 6));
806 return true;
807 }))
808 .WillOnce(Return(true));
809
810 int triggerId = 5;
811 GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
812 wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
813 tagId, triggerId, tagId, bucketStartTimeNs, bucketStartTimeNs,
814 pullerManager);
815 gaugeProducer.prepareFirstBucket();
816
817 LogEvent triggerEvent(/*uid=*/0, /*pid=*/0);
818 CreateNoValuesLogEvent(&triggerEvent, triggerId, bucketStartTimeNs + 3);
819 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
820 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
821 triggerEvent.setElapsedTimestampNs(bucketStartTimeNs + 10);
822 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
823 ASSERT_EQ(2UL, gaugeProducer.mCurrentSlicedBucket->size());
824 ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
825 triggerEvent.setElapsedTimestampNs(bucketStartTimeNs + 20);
826 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
827 ASSERT_EQ(2UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
828 triggerEvent.setElapsedTimestampNs(bucket2StartTimeNs + 1);
829 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
830
831 ASSERT_EQ(2UL, gaugeProducer.mPastBuckets.size());
832 auto bucketIt = gaugeProducer.mPastBuckets.begin();
833 ASSERT_EQ(1UL, bucketIt->second.back().mAggregatedAtoms.size());
834 EXPECT_EQ(3, bucketIt->first.getDimensionKeyInWhat().getValues().begin()->mValue.int_value);
835 EXPECT_EQ(4, bucketIt->second.back()
836 .mAggregatedAtoms.begin()
837 ->first.getAtomFieldValues()
838 .getValues()
839 .begin()
840 ->mValue.int_value);
841 bucketIt++;
842 ASSERT_EQ(2UL, bucketIt->second.back().mAggregatedAtoms.size());
843 EXPECT_EQ(4, bucketIt->first.getDimensionKeyInWhat().getValues().begin()->mValue.int_value);
844 auto atomIt = bucketIt->second.back().mAggregatedAtoms.begin();
845 vector<int> atomValues;
846 atomValues.emplace_back(
847 atomIt->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
848 atomIt++;
849 atomValues.emplace_back(
850 atomIt->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
851 EXPECT_THAT(atomValues, UnorderedElementsAre(5, 6));
852 }
853
854 /*
855 * Test that BUCKET_TOO_SMALL dump reason is logged when a flushed bucket size
856 * is smaller than the "min_bucket_size_nanos" specified in the metric config.
857 */
TEST(GaugeMetricProducerTest_BucketDrop,TestBucketDropWhenBucketTooSmall)858 TEST(GaugeMetricProducerTest_BucketDrop, TestBucketDropWhenBucketTooSmall) {
859 GaugeMetric metric;
860 metric.set_id(metricId);
861 metric.set_bucket(FIVE_MINUTES);
862 metric.set_sampling_type(GaugeMetric::FIRST_N_SAMPLES);
863 metric.set_min_bucket_size_nanos(10000000000); // 10 seconds
864
865 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
866
867 sp<EventMatcherWizard> eventMatcherWizard =
868 createEventMatcherWizard(tagId, logEventMatcherIndex);
869
870 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
871 EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, bucketStartTimeNs + 3, _))
872 // Bucket start.
873 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
874 vector<std::shared_ptr<LogEvent>>* data) {
875 data->clear();
876 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 10));
877 return true;
878 }));
879
880 int triggerId = 5;
881 GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
882 wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
883 tagId, triggerId, tagId, bucketStartTimeNs, bucketStartTimeNs,
884 pullerManager);
885 gaugeProducer.prepareFirstBucket();
886
887 LogEvent triggerEvent(/*uid=*/0, /*pid=*/0);
888 CreateNoValuesLogEvent(&triggerEvent, triggerId, bucketStartTimeNs + 3);
889 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
890
891 // Check dump report.
892 ProtoOutputStream output;
893 std::set<string> strSet;
894 gaugeProducer.onDumpReport(bucketStartTimeNs + 9000000, true /* include recent buckets */, true,
895 FAST /* dump_latency */, &strSet, &output);
896
897 StatsLogReport report = outputStreamToProto(&output);
898 EXPECT_TRUE(report.has_gauge_metrics());
899 ASSERT_EQ(0, report.gauge_metrics().data_size());
900 ASSERT_EQ(1, report.gauge_metrics().skipped_size());
901
902 EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
903 report.gauge_metrics().skipped(0).start_bucket_elapsed_millis());
904 EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 9000000),
905 report.gauge_metrics().skipped(0).end_bucket_elapsed_millis());
906 ASSERT_EQ(1, report.gauge_metrics().skipped(0).drop_event_size());
907
908 auto dropEvent = report.gauge_metrics().skipped(0).drop_event(0);
909 EXPECT_EQ(BucketDropReason::BUCKET_TOO_SMALL, dropEvent.drop_reason());
910 EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 9000000), dropEvent.drop_time_millis());
911 }
912
TEST(GaugeMetricProducerTest,TestPullDimensionalSampling)913 TEST(GaugeMetricProducerTest, TestPullDimensionalSampling) {
914 ShardOffsetProvider::getInstance().setShardOffset(5);
915
916 StatsdConfig config;
917 config.add_allowed_log_source("AID_ROOT"); // LogEvent defaults to UID of root.
918
919 int triggerId = 5;
920 int shardCount = 2;
921 GaugeMetric sampledGaugeMetric = createGaugeMetric(
922 "GaugePullSampled", metricId, GaugeMetric::FIRST_N_SAMPLES, nullopt, triggerId);
923 sampledGaugeMetric.set_max_pull_delay_sec(INT_MAX);
924 *sampledGaugeMetric.mutable_dimensions_in_what() = CreateDimensions(tagId, {1});
925 *sampledGaugeMetric.mutable_dimensional_sampling_info()->mutable_sampled_what_field() =
926 CreateDimensions(tagId, {1});
927 sampledGaugeMetric.mutable_dimensional_sampling_info()->set_shard_count(shardCount);
928 *config.add_gauge_metric() = sampledGaugeMetric;
929
930 sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
931
932 sp<EventMatcherWizard> eventMatcherWizard =
933 createEventMatcherWizard(tagId, logEventMatcherIndex);
934
935 sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
936
937 EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _))
938 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
939 vector<std::shared_ptr<LogEvent>>* data) {
940 data->clear();
941 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 10, 1001, 5, 10));
942 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 10, 1002, 10, 10));
943 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 10, 1003, 15, 10));
944 return true;
945 }))
946 .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
947 vector<std::shared_ptr<LogEvent>>* data) {
948 data->clear();
949 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 20, 1001, 6, 10));
950 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 20, 1002, 12, 10));
951 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 20, 1003, 18, 10));
952 return true;
953 }));
954
955 GaugeMetricProducer gaugeProducer(kConfigKey, sampledGaugeMetric,
956 -1 /*-1 meaning no condition*/, {}, wizard, protoHash,
957 logEventMatcherIndex, eventMatcherWizard, tagId, triggerId,
958 tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager);
959 SamplingInfo samplingInfo;
960 samplingInfo.shardCount = shardCount;
961 translateFieldMatcher(sampledGaugeMetric.dimensional_sampling_info().sampled_what_field(),
962 &samplingInfo.sampledWhatFields);
963 gaugeProducer.setSamplingInfo(samplingInfo);
964 gaugeProducer.prepareFirstBucket();
965
966 LogEvent triggerEvent(/*uid=*/0, /*pid=*/0);
967 CreateRepeatedValueLogEvent(&triggerEvent, triggerId, bucketStartTimeNs + 10, 5);
968 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
969
970 triggerEvent.setElapsedTimestampNs(bucketStartTimeNs + 20);
971 gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
972
973 // Check dump report.
974 ProtoOutputStream output;
975 std::set<string> strSet;
976 int64_t dumpReportTimeNs = bucketStartTimeNs + 10000000000;
977 gaugeProducer.onDumpReport(dumpReportTimeNs, true /* include current buckets */, true,
978 NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output);
979
980 StatsLogReport report = outputStreamToProto(&output);
981 backfillDimensionPath(&report);
982 backfillStartEndTimestamp(&report);
983 backfillAggregatedAtoms(&report);
984
985 EXPECT_TRUE(report.has_gauge_metrics());
986 StatsLogReport::GaugeMetricDataWrapper gaugeMetrics;
987 sortMetricDataByDimensionsValue(report.gauge_metrics(), &gaugeMetrics);
988 ASSERT_EQ(2, gaugeMetrics.data_size());
989 EXPECT_EQ(0, report.gauge_metrics().skipped_size());
990
991 // Only Uid 1 and 3 are logged. (odd hash value) + (offset of 5) % (shard count of 2) = 0
992 GaugeMetricData data = gaugeMetrics.data(0);
993 ValidateUidDimension(data.dimensions_in_what(), tagId, 1001);
994 ValidateGaugeBucketTimes(data.bucket_info(0), bucketStartTimeNs, dumpReportTimeNs,
995 {bucketStartTimeNs + 10, bucketStartTimeNs + 20});
996
997 data = gaugeMetrics.data(1);
998 ValidateUidDimension(data.dimensions_in_what(), tagId, 1003);
999 ValidateGaugeBucketTimes(data.bucket_info(0), bucketStartTimeNs, dumpReportTimeNs,
1000 {bucketStartTimeNs + 10, bucketStartTimeNs + 20});
1001 }
1002
1003 } // namespace statsd
1004 } // namespace os
1005 } // namespace android
1006 #else
1007 GTEST_LOG_(INFO) << "This test does nothing.\n";
1008 #endif
1009