1 // Copyright (C) 2018 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "common/debug.h"
16 #include "inode2filename/search_directories.h"
17 #include "inode2filename/system_call.h"
18
19 #include <android-base/file.h>
20 #include <android-base/logging.h>
21 #include <android-base/scopeguard.h>
22 #include <android-base/stringprintf.h>
23 #include <android-base/unique_fd.h>
24
25 #include "rxcpp/rx.hpp"
26
27 #include <iostream>
28 #include <stdio.h>
29 #include <fstream>
30 #include <vector>
31 #include <optional>
32
33 #include <signal.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36
37 #include <sys/types.h>
38
39 #ifdef __ANDROID__
40 #include <sys/sysmacros.h>
41 #endif
42
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <poll.h>
46 #include <dirent.h>
47
48 #include <unordered_map>
49
50 namespace rx = rxcpp;
51 using android::base::unique_fd; // NOLINT
52 using android::base::StringPrintf; // NOLINT
53
54 namespace iorap::inode2filename {
55
56 #define DEBUG_INODE_SET 0
57
58 // A multimap of 'ino_t -> List[Inode]' (where the value Inodes have the same ino_t as the key).
59 //
60 // A flat list of Inodes is turned into the above map, then keys can be removed one at a time
61 // until the InodeSet eventually becomes empty.
62 struct InodeSet {
63
64 InodeSet() = default;
65 #if DEBUG_INODE_SET
InodeSetiorap::inode2filename::InodeSet66 InodeSet(const InodeSet& other) {
67 LOG(INFO) << "InodeSet-copyctor";
68 set_ = other.set_;
69 }
70
InodeSetiorap::inode2filename::InodeSet71 InodeSet(InodeSet&& other) {
72 LOG(INFO) << "InodeSet-movector";
73 set_ = std::move(other.set_);
74 }
75
operator =iorap::inode2filename::InodeSet76 InodeSet& operator=(const InodeSet& other) {
77 LOG(INFO) << "InodeSet-opassign-copy";
78 set_ = other.set_;
79 return *this;
80 }
81
operator =iorap::inode2filename::InodeSet82 InodeSet& operator=(InodeSet&& other) {
83 LOG(INFO) << "InodeSet-opassign-move";
84 set_ = std::move(other.set_);
85 return *this;
86 }
87 #else
88 InodeSet(InodeSet&& other) = default;
89 InodeSet& operator=(InodeSet&& other) = default;
90 // Copying InodeSet can be very expensive, refuse to even allow compiling such code.
91 InodeSet(const InodeSet& other) = delete;
92 InodeSet& operator=(const InodeSet& other) = delete;
93 #endif
94
95 struct ValueRange {
beginiorap::inode2filename::InodeSet::ValueRange96 auto/*Iterable<Inode>*/ begin() {
97 return begin_;
98 }
99
endiorap::inode2filename::InodeSet::ValueRange100 auto/*Iterable<Inode>*/ end() {
101 return end_;
102 }
103
emptyiorap::inode2filename::InodeSet::ValueRange104 bool empty() const {
105 return begin_ == end_;
106 }
107
operator booliorap::inode2filename::InodeSet::ValueRange108 explicit operator bool() const {
109 return !empty();
110 }
111
112 std::unordered_multimap<ino_t, Inode>::iterator begin_, end_;
113
114 friend std::ostream& operator<<(std::ostream& os, const ValueRange& s);
115 };
116
117 // Create an observable that emits the remaining inodes in the map.
118 //
119 // Mutation functions must not be called until this observable
120 // has been finished emitting all values (e.g. with on_completed) since that
121 // would cause the underlying iterators to go into an undefined state.
IterateValuesiorap::inode2filename::InodeSet122 auto/*observable<Inode>*/ IterateValues() const {
123 return rxcpp::observable<>::iterate(set_).map( // XX: should we use identity_immediate here?
124 [](const std::pair<const ino_t, Inode>& pair) {
125 return pair.second;
126 }
127 );
128 // TODO: this would be more efficient as a range-v3 view.
129 }
130
Emptyiorap::inode2filename::InodeSet131 constexpr bool Empty() const {
132 return set_.empty();
133 }
134
OfListiorap::inode2filename::InodeSet135 static InodeSet OfList(const std::vector<Inode>& list) {
136 InodeSet new_inode_set;
137 std::unordered_multimap<ino_t, Inode>* map = &new_inode_set.set_;
138
139 for (const Inode& inode : list) {
140 map->insert({inode.inode, inode});
141 }
142
143 return new_inode_set;
144 }
145
146 // Return an optional list of 'Inode' structs whose 'inode' field matches the 'inode' parameter.
147 // Returns an empty range if there was nothing found.
FindInodeListiorap::inode2filename::InodeSet148 ValueRange FindInodeList(ino_t inode) {
149 auto range = set_.equal_range(inode);
150 return ValueRange{range.first, range.second};
151 }
152
153 // Match all fields of an Inode against a 'struct stat' stat_buf.
154 //
155 // The returned Inode (if any) is removed from the InodeSet; it will not be returned by
156 // FindInodeList in future calls.
FindAndRemoveInodeInListiorap::inode2filename::InodeSet157 std::optional<Inode> FindAndRemoveInodeInList(ValueRange inode_list,
158 const struct stat& stat_buf) {
159 LOG(VERBOSE) << "FindAndRemoveInodeInList " << inode_list << ", "
160 << "stat_buf{st_dev=" << stat_buf.st_dev << ",st_ino=" << stat_buf.st_ino << "}";
161
162 auto /*iterator*/ found = std::find_if(inode_list.begin(),
163 inode_list.end(),
164 [&](const std::pair<ino_t, Inode>& pair) {
165 const Inode& inode = pair.second;
166 if (inode.inode != stat_buf.st_ino) {
167 return false;
168 }
169
170 dev_t inode_dev =
171 makedev(static_cast<int>(inode.device_major), static_cast<int>(inode.device_minor));
172
173 // Inodes could be the same across different devices.
174 // Also match the device id.
175 if (inode_dev != stat_buf.st_dev) {
176 LOG(VERBOSE) << "InodeSet:FindAndRemoveInodeInList matched ino: " << inode.inode
177 << " but not device"
178 << ", expected dev: " << stat_buf.st_dev
179 << ", actual dev: " << inode_dev;
180 return false;
181 }
182 return true;
183 });
184
185 if (found != inode_list.end()) {
186 Inode inode = found->second;
187 LOG(VERBOSE) << "InodeSet:FindAndRemoveInodeInList *success* inode+device " << inode;
188 DCHECK(found->second.inode == stat_buf.st_ino);
189 // Erase the inode from the list. This is important.
190 set_.erase(found);
191 return inode;
192 }
193
194 return std::nullopt;
195 }
196
197 // Match all fields of an Inode against another Inode.
198 //
199 // The returned Inode (if any) is removed from the InodeSet; it will not be returned by
200 // FindInodeList in future calls.
FindAndRemoveInodeInListiorap::inode2filename::InodeSet201 std::optional<Inode> FindAndRemoveInodeInList(ValueRange inode_list,
202 const Inode& inode) {
203 LOG(VERBOSE) << "FindAndRemoveInodeInList " << inode_list << ", "
204 << inode << "}";
205
206 auto /*iterator*/ found = std::find_if(inode_list.begin(),
207 inode_list.end(),
208 [&](const std::pair<ino_t, Inode>& pair) {
209 return inode == pair.second;
210 });
211
212 if (found != inode_list.end()) {
213 Inode inode = found->second;
214 LOG(VERBOSE) << "InodeSet:FindAndRemoveInodeInList *success* inode+device " << inode;
215 DCHECK_EQ(found->second, inode);
216 // Erase the inode from the list. This is important.
217 set_.erase(found);
218 return inode;
219 }
220
221 return std::nullopt;
222 }
223
224 // TODO: equality and string operators for testing/logging.
225 private:
226 // Explanation: readdir returns a 'file' -> 'ino_t inode' mapping.
227 //
228 // However inodes can be reused on different partitions (but they have a different device number).
229 // To handle this edge case, and to avoid calling stat whenever the inode definitely doesn't match
230 // store the inodes into a single-key,multi-value container.
231 //
232 // This enables fast scanning of readdir results by matching just the 'inode' portion,
233 // then calling stat only when the inode portion definitely matches to confirm the device.
234
235 // There are no single-key multi-value containers in standard C++, so pretend
236 // we have one by writing this simple facade around an unordered set.
237 //
238 // We expect that the vector size is usually size=1 (or 2 or 3) since the # of devices
239 // is fixed by however many partitions there are on the system, AND the same inode #
240 // would have to be reused across a different file.
241 std::unordered_multimap<ino_t, Inode> set_; // TODO: Rename to map_.
242
243 friend std::ostream& operator<<(std::ostream& os, const InodeSet& s);
244 };
245
operator <<(std::ostream & os,const InodeSet & s)246 std::ostream& operator<<(std::ostream& os, const InodeSet& s) {
247 os << "InodeSet{";
248 for (const auto& kv : s.set_) {
249 // e.g. "123=>(1:2:123)" ... its expected for the 'ino_t' portion to be repeated.
250 os << "" << kv.first << "=>(" << kv.second << "),";
251 }
252 os << "}";
253 return os;
254 }
255
operator <<(std::ostream & os,const InodeSet::ValueRange & v)256 std::ostream& operator<<(std::ostream& os, const InodeSet::ValueRange& v) {
257 // Don't want to make a const and non const version of ValueRange.
258 InodeSet::ValueRange& s = const_cast<InodeSet::ValueRange&>(v);
259
260 os << "InodeSet::ValueRange{";
261 for (const auto& kv : s) {
262 // e.g. "123=>(1:2:123)" ... its expected for the 'ino_t' portion to be repeated.
263 os << "" << kv.first << "=>(" << kv.second << "),";
264 }
265 os << "}";
266 return os;
267 }
268
269 void search_for_inodes_in(std::vector<Inode>& inode_list, const std::string& dirpath);
270
271 enum DirectoryEntryErrorCode {
272 kInvalid, // not a real error code. to detect bad initialization.
273 kOpenDir, // opendir failed.
274 kReadDir, // readdir failed.
275 kDtUnknown, // d_type was DT_UNKNOWN error.
276 };
277
278 struct DirectoryEntryError {
279 DirectoryEntryErrorCode code;
280 int err_no;
281 std::string filename;
282 };
283
operator <<(std::ostream & os,const DirectoryEntryError & e)284 std::ostream& operator<<(std::ostream& os, const DirectoryEntryError& e) {
285 os << "DirectoryEntryError{"
286 << static_cast<int>(e.code) << "," << e.err_no << "," << e.filename << "}";
287 return os;
288 // TODO: pretty-print code and err-no
289 }
290
291 static common::DebugCounter gDebugDirectoryEntryCounter{};
292 static constexpr bool kDebugDirectoryEntry = false;
293
294 #define DIRECTORY_ENTRY_MOVE_DCHECK() \
295 DCHECK_EQ(other.moved_from_, false) << __PRETTY_FUNCTION__ << "CNT:" << other.debug_counter_;
296 #define DIRECTORY_ENTRY_TRACE_CTOR() \
297 if (kDebugDirectoryEntry) LOG(VERBOSE) << __PRETTY_FUNCTION__ << "@CNT:" << debug_counter_
298
299 struct DirectoryEntry {
300 using ResultT = iorap::expected<DirectoryEntry, DirectoryEntryError>;
301 using ObservableT = rx::observable<ResultT>;
302
303 static constexpr ino_t kInvalidIno = std::numeric_limits<ino_t>::max();
304 static constexpr auto kInvalidFileName = "";
305
306 // Path to file, the prefix is one of the root directories.
307 std::string filename{kInvalidFileName};
308 // Inode number of the file. Not unique across different devices.
309 ino_t d_ino{kInvalidIno};
310 // File type (DT_LNK, DT_REG, DT_DIR, or DT_UNKNOWN)
311 unsigned char d_type{DT_UNKNOWN}; // Note: not seen outside of sentinel roots.
312 // TODO: Consider invariant checks for valid combinations of above fields?
313
314 // Debug-only flags.
315 bool moved_from_{false};
316 size_t debug_counter_{0};
317
318 private:
319 // TODO: remove default constructor?
320 //
321 // SEEMS TO BE USED by std::vector etc. FIX DAT.
DirectoryEntryiorap::inode2filename::DirectoryEntry322 DirectoryEntry() noexcept {
323 debug_counter_ = gDebugDirectoryEntryCounter++;
324 DIRECTORY_ENTRY_TRACE_CTOR();
325 }
326 public:
DirectoryEntryiorap::inode2filename::DirectoryEntry327 DirectoryEntry(std::string filename, ino_t d_ino, unsigned char d_type) noexcept
328 : filename{std::move(filename)},
329 d_ino{d_ino},
330 d_type{d_type} {
331 debug_counter_ = gDebugDirectoryEntryCounter++;
332 DIRECTORY_ENTRY_TRACE_CTOR();
333 }
334
DirectoryEntryiorap::inode2filename::DirectoryEntry335 DirectoryEntry(const DirectoryEntry& other) noexcept {
336 // Do not use member-initialization syntax so that this DCHECK can execute first.
337 DIRECTORY_ENTRY_MOVE_DCHECK();
338
339 filename = other.filename;
340 d_ino = other.d_ino;
341 d_type = other.d_type;
342 children_paths_ = other.children_paths_;
343 children_initialized_ = other.children_initialized_;
344 debug_counter_ = other.debug_counter_;
345 DIRECTORY_ENTRY_TRACE_CTOR();
346 }
347
operator =iorap::inode2filename::DirectoryEntry348 DirectoryEntry& operator=(const DirectoryEntry& other) noexcept {
349 if (this == &other) {
350 return *this;
351 }
352
353 DIRECTORY_ENTRY_MOVE_DCHECK();
354
355 filename = other.filename;
356 d_ino = other.d_ino;
357 d_type = other.d_type;
358 children_paths_ = other.children_paths_;
359 children_initialized_ = other.children_initialized_;
360 debug_counter_ = other.debug_counter_;
361 DIRECTORY_ENTRY_TRACE_CTOR();
362
363 return *this;
364 }
365
operator =iorap::inode2filename::DirectoryEntry366 DirectoryEntry& operator=(DirectoryEntry&& other) noexcept {
367 if (this == &other) {
368 return *this;
369 }
370
371 DIRECTORY_ENTRY_MOVE_DCHECK();
372
373 filename = std::move(other.filename);
374 d_ino = other.d_ino;
375 d_type = other.d_type;
376 children_paths_ = std::move(other.children_paths_);
377 children_initialized_ = other.children_initialized_;
378 debug_counter_ = other.debug_counter_;
379 DIRECTORY_ENTRY_TRACE_CTOR();
380
381 return *this;
382 }
383
DirectoryEntryiorap::inode2filename::DirectoryEntry384 DirectoryEntry(DirectoryEntry&& other) noexcept {
385 DIRECTORY_ENTRY_MOVE_DCHECK();
386 other.moved_from_ = true;
387
388 filename = std::move(other.filename);
389 d_ino = other.d_ino;
390 d_type = other.d_type;
391 children_paths_ = std::move(other.children_paths_);
392 children_initialized_ = other.children_initialized_;
393 debug_counter_ = other.debug_counter_;
394 DIRECTORY_ENTRY_TRACE_CTOR();
395 }
396
397 // Create a sentinel (root of roots) whose children entries are those specified by
398 // children_paths.
CreateSentineliorap::inode2filename::DirectoryEntry399 static DirectoryEntry CreateSentinel(std::vector<std::string> children_paths) {
400 DirectoryEntry e;
401 e.d_type = DT_DIR;
402 ++gDebugDirectoryEntryCounter;
403
404 for (std::string& child_path : children_paths) {
405 // TODO: Should we call Stat on the child path here to reconstitute the ino_t for a root dir?
406 // Otherwise it can look a little strange (i.e. the root dir itself will never match
407 // the searched inode).
408 //
409 // Probably not too big of a problem in practice.
410 DirectoryEntry child_entry{std::move(child_path), kInvalidIno, DT_DIR};
411 ResultT child_entry_as_result{std::move(child_entry)};
412 e.children_paths_.push_back(std::move(child_entry_as_result));
413 }
414
415 e.children_initialized_ = true;
416
417 return e;
418 }
419
420 // Return an observable which emits the direct children only.
421 // The children entries are now read from disk (with readdir) if they weren't read previously.
GetChildrenEntriesiorap::inode2filename::DirectoryEntry422 std::vector<ResultT> GetChildrenEntries(borrowed<SystemCall*> system_call) const& {
423 BuildChildrenPaths(system_call);
424 return children_paths_;
425 }
426
427 // Return an observable which emits the direct children only.
428 // The children entries are now read from disk (with readdir) if they weren't read previously.
429 // Movable overload.
GetChildrenEntriesiorap::inode2filename::DirectoryEntry430 std::vector<ResultT> GetChildrenEntries(borrowed<SystemCall*> system_call) && {
431 BuildChildrenPaths(system_call);
432 return std::move(children_paths_);
433 }
434
435 // Returns a (lazy) observable that emits every single node, in pre-order,
436 // rooted at this tree.
437 //
438 // New entries are only read from disk (with e.g. readdir) when more values are pulled
439 // from the observable. Only the direct children of any entry are read at any time.
440 //
441 // The emission can be stopped prematurely by unsubscribing from the observable.
442 // This means the maximum amount of 'redundant' IO reads is bounded by the children count
443 // of all entries emitted thus far minus entries actually emitted.
444 ObservableT GetSubTreePreOrderEntries(borrowed<SystemCall*> system_call) const;
445
446 private:
447 // Out-of-line definition to avoid circular type dependency.
448 void BuildChildrenPaths(borrowed<SystemCall*> system_call) const;
449
450 // We need to lazily initialize children_paths_ only when we try to read them.
451 //
452 // Assuming the underlying file system doesn't change (which isn't strictly true),
453 // the directory children are referentially transparent.
454 //
455 // In practice we do not need to distinguish between the file contents changing out
456 // from under us in this code, so we don't need the more strict requirements.
457 mutable std::vector<ResultT> children_paths_;
458 mutable bool children_initialized_{false};
459
460 friend std::ostream& operator<<(std::ostream& os, const DirectoryEntry& d);
461 };
462
operator <<(std::ostream & os,const DirectoryEntry & d)463 std::ostream& operator<<(std::ostream& os, const DirectoryEntry& d) {
464 os << "DirectoryEntry{" << d.filename << ",ino:" << d.d_ino << ",type:" << d.d_type << "}";
465 return os;
466 }
467
468 using DirectoryEntryResult = DirectoryEntry::ResultT;
469
470 // Read all directory entries and return it as a vector. This must be an eager operation,
471 // as readdir is not re-entrant.
472 //
473 // This could be considered as a limitation from the 'observable' perspective since
474 // one can end up reading unnecessary extra directory entries that are then never consumed.
475 //
476 // The following entries are skipped:
477 // - '.' self
478 // - ".." parent
479 //
480 // All DT types except the following are removed:
481 // * DT_LNK - symbolic link (empty children)
482 // * DT_REG - regular file (empty children)
483 // * DT_DIR - directory (has children)
484 static std::vector<DirectoryEntryResult>
ReadDirectoryEntriesFromDirectoryPath(std::string dirpath,borrowed<SystemCall * > system_call)485 ReadDirectoryEntriesFromDirectoryPath(std::string dirpath, borrowed<SystemCall*> system_call) {
486 DIR *dirp;
487 struct dirent *dp;
488
489 LOG(VERBOSE) << "ReadDirectoryEntriesFromDirectoryPath(" << dirpath << ")";
490
491 if ((dirp = system_call->opendir(dirpath.c_str())) == nullptr) {
492 PLOG(ERROR) << "Couldn't open directory: " << dirpath;
493 return {DirectoryEntryError{kOpenDir, errno, dirpath}};
494 }
495
496 // Read all the results up front because readdir is not re-entrant.
497 std::vector<DirectoryEntryResult> results;
498
499 // Get full path + the directory entry path.
500 auto child_path = [&] { return dirpath + "/" + dp->d_name; };
501
502 do {
503 errno = 0;
504 if ((dp = system_call->readdir(dirp)) != nullptr) {
505 if (dp->d_type == DT_DIR) {
506 if (strcmp(".", dp->d_name) == 0 || strcmp("..", dp->d_name) == 0) {
507 LOG(VERBOSE) << "Skip self/parent: " << dp->d_name;
508 continue;
509 }
510
511 LOG(VERBOSE) << "Find entry " << child_path()
512 << ", ino: " << dp->d_ino << ", type: " << dp->d_type;
513 results.push_back(DirectoryEntry{child_path(),
514 static_cast<ino_t>(dp->d_ino),
515 dp->d_type});
516 } else if (dp->d_type == DT_UNKNOWN) {
517 // This seems bad if it happens. We should probably do something about this.
518 LOG(WARNING) << "Found unknown DT entry: " << child_path();
519
520 results.push_back(DirectoryEntryError{kDtUnknown, /*errno*/0, child_path()});
521 } else if (dp->d_type == DT_LNK || dp->d_type == DT_REG) {
522 // Regular non-directory file entry.
523 results.push_back(DirectoryEntry{child_path(),
524 static_cast<ino_t>(dp->d_ino),
525 dp->d_type});
526 } else {
527 // Block device, character device, socket, etc...
528 LOG(VERBOSE) << "Skip DT entry of type: " << dp->d_type << " " << child_path();
529 }
530 } else if (errno != 0) {
531 PLOG(ERROR) << "Error reading directory entry in " << dirpath;
532
533 results.push_back(DirectoryEntryError{kReadDir, errno, dirpath});
534 }
535 } while (dp != nullptr);
536
537 if (system_call->closedir(dirp) < 0) {
538 PLOG(ERROR) << "Failed to close directory " << dirpath;
539 }
540
541 return results;
542 }
543
BuildChildrenPaths(borrowed<SystemCall * > system_call) const544 void DirectoryEntry::BuildChildrenPaths(borrowed<SystemCall*> system_call) const {
545 if (children_initialized_) {
546 return;
547 }
548
549 if (d_type == DT_DIR) {
550 children_paths_ = ReadDirectoryEntriesFromDirectoryPath(filename, system_call);
551 // TODO: consider using dependency injection here to substitute this function during testing?
552 }
553 }
554
555 struct InodeSearchParameters {
556 std::vector<Inode> inode_list;
557 std::vector<std::string> root_dirs;
558 };
559
560 // [IN]
561 // observable: expected<Value, Error>, ...
562 // [OUT]
563 // observable: Value, ...
564 //
565 // Any encountered 'Error' items are dropped after logging.
566 template <typename T>
MapExpectedOrLog(T && observable,::android::base::LogSeverity log_level)567 auto MapExpectedOrLog(T&& observable,
568 ::android::base::LogSeverity log_level) {
569 return observable.filter([log_level](const auto& result) {
570 if (result) {
571 return true;
572 } else {
573 LOG(log_level) << result.error();
574 return false;
575 }
576 }).map([](auto&& result) {
577 return IORAP_FORWARD_LAMBDA(result).value();
578 });
579 }
580
581 template <typename T>
MapExpectedOrLogError(T && observable)582 auto MapExpectedOrLogError(T&& observable) {
583 return MapExpectedOrLog(std::forward<T>(observable), ::android::base::ERROR);
584 }
585
586 template <typename T>
MapOptionalOrDrop(T && observable)587 auto MapOptionalOrDrop(T&& observable) {
588 return observable.filter([](const auto& result) {
589 return result.has_value();
590 }).map([](auto&& result) {
591 return IORAP_FORWARD_LAMBDA(result).value();
592 });
593 // TODO: static_assert this isn't used with an unexpected.
594 }
595
596 template <typename T, typename F>
VisitValueOrLogError(T && expected,F && visit_func,const char * error_prefix="")597 auto VisitValueOrLogError(T&& expected, F&& visit_func, const char* error_prefix = "") {
598 if (!expected) {
599 LOG(ERROR) << error_prefix << " " << expected.error();
600 } else {
601 visit_func(std::forward<T>(expected).value());
602 }
603 // TODO: Could be good to make this more monadic by returning an optional.
604 }
605
606 template <typename TSimple, typename T, typename F>
TreeTraversalPreOrderObservableImpl(rx::subscriber<TSimple> dest,T && node,F && fn)607 void TreeTraversalPreOrderObservableImpl(rx::subscriber<TSimple> dest, T&& node, F&& fn) {
608 LOG(VERBOSE) << "TreeTraversalPreOrderObservableImpl (begin) " << __PRETTY_FUNCTION__;
609
610 if (!dest.is_subscribed()) {
611 LOG(VERBOSE) << "TreeTraversalPreOrderObservableImpl (unsubscribed)";
612 return;
613 } else {
614 LOG(VERBOSE) << "TreeTraversalPreOrderObservableImpl (on_next node)";
615
616 // Copy the node here. This is less bad than it seems since we haven't yet
617 // calculated its children (except in the root), so its just doing a shallow memcpy (sizeof(T)).
618 //
619 // This assumes the children are calculated lazily, otherwise we'd need to have a separate
620 // NodeBody class which only holds the non-children elements.
621
622 TSimple copy = std::forward<T>(node);
623 dest.on_next(std::move(copy));
624
625 if (!node.has_value()) {
626 return;
627 }
628
629 // Whenever we call 'on_next' also check if we end up unsubscribing.
630 // This avoids the expensive call into the children.
631 if (!dest.is_subscribed()) {
632 LOG(VERBOSE) << "TreeTraversalPreOrderObservableImpl (post-self unsubscribe)";
633 return;
634 }
635
636 // Eagerly get the childrem, moving them instead of copying them.
637 auto&& children = fn(std::forward<T>(node));
638 for (auto&& child : children) {
639 TreeTraversalPreOrderObservableImpl(dest, IORAP_FORWARD_LAMBDA(child), fn);
640 // TODO: double check this is doing the std::move properly for rvalues.
641
642 if (!dest.is_subscribed()) {
643 LOG(VERBOSE) << "TreeTraversalPreOrderObservableImpl (unsubscribed in children)";
644 break;
645 }
646 };
647 }
648 }
649
650 // Creates an observable over all the nodes in the tree rooted at node.
651 // fn is a function that returns the children of that node.
652 //
653 // The items are emitted left-to-right pre-order, and stop early if the
654 // observable is unsubscribed from.
655 //
656 // Implementation requirement:
657 // typeof(node) -> expected<V, E> or optional<V> or similar.
658 // fn(node) -> iterable<typeof(node)>
659 //
660 // preorder(self):
661 // visit(self)
662 // for child in fn(self):
663 // preorder(child)
664 template <typename T, typename F>
TreeTraversalPreOrderObservable(T && node,F && fn)665 auto/*observable<T>*/ TreeTraversalPreOrderObservable(T&& node, F&& fn) {
666 LOG(VERBOSE) << "TreeTraversalPreOrderObservable: " << __PRETTY_FUNCTION__;
667
668 using T_simple = std::decay_t<T>;
669 return rx::observable<>::create<T_simple>(
670 // Copy node to avoid lifetime issues.
671 [node=node,fn=std::forward<F>(fn)](rx::subscriber<T_simple> dest) {
672 LOG(VERBOSE) << "TreeTraversalPreOrderObservable (lambda)";
673 TreeTraversalPreOrderObservableImpl<T_simple>(dest,
674 std::move(node),
675 std::move(fn));
676 dest.on_completed();
677 }
678 );
679 }
680
681 DirectoryEntry::ObservableT
GetSubTreePreOrderEntries(borrowed<SystemCall * > system_call) const682 DirectoryEntry::GetSubTreePreOrderEntries(borrowed<SystemCall*> system_call) const {
683 return TreeTraversalPreOrderObservable(
684 DirectoryEntryResult{*this},
685 [system_call=system_call](auto/*DirectoryEntryResult*/&& result)
686 -> std::vector<DirectoryEntryResult> {
687 if (!result) {
688 LOG(VERBOSE) << "GetSubTreePreOrderEntries (no value return)";
689 // Cannot have children when it was an error.
690 return {};
691 }
692 return
693 IORAP_FORWARD_LAMBDA(result)
694 .value()
695 .GetChildrenEntries(system_call);
696 });
697 }
698
699 struct StatError {
700 int err_no;
701 std::string path_name;
702 };
703
operator <<(std::ostream & os,const StatError & e)704 std::ostream& operator<<(std::ostream& os, const StatError& e) {
705 os << "StatError{" << e.err_no << "," << e.path_name
706 << ": " << strerror(e.err_no) << "}";
707 return os;
708 }
709
710 template <typename U = void> // suppress unused warning.
Stat(const std::string & path_name,borrowed<SystemCall * > system_call)711 static iorap::expected<struct stat, StatError> Stat(const std::string& path_name,
712 borrowed<SystemCall*> system_call) {
713 struct stat statbuf{};
714
715 // Call stat(2) in live code. Overridden in test code.
716 if (system_call->stat(path_name.c_str(), /*out*/&statbuf) == 0) {
717 return statbuf;
718 } else {
719 return iorap::unexpected(StatError{errno, path_name});
720 }
721 }
722
723 using StatResult = iorap::expected<struct stat, StatError>;
724
725 // An inode's corresponding filename on the system.
726 struct SearchMatch {
727 Inode inode;
728 // Relative path joined with a root directory.
729 //
730 // Use absolute path root dirs to get back absolute path filenames.
731 // If relative, this is relative to the current working directory.
732 std::string filename;
733 };
734
operator <<(std::ostream & os,const SearchMatch & s)735 std::ostream& operator<<(std::ostream& os, const SearchMatch& s) {
736 os << "SearchMatch{" << s.inode << ", " << s.filename << "}";
737 return os;
738 }
739
740 struct SearchState {
741 // Emit 'match' Inodes corresponding to the ones here.
742 InodeSet inode_set;
743
744 // An inode matching one of the ones in inode_set was discovered in the most-recently
745 // emitted SearchState.
746 //
747 // The InodeSet removes any matching 'Inode'.
748 std::optional<SearchMatch> match;
749
750 SearchState() = default;
751 SearchState(SearchState&& other) = default;
752
753 // Do not copy this because copying InodeSet is excruciatingly slow.
754 SearchState(const SearchState& other) = delete;
755
756 // TODO: make sure this doesn't copy [inodes], as that would be unnecessarily expensive.
757 };
758
operator <<(std::ostream & os,const SearchState & s)759 std::ostream& operator<<(std::ostream& os, const SearchState& s) {
760 os << "SearchState{match:";
761 // Print the 'match' first. The InodeSet could be very large so it could be truncated in logs.
762 if (s.match) {
763 os << s.match.value();
764 } else {
765 os << "(none)";
766 }
767 os << ", inode_set:" << s.inode_set << "}";
768 return os;
769 }
770
771 // TODO: write operator<< etc.
772
773 // Return a lazy observable that will search for all filenames whose inodes
774 // match the inodes in inode_search_list.
775 //
776 // Every unmatched inode will be emitted as an unexpected at the end of the stream.
SearchDirectoriesForMatchingInodes(std::vector<std::string> root_dirs,std::vector<Inode> inode_search_list,borrowed<SystemCall * > system_call)777 auto/*[observable<InodeResult>, connectable]*/ SearchDirectoriesForMatchingInodes(
778 std::vector<std::string> root_dirs,
779 std::vector<Inode> inode_search_list,
780 borrowed<SystemCall*> system_call) {
781
782 // Create a (lazy) observable that will emit each DirectoryEntry that is a recursive subchild
783 // of root_dirs. Emission will be stopped when its unsubscribed from.
784 //
785 // This is done by calling readdir(3) lazily.
786 auto/*obs<DirectoryEntry>*/ find_all_subdir_entries = ([&]() {
787 DirectoryEntry sentinel = DirectoryEntry::CreateSentinel(std::move(root_dirs));
788 auto/*obs<DirectoryEntryResult*/ results = sentinel.GetSubTreePreOrderEntries(system_call);
789
790 // Drop any errors by logging them to logcat. "Unwrap" the expected into the underlying data.
791 auto/*obs<DirectoryEntry*>*/ expected_drop_errors = MapExpectedOrLogError(std::move(results));
792 return expected_drop_errors;
793 })();
794
795 // DirectoryEntry is missing the dev_t portion, so we may need to call scan(2) again
796 // to confirm the dev_t. We skip calling scan(2) when the ino_t does not match.
797 // InodeSet lets us optimally avoid calling scan(2).
798 std::shared_ptr<SearchState> initial = std::make_shared<SearchState>();
799 initial->inode_set = InodeSet::OfList(inode_search_list);
800
801 auto/*[observable<SearchState>,Connectable]*/ search_state_results = find_all_subdir_entries.scan(
802 std::move(initial),
803 [system_call=system_call](std::shared_ptr<SearchState> search_state,
804 const DirectoryEntry& dir_entry) {
805 LOG(VERBOSE) << "SearchDirectoriesForMatchingInodes#Scan "
806 << dir_entry << ", state: " << *search_state;
807
808 search_state->match = std::nullopt;
809
810 InodeSet* inodes = &search_state->inode_set;
811
812 // Find all the possible inodes across different devices.
813 InodeSet::ValueRange inode_list = inodes->FindInodeList(dir_entry.d_ino);
814
815 // This directory doesn't correspond to any inodes we are searching for.
816 if (!inode_list) {
817 return search_state;
818 }
819
820 StatResult maybe_stat = Stat(dir_entry.filename, system_call);
821 VisitValueOrLogError(maybe_stat, [&](const struct stat& stat_buf) {
822 // Try to match the specific inode. Usually this will not result in a match (nullopt).
823 std::optional<Inode> inode = inodes->FindAndRemoveInodeInList(inode_list, stat_buf);
824
825 if (inode) {
826 search_state->match = SearchMatch{inode.value(), dir_entry.filename};
827 }
828 });
829
830 return search_state;
831 }
832 // Avoid exhausting a potentially 'infinite' stream of files by terminating as soon
833 // as we find every single inode we care about.
834 ).take_while([](std::shared_ptr<SearchState> state) {
835 // Also emit the last item that caused the search set to go empty.
836 bool cond = !state->inode_set.Empty() || state->match;
837
838 if (WOULD_LOG(VERBOSE)) {
839 static int kCounter = 0;
840 LOG(VERBOSE) << "SearchDirectoriesForMatchingInodes#take_while (" << kCounter++ <<
841 ",is_empty:"
842 << state->inode_set.Empty() << ", match:" << state->match.has_value();
843 }
844 // Minor O(1) implementation inefficiency:
845 // (Too minor to fix but it can be strange if looking at the logs or readdir traces).
846 //
847 // Note, because we return 'true' after the search set went empty,
848 // the overall stream graph still pulls from search_state_results exactly once more:
849 //
850 // This means that for cond to go to false, we would've read one extra item and then discarded
851 // it. If that item was the first child of a directory, that means we essentially did
852 // one redundant pass of doing a readdir.
853 // In other words if the search set goes to empty while the current item is a directory,
854 //
855 // it will definitely readdir on it at least once as we try to get the first child in
856 // OnTreeTraversal.
857 //
858 // This could be fixed with a 'take_until(Predicate)' operator which doesn't discard
859 // the last item when the condition becomes false. However rxcpp seems to lack this operator,
860 // whereas RxJava has it.
861
862 if (!cond) {
863 LOG(VERBOSE) << "SearchDirectoriesForMatchingInodes#take_while "
864 << "should now terminate for " << *state;
865 }
866
867 return cond;
868 }).publish();
869 // The publish here is mandatory. The stream is consumed twice (once by matched and once by
870 // unmatched streams). Without the publish, once all items from 'matched' were consumed it would
871 // start another instance of 'search_state_results' (i.e. it appears as if the search
872 // is restarted).
873 //
874 // By using 'publish', the search_state_results is effectively shared by both downstream nodes.
875 // Note that this also requires the subscriber to additionally call #connect on the above stream,
876 // otherwise no work will happen.
877
878 // Lifetime notes:
879 //
880 // The the 'SearchState' is emitted into both below streams simultaneously.
881 // The 'unmatched_inode_values' only touches the inode_set.
882 // The 'matched_inode_values' only touches the match.
883 // Either stream can 'std::move' from those fields because they don't move each other's fields.
884 auto/*observable<InodeResult>*/ matched_inode_values = search_state_results
885 .filter([](std::shared_ptr<SearchState> search_state) {
886 return search_state->match.has_value(); })
887 .map([](std::shared_ptr<SearchState> search_state) {
888 return std::move(search_state->match.value()); })
889 // observable<SearchMatch>
890 .map([](SearchMatch search_match) {
891 return InodeResult::makeSuccess(search_match.inode, std::move(search_match.filename));
892 }); // observable<InodeResult>
893
894 auto/*observable<?>*/ unmatched_inode_values = search_state_results
895 // The 'last' SearchState is the one that contains all the remaining inodes.
896 .take_last(1) // observable<SearchState>
897 .flat_map([](std::shared_ptr<SearchState> search_state) {
898 LOG(VERBOSE) << "SearchDirectoriesForMatchingInodes#unmatched -- flat_map";
899 // Aside: Could've used a move here if the inodes weren't so lightweight already.
900 return search_state->inode_set.IterateValues(); })
901 // observable<Inode>
902 .map([](const Inode& inode) {
903 LOG(VERBOSE) << "SearchDirectoriesForMatchingInodes#unmatched -- map";
904 return InodeResult::makeFailure(inode, InodeResult::kCouldNotFindFilename);
905 });
906 // observable<InodeResult>
907
908 // The matched and unmatched InodeResults are emitted together.
909 // Use merge, not concat, because we need both observables to be subscribed to simultaneously.
910
911 auto/*observable<InodeResult*/ all_inode_results =
912 matched_inode_values.merge(unmatched_inode_values);
913
914 // Now that all mid-stream observables have been connected, turn the Connectable observable
915 // into a regular observable.
916
917 // The caller has to call 'connect' on the search_state_results after subscribing
918 // and before any work can actually start.
919 return std::make_pair(all_inode_results, search_state_results);
920 }
921
FindFilenamesFromInodes(std::vector<std::string> root_directories,std::vector<Inode> inode_list,SearchMode mode) const922 rxcpp::observable<InodeResult> SearchDirectories::FindFilenamesFromInodes(
923 std::vector<std::string> root_directories,
924 std::vector<Inode> inode_list,
925 SearchMode mode) const {
926 DCHECK(mode == SearchMode::kInProcessDirect) << " other modes not implemented yet";
927
928 auto/*observable[2]*/ [inode_results, connectable] = SearchDirectoriesForMatchingInodes(
929 std::move(root_directories),
930 std::move(inode_list),
931 system_call_);
932
933 return inode_results.ref_count(connectable);
934 }
935
936 // I think we could avoid this with auto_connect, which rxcpp doesn't seem to have.
937 //
938 // I can't figure out any other way to avoid this, or at least to allow connecting
939 // on the primary observable (instead of a secondary side-observable).
940 //
941 // If using the obvious publish+ref_count then the unmerged stream gets no items emitted into it.
942 // If tried to ref_count later, everything turns into no-op.
943 // If trying to call connect too early, the subscribe is missed.
944 template <typename T>
945 struct RxAnyConnectableFromObservable : public SearchDirectories::RxAnyConnectable {
connectiorap::inode2filename::RxAnyConnectableFromObservable946 virtual void connect() override {
947 observable.connect();
948 }
949
~RxAnyConnectableFromObservableiorap::inode2filename::RxAnyConnectableFromObservable950 virtual ~RxAnyConnectableFromObservable() {}
951
RxAnyConnectableFromObservableiorap::inode2filename::RxAnyConnectableFromObservable952 RxAnyConnectableFromObservable(rxcpp::connectable_observable<T> observable)
953 : observable(observable) {
954 }
955
956 rxcpp::connectable_observable<T> observable;
957 };
958
959 // Type deduction helper.
960 template <typename T>
961 std::unique_ptr<SearchDirectories::RxAnyConnectable>
MakeRxAnyConnectableFromObservable(rxcpp::connectable_observable<T> observable)962 MakeRxAnyConnectableFromObservable(rxcpp::connectable_observable<T> observable) {
963 SearchDirectories::RxAnyConnectable* ptr = new RxAnyConnectableFromObservable<T>{observable};
964 return std::unique_ptr<SearchDirectories::RxAnyConnectable>{ptr};
965 }
966
967 std::pair<rxcpp::observable<InodeResult>, std::unique_ptr<SearchDirectories::RxAnyConnectable>>
FindFilenamesFromInodesPair(std::vector<std::string> root_directories,std::vector<Inode> inode_list,SearchMode mode) const968 SearchDirectories::FindFilenamesFromInodesPair(
969 std::vector<std::string> root_directories,
970 std::vector<Inode> inode_list,
971 SearchMode mode) const {
972 DCHECK(mode == SearchMode::kInProcessDirect) << " other modes not implemented yet";
973
974 auto/*observable[2]*/ [inode_results, connectable] = SearchDirectoriesForMatchingInodes(
975 std::move(root_directories),
976 std::move(inode_list),
977 system_call_);
978
979 std::unique_ptr<SearchDirectories::RxAnyConnectable> connectable_ptr =
980 MakeRxAnyConnectableFromObservable(connectable.as_dynamic());
981
982 return {inode_results, std::move(connectable_ptr)};
983 }
984
985 rxcpp::observable<InodeResult>
FindFilenamesFromInodes(std::vector<std::string> root_directories,rxcpp::observable<Inode> inodes,SearchMode mode) const986 SearchDirectories::FindFilenamesFromInodes(std::vector<std::string> root_directories,
987 rxcpp::observable<Inode> inodes,
988 SearchMode mode) const {
989
990 // It's inefficient to search for inodes until the full search list is available,
991 // so first reduce to a vector so we can access all the inodes simultaneously.
992 return inodes.reduce(std::vector<Inode>{},
993 [](std::vector<Inode> vec, Inode inode) {
994 vec.push_back(inode);
995 return vec;
996 },
997 [](std::vector<Inode> v){
998 return v; // TODO: use an identity function
999 })
1000 .flat_map([root_directories=std::move(root_directories), mode, self=*this]
1001 (std::vector<Inode> vec) {
1002 // All borrowed values (e.g. SystemCall) must outlive the observable.
1003 return self.FindFilenamesFromInodes(root_directories, vec, mode);
1004 }
1005 );
1006 }
1007
EmitAllInodesFromDirectories(std::vector<std::string> root_dirs,borrowed<SystemCall * > system_call)1008 auto/*[observable<InodeResult>]*/ EmitAllInodesFromDirectories(
1009 std::vector<std::string> root_dirs,
1010 borrowed<SystemCall*> system_call) {
1011
1012 // Create a (lazy) observable that will emit each DirectoryEntry that is a recursive subchild
1013 // of root_dirs. Emission will be stopped when its unsubscribed from.
1014 //
1015 // This is done by calling readdir(3) lazily.
1016 auto/*obs<DirectoryEntry>*/ find_all_subdir_entries = ([&]() {
1017 DirectoryEntry sentinel = DirectoryEntry::CreateSentinel(std::move(root_dirs));
1018 auto/*obs<DirectoryEntryResult*/ results = sentinel.GetSubTreePreOrderEntries(system_call);
1019
1020 // Drop any errors by logging them to logcat. "Unwrap" the expected into the underlying data.
1021 auto/*obs<DirectoryEntry*>*/ expected_drop_errors = MapExpectedOrLogError(std::move(results));
1022 return expected_drop_errors;
1023 })();
1024
1025 // Fill in -1 for the dev_t since readdir only returns the ino_t.
1026 // The caller of this function is expected to call stat(2) later on to fill in
1027 // the full data.
1028 return find_all_subdir_entries.map([](DirectoryEntry e) {
1029 return InodeResult::makeSuccess(Inode::FromDeviceAndInode(-1, e.d_ino), std::move(e.filename));
1030 });
1031 }
1032
1033 rxcpp::observable<InodeResult>
ListAllFilenames(std::vector<std::string> root_directories) const1034 SearchDirectories::ListAllFilenames(std::vector<std::string> root_directories) const {
1035 // TODO: refactor implementation into DiskScanDataSource.
1036 return EmitAllInodesFromDirectories(std::move(root_directories),
1037 /*borrowed*/system_call_);
1038 }
1039
1040 struct FilterState {
1041 // Emit 'match' Inodes corresponding to the ones here.
1042 InodeSet inode_set;
1043
1044 // An inode matching one of the ones in inode_set was discovered in the most-recently
1045 // emitted SearchState.
1046 //
1047 // The InodeSet removes any matching 'Inode'.
1048 std::optional<InodeResult> match;
1049
1050 FilterState() = default;
1051 FilterState(FilterState&& other) = default;
1052
1053 // Copying the InodeSet is expensive, so forbid any copies.
1054 FilterState(const FilterState& other) = delete;
1055 };
1056
operator <<(std::ostream & os,const FilterState & s)1057 std::ostream& operator<<(std::ostream& os, const FilterState& s) {
1058 os << "FilterState{match:";
1059 // Print the 'match' first. The InodeSet could be very large so it could be truncated in logs.
1060 if (s.match) {
1061 os << s.match.value();
1062 } else {
1063 os << "(none)";
1064 }
1065 os << ", inode_set:" << s.inode_set << "}";
1066 return os;
1067 }
1068
FilterFilenamesForSpecificInodes(rxcpp::observable<InodeResult> all_inodes,std::vector<Inode> inode_list,bool missing_device_number,bool needs_verification) const1069 rxcpp::observable<InodeResult> SearchDirectories::FilterFilenamesForSpecificInodes(
1070 rxcpp::observable<InodeResult> all_inodes,
1071 std::vector<Inode> inode_list,
1072 bool missing_device_number, // missing dev_t portion?
1073 bool needs_verification) const {
1074 // TODO: refactor into InodeResolver
1075
1076 borrowed<SystemCall*> system_call = system_call_;
1077
1078 // InodeResult may be missing the dev_t portion, so we may need to call scan(2) again
1079 // to confirm the dev_t. We skip calling scan(2) when the ino_t does not match.
1080 // InodeSet lets us optimally avoid calling scan(2).
1081 std::shared_ptr<FilterState> initial = std::make_shared<FilterState>();
1082 initial->inode_set = InodeSet::OfList(inode_list);
1083
1084 auto/*[observable<FilterState>,Connectable]*/ filter_state_results = all_inodes.scan(
1085 std::move(initial),
1086 [system_call, missing_device_number]
1087 (std::shared_ptr<FilterState> filter_state, InodeResult inode_result) {
1088 LOG(VERBOSE) << "FilterFilenamesForSpecificInodes#Scan "
1089 << inode_result << ", state: " << *filter_state;
1090
1091 filter_state->match = std::nullopt;
1092
1093 InodeSet* inodes = &filter_state->inode_set;
1094
1095 // Find all the possible (dev_t, ino_t) potential needles given an ino_t in the haystack.
1096 InodeSet::ValueRange inode_list = inodes->FindInodeList(inode_result.inode.inode);
1097
1098 // This inode result doesn't correspond to any inodes we are searching for.
1099 if (!inode_list) {
1100 // Drop the result and keep going.
1101 return filter_state;
1102 }
1103
1104 if (missing_device_number) {
1105 // Need to fill in dev_t by calling stat(2).
1106 VisitValueOrLogError(std::move(inode_result.data), [&](std::string filename) {
1107 StatResult maybe_stat = Stat(filename, system_call);
1108 VisitValueOrLogError(maybe_stat, [&](const struct stat& stat_buf) {
1109 // Try to match the specific inode. Usually this will not result in a match (nullopt).
1110 std::optional<Inode> inode = inodes->FindAndRemoveInodeInList(inode_list, stat_buf);
1111
1112 if (inode) {
1113 filter_state->match = InodeResult::makeSuccess(inode.value(), std::move(filename));
1114 }
1115 });
1116
1117 // Note: stat errors are logged here to make the error closer to the occurrence.
1118 // In theory, we could just return it as an InodeResult but then the error would
1119 // just get logged elsewhere.
1120 });
1121 } else {
1122 // Trust the dev_t in InodeResult is valid. Later passes can verify it.
1123
1124 // Try to match the specific inode. Usually this will not result in a match (nullopt).
1125 std::optional<Inode> inode =
1126 inodes->FindAndRemoveInodeInList(inode_list, inode_result.inode);
1127
1128 if (inode) {
1129 filter_state->match = inode_result;
1130 }
1131
1132 // Note that the InodeResult doesn't necessarily need to have a valid filename here.
1133 // If the earlier pass returned an error-ed result, this will forward the error code.
1134 }
1135
1136 return filter_state;
1137 }
1138 // Avoid exhausting a potentially 'infinite' stream of files by terminating as soon
1139 // as we find every single inode we care about.
1140 ).take_while([](std::shared_ptr<FilterState> state) {
1141 // Also emit the last item that caused the search set to go empty.
1142 bool cond = !state->inode_set.Empty() || state->match;
1143
1144 if (WOULD_LOG(VERBOSE)) {
1145 static int kCounter = 0;
1146 LOG(VERBOSE) << "FilterFilenamesForSpecificInodes#take_while (" << kCounter++ <<
1147 ",is_empty:"
1148 << state->inode_set.Empty() << ", match:" << state->match.has_value();
1149 }
1150 // Minor O(1) implementation inefficiency:
1151 // (Too minor to fix but it can be strange if looking at the logs or readdir traces).
1152 //
1153 // Note, because we return 'true' after the search set went empty,
1154 // the overall stream graph still pulls from filter_state_results exactly once more:
1155 //
1156 // This means that for cond to go to false, we would've read one extra item and then discarded
1157 // it. If that item was the first child of a directory, that means we essentially did
1158 // one redundant pass of doing a readdir.
1159 // In other words if the search set goes to empty while the current item is a directory,
1160 //
1161 // it will definitely readdir on it at least once as we try to get the first child in
1162 // OnTreeTraversal.
1163 //
1164 // This could be fixed with a 'take_until(Predicate)' operator which doesn't discard
1165 // the last item when the condition becomes false. However rxcpp seems to lack this operator,
1166 // whereas RxJava has it.
1167
1168 if (!cond) {
1169 LOG(VERBOSE) << "FilterFilenamesForSpecificInodes#take_while "
1170 << "should now terminate for " << *state;
1171 }
1172
1173 return cond;
1174 }).publish();
1175 // The publish here is mandatory. The stream is consumed twice (once by matched and once by
1176 // unmatched streams). Without the publish, once all items from 'matched' were consumed it would
1177 // start another instance of 'filter_state_results' (i.e. it appears as if the search
1178 // is restarted).
1179 //
1180 // By using 'publish', the filter_state_results is effectively shared by both downstream nodes.
1181 // Note that this also requires the subscriber to additionally call #connect on the above stream,
1182 // otherwise no work will happen.
1183
1184 // Lifetime notes:
1185 //
1186 // The the 'FilterState' is emitted into both below streams simultaneously.
1187 // The 'unmatched_inode_values' only touches the inode_set.
1188 // The 'matched_inode_values' only touches the match.
1189 // Either stream can 'std::move' from those fields because they don't move each other's fields.
1190 auto/*observable<InodeResult>*/ matched_inode_values = filter_state_results
1191 .filter([](std::shared_ptr<FilterState> filter_state) {
1192 return filter_state->match.has_value(); })
1193 .map([](std::shared_ptr<FilterState> filter_state) {
1194 return std::move(filter_state->match.value()); });
1195 // observable<InodeResult>
1196
1197 auto/*observable<?>*/ unmatched_inode_values = filter_state_results
1198 // The 'last' FilterState is the one that contains all the remaining inodes.
1199 .take_last(1) // observable<FilterState>
1200 .flat_map([](std::shared_ptr<FilterState> filter_state) {
1201 LOG(VERBOSE) << "FilterFilenamesForSpecificInodes#unmatched -- flat_map";
1202 // Aside: Could've used a move here if the inodes weren't so lightweight already.
1203 return filter_state->inode_set.IterateValues(); })
1204 // observable<Inode>
1205 .map([](const Inode& inode) {
1206 LOG(VERBOSE) << "FilterFilenamesForSpecificInodes#unmatched -- map";
1207 return InodeResult::makeFailure(inode, InodeResult::kCouldNotFindFilename);
1208 });
1209 // observable<InodeResult>
1210
1211 // The matched and unmatched InodeResults are emitted together.
1212 // Use merge, not concat, because we need both observables to be subscribed to simultaneously.
1213
1214 auto/*observable<InodeResult*/ all_inode_results =
1215 matched_inode_values.merge(unmatched_inode_values);
1216
1217 // Verify the inode results by calling stat(2).
1218 // Unverified results are turned into an error.
1219
1220 auto/*observable<InodeResult>*/ verified_inode_results =
1221 all_inode_results.map([needs_verification, system_call](InodeResult result) {
1222 if (!needs_verification || !result) {
1223 // Skip verification if requested, or if the result didn't have a filename.
1224 return result;
1225 }
1226
1227 const std::string& filename = result.data.value();
1228 StatResult maybe_stat = Stat(filename, system_call);
1229
1230 if (maybe_stat)
1231 {
1232 if (result.inode == Inode::FromDeviceAndInode(maybe_stat->st_dev, maybe_stat->st_ino)) {
1233 return result;
1234 } else {
1235 LOG(WARNING)
1236 << "FilterFilenamesForSpecificInodes#verified fail out-of-date inode: " << result;
1237 return InodeResult::makeFailure(result.inode, InodeResult::kVerificationFailed);
1238 }
1239 } else {
1240 // Forward stat errors directly, as it could be a missing security rule,
1241 // but turn -ENOENT into casual verification errors.
1242 const StatError& err = maybe_stat.error();
1243 int error_code = err.err_no;
1244 if (err.err_no == ENOENT) {
1245 error_code = InodeResult::kVerificationFailed;
1246
1247 // TODO: Don't LOG(WARNING) here because this could be very common if we
1248 // access the data much much later after the initial results were read in.
1249 LOG(WARNING)
1250 << "FilterFilenamesForSpecificInodes#verified fail out-of-date filename: " << result;
1251 } else {
1252 LOG(ERROR)
1253 << "FilterFilenamesForSpecificInodes#verified stat(2) failure: " << err;
1254 }
1255
1256 return InodeResult::makeFailure(result.inode, error_code);
1257 }
1258 });
1259
1260 // Now that all mid-stream observables have been connected, turn the Connectable observable
1261 // into a regular observable.
1262 return verified_inode_results.ref_count(filter_state_results);
1263 }
1264
EmitAllFilenames(rxcpp::observable<InodeResult> all_inodes,bool missing_device_number,bool needs_verification) const1265 rxcpp::observable<InodeResult> SearchDirectories::EmitAllFilenames(
1266 rxcpp::observable<InodeResult> all_inodes,
1267 bool missing_device_number, // missing dev_t portion?
1268 bool needs_verification) const {
1269 // TODO: refactor into InodeResolver
1270
1271 borrowed<SystemCall*> system_call = system_call_;
1272
1273 // InodeResult may be missing the dev_t portion, so we may need to call scan(2) again
1274 // to confirm the dev_t.
1275
1276 using EmitAllState = std::optional<InodeResult>;
1277
1278 auto/*[observable<FilterState>,Connectable]*/ all_inode_results = all_inodes.map(
1279 [system_call, missing_device_number](InodeResult inode_result) {
1280 LOG(VERBOSE) << "EmitAllFilenames#map "
1281 << inode_result;
1282
1283 // Could fail if the device number is missing _and_ stat(2) fails.
1284 EmitAllState match = std::nullopt;
1285
1286 if (missing_device_number) {
1287 // Need to fill in dev_t by calling stat(2).
1288 VisitValueOrLogError(std::move(inode_result.data), [&](std::string filename) {
1289 StatResult maybe_stat = Stat(filename, system_call);
1290 VisitValueOrLogError(maybe_stat, [&](const struct stat& stat_buf) {
1291 Inode inode = Inode::FromDeviceAndInode(stat_buf.st_dev, stat_buf.st_ino);
1292 match = InodeResult::makeSuccess(inode, std::move(filename));
1293 });
1294
1295 // Note: stat errors are logged here to make the error closer to the occurrence.
1296 // In theory, we could just return it as an InodeResult but then the error would
1297 // just get logged elsewhere.
1298 });
1299 } else {
1300 // Trust the dev_t in InodeResult is valid. Later passes can verify it.
1301 match = std::move(inode_result);
1302
1303 // Note that the InodeResult doesn't necessarily need to have a valid filename here.
1304 // If the earlier pass returned an error-ed result, this will forward the error code.
1305 }
1306
1307 return match; // implicit move.
1308 }
1309 );
1310
1311 auto/*observable<InodeResult>*/ matched_inode_values = all_inode_results
1312 .filter([](const EmitAllState& filter_state) { return filter_state.has_value(); })
1313 .map([](EmitAllState& filter_state) { return std::move(filter_state.value()); });
1314 // observable<InodeResult>
1315
1316 // Verify the inode results by calling stat(2).
1317 // Unverified results are turned into an error.
1318
1319 auto/*observable<InodeResult>*/ verified_inode_results =
1320 matched_inode_values.map([needs_verification, system_call](InodeResult result) {
1321 if (!needs_verification || !result) {
1322 // Skip verification if requested, or if the result didn't have a filename.
1323 return result;
1324 }
1325
1326 const std::string& filename = result.data.value();
1327 StatResult maybe_stat = Stat(filename, system_call);
1328
1329 if (maybe_stat)
1330 {
1331 if (result.inode == Inode::FromDeviceAndInode(maybe_stat->st_dev, maybe_stat->st_ino)) {
1332 return result;
1333 } else {
1334 LOG(WARNING)
1335 << "EmitAllFilenames#verified fail out-of-date inode: " << result;
1336 return InodeResult::makeFailure(result.inode, InodeResult::kVerificationFailed);
1337 }
1338 } else {
1339 // Forward stat errors directly, as it could be a missing security rule,
1340 // but turn -ENOENT into casual verification errors.
1341 const StatError& err = maybe_stat.error();
1342 int error_code = err.err_no;
1343 if (err.err_no == ENOENT) {
1344 error_code = InodeResult::kVerificationFailed;
1345
1346 // TODO: Don't LOG(WARNING) here because this could be very common if we
1347 // access the data much much later after the initial results were read in.
1348 LOG(WARNING)
1349 << "EmitAllFilenames#verified fail out-of-date filename: " << result;
1350 } else {
1351 LOG(ERROR)
1352 << "EmitAllFilenames#verified stat(2) failure: " << err;
1353 }
1354
1355 return InodeResult::makeFailure(result.inode, error_code);
1356 }
1357 });
1358
1359 // TODO: refactor this function some more with the Find(inode_set) equivalent.
1360
1361 // Now that all mid-stream observables have been connected, turn the Connectable observable
1362 // into a regular observable.
1363 return verified_inode_results;
1364 }
1365
1366 } // namespace iorap::inode2filename
1367