1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
17
18 #include <cstring>
19
20 #include "tensorflow/core/lib/core/status_test_util.h"
21 #include "tensorflow/core/platform/blocking_counter.h"
22 #include "tensorflow/core/platform/cloud/now_seconds_env.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/notification.h"
25 #include "tensorflow/core/platform/test.h"
26
27 namespace tensorflow {
28 namespace {
29
ReadCache(RamFileBlockCache * cache,const string & filename,size_t offset,size_t n,std::vector<char> * out)30 Status ReadCache(RamFileBlockCache* cache, const string& filename,
31 size_t offset, size_t n, std::vector<char>* out) {
32 out->clear();
33 out->resize(n, 0);
34 size_t bytes_transferred = 0;
35 Status status =
36 cache->Read(filename, offset, n, out->data(), &bytes_transferred);
37 EXPECT_LE(bytes_transferred, n);
38 out->resize(bytes_transferred, n);
39 return status;
40 }
41
TEST(RamFileBlockCacheTest,IsCacheEnabled)42 TEST(RamFileBlockCacheTest, IsCacheEnabled) {
43 auto fetcher = [](const string& filename, size_t offset, size_t n,
44 char* buffer, size_t* bytes_transferred) {
45 // Do nothing.
46 return Status::OK();
47 };
48 RamFileBlockCache cache1(0, 0, 0, fetcher);
49 RamFileBlockCache cache2(16, 0, 0, fetcher);
50 RamFileBlockCache cache3(0, 32, 0, fetcher);
51 RamFileBlockCache cache4(16, 32, 0, fetcher);
52
53 EXPECT_FALSE(cache1.IsCacheEnabled());
54 EXPECT_FALSE(cache2.IsCacheEnabled());
55 EXPECT_FALSE(cache3.IsCacheEnabled());
56 EXPECT_TRUE(cache4.IsCacheEnabled());
57 }
58
TEST(RamFileBlockCacheTest,ValidateAndUpdateFileSignature)59 TEST(RamFileBlockCacheTest, ValidateAndUpdateFileSignature) {
60 int calls = 0;
61 auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
62 char* buffer, size_t* bytes_transferred) {
63 calls++;
64 memset(buffer, 'x', n);
65 *bytes_transferred = n;
66 return Status::OK();
67 };
68 string filename = "file";
69 RamFileBlockCache cache(16, 32, 0, fetcher);
70 std::vector<char> out;
71
72 // First read.
73 EXPECT_TRUE(cache.ValidateAndUpdateFileSignature(filename, 123));
74 TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
75 EXPECT_EQ(calls, 1);
76
77 // Second read. Hit cache.
78 EXPECT_TRUE(cache.ValidateAndUpdateFileSignature(filename, 123));
79 TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
80 EXPECT_EQ(calls, 1);
81
82 // Third read. File signatures are different.
83 EXPECT_FALSE(cache.ValidateAndUpdateFileSignature(filename, 321));
84 TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
85 EXPECT_EQ(calls, 2);
86 }
87
TEST(RamFileBlockCacheTest,PassThrough)88 TEST(RamFileBlockCacheTest, PassThrough) {
89 const string want_filename = "foo/bar";
90 const size_t want_offset = 42;
91 const size_t want_n = 1024;
92 int calls = 0;
93 auto fetcher = [&calls, want_filename, want_offset, want_n](
94 const string& got_filename, size_t got_offset,
95 size_t got_n, char* buffer, size_t* bytes_transferred) {
96 EXPECT_EQ(got_filename, want_filename);
97 EXPECT_EQ(got_offset, want_offset);
98 EXPECT_EQ(got_n, want_n);
99 calls++;
100 memset(buffer, 'x', got_n);
101 *bytes_transferred = got_n;
102 return Status::OK();
103 };
104 // If block_size, max_bytes, or both are zero, or want_n is larger than
105 // max_bytes the cache is a pass-through.
106 RamFileBlockCache cache1(1, 0, 0, fetcher);
107 RamFileBlockCache cache2(0, 1, 0, fetcher);
108 RamFileBlockCache cache3(0, 0, 0, fetcher);
109 RamFileBlockCache cache4(1000, 1000, 0, fetcher);
110 std::vector<char> out;
111 TF_EXPECT_OK(ReadCache(&cache1, want_filename, want_offset, want_n, &out));
112 EXPECT_EQ(calls, 1);
113 TF_EXPECT_OK(ReadCache(&cache2, want_filename, want_offset, want_n, &out));
114 EXPECT_EQ(calls, 2);
115 TF_EXPECT_OK(ReadCache(&cache3, want_filename, want_offset, want_n, &out));
116 EXPECT_EQ(calls, 3);
117 TF_EXPECT_OK(ReadCache(&cache4, want_filename, want_offset, want_n, &out));
118 EXPECT_EQ(calls, 4);
119 }
120
TEST(RamFileBlockCacheTest,BlockAlignment)121 TEST(RamFileBlockCacheTest, BlockAlignment) {
122 // Initialize a 256-byte buffer. This is the file underlying the reads we'll
123 // do in this test.
124 const size_t size = 256;
125 std::vector<char> buf;
126 for (int i = 0; i < size; i++) {
127 buf.push_back(i);
128 }
129 // The fetcher just fetches slices of the buffer.
130 auto fetcher = [&buf](const string& filename, size_t offset, size_t n,
131 char* buffer, size_t* bytes_transferred) {
132 if (offset < buf.size()) {
133 size_t bytes_to_copy = std::min<size_t>(buf.size() - offset, n);
134 memcpy(buffer, buf.data() + offset, bytes_to_copy);
135 *bytes_transferred = bytes_to_copy;
136 } else {
137 *bytes_transferred = 0;
138 }
139 return Status::OK();
140 };
141 for (size_t block_size = 2; block_size <= 4; block_size++) {
142 // Make a cache of N-byte block size (1 block) and verify that reads of
143 // varying offsets and lengths return correct data.
144 RamFileBlockCache cache(block_size, block_size, 0, fetcher);
145 for (size_t offset = 0; offset < 10; offset++) {
146 for (size_t n = block_size - 2; n <= block_size + 2; n++) {
147 std::vector<char> got;
148 TF_EXPECT_OK(ReadCache(&cache, "", offset, n, &got));
149 // Verify the size of the read.
150 if (offset + n <= size) {
151 // Expect a full read.
152 EXPECT_EQ(got.size(), n) << "block size = " << block_size
153 << ", offset = " << offset << ", n = " << n;
154 } else {
155 // Expect a partial read.
156 EXPECT_EQ(got.size(), size - offset)
157 << "block size = " << block_size << ", offset = " << offset
158 << ", n = " << n;
159 }
160 // Verify the contents of the read.
161 std::vector<char>::const_iterator begin = buf.begin() + offset;
162 std::vector<char>::const_iterator end =
163 offset + n > buf.size() ? buf.end() : begin + n;
164 std::vector<char> want(begin, end);
165 EXPECT_EQ(got, want) << "block size = " << block_size
166 << ", offset = " << offset << ", n = " << n;
167 }
168 }
169 }
170 }
171
TEST(RamFileBlockCacheTest,CacheHits)172 TEST(RamFileBlockCacheTest, CacheHits) {
173 const size_t block_size = 16;
174 std::set<size_t> calls;
175 auto fetcher = [&calls, block_size](const string& filename, size_t offset,
176 size_t n, char* buffer,
177 size_t* bytes_transferred) {
178 EXPECT_EQ(n, block_size);
179 EXPECT_EQ(offset % block_size, 0);
180 EXPECT_EQ(calls.find(offset), calls.end()) << "at offset " << offset;
181 calls.insert(offset);
182 memset(buffer, 'x', n);
183 *bytes_transferred = n;
184 return Status::OK();
185 };
186 const uint32 block_count = 256;
187 RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
188 std::vector<char> out;
189 out.resize(block_count, 0);
190 // The cache has space for `block_count` blocks. The loop with i = 0 should
191 // fill the cache, and the loop with i = 1 should be all cache hits. The
192 // fetcher checks that it is called once and only once for each offset (to
193 // fetch the corresponding block).
194 for (int i = 0; i < 2; i++) {
195 for (int j = 0; j < block_count; j++) {
196 TF_EXPECT_OK(ReadCache(&cache, "", block_size * j, block_size, &out));
197 }
198 }
199 }
200
TEST(RamFileBlockCacheTest,OutOfRange)201 TEST(RamFileBlockCacheTest, OutOfRange) {
202 // Tests reads of a 24-byte file with block size 16.
203 const size_t block_size = 16;
204 const size_t file_size = 24;
205 bool first_block = false;
206 bool second_block = false;
207 auto fetcher = [block_size, file_size, &first_block, &second_block](
208 const string& filename, size_t offset, size_t n,
209 char* buffer, size_t* bytes_transferred) {
210 EXPECT_EQ(n, block_size);
211 EXPECT_EQ(offset % block_size, 0);
212 size_t bytes_to_copy = 0;
213 if (offset == 0) {
214 // The first block (16 bytes) of the file.
215 memset(buffer, 'x', n);
216 bytes_to_copy = n;
217 first_block = true;
218 } else if (offset == block_size) {
219 // The second block (8 bytes) of the file.
220 bytes_to_copy = file_size - block_size;
221 memset(buffer, 'x', bytes_to_copy);
222 second_block = true;
223 }
224 *bytes_transferred = bytes_to_copy;
225 return Status::OK();
226 };
227 RamFileBlockCache cache(block_size, block_size, 0, fetcher);
228 std::vector<char> out;
229 // Reading the first 16 bytes should be fine.
230 TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size, &out));
231 EXPECT_TRUE(first_block);
232 EXPECT_EQ(out.size(), block_size);
233 // Reading at offset file_size + 4 will read the second block (since the read
234 // at file_size + 4 = 28 will be aligned to an offset of 16) but will return
235 // OutOfRange because the offset is past the end of the 24-byte file.
236 Status status = ReadCache(&cache, "", file_size + 4, 4, &out);
237 EXPECT_EQ(status.code(), error::OUT_OF_RANGE);
238 EXPECT_TRUE(second_block);
239 // Reading the second full block will return 8 bytes, from a cache hit.
240 second_block = false;
241 TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out));
242 EXPECT_FALSE(second_block);
243 EXPECT_EQ(out.size(), file_size - block_size);
244 }
245
TEST(RamFileBlockCacheTest,Inconsistent)246 TEST(RamFileBlockCacheTest, Inconsistent) {
247 // Tests the detection of interrupted reads leading to partially filled blocks
248 // where we expected complete blocks.
249 const size_t block_size = 16;
250 // This fetcher returns OK but only fills in one byte for any offset.
251 auto fetcher = [block_size](const string& filename, size_t offset, size_t n,
252 char* buffer, size_t* bytes_transferred) {
253 EXPECT_EQ(n, block_size);
254 EXPECT_EQ(offset % block_size, 0);
255 EXPECT_GE(n, 1);
256 memset(buffer, 'x', 1);
257 *bytes_transferred = 1;
258 return Status::OK();
259 };
260 RamFileBlockCache cache(block_size, 2 * block_size, 0, fetcher);
261 std::vector<char> out;
262 // Read the second block; this should yield an OK status and a single byte.
263 TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out));
264 EXPECT_EQ(out.size(), 1);
265 // Now read the first block; this should yield an INTERNAL error because we
266 // had already cached a partial block at a later position.
267 Status status = ReadCache(&cache, "", 0, block_size, &out);
268 EXPECT_EQ(status.code(), error::INTERNAL);
269 }
270
TEST(RamFileBlockCacheTest,LRU)271 TEST(RamFileBlockCacheTest, LRU) {
272 const size_t block_size = 16;
273 std::list<size_t> calls;
274 auto fetcher = [&calls, block_size](const string& filename, size_t offset,
275 size_t n, char* buffer,
276 size_t* bytes_transferred) {
277 EXPECT_EQ(n, block_size);
278 EXPECT_FALSE(calls.empty()) << "at offset = " << offset;
279 if (!calls.empty()) {
280 EXPECT_EQ(offset, calls.front());
281 calls.pop_front();
282 }
283 memset(buffer, 'x', n);
284 *bytes_transferred = n;
285 return Status::OK();
286 };
287 const uint32 block_count = 2;
288 RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
289 std::vector<char> out;
290 // Read blocks from the cache, and verify the LRU behavior based on the
291 // fetcher calls that the cache makes.
292 calls.push_back(0);
293 // Cache miss - drains an element from `calls`.
294 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
295 // Cache hit - does not drain an element from `calls`.
296 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
297 calls.push_back(block_size);
298 // Cache miss followed by cache hit.
299 TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
300 TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
301 calls.push_back(2 * block_size);
302 // Cache miss followed by cache hit. Causes eviction of LRU element.
303 TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
304 TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
305 // LRU element was at offset 0. Cache miss.
306 calls.push_back(0);
307 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
308 // Element at 2 * block_size is still in cache, and this read should update
309 // its position in the LRU list so it doesn't get evicted by the next read.
310 TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
311 // Element at block_size was evicted. Reading this element will also cause
312 // the LRU element (at 0) to be evicted.
313 calls.push_back(block_size);
314 TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
315 // Element at 0 was evicted again.
316 calls.push_back(0);
317 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
318 }
319
TEST(RamFileBlockCacheTest,MaxStaleness)320 TEST(RamFileBlockCacheTest, MaxStaleness) {
321 int calls = 0;
322 auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
323 char* buffer, size_t* bytes_transferred) {
324 calls++;
325 memset(buffer, 'x', n);
326 *bytes_transferred = n;
327 return Status::OK();
328 };
329 std::vector<char> out;
330 std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv);
331 // Create a cache with max staleness of 2 seconds, and verify that it works as
332 // expected.
333 RamFileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get());
334 // Execute the first read to load the block.
335 TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out));
336 EXPECT_EQ(calls, 1);
337 // Now advance the clock one second at a time and redo the read. The call
338 // count should advance every 3 seconds (i.e. every time the staleness is
339 // greater than 2).
340 for (int i = 1; i <= 10; i++) {
341 env->SetNowSeconds(i + 1);
342 TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out));
343 EXPECT_EQ(calls, 1 + i / 3);
344 }
345 // Now create a cache with max staleness of 0, and verify that it also works
346 // as expected.
347 calls = 0;
348 env->SetNowSeconds(0);
349 RamFileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get());
350 // Execute the first read to load the block.
351 TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out));
352 EXPECT_EQ(calls, 1);
353 // Advance the clock by a huge amount and verify that the cached block is
354 // used to satisfy the read.
355 env->SetNowSeconds(365 * 24 * 60 * 60); // ~1 year, just for fun.
356 TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out));
357 EXPECT_EQ(calls, 1);
358 }
359
TEST(RamFileBlockCacheTest,RemoveFile)360 TEST(RamFileBlockCacheTest, RemoveFile) {
361 int calls = 0;
362 auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
363 char* buffer, size_t* bytes_transferred) {
364 calls++;
365 char c = (filename == "a") ? 'a' : (filename == "b") ? 'b' : 'x';
366 if (offset > 0) {
367 // The first block is lower case and all subsequent blocks are upper case.
368 c = toupper(c);
369 }
370 memset(buffer, c, n);
371 *bytes_transferred = n;
372 return Status::OK();
373 };
374 // This cache has space for 4 blocks; we'll read from two files.
375 const size_t n = 3;
376 RamFileBlockCache cache(8, 32, 0, fetcher);
377 std::vector<char> out;
378 std::vector<char> a(n, 'a');
379 std::vector<char> b(n, 'b');
380 std::vector<char> A(n, 'A');
381 std::vector<char> B(n, 'B');
382 // Fill the cache.
383 TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
384 EXPECT_EQ(out, a);
385 EXPECT_EQ(calls, 1);
386 TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
387 EXPECT_EQ(out, A);
388 EXPECT_EQ(calls, 2);
389 TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
390 EXPECT_EQ(out, b);
391 EXPECT_EQ(calls, 3);
392 TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
393 EXPECT_EQ(out, B);
394 EXPECT_EQ(calls, 4);
395 // All four blocks should be in the cache now.
396 TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
397 EXPECT_EQ(out, a);
398 TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
399 EXPECT_EQ(out, A);
400 TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
401 EXPECT_EQ(out, b);
402 TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
403 EXPECT_EQ(out, B);
404 EXPECT_EQ(calls, 4);
405 // Remove the blocks from "a".
406 cache.RemoveFile("a");
407 // Both blocks from "b" should still be there.
408 TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
409 EXPECT_EQ(out, b);
410 TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
411 EXPECT_EQ(out, B);
412 EXPECT_EQ(calls, 4);
413 // The blocks from "a" should not be there.
414 TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
415 EXPECT_EQ(out, a);
416 EXPECT_EQ(calls, 5);
417 TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
418 EXPECT_EQ(out, A);
419 EXPECT_EQ(calls, 6);
420 }
421
TEST(RamFileBlockCacheTest,Prune)422 TEST(RamFileBlockCacheTest, Prune) {
423 int calls = 0;
424 auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
425 char* buffer, size_t* bytes_transferred) {
426 calls++;
427 memset(buffer, 'x', n);
428 *bytes_transferred = n;
429 return Status::OK();
430 };
431 std::vector<char> out;
432 // Our fake environment is initialized with the current timestamp.
433 std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv);
434 uint64 now = Env::Default()->NowSeconds();
435 env->SetNowSeconds(now);
436 RamFileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get());
437 // Read three blocks into the cache, and advance the timestamp by one second
438 // with each read. Start with a block of "a" at the current timestamp `now`.
439 TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out));
440 // Now load a block of a different file "b" at timestamp `now` + 1
441 env->SetNowSeconds(now + 1);
442 TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
443 // Now load a different block of file "a" at timestamp `now` + 1. When the
444 // first block of "a" expires, this block should also be removed because it
445 // also belongs to file "a".
446 TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out));
447 // Ensure that all blocks are in the cache (i.e. reads are cache hits).
448 EXPECT_EQ(cache.CacheSize(), 24);
449 EXPECT_EQ(calls, 3);
450 TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out));
451 TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
452 TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out));
453 EXPECT_EQ(calls, 3);
454 // Advance the fake timestamp so that "a" becomes stale via its first block.
455 env->SetNowSeconds(now + 2);
456 // The pruning thread periodically compares env->NowSeconds() with the oldest
457 // block's timestamp to see if it should evict any files. At the current fake
458 // timestamp of `now` + 2, file "a" is stale because its first block is stale,
459 // but file "b" is not stale yet. Thus, once the pruning thread wakes up (in
460 // one second of wall time), it should remove "a" and leave "b" alone.
461 uint64 start = Env::Default()->NowSeconds();
462 do {
463 Env::Default()->SleepForMicroseconds(100000);
464 } while (cache.CacheSize() == 24 && Env::Default()->NowSeconds() - start < 3);
465 // There should be one block left in the cache, and it should be the first
466 // block of "b".
467 EXPECT_EQ(cache.CacheSize(), 8);
468 TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
469 EXPECT_EQ(calls, 3);
470 // Advance the fake time to `now` + 3, at which point "b" becomes stale.
471 env->SetNowSeconds(now + 3);
472 // Wait for the pruner to remove "b".
473 start = Env::Default()->NowSeconds();
474 do {
475 Env::Default()->SleepForMicroseconds(100000);
476 } while (cache.CacheSize() == 8 && Env::Default()->NowSeconds() - start < 3);
477 // The cache should now be empty.
478 EXPECT_EQ(cache.CacheSize(), 0);
479 }
480
TEST(RamFileBlockCacheTest,ParallelReads)481 TEST(RamFileBlockCacheTest, ParallelReads) {
482 // This fetcher won't respond until either `callers` threads are calling it
483 // concurrently (at which point it will respond with success to all callers),
484 // or 10 seconds have elapsed (at which point it will respond with an error).
485 const int callers = 4;
486 BlockingCounter counter(callers);
487 auto fetcher = [&counter](const string& filename, size_t offset, size_t n,
488 char* buffer, size_t* bytes_transferred) {
489 counter.DecrementCount();
490 if (!counter.WaitFor(std::chrono::seconds(10))) {
491 // This avoids having the test time out, which is harder to debug.
492 return errors::FailedPrecondition("desired concurrency not reached");
493 }
494 memset(buffer, 'x', n);
495 *bytes_transferred = n;
496 return Status::OK();
497 };
498 const int block_size = 8;
499 RamFileBlockCache cache(block_size, 2 * callers * block_size, 0, fetcher);
500 std::vector<std::unique_ptr<Thread>> threads;
501 for (int i = 0; i < callers; i++) {
502 threads.emplace_back(
503 Env::Default()->StartThread({}, "caller", [&cache, i, block_size]() {
504 std::vector<char> out;
505 TF_EXPECT_OK(
506 ReadCache(&cache, "a", i * block_size, block_size, &out));
507 std::vector<char> x(block_size, 'x');
508 EXPECT_EQ(out, x);
509 }));
510 }
511 // The `threads` destructor blocks until the threads can be joined, once their
512 // respective reads finish (which happens once they are all concurrently being
513 // executed, or 10 seconds have passed).
514 }
515
TEST(RamFileBlockCacheTest,CoalesceConcurrentReads)516 TEST(RamFileBlockCacheTest, CoalesceConcurrentReads) {
517 // Concurrent reads to the same file blocks should be de-duplicated.
518 const size_t block_size = 16;
519 int num_requests = 0;
520 Notification notification;
521 auto fetcher = [&num_requests, ¬ification, block_size](
522 const string& filename, size_t offset, size_t n,
523 char* buffer, size_t* bytes_transferred) {
524 EXPECT_EQ(n, block_size);
525 EXPECT_EQ(offset, 0);
526 num_requests++;
527 memset(buffer, 'x', n);
528 *bytes_transferred = n;
529 notification.Notify();
530 // Wait for other thread to issue read.
531 Env::Default()->SleepForMicroseconds(100000); // 0.1 secs
532 return Status::OK();
533 };
534 RamFileBlockCache cache(block_size, block_size, 0, fetcher);
535 // Fork off thread for parallel read.
536 std::unique_ptr<Thread> concurrent(
537 Env::Default()->StartThread({}, "concurrent", [&cache, block_size] {
538 std::vector<char> out;
539 TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size / 2, &out));
540 EXPECT_EQ(out.size(), block_size / 2);
541 }));
542 notification.WaitForNotification();
543 std::vector<char> out;
544 TF_EXPECT_OK(ReadCache(&cache, "", block_size / 2, block_size / 2, &out));
545 EXPECT_EQ(out.size(), block_size / 2);
546
547 EXPECT_EQ(1, num_requests);
548 }
549
TEST(RamFileBlockCacheTest,Flush)550 TEST(RamFileBlockCacheTest, Flush) {
551 int calls = 0;
552 auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
553 char* buffer, size_t* bytes_transferred) {
554 calls++;
555 memset(buffer, 'x', n);
556 *bytes_transferred = n;
557 return Status::OK();
558 };
559 RamFileBlockCache cache(16, 32, 0, fetcher);
560 std::vector<char> out;
561 TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
562 TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
563 EXPECT_EQ(calls, 1);
564 cache.Flush();
565 TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
566 EXPECT_EQ(calls, 2);
567 }
568
569 } // namespace
570 } // namespace tensorflow
571