• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/cloud/ram_file_block_cache.h"
17 #include <cstring>
18 #include "tensorflow/core/lib/core/blocking_counter.h"
19 #include "tensorflow/core/lib/core/status_test_util.h"
20 #include "tensorflow/core/platform/cloud/now_seconds_env.h"
21 #include "tensorflow/core/platform/env.h"
22 #include "tensorflow/core/platform/notification.h"
23 #include "tensorflow/core/platform/test.h"
24 
25 namespace tensorflow {
26 namespace {
27 
ReadCache(RamFileBlockCache * cache,const string & filename,size_t offset,size_t n,std::vector<char> * out)28 Status ReadCache(RamFileBlockCache* cache, const string& filename,
29                  size_t offset, size_t n, std::vector<char>* out) {
30   out->clear();
31   out->resize(n, 0);
32   size_t bytes_transferred = 0;
33   Status status =
34       cache->Read(filename, offset, n, out->data(), &bytes_transferred);
35   EXPECT_LE(bytes_transferred, n);
36   out->resize(bytes_transferred, n);
37   return status;
38 }
39 
TEST(RamFileBlockCacheTest,IsCacheEnabled)40 TEST(RamFileBlockCacheTest, IsCacheEnabled) {
41   auto fetcher = [](const string& filename, size_t offset, size_t n,
42                     char* buffer, size_t* bytes_transferred) {
43     // Do nothing.
44     return Status::OK();
45   };
46   RamFileBlockCache cache1(0, 0, 0, fetcher);
47   RamFileBlockCache cache2(16, 0, 0, fetcher);
48   RamFileBlockCache cache3(0, 32, 0, fetcher);
49   RamFileBlockCache cache4(16, 32, 0, fetcher);
50 
51   EXPECT_FALSE(cache1.IsCacheEnabled());
52   EXPECT_FALSE(cache2.IsCacheEnabled());
53   EXPECT_FALSE(cache3.IsCacheEnabled());
54   EXPECT_TRUE(cache4.IsCacheEnabled());
55 }
56 
TEST(RamFileBlockCacheTest,ValidateAndUpdateFileSignature)57 TEST(RamFileBlockCacheTest, ValidateAndUpdateFileSignature) {
58   int calls = 0;
59   auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
60                           char* buffer, size_t* bytes_transferred) {
61     calls++;
62     memset(buffer, 'x', n);
63     *bytes_transferred = n;
64     return Status::OK();
65   };
66   string filename = "file";
67   RamFileBlockCache cache(16, 32, 0, fetcher);
68   std::vector<char> out;
69 
70   // First read.
71   EXPECT_TRUE(cache.ValidateAndUpdateFileSignature(filename, 123));
72   TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
73   EXPECT_EQ(calls, 1);
74 
75   // Second read. Hit cache.
76   EXPECT_TRUE(cache.ValidateAndUpdateFileSignature(filename, 123));
77   TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
78   EXPECT_EQ(calls, 1);
79 
80   // Third read. File signatures are different.
81   EXPECT_FALSE(cache.ValidateAndUpdateFileSignature(filename, 321));
82   TF_EXPECT_OK(ReadCache(&cache, filename, 0, 16, &out));
83   EXPECT_EQ(calls, 2);
84 }
85 
TEST(RamFileBlockCacheTest,PassThrough)86 TEST(RamFileBlockCacheTest, PassThrough) {
87   const string want_filename = "foo/bar";
88   const size_t want_offset = 42;
89   const size_t want_n = 1024;
90   int calls = 0;
91   auto fetcher = [&calls, want_filename, want_offset, want_n](
92                      const string& got_filename, size_t got_offset,
93                      size_t got_n, char* buffer, size_t* bytes_transferred) {
94     EXPECT_EQ(got_filename, want_filename);
95     EXPECT_EQ(got_offset, want_offset);
96     EXPECT_EQ(got_n, want_n);
97     calls++;
98     memset(buffer, 'x', got_n);
99     *bytes_transferred = got_n;
100     return Status::OK();
101   };
102   // If block_size, max_bytes, or both are zero, the cache is a pass-through.
103   RamFileBlockCache cache1(1, 0, 0, fetcher);
104   RamFileBlockCache cache2(0, 1, 0, fetcher);
105   RamFileBlockCache cache3(0, 0, 0, fetcher);
106   std::vector<char> out;
107   TF_EXPECT_OK(ReadCache(&cache1, want_filename, want_offset, want_n, &out));
108   EXPECT_EQ(calls, 1);
109   TF_EXPECT_OK(ReadCache(&cache2, want_filename, want_offset, want_n, &out));
110   EXPECT_EQ(calls, 2);
111   TF_EXPECT_OK(ReadCache(&cache3, want_filename, want_offset, want_n, &out));
112   EXPECT_EQ(calls, 3);
113 }
114 
TEST(RamFileBlockCacheTest,BlockAlignment)115 TEST(RamFileBlockCacheTest, BlockAlignment) {
116   // Initialize a 256-byte buffer.  This is the file underlying the reads we'll
117   // do in this test.
118   const size_t size = 256;
119   std::vector<char> buf;
120   for (int i = 0; i < size; i++) {
121     buf.push_back(i);
122   }
123   // The fetcher just fetches slices of the buffer.
124   auto fetcher = [&buf](const string& filename, size_t offset, size_t n,
125                         char* buffer, size_t* bytes_transferred) {
126     if (offset < buf.size()) {
127       size_t bytes_to_copy = std::min<size_t>(buf.size() - offset, n);
128       memcpy(buffer, buf.data() + offset, bytes_to_copy);
129       *bytes_transferred = bytes_to_copy;
130     } else {
131       *bytes_transferred = 0;
132     }
133     return Status::OK();
134   };
135   for (size_t block_size = 2; block_size <= 4; block_size++) {
136     // Make a cache of N-byte block size (1 block) and verify that reads of
137     // varying offsets and lengths return correct data.
138     RamFileBlockCache cache(block_size, block_size, 0, fetcher);
139     for (size_t offset = 0; offset < 10; offset++) {
140       for (size_t n = block_size - 2; n <= block_size + 2; n++) {
141         std::vector<char> got;
142         TF_EXPECT_OK(ReadCache(&cache, "", offset, n, &got));
143         // Verify the size of the read.
144         if (offset + n <= size) {
145           // Expect a full read.
146           EXPECT_EQ(got.size(), n) << "block size = " << block_size
147                                    << ", offset = " << offset << ", n = " << n;
148         } else {
149           // Expect a partial read.
150           EXPECT_EQ(got.size(), size - offset)
151               << "block size = " << block_size << ", offset = " << offset
152               << ", n = " << n;
153         }
154         // Verify the contents of the read.
155         std::vector<char>::const_iterator begin = buf.begin() + offset;
156         std::vector<char>::const_iterator end =
157             offset + n > buf.size() ? buf.end() : begin + n;
158         std::vector<char> want(begin, end);
159         EXPECT_EQ(got, want) << "block size = " << block_size
160                              << ", offset = " << offset << ", n = " << n;
161       }
162     }
163   }
164 }
165 
TEST(RamFileBlockCacheTest,CacheHits)166 TEST(RamFileBlockCacheTest, CacheHits) {
167   const size_t block_size = 16;
168   std::set<size_t> calls;
169   auto fetcher = [&calls, block_size](const string& filename, size_t offset,
170                                       size_t n, char* buffer,
171                                       size_t* bytes_transferred) {
172     EXPECT_EQ(n, block_size);
173     EXPECT_EQ(offset % block_size, 0);
174     EXPECT_EQ(calls.find(offset), calls.end()) << "at offset " << offset;
175     calls.insert(offset);
176     memset(buffer, 'x', n);
177     *bytes_transferred = n;
178     return Status::OK();
179   };
180   const uint32 block_count = 256;
181   RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
182   std::vector<char> out;
183   out.resize(block_count, 0);
184   // The cache has space for `block_count` blocks. The loop with i = 0 should
185   // fill the cache, and the loop with i = 1 should be all cache hits. The
186   // fetcher checks that it is called once and only once for each offset (to
187   // fetch the corresponding block).
188   for (int i = 0; i < 2; i++) {
189     for (int j = 0; j < block_count; j++) {
190       TF_EXPECT_OK(ReadCache(&cache, "", block_size * j, block_size, &out));
191     }
192   }
193 }
194 
TEST(RamFileBlockCacheTest,OutOfRange)195 TEST(RamFileBlockCacheTest, OutOfRange) {
196   // Tests reads of a 24-byte file with block size 16.
197   const size_t block_size = 16;
198   const size_t file_size = 24;
199   bool first_block = false;
200   bool second_block = false;
201   auto fetcher = [block_size, file_size, &first_block, &second_block](
202                      const string& filename, size_t offset, size_t n,
203                      char* buffer, size_t* bytes_transferred) {
204     EXPECT_EQ(n, block_size);
205     EXPECT_EQ(offset % block_size, 0);
206     size_t bytes_to_copy = 0;
207     if (offset == 0) {
208       // The first block (16 bytes) of the file.
209       memset(buffer, 'x', n);
210       bytes_to_copy = n;
211       first_block = true;
212     } else if (offset == block_size) {
213       // The second block (8 bytes) of the file.
214       bytes_to_copy = file_size - block_size;
215       memset(buffer, 'x', bytes_to_copy);
216       second_block = true;
217     }
218     *bytes_transferred = bytes_to_copy;
219     return Status::OK();
220   };
221   RamFileBlockCache cache(block_size, block_size, 0, fetcher);
222   std::vector<char> out;
223   // Reading the first 16 bytes should be fine.
224   TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size, &out));
225   EXPECT_TRUE(first_block);
226   EXPECT_EQ(out.size(), block_size);
227   // Reading at offset file_size + 4 will read the second block (since the read
228   // at file_size + 4 = 28 will be aligned to an offset of 16) but will return
229   // OutOfRange because the offset is past the end of the 24-byte file.
230   Status status = ReadCache(&cache, "", file_size + 4, 4, &out);
231   EXPECT_EQ(status.code(), error::OUT_OF_RANGE);
232   EXPECT_TRUE(second_block);
233   // Reading the second full block will return 8 bytes, from a cache hit.
234   second_block = false;
235   TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out));
236   EXPECT_FALSE(second_block);
237   EXPECT_EQ(out.size(), file_size - block_size);
238 }
239 
TEST(RamFileBlockCacheTest,Inconsistent)240 TEST(RamFileBlockCacheTest, Inconsistent) {
241   // Tests the detection of interrupted reads leading to partially filled blocks
242   // where we expected complete blocks.
243   const size_t block_size = 16;
244   // This fetcher returns OK but only fills in one byte for any offset.
245   auto fetcher = [block_size](const string& filename, size_t offset, size_t n,
246                               char* buffer, size_t* bytes_transferred) {
247     EXPECT_EQ(n, block_size);
248     EXPECT_EQ(offset % block_size, 0);
249     EXPECT_GE(n, 1);
250     memset(buffer, 'x', 1);
251     *bytes_transferred = 1;
252     return Status::OK();
253   };
254   RamFileBlockCache cache(block_size, 2 * block_size, 0, fetcher);
255   std::vector<char> out;
256   // Read the second block; this should yield an OK status and a single byte.
257   TF_EXPECT_OK(ReadCache(&cache, "", block_size, block_size, &out));
258   EXPECT_EQ(out.size(), 1);
259   // Now read the first block; this should yield an INTERNAL error because we
260   // had already cached a partial block at a later position.
261   Status status = ReadCache(&cache, "", 0, block_size, &out);
262   EXPECT_EQ(status.code(), error::INTERNAL);
263 }
264 
TEST(RamFileBlockCacheTest,LRU)265 TEST(RamFileBlockCacheTest, LRU) {
266   const size_t block_size = 16;
267   std::list<size_t> calls;
268   auto fetcher = [&calls, block_size](const string& filename, size_t offset,
269                                       size_t n, char* buffer,
270                                       size_t* bytes_transferred) {
271     EXPECT_EQ(n, block_size);
272     EXPECT_FALSE(calls.empty()) << "at offset = " << offset;
273     if (!calls.empty()) {
274       EXPECT_EQ(offset, calls.front());
275       calls.pop_front();
276     }
277     memset(buffer, 'x', n);
278     *bytes_transferred = n;
279     return Status::OK();
280   };
281   const uint32 block_count = 2;
282   RamFileBlockCache cache(block_size, block_count * block_size, 0, fetcher);
283   std::vector<char> out;
284   // Read blocks from the cache, and verify the LRU behavior based on the
285   // fetcher calls that the cache makes.
286   calls.push_back(0);
287   // Cache miss - drains an element from `calls`.
288   TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
289   // Cache hit - does not drain an element from `calls`.
290   TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
291   calls.push_back(block_size);
292   // Cache miss followed by cache hit.
293   TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
294   TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
295   calls.push_back(2 * block_size);
296   // Cache miss followed by cache hit.  Causes eviction of LRU element.
297   TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
298   TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
299   // LRU element was at offset 0.  Cache miss.
300   calls.push_back(0);
301   TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
302   // Element at 2 * block_size is still in cache, and this read should update
303   // its position in the LRU list so it doesn't get evicted by the next read.
304   TF_EXPECT_OK(ReadCache(&cache, "", 2 * block_size, 1, &out));
305   // Element at block_size was evicted.  Reading this element will also cause
306   // the LRU element (at 0) to be evicted.
307   calls.push_back(block_size);
308   TF_EXPECT_OK(ReadCache(&cache, "", block_size, 1, &out));
309   // Element at 0 was evicted again.
310   calls.push_back(0);
311   TF_EXPECT_OK(ReadCache(&cache, "", 0, 1, &out));
312 }
313 
TEST(RamFileBlockCacheTest,MaxStaleness)314 TEST(RamFileBlockCacheTest, MaxStaleness) {
315   int calls = 0;
316   auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
317                           char* buffer, size_t* bytes_transferred) {
318     calls++;
319     memset(buffer, 'x', n);
320     *bytes_transferred = n;
321     return Status::OK();
322   };
323   std::vector<char> out;
324   std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv);
325   // Create a cache with max staleness of 2 seconds, and verify that it works as
326   // expected.
327   RamFileBlockCache cache1(8, 16, 2 /* max staleness */, fetcher, env.get());
328   // Execute the first read to load the block.
329   TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out));
330   EXPECT_EQ(calls, 1);
331   // Now advance the clock one second at a time and redo the read. The call
332   // count should advance every 3 seconds (i.e. every time the staleness is
333   // greater than 2).
334   for (int i = 1; i <= 10; i++) {
335     env->SetNowSeconds(i + 1);
336     TF_EXPECT_OK(ReadCache(&cache1, "", 0, 1, &out));
337     EXPECT_EQ(calls, 1 + i / 3);
338   }
339   // Now create a cache with max staleness of 0, and verify that it also works
340   // as expected.
341   calls = 0;
342   env->SetNowSeconds(0);
343   RamFileBlockCache cache2(8, 16, 0 /* max staleness */, fetcher, env.get());
344   // Execute the first read to load the block.
345   TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out));
346   EXPECT_EQ(calls, 1);
347   // Advance the clock by a huge amount and verify that the cached block is
348   // used to satisfy the read.
349   env->SetNowSeconds(365 * 24 * 60 * 60);  // ~1 year, just for fun.
350   TF_EXPECT_OK(ReadCache(&cache2, "", 0, 1, &out));
351   EXPECT_EQ(calls, 1);
352 }
353 
TEST(RamFileBlockCacheTest,RemoveFile)354 TEST(RamFileBlockCacheTest, RemoveFile) {
355   int calls = 0;
356   auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
357                           char* buffer, size_t* bytes_transferred) {
358     calls++;
359     char c = (filename == "a") ? 'a' : (filename == "b") ? 'b' : 'x';
360     if (offset > 0) {
361       // The first block is lower case and all subsequent blocks are upper case.
362       c = toupper(c);
363     }
364     memset(buffer, c, n);
365     *bytes_transferred = n;
366     return Status::OK();
367   };
368   // This cache has space for 4 blocks; we'll read from two files.
369   const size_t n = 3;
370   RamFileBlockCache cache(8, 32, 0, fetcher);
371   std::vector<char> out;
372   std::vector<char> a(n, 'a');
373   std::vector<char> b(n, 'b');
374   std::vector<char> A(n, 'A');
375   std::vector<char> B(n, 'B');
376   // Fill the cache.
377   TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
378   EXPECT_EQ(out, a);
379   EXPECT_EQ(calls, 1);
380   TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
381   EXPECT_EQ(out, A);
382   EXPECT_EQ(calls, 2);
383   TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
384   EXPECT_EQ(out, b);
385   EXPECT_EQ(calls, 3);
386   TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
387   EXPECT_EQ(out, B);
388   EXPECT_EQ(calls, 4);
389   // All four blocks should be in the cache now.
390   TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
391   EXPECT_EQ(out, a);
392   TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
393   EXPECT_EQ(out, A);
394   TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
395   EXPECT_EQ(out, b);
396   TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
397   EXPECT_EQ(out, B);
398   EXPECT_EQ(calls, 4);
399   // Remove the blocks from "a".
400   cache.RemoveFile("a");
401   // Both blocks from "b" should still be there.
402   TF_EXPECT_OK(ReadCache(&cache, "b", 0, n, &out));
403   EXPECT_EQ(out, b);
404   TF_EXPECT_OK(ReadCache(&cache, "b", 8, n, &out));
405   EXPECT_EQ(out, B);
406   EXPECT_EQ(calls, 4);
407   // The blocks from "a" should not be there.
408   TF_EXPECT_OK(ReadCache(&cache, "a", 0, n, &out));
409   EXPECT_EQ(out, a);
410   EXPECT_EQ(calls, 5);
411   TF_EXPECT_OK(ReadCache(&cache, "a", 8, n, &out));
412   EXPECT_EQ(out, A);
413   EXPECT_EQ(calls, 6);
414 }
415 
TEST(RamFileBlockCacheTest,Prune)416 TEST(RamFileBlockCacheTest, Prune) {
417   int calls = 0;
418   auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
419                           char* buffer, size_t* bytes_transferred) {
420     calls++;
421     memset(buffer, 'x', n);
422     *bytes_transferred = n;
423     return Status::OK();
424   };
425   std::vector<char> out;
426   // Our fake environment is initialized with the current timestamp.
427   std::unique_ptr<NowSecondsEnv> env(new NowSecondsEnv);
428   uint64 now = Env::Default()->NowSeconds();
429   env->SetNowSeconds(now);
430   RamFileBlockCache cache(8, 32, 1 /* max staleness */, fetcher, env.get());
431   // Read three blocks into the cache, and advance the timestamp by one second
432   // with each read. Start with a block of "a" at the current timestamp `now`.
433   TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out));
434   // Now load a block of a different file "b" at timestamp `now` + 1
435   env->SetNowSeconds(now + 1);
436   TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
437   // Now load a different block of file "a" at timestamp `now` + 1. When the
438   // first block of "a" expires, this block should also be removed because it
439   // also belongs to file "a".
440   TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out));
441   // Ensure that all blocks are in the cache (i.e. reads are cache hits).
442   EXPECT_EQ(cache.CacheSize(), 24);
443   EXPECT_EQ(calls, 3);
444   TF_EXPECT_OK(ReadCache(&cache, "a", 0, 1, &out));
445   TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
446   TF_EXPECT_OK(ReadCache(&cache, "a", 8, 1, &out));
447   EXPECT_EQ(calls, 3);
448   // Advance the fake timestamp so that "a" becomes stale via its first block.
449   env->SetNowSeconds(now + 2);
450   // The pruning thread periodically compares env->NowSeconds() with the oldest
451   // block's timestamp to see if it should evict any files. At the current fake
452   // timestamp of `now` + 2, file "a" is stale because its first block is stale,
453   // but file "b" is not stale yet. Thus, once the pruning thread wakes up (in
454   // one second of wall time), it should remove "a" and leave "b" alone.
455   uint64 start = Env::Default()->NowSeconds();
456   do {
457     Env::Default()->SleepForMicroseconds(100000);
458   } while (cache.CacheSize() == 24 && Env::Default()->NowSeconds() - start < 3);
459   // There should be one block left in the cache, and it should be the first
460   // block of "b".
461   EXPECT_EQ(cache.CacheSize(), 8);
462   TF_EXPECT_OK(ReadCache(&cache, "b", 0, 1, &out));
463   EXPECT_EQ(calls, 3);
464   // Advance the fake time to `now` + 3, at which point "b" becomes stale.
465   env->SetNowSeconds(now + 3);
466   // Wait for the pruner to remove "b".
467   start = Env::Default()->NowSeconds();
468   do {
469     Env::Default()->SleepForMicroseconds(100000);
470   } while (cache.CacheSize() == 8 && Env::Default()->NowSeconds() - start < 3);
471   // The cache should now be empty.
472   EXPECT_EQ(cache.CacheSize(), 0);
473 }
474 
TEST(RamFileBlockCacheTest,ParallelReads)475 TEST(RamFileBlockCacheTest, ParallelReads) {
476   // This fetcher won't respond until either `callers` threads are calling it
477   // concurrently (at which point it will respond with success to all callers),
478   // or 10 seconds have elapsed (at which point it will respond with an error).
479   const int callers = 4;
480   BlockingCounter counter(callers);
481   auto fetcher = [&counter](const string& filename, size_t offset, size_t n,
482                             char* buffer, size_t* bytes_transferred) {
483     counter.DecrementCount();
484     if (!counter.WaitFor(std::chrono::seconds(10))) {
485       // This avoids having the test time out, which is harder to debug.
486       return errors::FailedPrecondition("desired concurrency not reached");
487     }
488     memset(buffer, 'x', n);
489     *bytes_transferred = n;
490     return Status::OK();
491   };
492   const int block_size = 8;
493   RamFileBlockCache cache(block_size, 2 * callers * block_size, 0, fetcher);
494   std::vector<std::unique_ptr<Thread>> threads;
495   for (int i = 0; i < callers; i++) {
496     threads.emplace_back(
497         Env::Default()->StartThread({}, "caller", [&cache, i, block_size]() {
498           std::vector<char> out;
499           TF_EXPECT_OK(
500               ReadCache(&cache, "a", i * block_size, block_size, &out));
501           std::vector<char> x(block_size, 'x');
502           EXPECT_EQ(out, x);
503         }));
504   }
505   // The `threads` destructor blocks until the threads can be joined, once their
506   // respective reads finish (which happens once they are all concurrently being
507   // executed, or 10 seconds have passed).
508 }
509 
TEST(RamFileBlockCacheTest,CoalesceConcurrentReads)510 TEST(RamFileBlockCacheTest, CoalesceConcurrentReads) {
511   // Concurrent reads to the same file blocks should be de-duplicated.
512   const size_t block_size = 16;
513   int num_requests = 0;
514   Notification notification;
515   auto fetcher = [&num_requests, &notification, block_size](
516                      const string& filename, size_t offset, size_t n,
517                      char* buffer, size_t* bytes_transferred) {
518     EXPECT_EQ(n, block_size);
519     EXPECT_EQ(offset, 0);
520     num_requests++;
521     memset(buffer, 'x', n);
522     *bytes_transferred = n;
523     notification.Notify();
524     // Wait for other thread to issue read.
525     Env::Default()->SleepForMicroseconds(100000);  // 0.1 secs
526     return Status::OK();
527   };
528   RamFileBlockCache cache(block_size, block_size, 0, fetcher);
529   // Fork off thread for parallel read.
530   std::unique_ptr<Thread> concurrent(
531       Env::Default()->StartThread({}, "concurrent", [&cache, block_size] {
532         std::vector<char> out;
533         TF_EXPECT_OK(ReadCache(&cache, "", 0, block_size / 2, &out));
534         EXPECT_EQ(out.size(), block_size / 2);
535       }));
536   notification.WaitForNotification();
537   std::vector<char> out;
538   TF_EXPECT_OK(ReadCache(&cache, "", block_size / 2, block_size / 2, &out));
539   EXPECT_EQ(out.size(), block_size / 2);
540 
541   EXPECT_EQ(1, num_requests);
542 }
543 
TEST(RamFileBlockCacheTest,Flush)544 TEST(RamFileBlockCacheTest, Flush) {
545   int calls = 0;
546   auto fetcher = [&calls](const string& filename, size_t offset, size_t n,
547                           char* buffer, size_t* bytes_transferred) {
548     calls++;
549     memset(buffer, 'x', n);
550     *bytes_transferred = n;
551     return Status::OK();
552   };
553   RamFileBlockCache cache(16, 32, 0, fetcher);
554   std::vector<char> out;
555   TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
556   TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
557   EXPECT_EQ(calls, 1);
558   cache.Flush();
559   TF_EXPECT_OK(ReadCache(&cache, "", 0, 16, &out));
560   EXPECT_EQ(calls, 2);
561 }
562 
563 }  // namespace
564 }  // namespace tensorflow
565