// Copyright (C) 2019 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "compiler/compiler.h" #include "common/debug.h" #include "common/expected.h" #include "perfetto/rx_producer.h" // TODO: refactor BinaryWireProtobuf to separate header. #include "inode2filename/inode.h" #include "inode2filename/search_directories.h" #include "serialize/protobuf_io.h" #include #include #include #include // ::perfetto::protos::Trace #include // ::perfetto::protos::TracePacket #include "rxcpp/rx.hpp" #include #include #include #include #include #include #include #include #include #include #include namespace iorap::compiler { using Inode = iorap::inode2filename::Inode; using InodeResult = iorap::inode2filename::InodeResult; using SearchDirectories = iorap::inode2filename::SearchDirectories; template using ProtobufPtr = iorap::perfetto::ProtobufPtr; struct PerfettoTraceProtoInfo { /* The perfetto trace proto. */ ::iorap::perfetto::PerfettoTraceProto proto; /* * The timestamp limit of the trace. * It's used to truncate the trace file. */ uint64_t timestamp_limit_ns; /* * The pid of the app. * If positive, it's used to filter out other page cache events. */ int32_t pid; }; struct PerfettoTracePtrInfo { /* Deserialized protobuf data containing the perfetto trace. */ ProtobufPtr<::perfetto::protos::Trace> trace_ptr; /* * The timestamp limit of the trace. * It's used to truncate the trace file. */ uint64_t timestamp_limit_ns; /* * The pid of the app. * If positive, it's used to filter out other page cache events. */ int32_t pid; }; // Attempt to read protobufs from the filenames. // Emits one (or none) protobuf for each filename, in the same order as the filenames. // On any errors, the items are dropped (errors are written to the error LOG). // // All work is done on the same Coordinator as the Subscriber. template auto/*observable*/ ReadProtosFromFileNames( rxcpp::observable file_infos) { using BinaryWireProtoT = ::iorap::perfetto::PerfettoTraceProto; return file_infos .map([](const CompilationInput& file_info) -> std::optional { LOG(VERBOSE) << "compiler::ReadProtosFromFileNames " << file_info.filename << " TimeStampLimit "<< file_info.timestamp_limit_ns << " Pid " << file_info.pid << " [begin]"; std::optional maybe_proto = BinaryWireProtoT::ReadFullyFromFile(file_info.filename); if (!maybe_proto) { LOG(ERROR) << "Failed to read file: " << file_info.filename; return std::nullopt; } return {{std::move(maybe_proto.value()), file_info.timestamp_limit_ns, file_info.pid}}; }) .filter([](const std::optional& proto_info) { return proto_info.has_value(); }) .map([](std::optional& proto_info) -> PerfettoTraceProtoInfo { return proto_info.value(); }) // TODO: refactor to something that flattens the optional, and logs in one operator. .map([](PerfettoTraceProtoInfo& proto_info) -> std::optional { std::optional> t = proto_info.proto.template MaybeUnserialize(); if (!t) { LOG(ERROR) << "Failed to parse protobuf: "; // TODO: filename. return std::nullopt; } return {{std::move(t.value()), proto_info.timestamp_limit_ns, proto_info.pid}}; }) .filter([](const std::optional& trace_info) { return trace_info.has_value(); }) .map([](std::optional& trace_info) -> PerfettoTracePtrInfo { LOG(VERBOSE) << "compiler::ReadProtosFromFileNames [success]"; return trace_info.value(); // TODO: protobufs have no move constructor. this might be inefficient? }); /* return filenames .map([](const std::string& filename) { LOG(VERBOSE) << "compiler::ReadProtosFromFileNames " << filename << " [begin]"; std::optional maybe_proto = BinaryWireProtoT::ReadFullyFromFile(filename); if (!maybe_proto) { LOG(ERROR) << "Failed to read file: " << filename; } std::unique_ptr ptr; if (maybe_proto) { ptr.reset(new BinaryWireProtoT{std::move(*maybe_proto)}); } return ptr; }) .filter([](const std::unique_ptr& proto) { return proto != nullptr; }) .map([](std::unique_ptr& proto) { std::optional t = proto->template MaybeUnserialize(); if (!t) { LOG(ERROR) << "Failed to parse protobuf: "; // TODO: filename. } return t; }) .filter([](const std::optional& proto) { return proto.has_value(); }) .map([](std::optional proto) -> ProtoT { LOG(VERBOSE) << "compiler::ReadProtosFromFileNames [success]"; return std::move(proto.value()); // TODO: protobufs have no move constructor. this might be inefficient? }); */ } auto/*observable*/ ReadPerfettoTraceProtos( std::vector file_infos) { auto filename_obs = rxcpp::observable<>::iterate(std::move(file_infos)); rxcpp::observable obs = ReadProtosFromFileNames<::perfetto::protos::Trace>(std::move(filename_obs)); return obs; } // A flattened data representation of an MmFileMap*FtraceEvent. // This representation is used for streaming processing. // // Note: Perfetto applies a 'union' over all possible fields on all possible devices // (and uses the max sizeof per-field). // // Since all protobuf fields are optional, fields not present on a particular device are always // null struct PageCacheFtraceEvent { /* * Ftrace buffer-specific */ uint32_t cpu; // e.g. 0-7 for the cpu core number. /* * Ftrace-event general data */ // Nanoseconds since an epoch. // Epoch is configurable by writing into trace_clock. // By default this timestamp is CPU local. uint64_t timestamp; // Kernel pid (do not confuse with userspace pid aka tgid) uint32_t pid; // Tagged by our code while parsing the ftraces: uint64_t timestamp_relative; // timestamp relative to first ftrace within a Trace protobuf. bool add_to_page_cache; // AddToPageCache=true, DeleteFromPageCache=false. /* * mm_filemap-specific data * * Fields are common: * - MmFilemapAddToPageCacheFtraceEvent * - MmFilemapDeleteFromPageCacheFtraceEvent */ uint64_t pfn; // page frame number (physical) - null on some devices, e.g. marlin uint64_t i_ino; // inode number (use in conjunction with s_dev) uint64_t index; // offset into file: this is a multiple of the page size (usually 4096). uint64_t s_dev; // (dev_t) device number uint64_t page; // struct page*. - null on some devices, e.g. blueline. Inode inode() const { return Inode::FromDeviceAndInode(static_cast(s_dev), static_cast(i_ino)); } }; std::ostream& operator<<(std::ostream& os, const PageCacheFtraceEvent& e) { os << "{"; os << "cpu:" << e.cpu << ","; os << "timestamp:" << e.timestamp << ","; os << "pid:" << e.pid << ","; os << "timestamp_relative:" << e.timestamp_relative << ","; os << "add_to_page_cache:" << e.add_to_page_cache << ","; os << "pfn:" << e.pfn << ","; os << "i_ino:" << e.i_ino << ","; os << "index:" << e.index << ","; os << "s_dev:" << e.s_dev << ","; os << "page:" << e.page; os << "}"; return os; } /* * Gets the start timestamp. * * It is the minimium timestamp. */ std::optional GetStartTimestamp(const ::perfetto::protos::Trace& trace) { std::optional timestamp_relative_start; // Traverse each timestamp to get the minimium one. for (const ::perfetto::protos::TracePacket& packet : trace.packet()) { if (packet.has_timestamp()) { timestamp_relative_start = timestamp_relative_start? std::min(*timestamp_relative_start, packet.timestamp()) : packet.timestamp(); } if (!packet.has_ftrace_events()) { continue; } const ::perfetto::protos::FtraceEventBundle& ftrace_event_bundle = packet.ftrace_events(); for (const ::perfetto::protos::FtraceEvent& event : ftrace_event_bundle.event()) { if (event.has_timestamp()) { timestamp_relative_start = timestamp_relative_start? std::min(*timestamp_relative_start, event.timestamp()) : event.timestamp(); } } } return timestamp_relative_start; } /* * sample blueline output: * * $ adb shell cat /d/tracing/events/filemap/mm_filemap_add_to_page_cache/format * * name: mm_filemap_add_to_page_cache * ID: 178 * format: * field:unsigned short common_type; offset:0; size:2; signed:0; * field:unsigned char common_flags; offset:2; size:1; signed:0; * field:unsigned char common_preempt_count; offset:3; size:1; signed:0; * field:int common_pid; offset:4; size:4; signed:1; * * field:unsigned long pfn; offset:8; size:8; signed:0; * field:unsigned long i_ino; offset:16; size:8; signed:0; * field:unsigned long index; offset:24; size:8; signed:0; * field:dev_t s_dev; offset:32; size:4; signed:0; * * print fmt: "dev %d:%d ino %lx page=%p pfn=%lu ofs=%lu", ((unsigned int) ((REC->s_dev) >> 20)), * ((unsigned int) ((REC->s_dev) & ((1U << 20) - 1))), REC->i_ino, * (((struct page *)(((0xffffffffffffffffUL) - ((1UL) << ((39) - 1)) + 1) - * ((1UL) << ((39) - 12 - 1 + 6))) - (memstart_addr >> 12)) + (REC->pfn)), * REC->pfn, REC->index << 12 */ auto /*observable*/ SelectPageCacheFtraceEvents( PerfettoTracePtrInfo trace_info) { const ::perfetto::protos::Trace& trace = *(trace_info.trace_ptr); constexpr bool kDebugFunction = true; return rxcpp::observable<>::create( [trace=std::move(trace), timestamp_limit_ns=trace_info.timestamp_limit_ns, app_pid=trace_info.pid] (rxcpp::subscriber sub) { uint64_t timestamp = 0; uint64_t timestamp_relative = 0; std::optional timestamp_relative_start = GetStartTimestamp(trace); uint32_t cpu = 0; uint32_t pid = 0; bool add_to_page_cache = true; auto on_next_page_cache_event = [&](const auto& mm_event) { PageCacheFtraceEvent out; out.timestamp = timestamp; out.cpu = cpu; out.pid = pid; out.timestamp_relative = timestamp_relative; out.add_to_page_cache = add_to_page_cache; out.pfn = mm_event.pfn(); out.i_ino = mm_event.i_ino(); out.index = mm_event.index(); out.s_dev = mm_event.s_dev(); out.page = mm_event.page(); sub.on_next(std::move(out)); }; for (const ::perfetto::protos::TracePacket& packet : trace.packet()) { // Break out of all loops if we are unsubscribed. if (!sub.is_subscribed()) { if (kDebugFunction) LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents unsubscribe"; return; } if (kDebugFunction) LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents TracePacket"; if (packet.has_timestamp()) { timestamp_relative_start = timestamp_relative_start.value_or(packet.timestamp()); timestamp = packet.timestamp(); // XX: should we call 'has_timestamp()' ? } else { timestamp = 0; } if (packet.has_ftrace_events()) { const ::perfetto::protos::FtraceEventBundle& ftrace_event_bundle = packet.ftrace_events(); cpu = ftrace_event_bundle.cpu(); // XX: has_cpu ? for (const ::perfetto::protos::FtraceEvent& event : ftrace_event_bundle.event()) { // Break out of all loops if we are unsubscribed. if (!sub.is_subscribed()) { return; } if (app_pid >= 0 && (!event.has_pid() || event.pid() != static_cast(app_pid))) { continue; } if (event.has_timestamp()) { timestamp = event.timestamp(); if(timestamp > timestamp_limit_ns) { LOG(VERBOSE) << "The timestamp is " << timestamp << ", which exceeds the limit "<< timestamp_limit_ns; continue; } } else { DCHECK(packet.has_timestamp() == false) << "Timestamp in outer packet but not inner packet"; // XX: use timestamp from the perfetto TracePacket ??? // REVIEWERS: not sure if this is ok, does it use the same clock source and // is the packet data going to be the same clock sample as the Ftrace event? } if (timestamp_relative_start){ timestamp_relative = timestamp - *timestamp_relative_start; } else { timestamp_relative = 0; } pid = event.pid(); // XX: has_pid ? if (event.has_mm_filemap_add_to_page_cache()) { add_to_page_cache = true; const ::perfetto::protos::MmFilemapAddToPageCacheFtraceEvent& mm_event = event.mm_filemap_add_to_page_cache(); on_next_page_cache_event(mm_event); } else if (event.has_mm_filemap_delete_from_page_cache()) { add_to_page_cache = false; const ::perfetto::protos::MmFilemapDeleteFromPageCacheFtraceEvent& mm_event = event.mm_filemap_delete_from_page_cache(); on_next_page_cache_event(mm_event); } } } else { if (kDebugFunction) { LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents no ftrace event bundle"; } } } if (kDebugFunction) { LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents#on_completed"; } // Let subscriber know there are no more items. sub.on_completed(); }); } auto /*observable traces) { // Emit only unique (s_dev, i_ino) pairs from all Trace protos. auto obs = traces .flat_map([](PerfettoTracePtrInfo trace) { rxcpp::observable obs = SelectPageCacheFtraceEvents(std::move(trace)); // FIXME: dont check this in // return obs; //return obs.take(100); // for faster development return obs; }) // TODO: Upstream bug? using []()::perfetto::protos::Trace&) causes a compilation error. .map([](const PageCacheFtraceEvent& event) -> Inode { return Inode::FromDeviceAndInode(static_cast(event.s_dev), static_cast(event.i_ino)); }) .tap([](const Inode& inode) { LOG(VERBOSE) << "SelectDistinctInodesFromTraces (pre-distinct): " << inode; }) .distinct() // observable*/ ; return obs; } // TODO: static assert checks for convertible return values. auto/*observable*/ ResolveInodesToFileNames( rxcpp::observable inodes, inode2filename::InodeResolverDependencies dependencies) { std::shared_ptr inode_resolver = inode2filename::InodeResolver::Create(std::move(dependencies)); return inode_resolver->FindFilenamesFromInodes(std::move(inodes)); } using InodeMap = std::unordered_map; auto /*just observable*/ ReduceResolvedInodesToMap( rxcpp::observable inode_results) { return inode_results.reduce( InodeMap{}, [](InodeMap m, InodeResult result) { if (result) { LOG(VERBOSE) << "compiler::ReduceResolvedInodesToMap insert " << result; m.insert({std::move(result.inode), std::move(result.data.value())}); } else { // TODO: side stats for how many of these are failed to resolve? LOG(WARNING) << "compiler: Failed to resolve inode, " << result; } return m; }, [](InodeMap m) { return m; // TODO: use an identity function }); // emits exactly 1 InodeMap value. } struct ResolvedPageCacheFtraceEvent { std::string filename; PageCacheFtraceEvent event; }; std::ostream& operator<<(std::ostream& os, const ResolvedPageCacheFtraceEvent& e) { os << "{"; os << "filename:\"" << e.filename << "\","; os << e.event; os << "}"; return os; } struct CombinedState { CombinedState() = default; explicit CombinedState(InodeMap inode_map) : inode_map{std::move(inode_map)} {} explicit CombinedState(PageCacheFtraceEvent event) : ftrace_event{std::move(event)} {} CombinedState(InodeMap inode_map, PageCacheFtraceEvent event) : inode_map(std::move(inode_map)), ftrace_event{std::move(event)} {} std::optional inode_map; std::optional ftrace_event; bool HasAll() const { return inode_map.has_value() && ftrace_event.has_value(); } const InodeMap& GetInodeMap() const { DCHECK(HasAll()); return inode_map.value(); } InodeMap& GetInodeMap() { DCHECK(HasAll()); return inode_map.value(); } const PageCacheFtraceEvent& GetEvent() const { DCHECK(HasAll()); return ftrace_event.value(); } PageCacheFtraceEvent& GetEvent() { DCHECK(HasAll()); return ftrace_event.value(); } void Merge(CombinedState&& other) { if (other.inode_map) { inode_map = std::move(other.inode_map); } if (other.ftrace_event) { ftrace_event = std::move(other.ftrace_event); } } }; std::ostream& operator<<(std::ostream& os, const CombinedState& s) { os << "CombinedState{inode_map:"; if (s.inode_map) { os << "|sz=" << (s.inode_map.value().size()) << "|"; } else { os << "(null)"; } os << ",event:"; if (s.ftrace_event) { //os << s.ftrace_event.value().timestamp << "ns"; os << s.ftrace_event.value(); } else { os << "(null)"; } os << "}"; return os; } auto/*observable*/ ResolvePageCacheEntriesFromProtos( rxcpp::observable traces, inode2filename::InodeResolverDependencies dependencies) { // 1st chain = emits exactly 1 InodeMap. // [proto, proto, proto...] -> [inode, inode, inode, ...] auto/*observable*/ distinct_inodes = SelectDistinctInodesFromTraces(traces); rxcpp::observable distinct_inodes_obs = distinct_inodes.as_dynamic(); // [inode, inode, inode, ...] -> [(inode, {filename|error}), ...] auto/*observable*/ inode_names = ResolveInodesToFileNames(distinct_inodes_obs, std::move(dependencies)); // rxcpp has no 'join' operators, so do a manual join with concat. auto/*observable*/ inode_name_map = ReduceResolvedInodesToMap(inode_names); // 2nd chain = emits all PageCacheFtraceEvent auto/*observable*/ page_cache_ftrace_events = traces .flat_map([](PerfettoTracePtrInfo trace) { rxcpp::observable obs = SelectPageCacheFtraceEvents(std::move(trace)); return obs; }); auto inode_name_map_precombine = inode_name_map .map([](InodeMap inode_map) { LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos#inode_name_map_precombine "; return CombinedState{std::move(inode_map)}; }); auto page_cache_ftrace_events_precombine = page_cache_ftrace_events .map([](PageCacheFtraceEvent event) { LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos#page_cache_ftrace_events_precombine " << event; return CombinedState{std::move(event)}; }); // Combine 1st+2nd chain. // // concat subscribes to each observable, waiting until its completed, before subscribing // to the next observable and waiting again. // // During all this, every #on_next is immediately forwarded to the downstream observables. // In our case, we want to block until InodeNameMap is ready, and re-iterate all ftrace events. auto/*observable*/ resolved_events = inode_name_map_precombine .concat(page_cache_ftrace_events_precombine) .scan(CombinedState{}, [](CombinedState current_state, CombinedState delta_state) { LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos#scan " << "current=" << current_state << "," << "delta=" << delta_state; // IT0 = (,) + (InodeMap,) // IT1 = (InodeMap,) + (,Event) // IT2..N = (InodeMap,Event1) + (,Event2) current_state.Merge(std::move(delta_state)); return current_state; }) .filter([](const CombinedState& state) { return state.HasAll(); }) .map([](CombinedState& state) -> std::optional { PageCacheFtraceEvent& event = state.GetEvent(); const InodeMap& inode_map = state.GetInodeMap(); auto it = inode_map.find(event.inode()); if (it != inode_map.end()) { std::string filename = it->second; LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos combine_latest " << event; return ResolvedPageCacheFtraceEvent{std::move(filename), std::move(event)}; } else { LOG(ERROR) << "compiler: FtraceEvent's inode did not have resolved filename: " << event; return std::nullopt; } }) .filter( [](std::optional maybe_event) { return maybe_event.has_value(); }) .map([](std::optional maybe_event) { return std::move(maybe_event.value()); }); // -> observable return resolved_events; } namespace detail { bool multiless_one(const std::string& a, const std::string& b) { return std::lexicographical_compare(a.begin(), a.end(), b.begin(), b.end()); } template constexpr bool multiless_one(T&& a, T&& b) { // a < b using std::less; // ADL return less>{}(std::forward(a), std::forward(b)); } constexpr bool multiless() { return false; // [] < [] is always false. } template constexpr bool multiless(T&& a, T&& b, Args&&... args) { if (a != b) { return multiless_one(std::forward(a), std::forward(b)); } else { return multiless(std::forward(args)...); } } } // namespace detail // Return [A0...An] < [B0...Bn] ; vector-like scalar comparison of each field. // Arguments are passed in the order A0,B0,A1,B1,...,An,Bn. template constexpr bool multiless(Args&&... args) { return detail::multiless(std::forward(args)...); } struct CompilerPageCacheEvent { std::string filename; uint64_t timestamp_relative; // use relative timestamp because absolute values aren't comparable // across different trace protos. // relative timestamps can be said to be 'approximately' comparable. // assuming we compare the same application startup's trace times. bool add_to_page_cache; // AddToPageCache=true, DeleteFromPageCache=false. uint64_t index; // offset into file: this is a multiple of the page size (usually 4096). // All other data from the ftrace is dropped because we don't currently use it in the // compiler algorithms. CompilerPageCacheEvent() = default; CompilerPageCacheEvent(const ResolvedPageCacheFtraceEvent& resolved) : CompilerPageCacheEvent(resolved.filename, resolved.event) { } CompilerPageCacheEvent(ResolvedPageCacheFtraceEvent&& resolved) : CompilerPageCacheEvent(std::move(resolved.filename), std::move(resolved.event)) { } // Compare all fields (except the timestamp field). static bool LessIgnoringTimestamp(const CompilerPageCacheEvent& a, const CompilerPageCacheEvent& b) { return multiless(a.filename, b.filename, a.add_to_page_cache, b.add_to_page_cache, a.index, b.index); } // Compare all fields. Timestamps get highest precedence. bool operator<(const CompilerPageCacheEvent& rhs) const { return multiless(timestamp_relative, rhs.timestamp_relative, filename, rhs.filename, add_to_page_cache, rhs.add_to_page_cache, index, rhs.index); } private: CompilerPageCacheEvent(std::string filename, const PageCacheFtraceEvent& event) : filename(std::move(filename)), timestamp_relative(event.timestamp_relative), add_to_page_cache(event.add_to_page_cache), index(event.index) { } }; std::ostream& operator<<(std::ostream& os, const CompilerPageCacheEvent& e) { os << "{"; os << "filename:\"" << e.filename << "\","; os << "timestamp:" << e.timestamp_relative << ","; os << "add_to_page_cache:" << e.add_to_page_cache << ","; os << "index:" << e.index; os << "}"; return os; } // Filter an observable chain of 'ResolvedPageCacheFtraceEvent' // into an observable chain of 'ResolvedPageCacheFtraceEvent'. // // Any items emitted by the input chain that match the regular expression // specified by blacklist_filter are not emitted into the output chain. auto/*observable*/ ApplyBlacklistToPageCacheEvents( rxcpp::observable resolved_events, std::optional blacklist_filter) { bool has_re = blacklist_filter.has_value(); // default regex engine is ecmascript. std::regex reg_exp{blacklist_filter ? *blacklist_filter : std::string("")}; return resolved_events.filter( [reg_exp, has_re](const ResolvedPageCacheFtraceEvent& event) { if (!has_re) { return true; } // Remove any entries that match the regex in --blacklist-filter/-bf. bool res = std::regex_search(event.filename, reg_exp); if (res) { LOG(VERBOSE) << "Blacklist filter removed '" << event.filename << "' from chain."; } return !res; }); } // Compile an observable chain of 'ResolvedPageCacheFtraceEvent' into // an observable chain of distinct, timestamp-ordered, CompilerPageCacheEvent. // // This is a reducing operation: No items are emitted until resolved_events is completed. auto/*observable*/ CompilePageCacheEvents( rxcpp::observable resolved_events) { struct CompilerPageCacheEventIgnoringTimestampLess { bool operator()(const CompilerPageCacheEvent& lhs, const CompilerPageCacheEvent& rhs) const { return CompilerPageCacheEvent::LessIgnoringTimestamp(lhs, rhs); } }; // Greedy O(N) compilation algorithm. // // This produces an inoptimal result (e.g. a small timestamp // that might occur only 1% of the time nevertheless wins out), but the // algorithm itself is quite simple, and doesn't require any heuristic tuning. // First pass: *Merge* into set that ignores the timestamp value for order, but retains // the smallest timestamp value if the same key is re-inserted. using IgnoreTimestampForOrderingSet = std::set; // Second pass: *Sort* data by smallest timestamp first. using CompilerPageCacheEventSet = std::set; return resolved_events .map( [](ResolvedPageCacheFtraceEvent event) { // Drop all the extra metadata like pid, cpu, etc. // When we merge we could keep a list of the original data, but there is no advantage // to doing so. return CompilerPageCacheEvent{std::move(event)}; } ) .reduce( IgnoreTimestampForOrderingSet{}, [](IgnoreTimestampForOrderingSet set, CompilerPageCacheEvent event) { // Add each event to the set, keying by everything but the timestamp. // If the key is already inserted, re-insert with the smaller timestamp value. auto it = set.find(event); if (it == set.end()) { // Need to insert new element. set.insert(std::move(event)); } else if (it->timestamp_relative > event.timestamp_relative) { // Replace existing element: the new element has a smaller timestamp. it = set.erase(it); // Amortized O(1) time if insertion happens in the position before the hint. set.insert(it, std::move(event)); } // else: Skip insertion. Element already present with the minimum timestamp. return set; }, [](IgnoreTimestampForOrderingSet set) { // Extract all elements from 'set', re-insert into 'ts_set'. // The values are now ordered by timestamp (and then the rest of the fields). CompilerPageCacheEventSet ts_set; ts_set.merge(std::move(set)); std::shared_ptr final_set{ new CompilerPageCacheEventSet{std::move(ts_set)}}; return final_set; // return ts_set; }) // observable (just) .flat_map( [](std::shared_ptr final_set) { // TODO: flat_map seems to make a copy of the parameter _every single iteration_ // without the shared_ptr it would just make a copy of the set every time it went // through the iterate function. // Causing absurdly slow compile times x1000 slower than we wanted. // TODO: file a bug upstream and/or fix upstream. CompilerPageCacheEventSet& ts_set = *final_set; // [](CompilerPageCacheEventSet& ts_set) { LOG(DEBUG) << "compiler: Merge-pass completed (" << ts_set.size() << " entries)."; //return rxcpp::sources::iterate(std::move(ts_set)); return rxcpp::sources::iterate(ts_set).map([](CompilerPageCacheEvent e) { return e; }); } ); // observable } /** Makes a vector of info that includes filename and timestamp limit. */ std::vector MakeCompilationInputs( std::vector input_file_names, std::vector timestamp_limit_ns, std::vector pids){ // If the timestamp limit is empty, set the limit to max value // for each trace file. if (timestamp_limit_ns.empty()) { for (size_t i = 0; i < input_file_names.size(); i++) { timestamp_limit_ns.push_back(std::numeric_limits::max()); } } // If the pids is empty, set all of them to -1. Because negative pid means any. if (pids.empty()) { for (size_t i = 0; i < input_file_names.size(); i++) { pids.push_back(-1); } } DCHECK_EQ(input_file_names.size(), timestamp_limit_ns.size()); std::vector file_infos; for (size_t i = 0; i < input_file_names.size(); i++) { file_infos.push_back({input_file_names[i], timestamp_limit_ns[i], pids[i]}); } return file_infos; } bool PerformCompilation(std::vector perfetto_traces, std::string output_file_name, bool output_proto, std::optional blacklist_filter, inode2filename::InodeResolverDependencies dependencies) { auto trace_protos = ReadPerfettoTraceProtos(std::move(perfetto_traces)); auto resolved_events = ResolvePageCacheEntriesFromProtos(std::move(trace_protos), std::move(dependencies)); auto filtered_events = ApplyBlacklistToPageCacheEvents(std::move(resolved_events), blacklist_filter); auto compiled_events = CompilePageCacheEvents(std::move(filtered_events)); std::ofstream ofs; if (!output_file_name.empty()) { if (!output_proto) { ofs.open(output_file_name); if (!ofs) { LOG(ERROR) << "compiler: Failed to open output file for writing: " << output_file_name; return false; } } } auto trace_file_proto = serialize::ArenaPtr::Make(); // Fast lookup of filename -> FileIndex id. std::unordered_map file_path_map; int64_t file_handle_id = 0; int counter = 0; compiled_events // .as_blocking() .tap([&](CompilerPageCacheEvent& event) { if (!output_proto) { return; } if (!event.add_to_page_cache) { // Skip DeleteFromPageCache events, they are only used for intermediate. return; } DCHECK(trace_file_proto->mutable_index() != nullptr); serialize::proto::TraceFileIndex& index = *trace_file_proto->mutable_index(); int64_t file_handle; // Add TraceFileIndexEntry if it doesn't exist. auto it = file_path_map.find(event.filename); if (it == file_path_map.end()) { file_handle = file_handle_id++; file_path_map[event.filename] = file_handle; serialize::proto::TraceFileIndexEntry* entry = index.add_entries(); DCHECK(entry != nullptr); entry->set_id(file_handle); entry->set_file_name(event.filename); if (kIsDebugBuild) { int i = static_cast(file_handle); const serialize::proto::TraceFileIndexEntry& entry_ex = index.entries(i); DCHECK_EQ(entry->id(), entry_ex.id()); DCHECK_EQ(entry->file_name(), entry_ex.file_name()); } } else { file_handle = it->second; } int kPageSize = 4096; // TODO: don't hardcode the page size. int entry_size = trace_file_proto->list().entries_size(); bool merged = false; if (entry_size > 0) { serialize::proto::TraceFileEntry* entry = trace_file_proto->mutable_list()->mutable_entries(entry_size-1); if (entry->index_id() == file_handle && entry->file_offset() + entry->file_length() == static_cast(event.index) * kPageSize) { entry->set_file_length(entry->file_length() + kPageSize); merged = true; } } if (!merged) { // Add TraceFileEntry. DCHECK(trace_file_proto->mutable_list() != nullptr); serialize::proto::TraceFileEntry* entry = trace_file_proto->mutable_list()->add_entries(); DCHECK(entry != nullptr); entry->set_index_id(file_handle); // Page index -> file offset in bytes. entry->set_file_offset(static_cast(event.index) * kPageSize); entry->set_file_length(kPageSize); } }) .subscribe([&](CompilerPageCacheEvent event) { if (!output_proto) { if (output_file_name.empty()) { LOG(INFO) << "CompilerPageCacheEvent" << event << std::endl; } else { ofs << event << "\n"; // TODO: write in proto format instead. } } ++counter; }); if (output_proto) { LOG(DEBUG) << "compiler: WriteFully to begin into " << output_file_name; ::google::protobuf::MessageLite& message = *trace_file_proto.get(); if (auto res = serialize::ProtobufIO::WriteFully(message, output_file_name); !res) { errno = res.error(); PLOG(ERROR) << "compiler: Failed to write protobuf to file: " << output_file_name; return false; } else { LOG(INFO) << "compiler: Wrote protobuf " << output_file_name; } } LOG(DEBUG) << "compiler: Compilation completed (" << counter << " events)."; return true; } } // namespace iorap::compiler