• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
17 
18 #include <cstring>
19 
20 #include "tensorflow/core/lib/core/status_test_util.h"
21 #include "tensorflow/core/platform/blocking_counter.h"
22 #include "tensorflow/core/platform/cloud/now_seconds_env.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/notification.h"
25 #include "tensorflow/core/platform/test.h"
26 
27 namespace tensorflow {
28 namespace {
29 
ReadCache(RamFileBlockCache * cache,const string & filename,size_t offset,size_t n,std::vector<char> * out)30 Status ReadCache(RamFileBlockCache* cache, const string& filename,
31                  size_t offset, size_t n, std::vector<char>* out) {
32   out->clear();
33   out->resize(n, 0);
34   size_t bytes_transferred = 0;
35   Status status =
36       cache->Read(filename, offset, n, out->data(), &bytes_transferred);
37   EXPECT_LE(bytes_transferred, n);
38   out->resize(bytes_transferred, n);
39   return status;
40 }
41 
TEST(RamFileBlockCacheTest,IsCacheEnabled)42 TEST(RamFileBlockCacheTest, IsCacheEnabled) {
43   auto fetcher = [](const string& filename, size_t offset, size_t n,
44                     char* buffer, size_t* bytes_transferred) {
45     // Do nothing.
46     return Status::OK();
47   };
48   RamFileBlockCache cache1(0, 0, 0, fetcher);
49   RamFileBlockCache cache2(16, 0, 0, fetcher);
50   RamFileBlockCache cache3(0, 32, 0, fetcher);
51   RamFileBlockCache cache4(16, 32, 0, fetcher);
52 
53   EXPECT_FALSE(cache1.IsCacheEnabled());
54   EXPECT_FALSE(cache2.IsCacheEnabled());
55   EXPECT_FALSE(cache3.IsCacheEnabled());
56   EXPECT_TRUE(cache4.IsCacheEnabled());
57 }
58 
TEST(RamFileBlockCacheTest,ValidateAndUpdateFileSignature)59 TEST(RamFileBlockCacheTest, ValidateAndUpdateFileSignature) {
60   int calls = 0;
61   auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
62                           char* buffer, size_t* bytes_transferred) {
63     calls++;
64     memset(buffer, 'x', n);
65     *bytes_transferred = n;
66     return Status::OK();
67   };
68   string filename = "file";
69   RamFileBlockCache cache(16, 32, 0, fetcher);
70   std::vector<char> out;
71 
72   // First read.
73   EXPECT_TRUE(cache.ValidateAndUpdateFileSignature(filename, 123));
74   TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
75   EXPECT_EQ(calls, 1);
76 
77   // Second read. Hit cache.
78   EXPECT_TRUE(cache.ValidateAndUpdateFileSignature(filename, 123));
79   TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
80   EXPECT_EQ(calls, 1);
81 
82   // Third read. File signatures are different.
83   EXPECT_FALSE(cache.ValidateAndUpdateFileSignature(filename, 321));
84   TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
85   EXPECT_EQ(calls, 2);
86 }
87 
TEST(RamFileBlockCacheTest,PassThrough)88 TEST(RamFileBlockCacheTest, PassThrough) {
89   const string want_filename = "foo/bar";
90   const size_t want_offset = 42;
91   const size_t want_n = 1024;
92   int calls = 0;
93   auto fetcher = [&calls, want_filename, want_offset, want_n](
94                      const string& got_filename, size_t got_offset,
95                      size_t got_n, char* buffer, size_t* bytes_transferred) {
96     EXPECT_EQ(got_filename, want_filename);
97     EXPECT_EQ(got_offset, want_offset);
98     EXPECT_EQ(got_n, want_n);
99     calls++;
100     memset(buffer, 'x', got_n);
101     *bytes_transferred = got_n;
102     return Status::OK();
103   };
104   // If block_size, max_bytes, or both are zero, or want_n is larger than
105   // max_bytes the cache is a pass-through.
106   RamFileBlockCache cache1(1, 0, 0, fetcher);
107   RamFileBlockCache cache2(0, 1, 0, fetcher);
108   RamFileBlockCache cache3(0, 0, 0, fetcher);
109   RamFileBlockCache cache4(1000, 1000, 0, fetcher);
110   std::vector<char> out;
111   TF_EXPECT_OK(ReadCache(&cache1, want_filename, want_offset, want_n, &out));
112   EXPECT_EQ(calls, 1);
113   TF_EXPECT_OK(ReadCache(&cache2, want_filename, want_offset, want_n, &out));
114   EXPECT_EQ(calls, 2);
115   TF_EXPECT_OK(ReadCache(&cache3, want_filename, want_offset, want_n, &out));
116   EXPECT_EQ(calls, 3);
117   TF_EXPECT_OK(ReadCache(&cache4, want_filename, want_offset, want_n, &out));
118   EXPECT_EQ(calls, 4);
119 }
120 
TEST(RamFileBlockCacheTest,BlockAlignment)121 TEST(RamFileBlockCacheTest, BlockAlignment) {
122   // Initialize a 256-byte buffer.  This is the file underlying the reads we'll
123   // do in this test.
124   const size_t size = 256;
125   std::vector<char> buf;
126   for (int i = 0; i < size; i++) {
127     buf.push_back(i);
128   }
129   // The fetcher just fetches slices of the buffer.
130   auto fetcher = [&buf](const string& filename, size_t offset, size_t n,
131                         char* buffer, size_t* bytes_transferred) {
132     if (offset < buf.size()) {
133       size_t bytes_to_copy = std::min<size_t>(buf.size() - offset, n);
134       memcpy(buffer, buf.data() + offset, bytes_to_copy);
135       *bytes_transferred = bytes_to_copy;
136     } else {
137       *bytes_transferred = 0;
138     }
139     return Status::OK();
140   };
141   for (size_t block_size = 2; block_size <= 4; block_size++) {
142     // Make a cache of N-byte block size (1 block) and verify that reads of
143     // varying offsets and lengths return correct data.
144     RamFileBlockCache cache(block_size, block_size, 0, fetcher);
145     for (size_t offset = 0; offset < 10; offset++) {
146       for (size_t n = block_size - 2; n <= block_size + 2; n++) {
147         std::vector<char> got;
148         TF_EXPECT_OK(ReadCache(&cache, "", offset, n, &got));
149         // Verify the size of the read.
150         if (offset + n <= size) {
151           // Expect a full read.
152           EXPECT_EQ(got.size(), n) << "block size = " << block_size
153                                    << ", offset = " << offset << ", n = " << n;
154         } else {
155           // Expect a partial read.
156           EXPECT_EQ(got.size(), size - offset)
157               << "block size = " << block_size << ", offset = " << offset
158               << ", n = " << n;
159         }
160         // Verify the contents of the read.
161         std::vector<char>::const_iterator begin = buf.begin() + offset;
162         std::vector<char>::const_iterator end =
163             offset + n > buf.size() ? buf.end() : begin + n;
164         std::vector<char> want(begin, end);
165         EXPECT_EQ(got, want) << "block size = " << block_size
166                              << ", offset = " << offset << ", n = " << n;
167       }
168     }
169   }
170 }
171 
TEST(RamFileBlockCacheTest,CacheHits)172 TEST(RamFileBlockCacheTest, CacheHits) {
173   const size_t block_size = 16;
174   std::set<size_t> calls;
175   auto fetcher = [&calls, block_size](const string& filename, size_t offset,
176                                       size_t n, char* buffer,
177                                       size_t* bytes_transferred) {
178     EXPECT_EQ(n, block_size);
179     EXPECT_EQ(offset % block_size, 0);
180     EXPECT_EQ(calls.find(offset), calls.end()) << "at offset " << offset;
181     calls.insert(offset);
182     memset(buffer, 'x', n);
183     *bytes_transferred = n;
184     return Status::OK();
185   };
186   const uint32 block_count = 256;
187   RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
188   std::vector<char> out;
189   out.resize(block_count, 0);
190   // The cache has space for `block_count` blocks. The loop with i = 0 should
191   // fill the cache, and the loop with i = 1 should be all cache hits. The
192   // fetcher checks that it is called once and only once for each offset (to
193   // fetch the corresponding block).
194   for (int i = 0; i < 2; i++) {
195     for (int j = 0; j < block_count; j++) {
196       TF_EXPECT_OK(ReadCache(&cache, "", block_size * j, block_size, &out));
197     }
198   }
199 }
200 
TEST(RamFileBlockCacheTest,OutOfRange)201 TEST(RamFileBlockCacheTest, OutOfRange) {
202   // Tests reads of a 24-byte file with block size 16.
203   const size_t block_size = 16;
204   const size_t file_size = 24;
205   bool first_block = false;
206   bool second_block = false;
207   auto fetcher = [block_size, file_size, &first_block, &second_block](
208                      const string& filename, size_t offset, size_t n,
209                      char* buffer, size_t* bytes_transferred) {
210     EXPECT_EQ(n, block_size);
211     EXPECT_EQ(offset % block_size, 0);
212     size_t bytes_to_copy = 0;
213     if (offset == 0) {
214       // The first block (16 bytes) of the file.
215       memset(buffer, 'x', n);
216       bytes_to_copy = n;
217       first_block = true;
218     } else if (offset == block_size) {
219       // The second block (8 bytes) of the file.
220       bytes_to_copy = file_size - block_size;
221       memset(buffer, 'x', bytes_to_copy);
222       second_block = true;
223     }
224     *bytes_transferred = bytes_to_copy;
225     return Status::OK();
226   };
227   RamFileBlockCache cache(block_size, block_size, 0, fetcher);
228   std::vector<char> out;
229   // Reading the first 16 bytes should be fine.
230   TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size, &out));
231   EXPECT_TRUE(first_block);
232   EXPECT_EQ(out.size(), block_size);
233   // Reading at offset file_size + 4 will read the second block (since the read
234   // at file_size + 4 = 28 will be aligned to an offset of 16) but will return
235   // OutOfRange because the offset is past the end of the 24-byte file.
236   Status status = ReadCache(&cache, "", file_size + 4, 4, &out);
237   EXPECT_EQ(status.code(), error::OUT_OF_RANGE);
238   EXPECT_TRUE(second_block);
239   // Reading the second full block will return 8 bytes, from a cache hit.
240   second_block = false;
241   TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out));
242   EXPECT_FALSE(second_block);
243   EXPECT_EQ(out.size(), file_size - block_size);
244 }
245 
TEST(RamFileBlockCacheTest,Inconsistent)246 TEST(RamFileBlockCacheTest, Inconsistent) {
247   // Tests the detection of interrupted reads leading to partially filled blocks
248   // where we expected complete blocks.
249   const size_t block_size = 16;
250   // This fetcher returns OK but only fills in one byte for any offset.
251   auto fetcher = [block_size](const string& filename, size_t offset, size_t n,
252                               char* buffer, size_t* bytes_transferred) {
253     EXPECT_EQ(n, block_size);
254     EXPECT_EQ(offset % block_size, 0);
255     EXPECT_GE(n, 1);
256     memset(buffer, 'x', 1);
257     *bytes_transferred = 1;
258     return Status::OK();
259   };
260   RamFileBlockCache cache(block_size, 2 * block_size, 0, fetcher);
261   std::vector<char> out;
262   // Read the second block; this should yield an OK status and a single byte.
263   TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out));
264   EXPECT_EQ(out.size(), 1);
265   // Now read the first block; this should yield an INTERNAL error because we
266   // had already cached a partial block at a later position.
267   Status status = ReadCache(&cache, "", 0, block_size, &out);
268   EXPECT_EQ(status.code(), error::INTERNAL);
269 }
270 
TEST(RamFileBlockCacheTest,LRU)271 TEST(RamFileBlockCacheTest, LRU) {
272   const size_t block_size = 16;
273   std::list<size_t> calls;
274   auto fetcher = [&calls, block_size](const string& filename, size_t offset,
275                                       size_t n, char* buffer,
276                                       size_t* bytes_transferred) {
277     EXPECT_EQ(n, block_size);
278     EXPECT_FALSE(calls.empty()) << "at offset = " << offset;
279     if (!calls.empty()) {
280       EXPECT_EQ(offset, calls.front());
281       calls.pop_front();
282     }
283     memset(buffer, 'x', n);
284     *bytes_transferred = n;
285     return Status::OK();
286   };
287   const uint32 block_count = 2;
288   RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
289   std::vector<char> out;
290   // Read blocks from the cache, and verify the LRU behavior based on the
291   // fetcher calls that the cache makes.
292   calls.push_back(0);
293   // Cache miss - drains an element from `calls`.
294   TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
295   // Cache hit - does not drain an element from `calls`.
296   TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
297   calls.push_back(block_size);
298   // Cache miss followed by cache hit.
299   TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
300   TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
301   calls.push_back(2 * block_size);
302   // Cache miss followed by cache hit.  Causes eviction of LRU element.
303   TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
304   TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
305   // LRU element was at offset 0.  Cache miss.
306   calls.push_back(0);
307   TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
308   // Element at 2 * block_size is still in cache, and this read should update
309   // its position in the LRU list so it doesn't get evicted by the next read.
310   TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
311   // Element at block_size was evicted.  Reading this element will also cause
312   // the LRU element (at 0) to be evicted.
313   calls.push_back(block_size);
314   TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
315   // Element at 0 was evicted again.
316   calls.push_back(0);
317   TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
318 }
319 
TEST(RamFileBlockCacheTest,MaxStaleness)320 TEST(RamFileBlockCacheTest, MaxStaleness) {
321   int calls = 0;
322   auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
323                           char* buffer, size_t* bytes_transferred) {
324     calls++;
325     memset(buffer, 'x', n);
326     *bytes_transferred = n;
327     return Status::OK();
328   };
329   std::vector<char> out;
330   std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv);
331   // Create a cache with max staleness of 2 seconds, and verify that it works as
332   // expected.
333   RamFileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get());
334   // Execute the first read to load the block.
335   TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out));
336   EXPECT_EQ(calls, 1);
337   // Now advance the clock one second at a time and redo the read. The call
338   // count should advance every 3 seconds (i.e. every time the staleness is
339   // greater than 2).
340   for (int i = 1; i <= 10; i++) {
341     env->SetNowSeconds(i + 1);
342     TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out));
343     EXPECT_EQ(calls, 1 + i / 3);
344   }
345   // Now create a cache with max staleness of 0, and verify that it also works
346   // as expected.
347   calls = 0;
348   env->SetNowSeconds(0);
349   RamFileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get());
350   // Execute the first read to load the block.
351   TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out));
352   EXPECT_EQ(calls, 1);
353   // Advance the clock by a huge amount and verify that the cached block is
354   // used to satisfy the read.
355   env->SetNowSeconds(365 * 24 * 60 * 60);  // ~1 year, just for fun.
356   TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out));
357   EXPECT_EQ(calls, 1);
358 }
359 
TEST(RamFileBlockCacheTest,RemoveFile)360 TEST(RamFileBlockCacheTest, RemoveFile) {
361   int calls = 0;
362   auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
363                           char* buffer, size_t* bytes_transferred) {
364     calls++;
365     char c = (filename == "a") ? 'a' : (filename == "b") ? 'b' : 'x';
366     if (offset > 0) {
367       // The first block is lower case and all subsequent blocks are upper case.
368       c = toupper(c);
369     }
370     memset(buffer, c, n);
371     *bytes_transferred = n;
372     return Status::OK();
373   };
374   // This cache has space for 4 blocks; we'll read from two files.
375   const size_t n = 3;
376   RamFileBlockCache cache(8, 32, 0, fetcher);
377   std::vector<char> out;
378   std::vector<char> a(n, 'a');
379   std::vector<char> b(n, 'b');
380   std::vector<char> A(n, 'A');
381   std::vector<char> B(n, 'B');
382   // Fill the cache.
383   TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
384   EXPECT_EQ(out, a);
385   EXPECT_EQ(calls, 1);
386   TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
387   EXPECT_EQ(out, A);
388   EXPECT_EQ(calls, 2);
389   TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
390   EXPECT_EQ(out, b);
391   EXPECT_EQ(calls, 3);
392   TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
393   EXPECT_EQ(out, B);
394   EXPECT_EQ(calls, 4);
395   // All four blocks should be in the cache now.
396   TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
397   EXPECT_EQ(out, a);
398   TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
399   EXPECT_EQ(out, A);
400   TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
401   EXPECT_EQ(out, b);
402   TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
403   EXPECT_EQ(out, B);
404   EXPECT_EQ(calls, 4);
405   // Remove the blocks from "a".
406   cache.RemoveFile("a");
407   // Both blocks from "b" should still be there.
408   TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
409   EXPECT_EQ(out, b);
410   TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
411   EXPECT_EQ(out, B);
412   EXPECT_EQ(calls, 4);
413   // The blocks from "a" should not be there.
414   TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
415   EXPECT_EQ(out, a);
416   EXPECT_EQ(calls, 5);
417   TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
418   EXPECT_EQ(out, A);
419   EXPECT_EQ(calls, 6);
420 }
421 
TEST(RamFileBlockCacheTest,Prune)422 TEST(RamFileBlockCacheTest, Prune) {
423   int calls = 0;
424   auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
425                           char* buffer, size_t* bytes_transferred) {
426     calls++;
427     memset(buffer, 'x', n);
428     *bytes_transferred = n;
429     return Status::OK();
430   };
431   std::vector<char> out;
432   // Our fake environment is initialized with the current timestamp.
433   std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv);
434   uint64 now = Env::Default()->NowSeconds();
435   env->SetNowSeconds(now);
436   RamFileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get());
437   // Read three blocks into the cache, and advance the timestamp by one second
438   // with each read. Start with a block of "a" at the current timestamp `now`.
439   TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out));
440   // Now load a block of a different file "b" at timestamp `now` + 1
441   env->SetNowSeconds(now + 1);
442   TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
443   // Now load a different block of file "a" at timestamp `now` + 1. When the
444   // first block of "a" expires, this block should also be removed because it
445   // also belongs to file "a".
446   TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out));
447   // Ensure that all blocks are in the cache (i.e. reads are cache hits).
448   EXPECT_EQ(cache.CacheSize(), 24);
449   EXPECT_EQ(calls, 3);
450   TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out));
451   TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
452   TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out));
453   EXPECT_EQ(calls, 3);
454   // Advance the fake timestamp so that "a" becomes stale via its first block.
455   env->SetNowSeconds(now + 2);
456   // The pruning thread periodically compares env->NowSeconds() with the oldest
457   // block's timestamp to see if it should evict any files. At the current fake
458   // timestamp of `now` + 2, file "a" is stale because its first block is stale,
459   // but file "b" is not stale yet. Thus, once the pruning thread wakes up (in
460   // one second of wall time), it should remove "a" and leave "b" alone.
461   uint64 start = Env::Default()->NowSeconds();
462   do {
463     Env::Default()->SleepForMicroseconds(100000);
464   } while (cache.CacheSize() == 24 && Env::Default()->NowSeconds() - start < 3);
465   // There should be one block left in the cache, and it should be the first
466   // block of "b".
467   EXPECT_EQ(cache.CacheSize(), 8);
468   TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
469   EXPECT_EQ(calls, 3);
470   // Advance the fake time to `now` + 3, at which point "b" becomes stale.
471   env->SetNowSeconds(now + 3);
472   // Wait for the pruner to remove "b".
473   start = Env::Default()->NowSeconds();
474   do {
475     Env::Default()->SleepForMicroseconds(100000);
476   } while (cache.CacheSize() == 8 && Env::Default()->NowSeconds() - start < 3);
477   // The cache should now be empty.
478   EXPECT_EQ(cache.CacheSize(), 0);
479 }
480 
TEST(RamFileBlockCacheTest,ParallelReads)481 TEST(RamFileBlockCacheTest, ParallelReads) {
482   // This fetcher won't respond until either `callers` threads are calling it
483   // concurrently (at which point it will respond with success to all callers),
484   // or 10 seconds have elapsed (at which point it will respond with an error).
485   const int callers = 4;
486   BlockingCounter counter(callers);
487   auto fetcher = [&counter](const string& filename, size_t offset, size_t n,
488                             char* buffer, size_t* bytes_transferred) {
489     counter.DecrementCount();
490     if (!counter.WaitFor(std::chrono::seconds(10))) {
491       // This avoids having the test time out, which is harder to debug.
492       return errors::FailedPrecondition("desired concurrency not reached");
493     }
494     memset(buffer, 'x', n);
495     *bytes_transferred = n;
496     return Status::OK();
497   };
498   const int block_size = 8;
499   RamFileBlockCache cache(block_size, 2 * callers * block_size, 0, fetcher);
500   std::vector<std::unique_ptr<Thread>> threads;
501   for (int i = 0; i < callers; i++) {
502     threads.emplace_back(
503         Env::Default()->StartThread({}, "caller", [&cache, i, block_size]() {
504           std::vector<char> out;
505           TF_EXPECT_OK(
506               ReadCache(&cache, "a", i * block_size, block_size, &out));
507           std::vector<char> x(block_size, 'x');
508           EXPECT_EQ(out, x);
509         }));
510   }
511   // The `threads` destructor blocks until the threads can be joined, once their
512   // respective reads finish (which happens once they are all concurrently being
513   // executed, or 10 seconds have passed).
514 }
515 
TEST(RamFileBlockCacheTest,CoalesceConcurrentReads)516 TEST(RamFileBlockCacheTest, CoalesceConcurrentReads) {
517   // Concurrent reads to the same file blocks should be de-duplicated.
518   const size_t block_size = 16;
519   int num_requests = 0;
520   Notification notification;
521   auto fetcher = [&num_requests, &notification, block_size](
522                      const string& filename, size_t offset, size_t n,
523                      char* buffer, size_t* bytes_transferred) {
524     EXPECT_EQ(n, block_size);
525     EXPECT_EQ(offset, 0);
526     num_requests++;
527     memset(buffer, 'x', n);
528     *bytes_transferred = n;
529     notification.Notify();
530     // Wait for other thread to issue read.
531     Env::Default()->SleepForMicroseconds(100000);  // 0.1 secs
532     return Status::OK();
533   };
534   RamFileBlockCache cache(block_size, block_size, 0, fetcher);
535   // Fork off thread for parallel read.
536   std::unique_ptr<Thread> concurrent(
537       Env::Default()->StartThread({}, "concurrent", [&cache, block_size] {
538         std::vector<char> out;
539         TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size / 2, &out));
540         EXPECT_EQ(out.size(), block_size / 2);
541       }));
542   notification.WaitForNotification();
543   std::vector<char> out;
544   TF_EXPECT_OK(ReadCache(&cache, "", block_size / 2, block_size / 2, &out));
545   EXPECT_EQ(out.size(), block_size / 2);
546 
547   EXPECT_EQ(1, num_requests);
548 }
549 
TEST(RamFileBlockCacheTest,Flush)550 TEST(RamFileBlockCacheTest, Flush) {
551   int calls = 0;
552   auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
553                           char* buffer, size_t* bytes_transferred) {
554     calls++;
555     memset(buffer, 'x', n);
556     *bytes_transferred = n;
557     return Status::OK();
558   };
559   RamFileBlockCache cache(16, 32, 0, fetcher);
560   std::vector<char> out;
561   TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
562   TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
563   EXPECT_EQ(calls, 1);
564   cache.Flush();
565   TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
566   EXPECT_EQ(calls, 2);
567 }
568 
569 }  // namespace
570 }  // namespace tensorflow
571