• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/file_system_helper.h"
17 
18 #include <deque>
19 #include <string>
20 #include <vector>
21 
22 #include "tensorflow/core/platform/cpu_info.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/errors.h"
25 #include "tensorflow/core/platform/file_system.h"
26 #include "tensorflow/core/platform/mutex.h"
27 #include "tensorflow/core/platform/path.h"
28 #include "tensorflow/core/platform/platform.h"
29 #include "tensorflow/core/platform/status.h"
30 #include "tensorflow/core/platform/str_util.h"
31 #include "tensorflow/core/platform/threadpool.h"
32 
33 namespace tensorflow {
34 namespace internal {
35 
36 namespace {
37 
38 const int kNumThreads = port::NumSchedulableCPUs();
39 
40 // Run a function in parallel using a ThreadPool, but skip the ThreadPool
41 // on the iOS platform due to its problems with more than a few threads.
ForEach(int first,int last,const std::function<void (int)> & f)42 void ForEach(int first, int last, const std::function<void(int)>& f) {
43 #if TARGET_OS_IPHONE
44   for (int i = first; i < last; i++) {
45     f(i);
46   }
47 #else
48   int num_threads = std::min(kNumThreads, last - first);
49   thread::ThreadPool threads(Env::Default(), "ForEach", num_threads);
50   for (int i = first; i < last; i++) {
51     threads.Schedule([f, i] { f(i); });
52   }
53 #endif
54 }
55 
56 // A globbing pattern can only start with these characters:
57 static const char kGlobbingChars[] = "*?[\\";
58 
IsGlobbingPattern(const std::string & pattern)59 static inline bool IsGlobbingPattern(const std::string& pattern) {
60   return (pattern.find_first_of(kGlobbingChars) != std::string::npos);
61 }
62 
63 // Make sure that the first entry in `dirs` during glob expansion does not
64 // contain a glob pattern. This is to prevent a corner-case bug where
65 // `<pattern>` would be treated differently than `./<pattern>`.
PatchPattern(const std::string & pattern)66 static std::string PatchPattern(const std::string& pattern) {
67   const std::string fixed_prefix =
68       pattern.substr(0, pattern.find_first_of(kGlobbingChars));
69 
70   // Patching is needed when there is no directory part in `prefix`
71   if (io::Dirname(fixed_prefix).empty()) {
72     return io::JoinPath(".", pattern);
73   }
74 
75   // No patching needed
76   return pattern;
77 }
78 
AllDirectoryPrefixes(const std::string & d)79 static std::vector<std::string> AllDirectoryPrefixes(const std::string& d) {
80   std::vector<std::string> dirs;
81   const std::string patched = PatchPattern(d);
82   StringPiece dir(patched);
83 
84   // If the pattern ends with a `/` (or `\\` on Windows), we need to strip it
85   // otherwise we would have one additional matching step and the result set
86   // would be empty.
87   bool is_directory = d[d.size() - 1] == '/';
88 #ifdef PLATFORM_WINDOWS
89   is_directory = is_directory || (d[d.size() - 1] == '\\');
90 #endif
91   if (is_directory) {
92     dir = io::Dirname(dir);
93   }
94 
95   while (!dir.empty()) {
96     dirs.emplace_back(dir);
97     StringPiece new_dir(io::Dirname(dir));
98     // io::Dirname("/") returns "/" so we need to break the loop.
99     // On Windows, io::Dirname("C:\\") would return "C:\\", so we check for
100     // identity of the result instead of checking for dir[0] == `/`.
101     if (dir == new_dir) break;
102     dir = new_dir;
103   }
104 
105   // Order the array from parent to ancestor (reverse order).
106   std::reverse(dirs.begin(), dirs.end());
107 
108   return dirs;
109 }
110 
GetFirstGlobbingEntry(const std::vector<std::string> & dirs)111 static inline int GetFirstGlobbingEntry(const std::vector<std::string>& dirs) {
112   int i = 0;
113   for (const auto& d : dirs) {
114     if (IsGlobbingPattern(d)) {
115       break;
116     }
117     i++;
118   }
119   return i;
120 }
121 
122 }  // namespace
123 
GetMatchingPaths(FileSystem * fs,Env * env,const string & pattern,std::vector<string> * results)124 Status GetMatchingPaths(FileSystem* fs, Env* env, const string& pattern,
125                         std::vector<string>* results) {
126   // Check that `fs`, `env` and `results` are non-null.
127   if (fs == nullptr || env == nullptr || results == nullptr) {
128     return Status(tensorflow::error::INVALID_ARGUMENT,
129                   "Filesystem calls GetMatchingPaths with nullptr arguments");
130   }
131 
132   // By design, we don't match anything on empty pattern
133   results->clear();
134   if (pattern.empty()) {
135     return OkStatus();
136   }
137 
138   // The pattern can contain globbing characters at multiple levels, e.g.:
139   //
140   //   foo/ba?/baz/f*r
141   //
142   // To match the full pattern, we must match every prefix subpattern and then
143   // operate on the children for each match. Thus, we separate all subpatterns
144   // in the `dirs` vector below.
145   std::vector<std::string> dirs = AllDirectoryPrefixes(pattern);
146 
147   // We can have patterns that have several parents where no globbing is being
148   // done, for example, `foo/bar/baz/*`. We don't need to expand the directories
149   // which don't contain the globbing characters.
150   int matching_index = GetFirstGlobbingEntry(dirs);
151 
152   // If we don't have globbing characters in the pattern then it specifies a
153   // path in the filesystem. We add it to the result set if it exists.
154   if (matching_index == dirs.size()) {
155     if (fs->FileExists(pattern).ok()) {
156       results->emplace_back(pattern);
157     }
158     return OkStatus();
159   }
160 
161   // To expand the globbing, we do a BFS from `dirs[matching_index-1]`.
162   // At every step, we work on a pair `{dir, ix}` such that `dir` is a real
163   // directory, `ix < dirs.size() - 1` and `dirs[ix+1]` is a globbing pattern.
164   // To expand the pattern, we select from all the children of `dir` only those
165   // that match against `dirs[ix+1]`.
166   // If there are more entries in `dirs` after `dirs[ix+1]` this mean we have
167   // more patterns to match. So, we add to the queue only those children that
168   // are also directories, paired with `ix+1`.
169   // If there are no more entries in `dirs`, we return all children as part of
170   // the answer.
171   // Since we can get into a combinatorial explosion issue (e.g., pattern
172   // `/*/*/*`), we process the queue in parallel. Each parallel processing takes
173   // elements from `expand_queue` and adds them to `next_expand_queue`, after
174   // which we swap these two queues (similar to double buffering algorithms).
175   // PRECONDITION: `IsGlobbingPattern(dirs[0]) == false`
176   // PRECONDITION: `matching_index > 0`
177   // INVARIANT: If `{d, ix}` is in queue, then `d` and `dirs[ix]` are at the
178   //            same level in the filesystem tree.
179   // INVARIANT: If `{d, _}` is in queue, then `IsGlobbingPattern(d) == false`.
180   // INVARIANT: If `{d, _}` is in queue, then `d` is a real directory.
181   // INVARIANT: If `{_, ix}` is in queue, then `ix < dirs.size() - 1`.
182   // INVARIANT: If `{_, ix}` is in queue, `IsGlobbingPattern(dirs[ix + 1])`.
183   std::deque<std::pair<string, int>> expand_queue;
184   std::deque<std::pair<string, int>> next_expand_queue;
185   expand_queue.emplace_back(dirs[matching_index - 1], matching_index - 1);
186 
187   // Adding to `result` or `new_expand_queue` need to be protected by mutexes
188   // since there are multiple threads writing to these.
189   mutex result_mutex;
190   mutex queue_mutex;
191 
192   while (!expand_queue.empty()) {
193     next_expand_queue.clear();
194 
195     // The work item for every item in `expand_queue`.
196     // pattern, we process them in parallel.
197     auto handle_level = [&fs, &results, &dirs, &expand_queue,
198                          &next_expand_queue, &result_mutex,
199                          &queue_mutex](int i) {
200       // See invariants above, all of these are valid accesses.
201       const auto& queue_item = expand_queue.at(i);
202       const std::string& parent = queue_item.first;
203       const int index = queue_item.second + 1;
204       const std::string& match_pattern = dirs[index];
205 
206       // Get all children of `parent`. If this fails, return early.
207       std::vector<std::string> children;
208       Status s = fs->GetChildren(parent, &children);
209       if (s.code() == tensorflow::error::PERMISSION_DENIED) {
210         return;
211       }
212 
213       // Also return early if we don't have any children
214       if (children.empty()) {
215         return;
216       }
217 
218       // Since we can get extremely many children here and on some filesystems
219       // `IsDirectory` is expensive, we process the children in parallel.
220       // We also check that children match the pattern in parallel, for speedup.
221       // We store the status of the match and `IsDirectory` in
222       // `children_status` array, one element for each children.
223       std::vector<Status> children_status(children.size());
224       auto handle_children = [&fs, &match_pattern, &parent, &children,
225                               &children_status](int j) {
226         const std::string path = io::JoinPath(parent, children[j]);
227         if (!fs->Match(path, match_pattern)) {
228           children_status[j] =
229               Status(tensorflow::error::CANCELLED, "Operation not needed");
230         } else {
231           children_status[j] = fs->IsDirectory(path);
232         }
233       };
234       ForEach(0, children.size(), handle_children);
235 
236       // At this point, pairing `children` with `children_status` will tell us
237       // if a children:
238       //   * does not match the pattern
239       //   * matches the pattern and is a directory
240       //   * matches the pattern and is not a directory
241       // We fully ignore the first case.
242       // If we matched the last pattern (`index == dirs.size() - 1`) then all
243       // remaining children get added to the result.
244       // Otherwise, only the directories get added to the next queue.
245       for (size_t j = 0; j < children.size(); j++) {
246         if (children_status[j].code() == tensorflow::error::CANCELLED) {
247           continue;
248         }
249 
250         const std::string path = io::JoinPath(parent, children[j]);
251         if (index == dirs.size() - 1) {
252           mutex_lock l(result_mutex);
253           results->emplace_back(path);
254         } else if (children_status[j].ok()) {
255           mutex_lock l(queue_mutex);
256           next_expand_queue.emplace_back(path, index);
257         }
258       }
259     };
260     ForEach(0, expand_queue.size(), handle_level);
261 
262     // After evaluating one level, swap the "buffers"
263     std::swap(expand_queue, next_expand_queue);
264   }
265 
266   return OkStatus();
267 }
268 
FileExists(Env * env,const string & fname)269 StatusOr<bool> FileExists(Env* env, const string& fname) {
270   Status status = env->FileExists(fname);
271   if (errors::IsNotFound(status)) {
272     return false;
273   }
274   TF_RETURN_IF_ERROR(status);
275   return true;
276 }
277 
278 }  // namespace internal
279 }  // namespace tensorflow
280