• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2017 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/metrics/GaugeMetricProducer.h"
16 
17 #include <gmock/gmock.h>
18 #include <gtest/gtest.h>
19 #include <math.h>
20 #include <stdio.h>
21 
22 #include <vector>
23 
24 #include "logd/LogEvent.h"
25 #include "metrics_test_helper.h"
26 #include "src/matchers/SimpleAtomMatchingTracker.h"
27 #include "src/metrics/MetricProducer.h"
28 #include "src/stats_log_util.h"
29 #include "stats_event.h"
30 #include "tests/statsd_test_util.h"
31 
32 using namespace testing;
33 using android::sp;
34 using std::set;
35 using std::unordered_map;
36 using std::vector;
37 using std::make_shared;
38 
39 #ifdef __ANDROID__
40 
41 namespace android {
42 namespace os {
43 namespace statsd {
44 
45 namespace {
46 
47 const ConfigKey kConfigKey(0, 12345);
48 const int tagId = 1;
49 const int64_t metricId = 123;
50 const uint64_t protoHash = 0x123456789;
51 const int logEventMatcherIndex = 0;
52 const int64_t bucketStartTimeNs = 10 * NS_PER_SEC;
53 const int64_t bucketSizeNs = TimeUnitToBucketSizeInMillis(ONE_MINUTE) * 1000000LL;
54 const int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
55 const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
56 const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs;
57 const int64_t partialBucketSplitTimeNs = bucketStartTimeNs + 15 * NS_PER_SEC;
58 
makeLogEvent(int32_t atomId,int64_t timestampNs,int32_t value1,string str1,int32_t value2)59 shared_ptr<LogEvent> makeLogEvent(int32_t atomId, int64_t timestampNs, int32_t value1, string str1,
60                                   int32_t value2) {
61     AStatsEvent* statsEvent = AStatsEvent_obtain();
62     AStatsEvent_setAtomId(statsEvent, atomId);
63     AStatsEvent_overwriteTimestamp(statsEvent, timestampNs);
64 
65     AStatsEvent_writeInt32(statsEvent, value1);
66     AStatsEvent_writeString(statsEvent, str1.c_str());
67     AStatsEvent_writeInt32(statsEvent, value2);
68 
69     shared_ptr<LogEvent> logEvent = std::make_shared<LogEvent>(/*uid=*/0, /*pid=*/0);
70     parseStatsEventToLogEvent(statsEvent, logEvent.get());
71     return logEvent;
72 }
73 }  // anonymous namespace
74 
75 // Setup for parameterized tests.
76 class GaugeMetricProducerTest_PartialBucket : public TestWithParam<BucketSplitEvent> {};
77 
78 INSTANTIATE_TEST_SUITE_P(GaugeMetricProducerTest_PartialBucket,
79                          GaugeMetricProducerTest_PartialBucket,
80                          testing::Values(APP_UPGRADE, BOOT_COMPLETE));
81 
82 /*
83  * Tests that the first bucket works correctly
84  */
TEST(GaugeMetricProducerTest,TestFirstBucket)85 TEST(GaugeMetricProducerTest, TestFirstBucket) {
86     GaugeMetric metric;
87     metric.set_id(metricId);
88     metric.set_bucket(ONE_MINUTE);
89     metric.mutable_gauge_fields_filter()->set_include_all(false);
90     auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
91     gaugeFieldMatcher->set_field(tagId);
92     gaugeFieldMatcher->add_child()->set_field(1);
93     gaugeFieldMatcher->add_child()->set_field(3);
94 
95     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
96 
97     sp<EventMatcherWizard> eventMatcherWizard =
98             createEventMatcherWizard(tagId, logEventMatcherIndex);
99 
100     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
101 
102     // statsd started long ago.
103     // The metric starts in the middle of the bucket
104     GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
105                                       wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
106                                       -1, -1, tagId, 5, 600 * NS_PER_SEC + NS_PER_SEC / 2,
107                                       pullerManager);
108     gaugeProducer.prepareFirstBucket();
109 
110     EXPECT_EQ(600500000000, gaugeProducer.mCurrentBucketStartTimeNs);
111     EXPECT_EQ(10, gaugeProducer.mCurrentBucketNum);
112     EXPECT_EQ(660000000005, gaugeProducer.getCurrentBucketEndTimeNs());
113 }
114 
TEST(GaugeMetricProducerTest,TestPulledEventsNoCondition)115 TEST(GaugeMetricProducerTest, TestPulledEventsNoCondition) {
116     GaugeMetric metric;
117     metric.set_id(metricId);
118     metric.set_bucket(ONE_MINUTE);
119     metric.mutable_gauge_fields_filter()->set_include_all(false);
120     metric.set_max_pull_delay_sec(INT_MAX);
121     auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
122     gaugeFieldMatcher->set_field(tagId);
123     gaugeFieldMatcher->add_child()->set_field(1);
124     gaugeFieldMatcher->add_child()->set_field(3);
125 
126     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
127 
128     sp<EventMatcherWizard> eventMatcherWizard =
129             createEventMatcherWizard(tagId, logEventMatcherIndex);
130 
131     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
132     EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
133     EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
134     EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, bucketStartTimeNs, _))
135             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
136                                 vector<std::shared_ptr<LogEvent>>* data) {
137                 EXPECT_EQ(eventTimeNs, bucketStartTimeNs);
138                 data->clear();
139                 data->push_back(makeLogEvent(tagId, eventTimeNs + 10, 3, "some value", 11));
140                 return true;
141             }));
142 
143     GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
144                                       wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
145                                       tagId, -1, tagId, bucketStartTimeNs, bucketStartTimeNs,
146                                       pullerManager);
147     gaugeProducer.prepareFirstBucket();
148 
149     vector<shared_ptr<LogEvent>> allData;
150     allData.clear();
151     allData.push_back(makeLogEvent(tagId, bucket2StartTimeNs + 1, 10, "some value", 11));
152 
153     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucket2StartTimeNs);
154     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
155     auto it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
156     EXPECT_EQ(INT, it->mValue.getType());
157     EXPECT_EQ(10, it->mValue.int_value);
158     it++;
159     EXPECT_EQ(11, it->mValue.int_value);
160     ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
161     EXPECT_EQ(3, gaugeProducer.mPastBuckets.begin()
162                          ->second.back()
163                          .mAggregatedAtoms.begin()
164                          ->first.getAtomFieldValues()
165                          .getValues()
166                          .begin()
167                          ->mValue.int_value);
168 
169     allData.clear();
170     allData.push_back(makeLogEvent(tagId, bucket3StartTimeNs + 10, 24, "some value", 25));
171     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucket3StartTimeNs);
172     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
173     it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
174     EXPECT_EQ(INT, it->mValue.getType());
175     EXPECT_EQ(24, it->mValue.int_value);
176     it++;
177     EXPECT_EQ(INT, it->mValue.getType());
178     EXPECT_EQ(25, it->mValue.int_value);
179     // One dimension.
180     ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
181     ASSERT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.size());
182     auto it2 = gaugeProducer.mPastBuckets.begin()
183                        ->second.back()
184                        .mAggregatedAtoms.begin()
185                        ->first.getAtomFieldValues()
186                        .getValues()
187                        .begin();
188     EXPECT_EQ(INT, it2->mValue.getType());
189     EXPECT_EQ(10L, it2->mValue.int_value);
190     it2++;
191     EXPECT_EQ(INT, it2->mValue.getType());
192     EXPECT_EQ(11L, it2->mValue.int_value);
193 
194     gaugeProducer.flushIfNeededLocked(bucket4StartTimeNs);
195     ASSERT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
196     // One dimension.
197     ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
198     ASSERT_EQ(3UL, gaugeProducer.mPastBuckets.begin()->second.size());
199     it2 = gaugeProducer.mPastBuckets.begin()
200                   ->second.back()
201                   .mAggregatedAtoms.begin()
202                   ->first.getAtomFieldValues()
203                   .getValues()
204                   .begin();
205     EXPECT_EQ(INT, it2->mValue.getType());
206     EXPECT_EQ(24L, it2->mValue.int_value);
207     it2++;
208     EXPECT_EQ(INT, it2->mValue.getType());
209     EXPECT_EQ(25L, it2->mValue.int_value);
210 }
211 
TEST_P(GaugeMetricProducerTest_PartialBucket,TestPushedEvents)212 TEST_P(GaugeMetricProducerTest_PartialBucket, TestPushedEvents) {
213     sp<AlarmMonitor> alarmMonitor;
214     GaugeMetric metric;
215     metric.set_id(metricId);
216     metric.set_bucket(ONE_MINUTE);
217     metric.mutable_gauge_fields_filter()->set_include_all(true);
218     metric.set_split_bucket_for_app_upgrade(true);
219 
220     Alert alert;
221     alert.set_id(101);
222     alert.set_metric_id(metricId);
223     alert.set_trigger_if_sum_gt(25);
224     alert.set_num_buckets(100);
225     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
226     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
227 
228     sp<EventMatcherWizard> eventMatcherWizard =
229             createEventMatcherWizard(tagId, logEventMatcherIndex);
230 
231     GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
232                                       wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
233                                       -1 /* -1 means no pulling */, -1, tagId, bucketStartTimeNs,
234                                       bucketStartTimeNs, pullerManager);
235     gaugeProducer.prepareFirstBucket();
236 
237     sp<AnomalyTracker> anomalyTracker =
238             gaugeProducer.addAnomalyTracker(alert, alarmMonitor, UPDATE_NEW, bucketStartTimeNs);
239     EXPECT_TRUE(anomalyTracker != nullptr);
240 
241     LogEvent event1(/*uid=*/0, /*pid=*/0);
242     CreateTwoValueLogEvent(&event1, tagId, bucketStartTimeNs + 10, 1, 10);
243     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
244     EXPECT_EQ(1UL, (*gaugeProducer.mCurrentSlicedBucket).count(DEFAULT_METRIC_DIMENSION_KEY));
245 
246     switch (GetParam()) {
247         case APP_UPGRADE:
248             gaugeProducer.notifyAppUpgrade(partialBucketSplitTimeNs);
249             break;
250         case BOOT_COMPLETE:
251             gaugeProducer.onStatsdInitCompleted(partialBucketSplitTimeNs);
252             break;
253     }
254     EXPECT_EQ(0UL, (*gaugeProducer.mCurrentSlicedBucket).count(DEFAULT_METRIC_DIMENSION_KEY));
255     ASSERT_EQ(1UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
256     EXPECT_EQ(bucketStartTimeNs,
257               gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs);
258     EXPECT_EQ(partialBucketSplitTimeNs,
259               gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketEndNs);
260     EXPECT_EQ(0L, gaugeProducer.mCurrentBucketNum);
261     EXPECT_EQ(partialBucketSplitTimeNs, gaugeProducer.mCurrentBucketStartTimeNs);
262     // Partial buckets are not sent to anomaly tracker.
263     EXPECT_EQ(0, anomalyTracker->getSumOverPastBuckets(DEFAULT_METRIC_DIMENSION_KEY));
264 
265     // Create an event in the same partial bucket.
266     LogEvent event2(/*uid=*/0, /*pid=*/0);
267     CreateTwoValueLogEvent(&event2, tagId, bucketStartTimeNs + 59 * NS_PER_SEC, 1, 10);
268     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
269     EXPECT_EQ(0L, gaugeProducer.mCurrentBucketNum);
270     ASSERT_EQ(1UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
271     EXPECT_EQ(bucketStartTimeNs,
272               gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs);
273     EXPECT_EQ(partialBucketSplitTimeNs,
274               gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketEndNs);
275     EXPECT_EQ((int64_t)partialBucketSplitTimeNs, gaugeProducer.mCurrentBucketStartTimeNs);
276     // Partial buckets are not sent to anomaly tracker.
277     EXPECT_EQ(0, anomalyTracker->getSumOverPastBuckets(DEFAULT_METRIC_DIMENSION_KEY));
278 
279     // Next event should trigger creation of new bucket and send previous full bucket to anomaly
280     // tracker.
281     LogEvent event3(/*uid=*/0, /*pid=*/0);
282     CreateTwoValueLogEvent(&event3, tagId, bucketStartTimeNs + 65 * NS_PER_SEC, 1, 10);
283     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, event3);
284     EXPECT_EQ(1L, gaugeProducer.mCurrentBucketNum);
285     ASSERT_EQ(2UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
286     EXPECT_EQ((int64_t)bucketStartTimeNs + bucketSizeNs, gaugeProducer.mCurrentBucketStartTimeNs);
287     EXPECT_EQ(1, anomalyTracker->getSumOverPastBuckets(DEFAULT_METRIC_DIMENSION_KEY));
288 
289     // Next event should trigger creation of new bucket.
290     LogEvent event4(/*uid=*/0, /*pid=*/0);
291     CreateTwoValueLogEvent(&event4, tagId, bucketStartTimeNs + 125 * NS_PER_SEC, 1, 10);
292     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, event4);
293     EXPECT_EQ(2L, gaugeProducer.mCurrentBucketNum);
294     ASSERT_EQ(3UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
295     EXPECT_EQ(2, anomalyTracker->getSumOverPastBuckets(DEFAULT_METRIC_DIMENSION_KEY));
296 }
297 
TEST_P(GaugeMetricProducerTest_PartialBucket,TestPulled)298 TEST_P(GaugeMetricProducerTest_PartialBucket, TestPulled) {
299     GaugeMetric metric;
300     metric.set_id(metricId);
301     metric.set_bucket(ONE_MINUTE);
302     metric.set_max_pull_delay_sec(INT_MAX);
303     metric.set_split_bucket_for_app_upgrade(true);
304     auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
305     gaugeFieldMatcher->set_field(tagId);
306     gaugeFieldMatcher->add_child()->set_field(2);
307 
308     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
309 
310     sp<EventMatcherWizard> eventMatcherWizard =
311             createEventMatcherWizard(tagId, logEventMatcherIndex);
312 
313     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
314     EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
315     EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
316     EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _))
317             .WillOnce(Return(false))
318             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
319                                 vector<std::shared_ptr<LogEvent>>* data) {
320                 EXPECT_EQ(eventTimeNs, partialBucketSplitTimeNs);
321                 data->clear();
322                 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 2));
323                 return true;
324             }));
325 
326     GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
327                                       wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
328                                       tagId, -1, tagId, bucketStartTimeNs, bucketStartTimeNs,
329                                       pullerManager);
330     gaugeProducer.prepareFirstBucket();
331 
332     vector<shared_ptr<LogEvent>> allData;
333     allData.push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 1, 1));
334     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucketStartTimeNs);
335     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
336     EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
337                          ->second.front()
338                          .mFields->begin()
339                          ->mValue.int_value);
340 
341     switch (GetParam()) {
342         case APP_UPGRADE:
343             gaugeProducer.notifyAppUpgrade(partialBucketSplitTimeNs);
344             break;
345         case BOOT_COMPLETE:
346             gaugeProducer.onStatsdInitCompleted(partialBucketSplitTimeNs);
347             break;
348     }
349     ASSERT_EQ(1UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
350     EXPECT_EQ(bucketStartTimeNs,
351               gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs);
352     EXPECT_EQ(partialBucketSplitTimeNs,
353               gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketEndNs);
354     EXPECT_EQ(0L, gaugeProducer.mCurrentBucketNum);
355     EXPECT_EQ(partialBucketSplitTimeNs, gaugeProducer.mCurrentBucketStartTimeNs);
356     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
357     EXPECT_EQ(2, gaugeProducer.mCurrentSlicedBucket->begin()
358                          ->second.front()
359                          .mFields->begin()
360                          ->mValue.int_value);
361 
362     allData.clear();
363     allData.push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + bucketSizeNs + 1, 3));
364     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS,
365                                bucketStartTimeNs + bucketSizeNs);
366     ASSERT_EQ(2UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
367     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
368     EXPECT_EQ(3, gaugeProducer.mCurrentSlicedBucket->begin()
369                          ->second.front()
370                          .mFields->begin()
371                          ->mValue.int_value);
372 }
373 
TEST(GaugeMetricProducerTest,TestPulledWithAppUpgradeDisabled)374 TEST(GaugeMetricProducerTest, TestPulledWithAppUpgradeDisabled) {
375     GaugeMetric metric;
376     metric.set_id(metricId);
377     metric.set_bucket(ONE_MINUTE);
378     metric.set_max_pull_delay_sec(INT_MAX);
379     metric.set_split_bucket_for_app_upgrade(false);
380     auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
381     gaugeFieldMatcher->set_field(tagId);
382     gaugeFieldMatcher->add_child()->set_field(2);
383 
384     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
385 
386     sp<EventMatcherWizard> eventMatcherWizard =
387             createEventMatcherWizard(tagId, logEventMatcherIndex);
388 
389     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
390     EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
391     EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
392     EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, bucketStartTimeNs, _))
393             .WillOnce(Return(false));
394 
395     GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
396                                       wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
397                                       tagId, -1, tagId, bucketStartTimeNs, bucketStartTimeNs,
398                                       pullerManager);
399     gaugeProducer.prepareFirstBucket();
400 
401     vector<shared_ptr<LogEvent>> allData;
402     allData.push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 1, 1));
403     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucketStartTimeNs);
404     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
405     EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
406                          ->second.front()
407                          .mFields->begin()
408                          ->mValue.int_value);
409 
410     gaugeProducer.notifyAppUpgrade(partialBucketSplitTimeNs);
411     ASSERT_EQ(0UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
412     EXPECT_EQ(0L, gaugeProducer.mCurrentBucketNum);
413     EXPECT_EQ(bucketStartTimeNs, gaugeProducer.mCurrentBucketStartTimeNs);
414     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
415     EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
416                          ->second.front()
417                          .mFields->begin()
418                          ->mValue.int_value);
419 }
420 
TEST(GaugeMetricProducerTest,TestPulledEventsWithCondition)421 TEST(GaugeMetricProducerTest, TestPulledEventsWithCondition) {
422     GaugeMetric metric;
423     metric.set_id(metricId);
424     metric.set_bucket(ONE_MINUTE);
425     metric.set_max_pull_delay_sec(INT_MAX);
426     auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
427     gaugeFieldMatcher->set_field(tagId);
428     gaugeFieldMatcher->add_child()->set_field(2);
429     metric.set_condition(StringToId("SCREEN_ON"));
430 
431     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
432 
433     sp<EventMatcherWizard> eventMatcherWizard =
434             createEventMatcherWizard(tagId, logEventMatcherIndex);
435 
436     int64_t conditionChangeNs = bucketStartTimeNs + 8;
437 
438     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
439     EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
440     EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
441     EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, conditionChangeNs, _))
442             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
443                                 vector<std::shared_ptr<LogEvent>>* data) {
444                 data->clear();
445                 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs + 10, 100));
446                 return true;
447             }));
448 
449     GaugeMetricProducer gaugeProducer(kConfigKey, metric, 0 /*condition index*/,
450                                       {ConditionState::kUnknown}, wizard, protoHash,
451                                       logEventMatcherIndex, eventMatcherWizard, tagId, -1, tagId,
452                                       bucketStartTimeNs, bucketStartTimeNs, pullerManager);
453     gaugeProducer.prepareFirstBucket();
454 
455     gaugeProducer.onConditionChanged(true, conditionChangeNs);
456     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
457     EXPECT_EQ(100, gaugeProducer.mCurrentSlicedBucket->begin()
458                            ->second.front()
459                            .mFields->begin()
460                            ->mValue.int_value);
461     ASSERT_EQ(0UL, gaugeProducer.mPastBuckets.size());
462 
463     vector<shared_ptr<LogEvent>> allData;
464     allData.clear();
465     allData.push_back(CreateRepeatedValueLogEvent(tagId, bucket2StartTimeNs + 1, 110));
466     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucket2StartTimeNs);
467 
468     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
469     EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin()
470                            ->second.front()
471                            .mFields->begin()
472                            ->mValue.int_value);
473     ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
474 
475     EXPECT_EQ(100, gaugeProducer.mPastBuckets.begin()
476                            ->second.back()
477                            .mAggregatedAtoms.begin()
478                            ->first.getAtomFieldValues()
479                            .getValues()
480                            .begin()
481                            ->mValue.int_value);
482 
483     gaugeProducer.onConditionChanged(false, bucket2StartTimeNs + 10);
484     gaugeProducer.flushIfNeededLocked(bucket3StartTimeNs + 10);
485     ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
486     ASSERT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.size());
487     EXPECT_EQ(110L, gaugeProducer.mPastBuckets.begin()
488                             ->second.back()
489                             .mAggregatedAtoms.begin()
490                             ->first.getAtomFieldValues()
491                             .getValues()
492                             .begin()
493                             ->mValue.int_value);
494 }
495 
TEST(GaugeMetricProducerTest,TestPulledEventsWithSlicedCondition)496 TEST(GaugeMetricProducerTest, TestPulledEventsWithSlicedCondition) {
497     const int conditionTag = 65;
498     GaugeMetric metric;
499     metric.set_id(1111111);
500     metric.set_bucket(ONE_MINUTE);
501     metric.mutable_gauge_fields_filter()->set_include_all(true);
502     metric.set_condition(StringToId("APP_DIED"));
503     metric.set_max_pull_delay_sec(INT_MAX);
504     auto dim = metric.mutable_dimensions_in_what();
505     dim->set_field(tagId);
506     dim->add_child()->set_field(1);
507 
508     sp<EventMatcherWizard> eventMatcherWizard =
509             createEventMatcherWizard(tagId, logEventMatcherIndex);
510 
511     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
512     EXPECT_CALL(*wizard, query(_, _, _))
513             .WillRepeatedly(
514                     Invoke([](const int conditionIndex, const ConditionKey& conditionParameters,
515                               const bool isPartialLink) {
516                         int pos[] = {1, 0, 0};
517                         Field f(conditionTag, pos, 0);
518                         HashableDimensionKey key;
519                         key.mutableValues()->emplace_back(f, Value((int32_t)1000000));
520 
521                         return ConditionState::kTrue;
522                     }));
523 
524     int64_t sliceConditionChangeNs = bucketStartTimeNs + 8;
525 
526     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
527     EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
528     EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
529     EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, sliceConditionChangeNs, _))
530             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
531                                 vector<std::shared_ptr<LogEvent>>* data) {
532                 data->clear();
533                 data->push_back(CreateTwoValueLogEvent(tagId, eventTimeNs + 10, 1000, 100));
534                 return true;
535             }));
536 
537     GaugeMetricProducer gaugeProducer(kConfigKey, metric, 0 /*condition index*/,
538                                       {ConditionState::kUnknown}, wizard, protoHash,
539                                       logEventMatcherIndex, eventMatcherWizard, tagId, -1, tagId,
540                                       bucketStartTimeNs, bucketStartTimeNs, pullerManager);
541     gaugeProducer.prepareFirstBucket();
542 
543     gaugeProducer.onSlicedConditionMayChange(true, sliceConditionChangeNs);
544 
545     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
546     const auto& key = gaugeProducer.mCurrentSlicedBucket->begin()->first;
547     ASSERT_EQ(1UL, key.getDimensionKeyInWhat().getValues().size());
548     EXPECT_EQ(1000, key.getDimensionKeyInWhat().getValues()[0].mValue.int_value);
549 
550     ASSERT_EQ(0UL, gaugeProducer.mPastBuckets.size());
551 
552     vector<shared_ptr<LogEvent>> allData;
553     allData.clear();
554     allData.push_back(CreateTwoValueLogEvent(tagId, bucket2StartTimeNs + 1, 1000, 110));
555     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucket2StartTimeNs);
556 
557     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
558     ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
559 }
560 
TEST(GaugeMetricProducerTest,TestPulledEventsAnomalyDetection)561 TEST(GaugeMetricProducerTest, TestPulledEventsAnomalyDetection) {
562     sp<AlarmMonitor> alarmMonitor;
563     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
564 
565     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
566     EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
567     EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
568     EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, bucketStartTimeNs, _))
569             .WillOnce(Return(false));
570 
571     GaugeMetric metric;
572     metric.set_id(metricId);
573     metric.set_bucket(ONE_MINUTE);
574     metric.set_max_pull_delay_sec(INT_MAX);
575     auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
576     gaugeFieldMatcher->set_field(tagId);
577     gaugeFieldMatcher->add_child()->set_field(2);
578 
579     sp<EventMatcherWizard> eventMatcherWizard =
580             createEventMatcherWizard(tagId, logEventMatcherIndex);
581 
582     GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
583                                       wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
584                                       tagId, -1, tagId, bucketStartTimeNs, bucketStartTimeNs,
585                                       pullerManager);
586     gaugeProducer.prepareFirstBucket();
587 
588     Alert alert;
589     alert.set_id(101);
590     alert.set_metric_id(metricId);
591     alert.set_trigger_if_sum_gt(25);
592     alert.set_num_buckets(2);
593     const int32_t refPeriodSec = 60;
594     alert.set_refractory_period_secs(refPeriodSec);
595     sp<AnomalyTracker> anomalyTracker =
596             gaugeProducer.addAnomalyTracker(alert, alarmMonitor, UPDATE_NEW, bucketStartTimeNs);
597 
598     int tagId = 1;
599     vector<shared_ptr<LogEvent>> allData;
600     allData.clear();
601     allData.push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 1, 13));
602     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucketStartTimeNs);
603     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
604     EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()
605                            ->second.front()
606                            .mFields->begin()
607                            ->mValue.int_value);
608     EXPECT_EQ(anomalyTracker->getRefractoryPeriodEndsSec(DEFAULT_METRIC_DIMENSION_KEY), 0U);
609 
610     std::shared_ptr<LogEvent> event2 =
611             CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + bucketSizeNs + 20, 15);
612 
613     allData.clear();
614     allData.push_back(event2);
615     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS,
616                                bucketStartTimeNs + bucketSizeNs);
617     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
618     EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()
619                            ->second.front()
620                            .mFields->begin()
621                            ->mValue.int_value);
622     EXPECT_EQ(anomalyTracker->getRefractoryPeriodEndsSec(DEFAULT_METRIC_DIMENSION_KEY),
623               std::ceil(1.0 * event2->GetElapsedTimestampNs() / NS_PER_SEC) + refPeriodSec);
624 
625     allData.clear();
626     allData.push_back(
627             CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 2 * bucketSizeNs + 10, 26));
628     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS,
629                                bucket2StartTimeNs + 2 * bucketSizeNs);
630     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
631     EXPECT_EQ(26L, gaugeProducer.mCurrentSlicedBucket->begin()
632                            ->second.front()
633                            .mFields->begin()
634                            ->mValue.int_value);
635     EXPECT_EQ(anomalyTracker->getRefractoryPeriodEndsSec(DEFAULT_METRIC_DIMENSION_KEY),
636               std::ceil(1.0 * event2->GetElapsedTimestampNs() / NS_PER_SEC + refPeriodSec));
637 
638     // This event does not have the gauge field. Thus the current bucket value is 0.
639     allData.clear();
640     allData.push_back(CreateNoValuesLogEvent(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 10));
641     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS,
642                                bucketStartTimeNs + 3 * bucketSizeNs);
643     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
644     EXPECT_TRUE(gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->empty());
645 }
646 
TEST(GaugeMetricProducerTest,TestPullOnTrigger)647 TEST(GaugeMetricProducerTest, TestPullOnTrigger) {
648     GaugeMetric metric;
649     metric.set_id(metricId);
650     metric.set_bucket(ONE_MINUTE);
651     metric.set_sampling_type(GaugeMetric::FIRST_N_SAMPLES);
652     metric.mutable_gauge_fields_filter()->set_include_all(false);
653     metric.set_max_pull_delay_sec(INT_MAX);
654     auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
655     gaugeFieldMatcher->set_field(tagId);
656     gaugeFieldMatcher->add_child()->set_field(1);
657 
658     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
659 
660     sp<EventMatcherWizard> eventMatcherWizard =
661             createEventMatcherWizard(tagId, logEventMatcherIndex);
662 
663     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
664     EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _))
665             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
666                                 vector<std::shared_ptr<LogEvent>>* data) {
667                 EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 10);
668                 data->clear();
669                 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 4));
670                 return true;
671             }))
672             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
673                                 vector<std::shared_ptr<LogEvent>>* data) {
674                 EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 20);
675                 data->clear();
676                 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 5));
677                 return true;
678             }))
679             .WillOnce(Return(true));
680 
681     int triggerId = 5;
682     GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
683                                       wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
684                                       tagId, triggerId, tagId, bucketStartTimeNs, bucketStartTimeNs,
685                                       pullerManager);
686     gaugeProducer.prepareFirstBucket();
687 
688     ASSERT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
689 
690     LogEvent triggerEvent(/*uid=*/0, /*pid=*/0);
691     CreateNoValuesLogEvent(&triggerEvent, triggerId, bucketStartTimeNs + 10);
692     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
693     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
694     triggerEvent.setElapsedTimestampNs(bucketStartTimeNs + 20);
695     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
696     ASSERT_EQ(2UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
697     triggerEvent.setElapsedTimestampNs(bucket2StartTimeNs + 1);
698     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
699 
700     ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
701     ASSERT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.back().mAggregatedAtoms.size());
702     auto it = gaugeProducer.mPastBuckets.begin()->second.back().mAggregatedAtoms.begin();
703     vector<int> atomValues;
704     atomValues.emplace_back(it->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
705     it++;
706     atomValues.emplace_back(it->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
707     EXPECT_THAT(atomValues, UnorderedElementsAre(4, 5));
708 }
709 
TEST(GaugeMetricProducerTest,TestPullNWithoutTrigger)710 TEST(GaugeMetricProducerTest, TestPullNWithoutTrigger) {
711     GaugeMetric metric;
712     metric.set_id(metricId);
713     metric.set_bucket(ONE_MINUTE);
714     metric.set_sampling_type(GaugeMetric::FIRST_N_SAMPLES);
715     metric.set_max_pull_delay_sec(INT_MAX);
716     metric.set_max_num_gauge_atoms_per_bucket(3);
717     auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
718     gaugeFieldMatcher->set_field(tagId);
719 
720     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
721 
722     sp<EventMatcherWizard> eventMatcherWizard =
723             createEventMatcherWizard(tagId, logEventMatcherIndex);
724 
725     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
726     EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, kConfigKey, _, _, _)).WillOnce(Return());
727     EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, kConfigKey, _)).WillOnce(Return());
728     EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _))
729             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
730                                 vector<std::shared_ptr<LogEvent>>* data) {
731                 EXPECT_EQ(eventTimeNs, bucketStartTimeNs);
732                 data->clear();
733                 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 4));
734                 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 5));
735                 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 6));
736                 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 7));
737                 return true;
738             }));
739 
740     GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
741                                       wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
742                                       tagId, /*triggerId=*/-1, tagId, bucketStartTimeNs,
743                                       bucketStartTimeNs, pullerManager);
744 
745     EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
746     gaugeProducer.prepareFirstBucket();
747     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
748     EXPECT_EQ(3UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
749 
750     vector<std::shared_ptr<LogEvent>> allData;
751     allData.push_back(CreateNoValuesLogEvent(tagId, bucket2StartTimeNs + 10));
752     gaugeProducer.onDataPulled(allData, PullResult::PULL_RESULT_SUCCESS, bucket2StartTimeNs + 30);
753     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
754     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
755 
756     ASSERT_EQ(1UL, gaugeProducer.mPastBuckets.size());
757     ASSERT_EQ(3UL, gaugeProducer.mPastBuckets.begin()->second.back().mAggregatedAtoms.size());
758     auto it = gaugeProducer.mPastBuckets.begin()->second.back().mAggregatedAtoms.begin();
759     vector<int> atomValues;
760     atomValues.emplace_back(it->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
761     it++;
762     atomValues.emplace_back(it->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
763     it++;
764     atomValues.emplace_back(it->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
765     EXPECT_THAT(atomValues, UnorderedElementsAre(4, 5, 6));
766 }
767 
TEST(GaugeMetricProducerTest,TestRemoveDimensionInOutput)768 TEST(GaugeMetricProducerTest, TestRemoveDimensionInOutput) {
769     GaugeMetric metric;
770     metric.set_id(metricId);
771     metric.set_bucket(ONE_MINUTE);
772     metric.set_sampling_type(GaugeMetric::FIRST_N_SAMPLES);
773     metric.mutable_gauge_fields_filter()->set_include_all(true);
774     metric.set_max_pull_delay_sec(INT_MAX);
775     auto dimensionMatcher = metric.mutable_dimensions_in_what();
776     // use field 1 as dimension.
777     dimensionMatcher->set_field(tagId);
778     dimensionMatcher->add_child()->set_field(1);
779 
780     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
781 
782     sp<EventMatcherWizard> eventMatcherWizard =
783             createEventMatcherWizard(tagId, logEventMatcherIndex);
784 
785     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
786     EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _))
787             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
788                                 vector<std::shared_ptr<LogEvent>>* data) {
789                 EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 3);
790                 data->clear();
791                 data->push_back(CreateTwoValueLogEvent(tagId, eventTimeNs, 3, 4));
792                 return true;
793             }))
794             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
795                                 vector<std::shared_ptr<LogEvent>>* data) {
796                 EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 10);
797                 data->clear();
798                 data->push_back(CreateTwoValueLogEvent(tagId, eventTimeNs, 4, 5));
799                 return true;
800             }))
801             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
802                                 vector<std::shared_ptr<LogEvent>>* data) {
803                 EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 20);
804                 data->clear();
805                 data->push_back(CreateTwoValueLogEvent(tagId, eventTimeNs, 4, 6));
806                 return true;
807             }))
808             .WillOnce(Return(true));
809 
810     int triggerId = 5;
811     GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
812                                       wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
813                                       tagId, triggerId, tagId, bucketStartTimeNs, bucketStartTimeNs,
814                                       pullerManager);
815     gaugeProducer.prepareFirstBucket();
816 
817     LogEvent triggerEvent(/*uid=*/0, /*pid=*/0);
818     CreateNoValuesLogEvent(&triggerEvent, triggerId, bucketStartTimeNs + 3);
819     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
820     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
821     triggerEvent.setElapsedTimestampNs(bucketStartTimeNs + 10);
822     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
823     ASSERT_EQ(2UL, gaugeProducer.mCurrentSlicedBucket->size());
824     ASSERT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
825     triggerEvent.setElapsedTimestampNs(bucketStartTimeNs + 20);
826     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
827     ASSERT_EQ(2UL, gaugeProducer.mCurrentSlicedBucket->begin()->second.size());
828     triggerEvent.setElapsedTimestampNs(bucket2StartTimeNs + 1);
829     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
830 
831     ASSERT_EQ(2UL, gaugeProducer.mPastBuckets.size());
832     auto bucketIt = gaugeProducer.mPastBuckets.begin();
833     ASSERT_EQ(1UL, bucketIt->second.back().mAggregatedAtoms.size());
834     EXPECT_EQ(3, bucketIt->first.getDimensionKeyInWhat().getValues().begin()->mValue.int_value);
835     EXPECT_EQ(4, bucketIt->second.back()
836                          .mAggregatedAtoms.begin()
837                          ->first.getAtomFieldValues()
838                          .getValues()
839                          .begin()
840                          ->mValue.int_value);
841     bucketIt++;
842     ASSERT_EQ(2UL, bucketIt->second.back().mAggregatedAtoms.size());
843     EXPECT_EQ(4, bucketIt->first.getDimensionKeyInWhat().getValues().begin()->mValue.int_value);
844     auto atomIt = bucketIt->second.back().mAggregatedAtoms.begin();
845     vector<int> atomValues;
846     atomValues.emplace_back(
847             atomIt->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
848     atomIt++;
849     atomValues.emplace_back(
850             atomIt->first.getAtomFieldValues().getValues().begin()->mValue.int_value);
851     EXPECT_THAT(atomValues, UnorderedElementsAre(5, 6));
852 }
853 
854 /*
855  * Test that BUCKET_TOO_SMALL dump reason is logged when a flushed bucket size
856  * is smaller than the "min_bucket_size_nanos" specified in the metric config.
857  */
TEST(GaugeMetricProducerTest_BucketDrop,TestBucketDropWhenBucketTooSmall)858 TEST(GaugeMetricProducerTest_BucketDrop, TestBucketDropWhenBucketTooSmall) {
859     GaugeMetric metric;
860     metric.set_id(metricId);
861     metric.set_bucket(FIVE_MINUTES);
862     metric.set_sampling_type(GaugeMetric::FIRST_N_SAMPLES);
863     metric.set_min_bucket_size_nanos(10000000000);  // 10 seconds
864 
865     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
866 
867     sp<EventMatcherWizard> eventMatcherWizard =
868             createEventMatcherWizard(tagId, logEventMatcherIndex);
869 
870     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
871     EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, bucketStartTimeNs + 3, _))
872             // Bucket start.
873             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
874                                 vector<std::shared_ptr<LogEvent>>* data) {
875                 data->clear();
876                 data->push_back(CreateRepeatedValueLogEvent(tagId, eventTimeNs, 10));
877                 return true;
878             }));
879 
880     int triggerId = 5;
881     GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, {},
882                                       wizard, protoHash, logEventMatcherIndex, eventMatcherWizard,
883                                       tagId, triggerId, tagId, bucketStartTimeNs, bucketStartTimeNs,
884                                       pullerManager);
885     gaugeProducer.prepareFirstBucket();
886 
887     LogEvent triggerEvent(/*uid=*/0, /*pid=*/0);
888     CreateNoValuesLogEvent(&triggerEvent, triggerId, bucketStartTimeNs + 3);
889     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
890 
891     // Check dump report.
892     ProtoOutputStream output;
893     std::set<string> strSet;
894     gaugeProducer.onDumpReport(bucketStartTimeNs + 9000000, true /* include recent buckets */, true,
895                                FAST /* dump_latency */, &strSet, &output);
896 
897     StatsLogReport report = outputStreamToProto(&output);
898     EXPECT_TRUE(report.has_gauge_metrics());
899     ASSERT_EQ(0, report.gauge_metrics().data_size());
900     ASSERT_EQ(1, report.gauge_metrics().skipped_size());
901 
902     EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
903               report.gauge_metrics().skipped(0).start_bucket_elapsed_millis());
904     EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 9000000),
905               report.gauge_metrics().skipped(0).end_bucket_elapsed_millis());
906     ASSERT_EQ(1, report.gauge_metrics().skipped(0).drop_event_size());
907 
908     auto dropEvent = report.gauge_metrics().skipped(0).drop_event(0);
909     EXPECT_EQ(BucketDropReason::BUCKET_TOO_SMALL, dropEvent.drop_reason());
910     EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 9000000), dropEvent.drop_time_millis());
911 }
912 
TEST(GaugeMetricProducerTest,TestPullDimensionalSampling)913 TEST(GaugeMetricProducerTest, TestPullDimensionalSampling) {
914     ShardOffsetProvider::getInstance().setShardOffset(5);
915 
916     StatsdConfig config;
917     config.add_allowed_log_source("AID_ROOT");  // LogEvent defaults to UID of root.
918 
919     int triggerId = 5;
920     int shardCount = 2;
921     GaugeMetric sampledGaugeMetric = createGaugeMetric(
922             "GaugePullSampled", metricId, GaugeMetric::FIRST_N_SAMPLES, nullopt, triggerId);
923     sampledGaugeMetric.set_max_pull_delay_sec(INT_MAX);
924     *sampledGaugeMetric.mutable_dimensions_in_what() = CreateDimensions(tagId, {1});
925     *sampledGaugeMetric.mutable_dimensional_sampling_info()->mutable_sampled_what_field() =
926             CreateDimensions(tagId, {1});
927     sampledGaugeMetric.mutable_dimensional_sampling_info()->set_shard_count(shardCount);
928     *config.add_gauge_metric() = sampledGaugeMetric;
929 
930     sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
931 
932     sp<EventMatcherWizard> eventMatcherWizard =
933             createEventMatcherWizard(tagId, logEventMatcherIndex);
934 
935     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
936 
937     EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _))
938             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
939                                 vector<std::shared_ptr<LogEvent>>* data) {
940                 data->clear();
941                 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 10, 1001, 5, 10));
942                 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 10, 1002, 10, 10));
943                 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 10, 1003, 15, 10));
944                 return true;
945             }))
946             .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
947                                 vector<std::shared_ptr<LogEvent>>* data) {
948                 data->clear();
949                 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 20, 1001, 6, 10));
950                 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 20, 1002, 12, 10));
951                 data->push_back(makeUidLogEvent(tagId, bucketStartTimeNs + 20, 1003, 18, 10));
952                 return true;
953             }));
954 
955     GaugeMetricProducer gaugeProducer(kConfigKey, sampledGaugeMetric,
956                                       -1 /*-1 meaning no condition*/, {}, wizard, protoHash,
957                                       logEventMatcherIndex, eventMatcherWizard, tagId, triggerId,
958                                       tagId, bucketStartTimeNs, bucketStartTimeNs, pullerManager);
959     SamplingInfo samplingInfo;
960     samplingInfo.shardCount = shardCount;
961     translateFieldMatcher(sampledGaugeMetric.dimensional_sampling_info().sampled_what_field(),
962                           &samplingInfo.sampledWhatFields);
963     gaugeProducer.setSamplingInfo(samplingInfo);
964     gaugeProducer.prepareFirstBucket();
965 
966     LogEvent triggerEvent(/*uid=*/0, /*pid=*/0);
967     CreateRepeatedValueLogEvent(&triggerEvent, triggerId, bucketStartTimeNs + 10, 5);
968     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
969 
970     triggerEvent.setElapsedTimestampNs(bucketStartTimeNs + 20);
971     gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, triggerEvent);
972 
973     // Check dump report.
974     ProtoOutputStream output;
975     std::set<string> strSet;
976     int64_t dumpReportTimeNs = bucketStartTimeNs + 10000000000;
977     gaugeProducer.onDumpReport(dumpReportTimeNs, true /* include current buckets */, true,
978                                NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output);
979 
980     StatsLogReport report = outputStreamToProto(&output);
981     backfillDimensionPath(&report);
982     backfillStartEndTimestamp(&report);
983     backfillAggregatedAtoms(&report);
984 
985     EXPECT_TRUE(report.has_gauge_metrics());
986     StatsLogReport::GaugeMetricDataWrapper gaugeMetrics;
987     sortMetricDataByDimensionsValue(report.gauge_metrics(), &gaugeMetrics);
988     ASSERT_EQ(2, gaugeMetrics.data_size());
989     EXPECT_EQ(0, report.gauge_metrics().skipped_size());
990 
991     // Only Uid 1 and 3 are logged. (odd hash value) + (offset of 5) % (shard count of 2) = 0
992     GaugeMetricData data = gaugeMetrics.data(0);
993     ValidateUidDimension(data.dimensions_in_what(), tagId, 1001);
994     ValidateGaugeBucketTimes(data.bucket_info(0), bucketStartTimeNs, dumpReportTimeNs,
995                              {bucketStartTimeNs + 10, bucketStartTimeNs + 20});
996 
997     data = gaugeMetrics.data(1);
998     ValidateUidDimension(data.dimensions_in_what(), tagId, 1003);
999     ValidateGaugeBucketTimes(data.bucket_info(0), bucketStartTimeNs, dumpReportTimeNs,
1000                              {bucketStartTimeNs + 10, bucketStartTimeNs + 20});
1001 }
1002 
1003 }  // namespace statsd
1004 }  // namespace os
1005 }  // namespace android
1006 #else
1007 GTEST_LOG_(INFO) << "This test does nothing.\n";
1008 #endif
1009