• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "fcp/client/opstats/pds_backed_opstats_db.h"
18 
19 #include <filesystem>
20 #include <functional>
21 #include <string>
22 #include <thread>  // NOLINT(build/c++11)
23 #include <utility>
24 
25 #include "google/protobuf/util/time_util.h"
26 #include "gmock/gmock.h"
27 #include "gtest/gtest.h"
28 #include "absl/status/statusor.h"
29 #include "fcp/client/test_helpers.h"
30 #include "fcp/protos/opstats.pb.h"
31 #include "fcp/testing/testing.h"
32 
33 namespace fcp {
34 namespace client {
35 namespace opstats {
36 namespace {
37 
38 using ::google::protobuf::util::TimeUtil;
39 using ::testing::Ge;
40 using ::testing::Gt;
41 
42 const absl::Duration ttl = absl::Hours(24);
43 const absl::Time benchmark_time = absl::Now();
44 const int64_t benchmark_time_sec = absl::ToUnixSeconds(absl::Now());
45 const int64_t size_limit = 1 * 1024 * 1024;
46 
47 class BasePdsBackedOpStatsDbTest {
48  protected:
SetUpBaseDir()49   void SetUpBaseDir() { base_dir_ = testing::TempDir(); }
50 
TearDownBaseDir()51   void TearDownBaseDir() {
52     std::filesystem::remove(std::filesystem::path(base_dir_) /
53                             PdsBackedOpStatsDb::kParentDir /
54                             PdsBackedOpStatsDb::kDbFileName);
55   }
56 
CreateEvent(OperationalStats::Event::EventKind kind,int64_t time_sec)57   static OperationalStats_Event CreateEvent(
58       OperationalStats::Event::EventKind kind, int64_t time_sec) {
59     OperationalStats_Event event;
60     event.set_event_type(kind);
61     *event.mutable_timestamp() = TimeUtil::SecondsToTimestamp(time_sec);
62     return event;
63   }
64 
CreateOperationalStatsWithSingleEvent(OperationalStats::Event::EventKind kind,int64_t time_sec)65   static OperationalStats CreateOperationalStatsWithSingleEvent(
66       OperationalStats::Event::EventKind kind, int64_t time_sec) {
67     OperationalStats op_stats;
68     op_stats.mutable_events()->Add(CreateEvent(kind, time_sec));
69     return op_stats;
70   }
71 
72   std::string base_dir_;
73   testing::StrictMock<MockLogManager> log_manager_;
74   absl::Mutex mu_;
75 };
76 
77 class PdsBackedOpStatsDbTest : public BasePdsBackedOpStatsDbTest,
78                                public testing::Test {
SetUp()79   void SetUp() override { SetUpBaseDir(); }
80 
TearDown()81   void TearDown() override { TearDownBaseDir(); }
82 };
83 
TEST_F(PdsBackedOpStatsDbTest,FailToCreateParentDirectory)84 TEST_F(PdsBackedOpStatsDbTest, FailToCreateParentDirectory) {
85   EXPECT_CALL(log_manager_,
86               LogDiag(ProdDiagCode::OPSTATS_PARENT_DIR_CREATION_FAILED));
87   ASSERT_THAT(
88       PdsBackedOpStatsDb::Create("/proc/0", ttl, log_manager_, size_limit),
89       IsCode(INTERNAL));
90 }
91 
TEST_F(PdsBackedOpStatsDbTest,InvalidRelativePath)92 TEST_F(PdsBackedOpStatsDbTest, InvalidRelativePath) {
93   EXPECT_CALL(log_manager_, LogDiag(ProdDiagCode::OPSTATS_INVALID_FILE_PATH));
94   ASSERT_THAT(PdsBackedOpStatsDb::Create("relative/opstats", ttl, log_manager_,
95                                          size_limit),
96               IsCode(INVALID_ARGUMENT));
97 }
98 
TEST_F(PdsBackedOpStatsDbTest,AddOpStats)99 TEST_F(PdsBackedOpStatsDbTest, AddOpStats) {
100   auto db =
101       PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
102   ASSERT_THAT(db, IsOk());
103   OperationalStats op_stats = CreateOperationalStatsWithSingleEvent(
104       OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED, benchmark_time_sec);
105   auto func = [op_stats](OpStatsSequence& data) {
106     *data.add_opstats() = op_stats;
107   };
108   EXPECT_CALL(
109       log_manager_,
110       LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
111                          /*execution_index=*/0, /*epoch_index=*/0,
112                          engine::DataSourceType::DATASET, /*value=*/Gt(0)));
113   EXPECT_CALL(log_manager_,
114               LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
115                                  /*execution_index=*/0, /*epoch_index=*/0,
116                                  engine::DataSourceType::DATASET, /*value=*/1));
117   ASSERT_OK((*db)->Transform(func));
118   OpStatsSequence expected;
119   *expected.add_opstats() = op_stats;
120   absl::StatusOr<OpStatsSequence> data = (*db)->Read();
121   ASSERT_THAT(data, IsOk());
122   ASSERT_TRUE(data->has_earliest_trustworthy_time());
123   data->clear_earliest_trustworthy_time();
124   EXPECT_THAT(*data, EqualsProto(expected));
125 }
126 
TEST_F(PdsBackedOpStatsDbTest,MutateOpStats)127 TEST_F(PdsBackedOpStatsDbTest, MutateOpStats) {
128   auto db =
129       PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
130   ASSERT_THAT(db, IsOk());
131   auto initialCommit = [](OpStatsSequence& data) {
132     *data.add_opstats() = CreateOperationalStatsWithSingleEvent(
133         OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
134         benchmark_time_sec);
135   };
136   EXPECT_CALL(
137       log_manager_,
138       LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
139                          /*execution_index=*/0, /*epoch_index=*/0,
140                          engine::DataSourceType::DATASET, /*value=*/Gt(0)))
141       .Times(2);
142   EXPECT_CALL(log_manager_,
143               LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
144                                  /*execution_index=*/0, /*epoch_index=*/0,
145                                  engine::DataSourceType::DATASET, /*value=*/1))
146       .Times(2);
147   ASSERT_OK((*db)->Transform(initialCommit));
148   auto mutate = [](OpStatsSequence& data) {
149     data.mutable_opstats(0)->mutable_events()->Add(
150         CreateEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED,
151                     benchmark_time_sec));
152   };
153   ASSERT_OK((*db)->Transform(mutate));
154   OperationalStats expected_op_stats;
155   expected_op_stats.mutable_events()->Add(CreateEvent(
156       OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED, benchmark_time_sec));
157   expected_op_stats.mutable_events()->Add(
158       CreateEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED,
159                   benchmark_time_sec));
160   OpStatsSequence expected;
161   *expected.add_opstats() = expected_op_stats;
162   absl::StatusOr<OpStatsSequence> data = (*db)->Read();
163   ASSERT_THAT(data, IsOk());
164   ASSERT_TRUE(data->has_earliest_trustworthy_time());
165   data->clear_earliest_trustworthy_time();
166   EXPECT_THAT(*data, EqualsProto(expected));
167 }
168 
TEST_F(PdsBackedOpStatsDbTest,LastUpdateTimeIsCorrectlyUsed)169 TEST_F(PdsBackedOpStatsDbTest, LastUpdateTimeIsCorrectlyUsed) {
170   auto db =
171       PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
172   ASSERT_THAT(db, IsOk());
173   OperationalStats op_stats;
174   op_stats.mutable_events()->Add(
175       CreateEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
176                   absl::ToUnixSeconds(benchmark_time - absl::Hours(48))));
177   op_stats.mutable_events()->Add(
178       CreateEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED,
179                   absl::ToUnixSeconds(benchmark_time - absl::Hours(12))));
180   auto initialCommit = [op_stats](OpStatsSequence& data) {
181     *data.add_opstats() = op_stats;
182   };
183   EXPECT_CALL(
184       log_manager_,
185       LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
186                          /*execution_index=*/0, /*epoch_index=*/0,
187                          engine::DataSourceType::DATASET, /*value=*/Gt(0)))
188       .Times(2);
189   EXPECT_CALL(log_manager_,
190               LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
191                                  /*execution_index=*/0, /*epoch_index=*/0,
192                                  engine::DataSourceType::DATASET, /*value=*/1))
193       .Times(2);
194   ASSERT_OK((*db)->Transform(initialCommit));
195 
196   // We do a second unity commit to trigger the ttl cleanup.
197   auto unityCommit = [](OpStatsSequence& data) {};
198   ASSERT_OK((*db)->Transform(unityCommit));
199   OpStatsSequence expected;
200   *expected.add_opstats() = op_stats;
201   absl::StatusOr<OpStatsSequence> data = (*db)->Read();
202   ASSERT_THAT(data, IsOk());
203   ASSERT_TRUE(data->has_earliest_trustworthy_time());
204   data->clear_earliest_trustworthy_time();
205   EXPECT_THAT(*data, EqualsProto(expected));
206 }
207 
TEST_F(PdsBackedOpStatsDbTest,NoEventsOpStatsGotRemoved)208 TEST_F(PdsBackedOpStatsDbTest, NoEventsOpStatsGotRemoved) {
209   auto db =
210       PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
211   ASSERT_THAT(db, IsOk());
212   OperationalStats op_stats;
213   op_stats.set_population_name("population");
214   auto initialCommit = [op_stats](OpStatsSequence& data) {
215     *data.add_opstats() = op_stats;
216   };
217   EXPECT_CALL(
218       log_manager_,
219       LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
220                          /*execution_index=*/0, /*epoch_index=*/0,
221                          engine::DataSourceType::DATASET, /*value=*/Ge(0)))
222       .Times(2);
223   EXPECT_CALL(log_manager_,
224               LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
225                                  /*execution_index=*/0, /*epoch_index=*/0,
226                                  engine::DataSourceType::DATASET, /*value=*/1));
227   EXPECT_CALL(log_manager_,
228               LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
229                                  /*execution_index=*/0, /*epoch_index=*/0,
230                                  engine::DataSourceType::DATASET, /*value=*/0));
231   ASSERT_OK((*db)->Transform(initialCommit));
232 
233   // We do a second unity commit to trigger the ttl cleanup.
234   auto unityCommit = [](OpStatsSequence& data) {};
235   ASSERT_OK((*db)->Transform(unityCommit));
236   absl::StatusOr<OpStatsSequence> data = (*db)->Read();
237   ASSERT_THAT(data, IsOk());
238   ASSERT_TRUE(data->has_earliest_trustworthy_time());
239   data->clear_earliest_trustworthy_time();
240   EXPECT_THAT(*data, EqualsProto(OpStatsSequence::default_instance()));
241 }
242 
TEST_F(PdsBackedOpStatsDbTest,TwoInstanceOnTwoThreadsAccessSameFile)243 TEST_F(PdsBackedOpStatsDbTest, TwoInstanceOnTwoThreadsAccessSameFile) {
244   EXPECT_CALL(log_manager_,
245               LogDiag(ProdDiagCode::OPSTATS_MULTIPLE_DB_INSTANCE_DETECTED));
246   std::vector<absl::StatusOr<std::unique_ptr<OpStatsDb>>> results;
247   std::function<void()> init = [&]() {
248     absl::WriterMutexLock lock(&mu_);
249     results.push_back(
250         PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit));
251   };
252   std::thread first_thread(init);
253   std::thread second_thread(init);
254   first_thread.join();
255   second_thread.join();
256   std::set<absl::StatusCode> expected{absl::StatusCode::kOk,
257                                       absl::StatusCode::kInternal};
258   std::set<absl::StatusCode> status_codes;
259   for (const auto& result : results) {
260     status_codes.insert(result.status().code());
261   }
262   ASSERT_EQ(status_codes, expected);
263 }
264 
TEST_F(PdsBackedOpStatsDbTest,TwoInstanceOnTwoThreadsAccessDifferentFile)265 TEST_F(PdsBackedOpStatsDbTest, TwoInstanceOnTwoThreadsAccessDifferentFile) {
266   std::vector<absl::StatusOr<std::unique_ptr<OpStatsDb>>> results;
267   std::function<void(std::string)> init = [&](std::string thread_id) {
268     absl::WriterMutexLock lock(&mu_);
269     results.push_back(
270         PdsBackedOpStatsDb::Create(absl::StrCat(base_dir_, "/", thread_id), ttl,
271                                    log_manager_, size_limit));
272   };
273   std::thread first_thread(init, "1");
274   std::thread second_thread(init, "2");
275   first_thread.join();
276   second_thread.join();
277   for (const auto& result : results) {
278     ASSERT_OK(result.status());
279   }
280 }
281 
TEST_F(PdsBackedOpStatsDbTest,BackfillEarliestTrustWorthyTime)282 TEST_F(PdsBackedOpStatsDbTest, BackfillEarliestTrustWorthyTime) {
283   OperationalStats first_op_stats = CreateOperationalStatsWithSingleEvent(
284       OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED,
285       benchmark_time_sec);
286   OperationalStats second_op_stats = CreateOperationalStatsWithSingleEvent(
287       OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED,
288       benchmark_time_sec);
289   {
290     absl::StatusOr<std::unique_ptr<OpStatsDb>> db =
291         PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
292     ASSERT_OK(db);
293     auto add = [first_op_stats, second_op_stats](OpStatsSequence& data) {
294       *data.add_opstats() = first_op_stats;
295       *data.add_opstats() = second_op_stats;
296     };
297     auto remove_earliest_trustworthy_time = [](OpStatsSequence& data) {
298       data.clear_earliest_trustworthy_time();
299     };
300     EXPECT_CALL(
301         log_manager_,
302         LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
303                            /*execution_index=*/0, /*epoch_index=*/0,
304                            engine::DataSourceType::DATASET, /*value=*/Gt(0)))
305         .Times(2);
306     EXPECT_CALL(log_manager_, LogToLongHistogram(
307                                   HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
308                                   /*execution_index=*/0, /*epoch_index=*/0,
309                                   engine::DataSourceType::DATASET, /*value=*/2))
310         .Times(2);
311     ASSERT_OK((*db)->Transform(add));
312     ASSERT_OK((*db)->Transform(remove_earliest_trustworthy_time));
313   }
314 
315   absl::StatusOr<std::unique_ptr<OpStatsDb>> db =
316       PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
317   ASSERT_OK(db);
318   OperationalStats third_op_stats = CreateOperationalStatsWithSingleEvent(
319       OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
320       benchmark_time_sec + 10);
321   auto add_another = [third_op_stats](OpStatsSequence& data) {
322     *data.add_opstats() = third_op_stats;
323   };
324   EXPECT_CALL(
325       log_manager_,
326       LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
327                          /*execution_index=*/0, /*epoch_index=*/0,
328                          engine::DataSourceType::DATASET, /*value=*/Gt(0)));
329   EXPECT_CALL(log_manager_,
330               LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
331                                  /*execution_index=*/0, /*epoch_index=*/0,
332                                  engine::DataSourceType::DATASET, /*value=*/3));
333   ASSERT_OK((*db)->Transform(add_another));
334   absl::StatusOr<OpStatsSequence> data = (*db)->Read();
335   ASSERT_OK(data);
336   OpStatsSequence expected;
337   *expected.mutable_earliest_trustworthy_time() =
338       TimeUtil::SecondsToTimestamp(benchmark_time_sec);
339   *expected.add_opstats() = first_op_stats;
340   *expected.add_opstats() = second_op_stats;
341   *expected.add_opstats() = third_op_stats;
342   EXPECT_THAT((*data), EqualsProto(expected));
343 }
344 
TEST_F(PdsBackedOpStatsDbTest,ReadEmpty)345 TEST_F(PdsBackedOpStatsDbTest, ReadEmpty) {
346   ::google::protobuf::Timestamp before_creation_time =
347       TimeUtil::GetCurrentTime();
348   auto db =
349       PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
350   ::google::protobuf::Timestamp after_creation_time =
351       TimeUtil::GetCurrentTime();
352   ASSERT_THAT(db, IsOk());
353   absl::StatusOr<OpStatsSequence> data = (*db)->Read();
354   ASSERT_THAT(data, IsOk());
355   EXPECT_TRUE(data->opstats().empty());
356   EXPECT_TRUE(data->earliest_trustworthy_time() >= before_creation_time);
357   EXPECT_TRUE(data->earliest_trustworthy_time() <= after_creation_time);
358 }
359 
TEST_F(PdsBackedOpStatsDbTest,RemoveOpstatsDueToTtl)360 TEST_F(PdsBackedOpStatsDbTest, RemoveOpstatsDueToTtl) {
361   auto db =
362       PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
363   ASSERT_THAT(db, IsOk());
364   OperationalStats op_stats_remove = CreateOperationalStatsWithSingleEvent(
365       OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
366       absl::ToUnixSeconds(benchmark_time - absl::Hours(25)));
367   OperationalStats op_stats_keep = CreateOperationalStatsWithSingleEvent(
368       OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
369       absl::ToUnixSeconds(benchmark_time - absl::Hours(23)));
370   auto initialCommit = [op_stats_remove, op_stats_keep](OpStatsSequence& data) {
371     *data.add_opstats() = op_stats_remove;
372     *data.add_opstats() = op_stats_keep;
373   };
374   EXPECT_CALL(
375       log_manager_,
376       LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
377                          /*execution_index=*/0, /*epoch_index=*/0,
378                          engine::DataSourceType::DATASET, /*value=*/Gt(0)))
379       .Times(2);
380   EXPECT_CALL(log_manager_,
381               LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
382                                  /*execution_index=*/0, /*epoch_index=*/0,
383                                  engine::DataSourceType::DATASET, /*value=*/2));
384   EXPECT_CALL(log_manager_,
385               LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
386                                  /*execution_index=*/0, /*epoch_index=*/0,
387                                  engine::DataSourceType::DATASET, /*value=*/1));
388   ASSERT_OK((*db)->Transform(initialCommit));
389 
390   // We do a second unity commit to trigger the ttl cleanup.
391   auto unityCommit = [](OpStatsSequence& data) {};
392   ASSERT_OK((*db)->Transform(unityCommit));
393 
394   absl::StatusOr<OpStatsSequence> data = (*db)->Read();
395   ASSERT_THAT(data, IsOk());
396   ASSERT_EQ(data->opstats().size(), 1);
397   ASSERT_THAT(data->opstats()[0], EqualsProto(op_stats_keep));
398   // The TTL is 24 hours, the timestamp should be set to the time when the db
399   // got purged - 24 hours.  It should be smaller than the kept
400   // OperationalStats, but larger than benchmark time - 24 hours.
401   google::protobuf::Timestamp lower_bound = TimeUtil::SecondsToTimestamp(
402       absl::ToUnixSeconds(benchmark_time - absl::Hours(24)));
403   google::protobuf::Timestamp upper_bound = TimeUtil::SecondsToTimestamp(
404       absl::ToUnixSeconds(benchmark_time - absl::Hours(23)));
405   EXPECT_TRUE(data->earliest_trustworthy_time() >= lower_bound);
406   EXPECT_TRUE(data->earliest_trustworthy_time() <= upper_bound);
407 }
408 
TEST_F(PdsBackedOpStatsDbTest,CorruptedFile)409 TEST_F(PdsBackedOpStatsDbTest, CorruptedFile) {
410   {
411     std::unique_ptr<OpStatsDb> db =
412         PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit)
413             .value();
414     auto func = [](OpStatsSequence& data) {
415       *data.add_opstats() = CreateOperationalStatsWithSingleEvent(
416           OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
417           benchmark_time_sec);
418     };
419     EXPECT_CALL(
420         log_manager_,
421         LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
422                            /*execution_index=*/0, /*epoch_index=*/0,
423                            engine::DataSourceType::DATASET, /*value=*/Gt(0)));
424     EXPECT_CALL(
425         log_manager_,
426         LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
427                            /*execution_index=*/0, /*epoch_index=*/0,
428                            engine::DataSourceType::DATASET, /*value=*/1));
429     ASSERT_OK(db->Transform(func));
430   }
431 
432   {
433     std::filesystem::path db_path(base_dir_);
434     db_path /= PdsBackedOpStatsDb::kParentDir;
435     db_path /= PdsBackedOpStatsDb::kDbFileName;
436     protostore::FileStorage file_storage;
437     std::unique_ptr<protostore::OutputStream> ostream =
438         file_storage.OpenForWrite(db_path).value();
439     ASSERT_OK(ostream->Append("not a proto"));
440     ASSERT_OK(ostream->Close());
441   }
442 
443   std::unique_ptr<OpStatsDb> db =
444       PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit)
445           .value();
446   EXPECT_CALL(log_manager_, LogDiag(ProdDiagCode::OPSTATS_READ_FAILED));
447   ::google::protobuf::Timestamp before_read_time = TimeUtil::GetCurrentTime();
448   ASSERT_THAT(db->Read(), IsCode(INTERNAL));
449   ::google::protobuf::Timestamp after_read_time = TimeUtil::GetCurrentTime();
450 
451   // Second read should succeed, and return empty data.
452   absl::StatusOr<OpStatsSequence> data = db->Read();
453   ASSERT_THAT(data, IsOk());
454   EXPECT_TRUE(data->opstats().empty());
455   EXPECT_TRUE(data->earliest_trustworthy_time() >= before_read_time);
456   EXPECT_TRUE(data->earliest_trustworthy_time() <= after_read_time);
457 }
458 
TEST_F(PdsBackedOpStatsDbTest,OpStatsRemovedDueToSizeLimit)459 TEST_F(PdsBackedOpStatsDbTest, OpStatsRemovedDueToSizeLimit) {
460   // Set size limit to 18, which allow a single OperationalStats with a single
461   // event (12 bytes for OperationalStats, 14 bytes when it is wrapped inside
462   // an OpStatsSequence). If record_earliest_trustworthy_time is true, we'll
463   // increase the size limit to 30 bytes to accommodate the timestamp.
464   int64_t max_size_bytes = 30;
465   absl::StatusOr<std::unique_ptr<OpStatsDb>> db_status =
466       PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, max_size_bytes);
467   ASSERT_THAT(db_status, IsOk());
468   std::unique_ptr<OpStatsDb> db = std::move(db_status.value());
469   EXPECT_CALL(
470       log_manager_,
471       LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
472                          /*execution_index=*/0, /*epoch_index=*/0,
473                          engine::DataSourceType::DATASET, /*value=*/Gt(0)))
474       .Times(2);
475   EXPECT_CALL(log_manager_,
476               LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
477                                  /*execution_index=*/0, /*epoch_index=*/0,
478                                  engine::DataSourceType::DATASET, /*value=*/1))
479       .Times(2);
480   OperationalStats op_stats = CreateOperationalStatsWithSingleEvent(
481       OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED, benchmark_time_sec);
482   auto initial_commit = [op_stats](OpStatsSequence& data) {
483     *data.add_opstats() = op_stats;
484   };
485   ASSERT_OK(db->Transform(initial_commit));
486 
487   // Add the second event, which will pushes the database size over the limit.
488   EXPECT_CALL(log_manager_,
489               LogToLongHistogram(HistogramCounters::OPSTATS_NUM_PRUNED_ENTRIES,
490                                  /*execution_index=*/0, /*epoch_index=*/0,
491                                  engine::DataSourceType::DATASET, /*value=*/1));
492   EXPECT_CALL(log_manager_,
493               LogToLongHistogram(
494                   HistogramCounters::OPSTATS_OLDEST_PRUNED_ENTRY_TENURE_HOURS,
495                   /*execution_index=*/0, /*epoch_index=*/0,
496                   engine::DataSourceType::DATASET, /*value=*/Ge(0)));
497   OperationalStats another_op_stats = CreateOperationalStatsWithSingleEvent(
498       OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
499       benchmark_time_sec + 5);
500   auto add = [another_op_stats](OpStatsSequence& data) {
501     *data.add_opstats() = another_op_stats;
502   };
503   ASSERT_OK(db->Transform(add));
504 
505   // Verify the first event doesn't exist in the database.
506   OpStatsSequence expected;
507   *expected.add_opstats() = another_op_stats;
508   *expected.mutable_earliest_trustworthy_time() =
509       TimeUtil::SecondsToTimestamp(benchmark_time_sec + 5);
510 
511   absl::StatusOr<OpStatsSequence> data = db->Read();
512   ASSERT_THAT(data, IsOk());
513   EXPECT_THAT(*data, EqualsProto(expected));
514 }
515 
516 }  // anonymous namespace
517 }  // namespace opstats
518 }  // namespace client
519 }  // namespace fcp
520