1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
17 #include <cstring>
18 #include "tensorflow/core/lib/core/blocking_counter.h"
19 #include "tensorflow/core/lib/core/status_test_util.h"
20 #include "tensorflow/core/platform/cloud/now_seconds_env.h"
21 #include "tensorflow/core/platform/env.h"
22 #include "tensorflow/core/platform/notification.h"
23 #include "tensorflow/core/platform/test.h"
24
25 namespace tensorflow {
26 namespace {
27
ReadCache(RamFileBlockCache * cache,const string & filename,size_t offset,size_t n,std::vector<char> * out)28 Status ReadCache(RamFileBlockCache* cache, const string& filename,
29 size_t offset, size_t n, std::vector<char>* out) {
30 out->clear();
31 out->resize(n, 0);
32 size_t bytes_transferred = 0;
33 Status status =
34 cache->Read(filename, offset, n, out->data(), &bytes_transferred);
35 EXPECT_LE(bytes_transferred, n);
36 out->resize(bytes_transferred, n);
37 return status;
38 }
39
TEST(RamFileBlockCacheTest,IsCacheEnabled)40 TEST(RamFileBlockCacheTest, IsCacheEnabled) {
41 auto fetcher = [](const string& filename, size_t offset, size_t n,
42 char* buffer, size_t* bytes_transferred) {
43 // Do nothing.
44 return Status::OK();
45 };
46 RamFileBlockCache cache1(0, 0, 0, fetcher);
47 RamFileBlockCache cache2(16, 0, 0, fetcher);
48 RamFileBlockCache cache3(0, 32, 0, fetcher);
49 RamFileBlockCache cache4(16, 32, 0, fetcher);
50
51 EXPECT_FALSE(cache1.IsCacheEnabled());
52 EXPECT_FALSE(cache2.IsCacheEnabled());
53 EXPECT_FALSE(cache3.IsCacheEnabled());
54 EXPECT_TRUE(cache4.IsCacheEnabled());
55 }
56
TEST(RamFileBlockCacheTest,ValidateAndUpdateFileSignature)57 TEST(RamFileBlockCacheTest, ValidateAndUpdateFileSignature) {
58 int calls = 0;
59 auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
60 char* buffer, size_t* bytes_transferred) {
61 calls++;
62 memset(buffer, 'x', n);
63 *bytes_transferred = n;
64 return Status::OK();
65 };
66 string filename = "file";
67 RamFileBlockCache cache(16, 32, 0, fetcher);
68 std::vector<char> out;
69
70 // First read.
71 EXPECT_TRUE(cache.ValidateAndUpdateFileSignature(filename, 123));
72 TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
73 EXPECT_EQ(calls, 1);
74
75 // Second read. Hit cache.
76 EXPECT_TRUE(cache.ValidateAndUpdateFileSignature(filename, 123));
77 TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
78 EXPECT_EQ(calls, 1);
79
80 // Third read. File signatures are different.
81 EXPECT_FALSE(cache.ValidateAndUpdateFileSignature(filename, 321));
82 TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
83 EXPECT_EQ(calls, 2);
84 }
85
TEST(RamFileBlockCacheTest,PassThrough)86 TEST(RamFileBlockCacheTest, PassThrough) {
87 const string want_filename = "foo/bar";
88 const size_t want_offset = 42;
89 const size_t want_n = 1024;
90 int calls = 0;
91 auto fetcher = [&calls, want_filename, want_offset, want_n](
92 const string& got_filename, size_t got_offset,
93 size_t got_n, char* buffer, size_t* bytes_transferred) {
94 EXPECT_EQ(got_filename, want_filename);
95 EXPECT_EQ(got_offset, want_offset);
96 EXPECT_EQ(got_n, want_n);
97 calls++;
98 memset(buffer, 'x', got_n);
99 *bytes_transferred = got_n;
100 return Status::OK();
101 };
102 // If block_size, max_bytes, or both are zero, the cache is a pass-through.
103 RamFileBlockCache cache1(1, 0, 0, fetcher);
104 RamFileBlockCache cache2(0, 1, 0, fetcher);
105 RamFileBlockCache cache3(0, 0, 0, fetcher);
106 std::vector<char> out;
107 TF_EXPECT_OK(ReadCache(&cache1, want_filename, want_offset, want_n, &out));
108 EXPECT_EQ(calls, 1);
109 TF_EXPECT_OK(ReadCache(&cache2, want_filename, want_offset, want_n, &out));
110 EXPECT_EQ(calls, 2);
111 TF_EXPECT_OK(ReadCache(&cache3, want_filename, want_offset, want_n, &out));
112 EXPECT_EQ(calls, 3);
113 }
114
TEST(RamFileBlockCacheTest,BlockAlignment)115 TEST(RamFileBlockCacheTest, BlockAlignment) {
116 // Initialize a 256-byte buffer. This is the file underlying the reads we'll
117 // do in this test.
118 const size_t size = 256;
119 std::vector<char> buf;
120 for (int i = 0; i < size; i++) {
121 buf.push_back(i);
122 }
123 // The fetcher just fetches slices of the buffer.
124 auto fetcher = [&buf](const string& filename, size_t offset, size_t n,
125 char* buffer, size_t* bytes_transferred) {
126 if (offset < buf.size()) {
127 size_t bytes_to_copy = std::min<size_t>(buf.size() - offset, n);
128 memcpy(buffer, buf.data() + offset, bytes_to_copy);
129 *bytes_transferred = bytes_to_copy;
130 } else {
131 *bytes_transferred = 0;
132 }
133 return Status::OK();
134 };
135 for (size_t block_size = 2; block_size <= 4; block_size++) {
136 // Make a cache of N-byte block size (1 block) and verify that reads of
137 // varying offsets and lengths return correct data.
138 RamFileBlockCache cache(block_size, block_size, 0, fetcher);
139 for (size_t offset = 0; offset < 10; offset++) {
140 for (size_t n = block_size - 2; n <= block_size + 2; n++) {
141 std::vector<char> got;
142 TF_EXPECT_OK(ReadCache(&cache, "", offset, n, &got));
143 // Verify the size of the read.
144 if (offset + n <= size) {
145 // Expect a full read.
146 EXPECT_EQ(got.size(), n) << "block size = " << block_size
147 << ", offset = " << offset << ", n = " << n;
148 } else {
149 // Expect a partial read.
150 EXPECT_EQ(got.size(), size - offset)
151 << "block size = " << block_size << ", offset = " << offset
152 << ", n = " << n;
153 }
154 // Verify the contents of the read.
155 std::vector<char>::const_iterator begin = buf.begin() + offset;
156 std::vector<char>::const_iterator end =
157 offset + n > buf.size() ? buf.end() : begin + n;
158 std::vector<char> want(begin, end);
159 EXPECT_EQ(got, want) << "block size = " << block_size
160 << ", offset = " << offset << ", n = " << n;
161 }
162 }
163 }
164 }
165
TEST(RamFileBlockCacheTest,CacheHits)166 TEST(RamFileBlockCacheTest, CacheHits) {
167 const size_t block_size = 16;
168 std::set<size_t> calls;
169 auto fetcher = [&calls, block_size](const string& filename, size_t offset,
170 size_t n, char* buffer,
171 size_t* bytes_transferred) {
172 EXPECT_EQ(n, block_size);
173 EXPECT_EQ(offset % block_size, 0);
174 EXPECT_EQ(calls.find(offset), calls.end()) << "at offset " << offset;
175 calls.insert(offset);
176 memset(buffer, 'x', n);
177 *bytes_transferred = n;
178 return Status::OK();
179 };
180 const uint32 block_count = 256;
181 RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
182 std::vector<char> out;
183 out.resize(block_count, 0);
184 // The cache has space for `block_count` blocks. The loop with i = 0 should
185 // fill the cache, and the loop with i = 1 should be all cache hits. The
186 // fetcher checks that it is called once and only once for each offset (to
187 // fetch the corresponding block).
188 for (int i = 0; i < 2; i++) {
189 for (int j = 0; j < block_count; j++) {
190 TF_EXPECT_OK(ReadCache(&cache, "", block_size * j, block_size, &out));
191 }
192 }
193 }
194
TEST(RamFileBlockCacheTest,OutOfRange)195 TEST(RamFileBlockCacheTest, OutOfRange) {
196 // Tests reads of a 24-byte file with block size 16.
197 const size_t block_size = 16;
198 const size_t file_size = 24;
199 bool first_block = false;
200 bool second_block = false;
201 auto fetcher = [block_size, file_size, &first_block, &second_block](
202 const string& filename, size_t offset, size_t n,
203 char* buffer, size_t* bytes_transferred) {
204 EXPECT_EQ(n, block_size);
205 EXPECT_EQ(offset % block_size, 0);
206 size_t bytes_to_copy = 0;
207 if (offset == 0) {
208 // The first block (16 bytes) of the file.
209 memset(buffer, 'x', n);
210 bytes_to_copy = n;
211 first_block = true;
212 } else if (offset == block_size) {
213 // The second block (8 bytes) of the file.
214 bytes_to_copy = file_size - block_size;
215 memset(buffer, 'x', bytes_to_copy);
216 second_block = true;
217 }
218 *bytes_transferred = bytes_to_copy;
219 return Status::OK();
220 };
221 RamFileBlockCache cache(block_size, block_size, 0, fetcher);
222 std::vector<char> out;
223 // Reading the first 16 bytes should be fine.
224 TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size, &out));
225 EXPECT_TRUE(first_block);
226 EXPECT_EQ(out.size(), block_size);
227 // Reading at offset file_size + 4 will read the second block (since the read
228 // at file_size + 4 = 28 will be aligned to an offset of 16) but will return
229 // OutOfRange because the offset is past the end of the 24-byte file.
230 Status status = ReadCache(&cache, "", file_size + 4, 4, &out);
231 EXPECT_EQ(status.code(), error::OUT_OF_RANGE);
232 EXPECT_TRUE(second_block);
233 // Reading the second full block will return 8 bytes, from a cache hit.
234 second_block = false;
235 TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out));
236 EXPECT_FALSE(second_block);
237 EXPECT_EQ(out.size(), file_size - block_size);
238 }
239
TEST(RamFileBlockCacheTest,Inconsistent)240 TEST(RamFileBlockCacheTest, Inconsistent) {
241 // Tests the detection of interrupted reads leading to partially filled blocks
242 // where we expected complete blocks.
243 const size_t block_size = 16;
244 // This fetcher returns OK but only fills in one byte for any offset.
245 auto fetcher = [block_size](const string& filename, size_t offset, size_t n,
246 char* buffer, size_t* bytes_transferred) {
247 EXPECT_EQ(n, block_size);
248 EXPECT_EQ(offset % block_size, 0);
249 EXPECT_GE(n, 1);
250 memset(buffer, 'x', 1);
251 *bytes_transferred = 1;
252 return Status::OK();
253 };
254 RamFileBlockCache cache(block_size, 2 * block_size, 0, fetcher);
255 std::vector<char> out;
256 // Read the second block; this should yield an OK status and a single byte.
257 TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out));
258 EXPECT_EQ(out.size(), 1);
259 // Now read the first block; this should yield an INTERNAL error because we
260 // had already cached a partial block at a later position.
261 Status status = ReadCache(&cache, "", 0, block_size, &out);
262 EXPECT_EQ(status.code(), error::INTERNAL);
263 }
264
TEST(RamFileBlockCacheTest,LRU)265 TEST(RamFileBlockCacheTest, LRU) {
266 const size_t block_size = 16;
267 std::list<size_t> calls;
268 auto fetcher = [&calls, block_size](const string& filename, size_t offset,
269 size_t n, char* buffer,
270 size_t* bytes_transferred) {
271 EXPECT_EQ(n, block_size);
272 EXPECT_FALSE(calls.empty()) << "at offset = " << offset;
273 if (!calls.empty()) {
274 EXPECT_EQ(offset, calls.front());
275 calls.pop_front();
276 }
277 memset(buffer, 'x', n);
278 *bytes_transferred = n;
279 return Status::OK();
280 };
281 const uint32 block_count = 2;
282 RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
283 std::vector<char> out;
284 // Read blocks from the cache, and verify the LRU behavior based on the
285 // fetcher calls that the cache makes.
286 calls.push_back(0);
287 // Cache miss - drains an element from `calls`.
288 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
289 // Cache hit - does not drain an element from `calls`.
290 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
291 calls.push_back(block_size);
292 // Cache miss followed by cache hit.
293 TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
294 TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
295 calls.push_back(2 * block_size);
296 // Cache miss followed by cache hit. Causes eviction of LRU element.
297 TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
298 TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
299 // LRU element was at offset 0. Cache miss.
300 calls.push_back(0);
301 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
302 // Element at 2 * block_size is still in cache, and this read should update
303 // its position in the LRU list so it doesn't get evicted by the next read.
304 TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
305 // Element at block_size was evicted. Reading this element will also cause
306 // the LRU element (at 0) to be evicted.
307 calls.push_back(block_size);
308 TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
309 // Element at 0 was evicted again.
310 calls.push_back(0);
311 TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
312 }
313
TEST(RamFileBlockCacheTest,MaxStaleness)314 TEST(RamFileBlockCacheTest, MaxStaleness) {
315 int calls = 0;
316 auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
317 char* buffer, size_t* bytes_transferred) {
318 calls++;
319 memset(buffer, 'x', n);
320 *bytes_transferred = n;
321 return Status::OK();
322 };
323 std::vector<char> out;
324 std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv);
325 // Create a cache with max staleness of 2 seconds, and verify that it works as
326 // expected.
327 RamFileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get());
328 // Execute the first read to load the block.
329 TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out));
330 EXPECT_EQ(calls, 1);
331 // Now advance the clock one second at a time and redo the read. The call
332 // count should advance every 3 seconds (i.e. every time the staleness is
333 // greater than 2).
334 for (int i = 1; i <= 10; i++) {
335 env->SetNowSeconds(i + 1);
336 TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out));
337 EXPECT_EQ(calls, 1 + i / 3);
338 }
339 // Now create a cache with max staleness of 0, and verify that it also works
340 // as expected.
341 calls = 0;
342 env->SetNowSeconds(0);
343 RamFileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get());
344 // Execute the first read to load the block.
345 TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out));
346 EXPECT_EQ(calls, 1);
347 // Advance the clock by a huge amount and verify that the cached block is
348 // used to satisfy the read.
349 env->SetNowSeconds(365 * 24 * 60 * 60); // ~1 year, just for fun.
350 TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out));
351 EXPECT_EQ(calls, 1);
352 }
353
TEST(RamFileBlockCacheTest,RemoveFile)354 TEST(RamFileBlockCacheTest, RemoveFile) {
355 int calls = 0;
356 auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
357 char* buffer, size_t* bytes_transferred) {
358 calls++;
359 char c = (filename == "a") ? 'a' : (filename == "b") ? 'b' : 'x';
360 if (offset > 0) {
361 // The first block is lower case and all subsequent blocks are upper case.
362 c = toupper(c);
363 }
364 memset(buffer, c, n);
365 *bytes_transferred = n;
366 return Status::OK();
367 };
368 // This cache has space for 4 blocks; we'll read from two files.
369 const size_t n = 3;
370 RamFileBlockCache cache(8, 32, 0, fetcher);
371 std::vector<char> out;
372 std::vector<char> a(n, 'a');
373 std::vector<char> b(n, 'b');
374 std::vector<char> A(n, 'A');
375 std::vector<char> B(n, 'B');
376 // Fill the cache.
377 TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
378 EXPECT_EQ(out, a);
379 EXPECT_EQ(calls, 1);
380 TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
381 EXPECT_EQ(out, A);
382 EXPECT_EQ(calls, 2);
383 TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
384 EXPECT_EQ(out, b);
385 EXPECT_EQ(calls, 3);
386 TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
387 EXPECT_EQ(out, B);
388 EXPECT_EQ(calls, 4);
389 // All four blocks should be in the cache now.
390 TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
391 EXPECT_EQ(out, a);
392 TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
393 EXPECT_EQ(out, A);
394 TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
395 EXPECT_EQ(out, b);
396 TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
397 EXPECT_EQ(out, B);
398 EXPECT_EQ(calls, 4);
399 // Remove the blocks from "a".
400 cache.RemoveFile("a");
401 // Both blocks from "b" should still be there.
402 TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
403 EXPECT_EQ(out, b);
404 TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
405 EXPECT_EQ(out, B);
406 EXPECT_EQ(calls, 4);
407 // The blocks from "a" should not be there.
408 TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
409 EXPECT_EQ(out, a);
410 EXPECT_EQ(calls, 5);
411 TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
412 EXPECT_EQ(out, A);
413 EXPECT_EQ(calls, 6);
414 }
415
TEST(RamFileBlockCacheTest,Prune)416 TEST(RamFileBlockCacheTest, Prune) {
417 int calls = 0;
418 auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
419 char* buffer, size_t* bytes_transferred) {
420 calls++;
421 memset(buffer, 'x', n);
422 *bytes_transferred = n;
423 return Status::OK();
424 };
425 std::vector<char> out;
426 // Our fake environment is initialized with the current timestamp.
427 std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv);
428 uint64 now = Env::Default()->NowSeconds();
429 env->SetNowSeconds(now);
430 RamFileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get());
431 // Read three blocks into the cache, and advance the timestamp by one second
432 // with each read. Start with a block of "a" at the current timestamp `now`.
433 TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out));
434 // Now load a block of a different file "b" at timestamp `now` + 1
435 env->SetNowSeconds(now + 1);
436 TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
437 // Now load a different block of file "a" at timestamp `now` + 1. When the
438 // first block of "a" expires, this block should also be removed because it
439 // also belongs to file "a".
440 TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out));
441 // Ensure that all blocks are in the cache (i.e. reads are cache hits).
442 EXPECT_EQ(cache.CacheSize(), 24);
443 EXPECT_EQ(calls, 3);
444 TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out));
445 TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
446 TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out));
447 EXPECT_EQ(calls, 3);
448 // Advance the fake timestamp so that "a" becomes stale via its first block.
449 env->SetNowSeconds(now + 2);
450 // The pruning thread periodically compares env->NowSeconds() with the oldest
451 // block's timestamp to see if it should evict any files. At the current fake
452 // timestamp of `now` + 2, file "a" is stale because its first block is stale,
453 // but file "b" is not stale yet. Thus, once the pruning thread wakes up (in
454 // one second of wall time), it should remove "a" and leave "b" alone.
455 uint64 start = Env::Default()->NowSeconds();
456 do {
457 Env::Default()->SleepForMicroseconds(100000);
458 } while (cache.CacheSize() == 24 && Env::Default()->NowSeconds() - start < 3);
459 // There should be one block left in the cache, and it should be the first
460 // block of "b".
461 EXPECT_EQ(cache.CacheSize(), 8);
462 TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
463 EXPECT_EQ(calls, 3);
464 // Advance the fake time to `now` + 3, at which point "b" becomes stale.
465 env->SetNowSeconds(now + 3);
466 // Wait for the pruner to remove "b".
467 start = Env::Default()->NowSeconds();
468 do {
469 Env::Default()->SleepForMicroseconds(100000);
470 } while (cache.CacheSize() == 8 && Env::Default()->NowSeconds() - start < 3);
471 // The cache should now be empty.
472 EXPECT_EQ(cache.CacheSize(), 0);
473 }
474
TEST(RamFileBlockCacheTest,ParallelReads)475 TEST(RamFileBlockCacheTest, ParallelReads) {
476 // This fetcher won't respond until either `callers` threads are calling it
477 // concurrently (at which point it will respond with success to all callers),
478 // or 10 seconds have elapsed (at which point it will respond with an error).
479 const int callers = 4;
480 BlockingCounter counter(callers);
481 auto fetcher = [&counter](const string& filename, size_t offset, size_t n,
482 char* buffer, size_t* bytes_transferred) {
483 counter.DecrementCount();
484 if (!counter.WaitFor(std::chrono::seconds(10))) {
485 // This avoids having the test time out, which is harder to debug.
486 return errors::FailedPrecondition("desired concurrency not reached");
487 }
488 memset(buffer, 'x', n);
489 *bytes_transferred = n;
490 return Status::OK();
491 };
492 const int block_size = 8;
493 RamFileBlockCache cache(block_size, 2 * callers * block_size, 0, fetcher);
494 std::vector<std::unique_ptr<Thread>> threads;
495 for (int i = 0; i < callers; i++) {
496 threads.emplace_back(
497 Env::Default()->StartThread({}, "caller", [&cache, i, block_size]() {
498 std::vector<char> out;
499 TF_EXPECT_OK(
500 ReadCache(&cache, "a", i * block_size, block_size, &out));
501 std::vector<char> x(block_size, 'x');
502 EXPECT_EQ(out, x);
503 }));
504 }
505 // The `threads` destructor blocks until the threads can be joined, once their
506 // respective reads finish (which happens once they are all concurrently being
507 // executed, or 10 seconds have passed).
508 }
509
TEST(RamFileBlockCacheTest,CoalesceConcurrentReads)510 TEST(RamFileBlockCacheTest, CoalesceConcurrentReads) {
511 // Concurrent reads to the same file blocks should be de-duplicated.
512 const size_t block_size = 16;
513 int num_requests = 0;
514 Notification notification;
515 auto fetcher = [&num_requests, ¬ification, block_size](
516 const string& filename, size_t offset, size_t n,
517 char* buffer, size_t* bytes_transferred) {
518 EXPECT_EQ(n, block_size);
519 EXPECT_EQ(offset, 0);
520 num_requests++;
521 memset(buffer, 'x', n);
522 *bytes_transferred = n;
523 notification.Notify();
524 // Wait for other thread to issue read.
525 Env::Default()->SleepForMicroseconds(100000); // 0.1 secs
526 return Status::OK();
527 };
528 RamFileBlockCache cache(block_size, block_size, 0, fetcher);
529 // Fork off thread for parallel read.
530 std::unique_ptr<Thread> concurrent(
531 Env::Default()->StartThread({}, "concurrent", [&cache, block_size] {
532 std::vector<char> out;
533 TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size / 2, &out));
534 EXPECT_EQ(out.size(), block_size / 2);
535 }));
536 notification.WaitForNotification();
537 std::vector<char> out;
538 TF_EXPECT_OK(ReadCache(&cache, "", block_size / 2, block_size / 2, &out));
539 EXPECT_EQ(out.size(), block_size / 2);
540
541 EXPECT_EQ(1, num_requests);
542 }
543
TEST(RamFileBlockCacheTest,Flush)544 TEST(RamFileBlockCacheTest, Flush) {
545 int calls = 0;
546 auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
547 char* buffer, size_t* bytes_transferred) {
548 calls++;
549 memset(buffer, 'x', n);
550 *bytes_transferred = n;
551 return Status::OK();
552 };
553 RamFileBlockCache cache(16, 32, 0, fetcher);
554 std::vector<char> out;
555 TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
556 TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
557 EXPECT_EQ(calls, 1);
558 cache.Flush();
559 TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
560 EXPECT_EQ(calls, 2);
561 }
562
563 } // namespace
564 } // namespace tensorflow
565