1 /*
2 * Copyright 2021 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "fcp/client/opstats/pds_backed_opstats_db.h"
18
19 #include <filesystem>
20 #include <functional>
21 #include <string>
22 #include <thread> // NOLINT(build/c++11)
23 #include <utility>
24
25 #include "google/protobuf/util/time_util.h"
26 #include "gmock/gmock.h"
27 #include "gtest/gtest.h"
28 #include "absl/status/statusor.h"
29 #include "fcp/client/test_helpers.h"
30 #include "fcp/protos/opstats.pb.h"
31 #include "fcp/testing/testing.h"
32
33 namespace fcp {
34 namespace client {
35 namespace opstats {
36 namespace {
37
38 using ::google::protobuf::util::TimeUtil;
39 using ::testing::Ge;
40 using ::testing::Gt;
41
42 const absl::Duration ttl = absl::Hours(24);
43 const absl::Time benchmark_time = absl::Now();
44 const int64_t benchmark_time_sec = absl::ToUnixSeconds(absl::Now());
45 const int64_t size_limit = 1 * 1024 * 1024;
46
47 class BasePdsBackedOpStatsDbTest {
48 protected:
SetUpBaseDir()49 void SetUpBaseDir() { base_dir_ = testing::TempDir(); }
50
TearDownBaseDir()51 void TearDownBaseDir() {
52 std::filesystem::remove(std::filesystem::path(base_dir_) /
53 PdsBackedOpStatsDb::kParentDir /
54 PdsBackedOpStatsDb::kDbFileName);
55 }
56
CreateEvent(OperationalStats::Event::EventKind kind,int64_t time_sec)57 static OperationalStats_Event CreateEvent(
58 OperationalStats::Event::EventKind kind, int64_t time_sec) {
59 OperationalStats_Event event;
60 event.set_event_type(kind);
61 *event.mutable_timestamp() = TimeUtil::SecondsToTimestamp(time_sec);
62 return event;
63 }
64
CreateOperationalStatsWithSingleEvent(OperationalStats::Event::EventKind kind,int64_t time_sec)65 static OperationalStats CreateOperationalStatsWithSingleEvent(
66 OperationalStats::Event::EventKind kind, int64_t time_sec) {
67 OperationalStats op_stats;
68 op_stats.mutable_events()->Add(CreateEvent(kind, time_sec));
69 return op_stats;
70 }
71
72 std::string base_dir_;
73 testing::StrictMock<MockLogManager> log_manager_;
74 absl::Mutex mu_;
75 };
76
77 class PdsBackedOpStatsDbTest : public BasePdsBackedOpStatsDbTest,
78 public testing::Test {
SetUp()79 void SetUp() override { SetUpBaseDir(); }
80
TearDown()81 void TearDown() override { TearDownBaseDir(); }
82 };
83
TEST_F(PdsBackedOpStatsDbTest,FailToCreateParentDirectory)84 TEST_F(PdsBackedOpStatsDbTest, FailToCreateParentDirectory) {
85 EXPECT_CALL(log_manager_,
86 LogDiag(ProdDiagCode::OPSTATS_PARENT_DIR_CREATION_FAILED));
87 ASSERT_THAT(
88 PdsBackedOpStatsDb::Create("/proc/0", ttl, log_manager_, size_limit),
89 IsCode(INTERNAL));
90 }
91
TEST_F(PdsBackedOpStatsDbTest,InvalidRelativePath)92 TEST_F(PdsBackedOpStatsDbTest, InvalidRelativePath) {
93 EXPECT_CALL(log_manager_, LogDiag(ProdDiagCode::OPSTATS_INVALID_FILE_PATH));
94 ASSERT_THAT(PdsBackedOpStatsDb::Create("relative/opstats", ttl, log_manager_,
95 size_limit),
96 IsCode(INVALID_ARGUMENT));
97 }
98
TEST_F(PdsBackedOpStatsDbTest,AddOpStats)99 TEST_F(PdsBackedOpStatsDbTest, AddOpStats) {
100 auto db =
101 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
102 ASSERT_THAT(db, IsOk());
103 OperationalStats op_stats = CreateOperationalStatsWithSingleEvent(
104 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED, benchmark_time_sec);
105 auto func = [op_stats](OpStatsSequence& data) {
106 *data.add_opstats() = op_stats;
107 };
108 EXPECT_CALL(
109 log_manager_,
110 LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
111 /*execution_index=*/0, /*epoch_index=*/0,
112 engine::DataSourceType::DATASET, /*value=*/Gt(0)));
113 EXPECT_CALL(log_manager_,
114 LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
115 /*execution_index=*/0, /*epoch_index=*/0,
116 engine::DataSourceType::DATASET, /*value=*/1));
117 ASSERT_OK((*db)->Transform(func));
118 OpStatsSequence expected;
119 *expected.add_opstats() = op_stats;
120 absl::StatusOr<OpStatsSequence> data = (*db)->Read();
121 ASSERT_THAT(data, IsOk());
122 ASSERT_TRUE(data->has_earliest_trustworthy_time());
123 data->clear_earliest_trustworthy_time();
124 EXPECT_THAT(*data, EqualsProto(expected));
125 }
126
TEST_F(PdsBackedOpStatsDbTest,MutateOpStats)127 TEST_F(PdsBackedOpStatsDbTest, MutateOpStats) {
128 auto db =
129 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
130 ASSERT_THAT(db, IsOk());
131 auto initialCommit = [](OpStatsSequence& data) {
132 *data.add_opstats() = CreateOperationalStatsWithSingleEvent(
133 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
134 benchmark_time_sec);
135 };
136 EXPECT_CALL(
137 log_manager_,
138 LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
139 /*execution_index=*/0, /*epoch_index=*/0,
140 engine::DataSourceType::DATASET, /*value=*/Gt(0)))
141 .Times(2);
142 EXPECT_CALL(log_manager_,
143 LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
144 /*execution_index=*/0, /*epoch_index=*/0,
145 engine::DataSourceType::DATASET, /*value=*/1))
146 .Times(2);
147 ASSERT_OK((*db)->Transform(initialCommit));
148 auto mutate = [](OpStatsSequence& data) {
149 data.mutable_opstats(0)->mutable_events()->Add(
150 CreateEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED,
151 benchmark_time_sec));
152 };
153 ASSERT_OK((*db)->Transform(mutate));
154 OperationalStats expected_op_stats;
155 expected_op_stats.mutable_events()->Add(CreateEvent(
156 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED, benchmark_time_sec));
157 expected_op_stats.mutable_events()->Add(
158 CreateEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED,
159 benchmark_time_sec));
160 OpStatsSequence expected;
161 *expected.add_opstats() = expected_op_stats;
162 absl::StatusOr<OpStatsSequence> data = (*db)->Read();
163 ASSERT_THAT(data, IsOk());
164 ASSERT_TRUE(data->has_earliest_trustworthy_time());
165 data->clear_earliest_trustworthy_time();
166 EXPECT_THAT(*data, EqualsProto(expected));
167 }
168
TEST_F(PdsBackedOpStatsDbTest,LastUpdateTimeIsCorrectlyUsed)169 TEST_F(PdsBackedOpStatsDbTest, LastUpdateTimeIsCorrectlyUsed) {
170 auto db =
171 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
172 ASSERT_THAT(db, IsOk());
173 OperationalStats op_stats;
174 op_stats.mutable_events()->Add(
175 CreateEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
176 absl::ToUnixSeconds(benchmark_time - absl::Hours(48))));
177 op_stats.mutable_events()->Add(
178 CreateEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED,
179 absl::ToUnixSeconds(benchmark_time - absl::Hours(12))));
180 auto initialCommit = [op_stats](OpStatsSequence& data) {
181 *data.add_opstats() = op_stats;
182 };
183 EXPECT_CALL(
184 log_manager_,
185 LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
186 /*execution_index=*/0, /*epoch_index=*/0,
187 engine::DataSourceType::DATASET, /*value=*/Gt(0)))
188 .Times(2);
189 EXPECT_CALL(log_manager_,
190 LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
191 /*execution_index=*/0, /*epoch_index=*/0,
192 engine::DataSourceType::DATASET, /*value=*/1))
193 .Times(2);
194 ASSERT_OK((*db)->Transform(initialCommit));
195
196 // We do a second unity commit to trigger the ttl cleanup.
197 auto unityCommit = [](OpStatsSequence& data) {};
198 ASSERT_OK((*db)->Transform(unityCommit));
199 OpStatsSequence expected;
200 *expected.add_opstats() = op_stats;
201 absl::StatusOr<OpStatsSequence> data = (*db)->Read();
202 ASSERT_THAT(data, IsOk());
203 ASSERT_TRUE(data->has_earliest_trustworthy_time());
204 data->clear_earliest_trustworthy_time();
205 EXPECT_THAT(*data, EqualsProto(expected));
206 }
207
TEST_F(PdsBackedOpStatsDbTest,NoEventsOpStatsGotRemoved)208 TEST_F(PdsBackedOpStatsDbTest, NoEventsOpStatsGotRemoved) {
209 auto db =
210 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
211 ASSERT_THAT(db, IsOk());
212 OperationalStats op_stats;
213 op_stats.set_population_name("population");
214 auto initialCommit = [op_stats](OpStatsSequence& data) {
215 *data.add_opstats() = op_stats;
216 };
217 EXPECT_CALL(
218 log_manager_,
219 LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
220 /*execution_index=*/0, /*epoch_index=*/0,
221 engine::DataSourceType::DATASET, /*value=*/Ge(0)))
222 .Times(2);
223 EXPECT_CALL(log_manager_,
224 LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
225 /*execution_index=*/0, /*epoch_index=*/0,
226 engine::DataSourceType::DATASET, /*value=*/1));
227 EXPECT_CALL(log_manager_,
228 LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
229 /*execution_index=*/0, /*epoch_index=*/0,
230 engine::DataSourceType::DATASET, /*value=*/0));
231 ASSERT_OK((*db)->Transform(initialCommit));
232
233 // We do a second unity commit to trigger the ttl cleanup.
234 auto unityCommit = [](OpStatsSequence& data) {};
235 ASSERT_OK((*db)->Transform(unityCommit));
236 absl::StatusOr<OpStatsSequence> data = (*db)->Read();
237 ASSERT_THAT(data, IsOk());
238 ASSERT_TRUE(data->has_earliest_trustworthy_time());
239 data->clear_earliest_trustworthy_time();
240 EXPECT_THAT(*data, EqualsProto(OpStatsSequence::default_instance()));
241 }
242
TEST_F(PdsBackedOpStatsDbTest,TwoInstanceOnTwoThreadsAccessSameFile)243 TEST_F(PdsBackedOpStatsDbTest, TwoInstanceOnTwoThreadsAccessSameFile) {
244 EXPECT_CALL(log_manager_,
245 LogDiag(ProdDiagCode::OPSTATS_MULTIPLE_DB_INSTANCE_DETECTED));
246 std::vector<absl::StatusOr<std::unique_ptr<OpStatsDb>>> results;
247 std::function<void()> init = [&]() {
248 absl::WriterMutexLock lock(&mu_);
249 results.push_back(
250 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit));
251 };
252 std::thread first_thread(init);
253 std::thread second_thread(init);
254 first_thread.join();
255 second_thread.join();
256 std::set<absl::StatusCode> expected{absl::StatusCode::kOk,
257 absl::StatusCode::kInternal};
258 std::set<absl::StatusCode> status_codes;
259 for (const auto& result : results) {
260 status_codes.insert(result.status().code());
261 }
262 ASSERT_EQ(status_codes, expected);
263 }
264
TEST_F(PdsBackedOpStatsDbTest,TwoInstanceOnTwoThreadsAccessDifferentFile)265 TEST_F(PdsBackedOpStatsDbTest, TwoInstanceOnTwoThreadsAccessDifferentFile) {
266 std::vector<absl::StatusOr<std::unique_ptr<OpStatsDb>>> results;
267 std::function<void(std::string)> init = [&](std::string thread_id) {
268 absl::WriterMutexLock lock(&mu_);
269 results.push_back(
270 PdsBackedOpStatsDb::Create(absl::StrCat(base_dir_, "/", thread_id), ttl,
271 log_manager_, size_limit));
272 };
273 std::thread first_thread(init, "1");
274 std::thread second_thread(init, "2");
275 first_thread.join();
276 second_thread.join();
277 for (const auto& result : results) {
278 ASSERT_OK(result.status());
279 }
280 }
281
TEST_F(PdsBackedOpStatsDbTest,BackfillEarliestTrustWorthyTime)282 TEST_F(PdsBackedOpStatsDbTest, BackfillEarliestTrustWorthyTime) {
283 OperationalStats first_op_stats = CreateOperationalStatsWithSingleEvent(
284 OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED,
285 benchmark_time_sec);
286 OperationalStats second_op_stats = CreateOperationalStatsWithSingleEvent(
287 OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED,
288 benchmark_time_sec);
289 {
290 absl::StatusOr<std::unique_ptr<OpStatsDb>> db =
291 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
292 ASSERT_OK(db);
293 auto add = [first_op_stats, second_op_stats](OpStatsSequence& data) {
294 *data.add_opstats() = first_op_stats;
295 *data.add_opstats() = second_op_stats;
296 };
297 auto remove_earliest_trustworthy_time = [](OpStatsSequence& data) {
298 data.clear_earliest_trustworthy_time();
299 };
300 EXPECT_CALL(
301 log_manager_,
302 LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
303 /*execution_index=*/0, /*epoch_index=*/0,
304 engine::DataSourceType::DATASET, /*value=*/Gt(0)))
305 .Times(2);
306 EXPECT_CALL(log_manager_, LogToLongHistogram(
307 HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
308 /*execution_index=*/0, /*epoch_index=*/0,
309 engine::DataSourceType::DATASET, /*value=*/2))
310 .Times(2);
311 ASSERT_OK((*db)->Transform(add));
312 ASSERT_OK((*db)->Transform(remove_earliest_trustworthy_time));
313 }
314
315 absl::StatusOr<std::unique_ptr<OpStatsDb>> db =
316 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
317 ASSERT_OK(db);
318 OperationalStats third_op_stats = CreateOperationalStatsWithSingleEvent(
319 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
320 benchmark_time_sec + 10);
321 auto add_another = [third_op_stats](OpStatsSequence& data) {
322 *data.add_opstats() = third_op_stats;
323 };
324 EXPECT_CALL(
325 log_manager_,
326 LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
327 /*execution_index=*/0, /*epoch_index=*/0,
328 engine::DataSourceType::DATASET, /*value=*/Gt(0)));
329 EXPECT_CALL(log_manager_,
330 LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
331 /*execution_index=*/0, /*epoch_index=*/0,
332 engine::DataSourceType::DATASET, /*value=*/3));
333 ASSERT_OK((*db)->Transform(add_another));
334 absl::StatusOr<OpStatsSequence> data = (*db)->Read();
335 ASSERT_OK(data);
336 OpStatsSequence expected;
337 *expected.mutable_earliest_trustworthy_time() =
338 TimeUtil::SecondsToTimestamp(benchmark_time_sec);
339 *expected.add_opstats() = first_op_stats;
340 *expected.add_opstats() = second_op_stats;
341 *expected.add_opstats() = third_op_stats;
342 EXPECT_THAT((*data), EqualsProto(expected));
343 }
344
TEST_F(PdsBackedOpStatsDbTest,ReadEmpty)345 TEST_F(PdsBackedOpStatsDbTest, ReadEmpty) {
346 ::google::protobuf::Timestamp before_creation_time =
347 TimeUtil::GetCurrentTime();
348 auto db =
349 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
350 ::google::protobuf::Timestamp after_creation_time =
351 TimeUtil::GetCurrentTime();
352 ASSERT_THAT(db, IsOk());
353 absl::StatusOr<OpStatsSequence> data = (*db)->Read();
354 ASSERT_THAT(data, IsOk());
355 EXPECT_TRUE(data->opstats().empty());
356 EXPECT_TRUE(data->earliest_trustworthy_time() >= before_creation_time);
357 EXPECT_TRUE(data->earliest_trustworthy_time() <= after_creation_time);
358 }
359
TEST_F(PdsBackedOpStatsDbTest,RemoveOpstatsDueToTtl)360 TEST_F(PdsBackedOpStatsDbTest, RemoveOpstatsDueToTtl) {
361 auto db =
362 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit);
363 ASSERT_THAT(db, IsOk());
364 OperationalStats op_stats_remove = CreateOperationalStatsWithSingleEvent(
365 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
366 absl::ToUnixSeconds(benchmark_time - absl::Hours(25)));
367 OperationalStats op_stats_keep = CreateOperationalStatsWithSingleEvent(
368 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
369 absl::ToUnixSeconds(benchmark_time - absl::Hours(23)));
370 auto initialCommit = [op_stats_remove, op_stats_keep](OpStatsSequence& data) {
371 *data.add_opstats() = op_stats_remove;
372 *data.add_opstats() = op_stats_keep;
373 };
374 EXPECT_CALL(
375 log_manager_,
376 LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
377 /*execution_index=*/0, /*epoch_index=*/0,
378 engine::DataSourceType::DATASET, /*value=*/Gt(0)))
379 .Times(2);
380 EXPECT_CALL(log_manager_,
381 LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
382 /*execution_index=*/0, /*epoch_index=*/0,
383 engine::DataSourceType::DATASET, /*value=*/2));
384 EXPECT_CALL(log_manager_,
385 LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
386 /*execution_index=*/0, /*epoch_index=*/0,
387 engine::DataSourceType::DATASET, /*value=*/1));
388 ASSERT_OK((*db)->Transform(initialCommit));
389
390 // We do a second unity commit to trigger the ttl cleanup.
391 auto unityCommit = [](OpStatsSequence& data) {};
392 ASSERT_OK((*db)->Transform(unityCommit));
393
394 absl::StatusOr<OpStatsSequence> data = (*db)->Read();
395 ASSERT_THAT(data, IsOk());
396 ASSERT_EQ(data->opstats().size(), 1);
397 ASSERT_THAT(data->opstats()[0], EqualsProto(op_stats_keep));
398 // The TTL is 24 hours, the timestamp should be set to the time when the db
399 // got purged - 24 hours. It should be smaller than the kept
400 // OperationalStats, but larger than benchmark time - 24 hours.
401 google::protobuf::Timestamp lower_bound = TimeUtil::SecondsToTimestamp(
402 absl::ToUnixSeconds(benchmark_time - absl::Hours(24)));
403 google::protobuf::Timestamp upper_bound = TimeUtil::SecondsToTimestamp(
404 absl::ToUnixSeconds(benchmark_time - absl::Hours(23)));
405 EXPECT_TRUE(data->earliest_trustworthy_time() >= lower_bound);
406 EXPECT_TRUE(data->earliest_trustworthy_time() <= upper_bound);
407 }
408
TEST_F(PdsBackedOpStatsDbTest,CorruptedFile)409 TEST_F(PdsBackedOpStatsDbTest, CorruptedFile) {
410 {
411 std::unique_ptr<OpStatsDb> db =
412 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit)
413 .value();
414 auto func = [](OpStatsSequence& data) {
415 *data.add_opstats() = CreateOperationalStatsWithSingleEvent(
416 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
417 benchmark_time_sec);
418 };
419 EXPECT_CALL(
420 log_manager_,
421 LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
422 /*execution_index=*/0, /*epoch_index=*/0,
423 engine::DataSourceType::DATASET, /*value=*/Gt(0)));
424 EXPECT_CALL(
425 log_manager_,
426 LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
427 /*execution_index=*/0, /*epoch_index=*/0,
428 engine::DataSourceType::DATASET, /*value=*/1));
429 ASSERT_OK(db->Transform(func));
430 }
431
432 {
433 std::filesystem::path db_path(base_dir_);
434 db_path /= PdsBackedOpStatsDb::kParentDir;
435 db_path /= PdsBackedOpStatsDb::kDbFileName;
436 protostore::FileStorage file_storage;
437 std::unique_ptr<protostore::OutputStream> ostream =
438 file_storage.OpenForWrite(db_path).value();
439 ASSERT_OK(ostream->Append("not a proto"));
440 ASSERT_OK(ostream->Close());
441 }
442
443 std::unique_ptr<OpStatsDb> db =
444 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, size_limit)
445 .value();
446 EXPECT_CALL(log_manager_, LogDiag(ProdDiagCode::OPSTATS_READ_FAILED));
447 ::google::protobuf::Timestamp before_read_time = TimeUtil::GetCurrentTime();
448 ASSERT_THAT(db->Read(), IsCode(INTERNAL));
449 ::google::protobuf::Timestamp after_read_time = TimeUtil::GetCurrentTime();
450
451 // Second read should succeed, and return empty data.
452 absl::StatusOr<OpStatsSequence> data = db->Read();
453 ASSERT_THAT(data, IsOk());
454 EXPECT_TRUE(data->opstats().empty());
455 EXPECT_TRUE(data->earliest_trustworthy_time() >= before_read_time);
456 EXPECT_TRUE(data->earliest_trustworthy_time() <= after_read_time);
457 }
458
TEST_F(PdsBackedOpStatsDbTest,OpStatsRemovedDueToSizeLimit)459 TEST_F(PdsBackedOpStatsDbTest, OpStatsRemovedDueToSizeLimit) {
460 // Set size limit to 18, which allow a single OperationalStats with a single
461 // event (12 bytes for OperationalStats, 14 bytes when it is wrapped inside
462 // an OpStatsSequence). If record_earliest_trustworthy_time is true, we'll
463 // increase the size limit to 30 bytes to accommodate the timestamp.
464 int64_t max_size_bytes = 30;
465 absl::StatusOr<std::unique_ptr<OpStatsDb>> db_status =
466 PdsBackedOpStatsDb::Create(base_dir_, ttl, log_manager_, max_size_bytes);
467 ASSERT_THAT(db_status, IsOk());
468 std::unique_ptr<OpStatsDb> db = std::move(db_status.value());
469 EXPECT_CALL(
470 log_manager_,
471 LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
472 /*execution_index=*/0, /*epoch_index=*/0,
473 engine::DataSourceType::DATASET, /*value=*/Gt(0)))
474 .Times(2);
475 EXPECT_CALL(log_manager_,
476 LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
477 /*execution_index=*/0, /*epoch_index=*/0,
478 engine::DataSourceType::DATASET, /*value=*/1))
479 .Times(2);
480 OperationalStats op_stats = CreateOperationalStatsWithSingleEvent(
481 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED, benchmark_time_sec);
482 auto initial_commit = [op_stats](OpStatsSequence& data) {
483 *data.add_opstats() = op_stats;
484 };
485 ASSERT_OK(db->Transform(initial_commit));
486
487 // Add the second event, which will pushes the database size over the limit.
488 EXPECT_CALL(log_manager_,
489 LogToLongHistogram(HistogramCounters::OPSTATS_NUM_PRUNED_ENTRIES,
490 /*execution_index=*/0, /*epoch_index=*/0,
491 engine::DataSourceType::DATASET, /*value=*/1));
492 EXPECT_CALL(log_manager_,
493 LogToLongHistogram(
494 HistogramCounters::OPSTATS_OLDEST_PRUNED_ENTRY_TENURE_HOURS,
495 /*execution_index=*/0, /*epoch_index=*/0,
496 engine::DataSourceType::DATASET, /*value=*/Ge(0)));
497 OperationalStats another_op_stats = CreateOperationalStatsWithSingleEvent(
498 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED,
499 benchmark_time_sec + 5);
500 auto add = [another_op_stats](OpStatsSequence& data) {
501 *data.add_opstats() = another_op_stats;
502 };
503 ASSERT_OK(db->Transform(add));
504
505 // Verify the first event doesn't exist in the database.
506 OpStatsSequence expected;
507 *expected.add_opstats() = another_op_stats;
508 *expected.mutable_earliest_trustworthy_time() =
509 TimeUtil::SecondsToTimestamp(benchmark_time_sec + 5);
510
511 absl::StatusOr<OpStatsSequence> data = db->Read();
512 ASSERT_THAT(data, IsOk());
513 EXPECT_THAT(*data, EqualsProto(expected));
514 }
515
516 } // anonymous namespace
517 } // namespace opstats
518 } // namespace client
519 } // namespace fcp
520