• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2019 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "compiler/compiler.h"
16 
17 #include "common/debug.h"
18 #include "common/expected.h"
19 
20 #include "perfetto/rx_producer.h"  // TODO: refactor BinaryWireProtobuf to separate header.
21 #include "inode2filename/inode.h"
22 #include "inode2filename/search_directories.h"
23 #include "serialize/protobuf_io.h"
24 
25 #include <android-base/unique_fd.h>
26 #include <android-base/parseint.h>
27 #include <android-base/file.h>
28 
29 #include <perfetto/trace/trace.pb.h>  // ::perfetto::protos::Trace
30 #include <perfetto/trace/trace_packet.pb.h>  // ::perfetto::protos::TracePacket
31 
32 #include "rxcpp/rx.hpp"
33 #include <iostream>
34 #include <fstream>
35 #include <optional>
36 #include <utility>
37 #include <regex>
38 
39 #include <sched.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <syscall.h>
43 #include <fcntl.h>
44 #include <unistd.h>
45 
46 namespace iorap::compiler {
47 
48 using Inode = iorap::inode2filename::Inode;
49 using InodeResult = iorap::inode2filename::InodeResult;
50 using SearchDirectories = iorap::inode2filename::SearchDirectories;
51 
52 template <typename T>
53 using ProtobufPtr = iorap::perfetto::ProtobufPtr<T>;
54 
55 struct PerfettoTraceProtoInfo {
56   /* The perfetto trace proto. */
57   ::iorap::perfetto::PerfettoTraceProto proto;
58   /*
59    * The timestamp limit of the trace.
60    * It's used to truncate the trace file.
61    */
62   uint64_t timestamp_limit_ns;
63 };
64 
65 struct PerfettoTracePtrInfo {
66   /* Deserialized protobuf data containing the perfetto trace. */
67   ProtobufPtr<::perfetto::protos::Trace> trace_ptr;
68   /*
69    * The timestamp limit of the trace.
70    * It's used to truncate the trace file.
71    */
72   uint64_t timestamp_limit_ns;
73 };
74 
75 // Attempt to read protobufs from the filenames.
76 // Emits one (or none) protobuf for each filename, in the same order as the filenames.
77 // On any errors, the items are dropped (errors are written to the error LOG).
78 //
79 // All work is done on the same Coordinator as the Subscriber.
80 template <typename ProtoT /*extends MessageLite*/>
ReadProtosFromFileNames(rxcpp::observable<CompilationInput> file_infos)81 auto/*observable<PerfettoTracePtrInfo>*/ ReadProtosFromFileNames(
82     rxcpp::observable<CompilationInput> file_infos) {
83   using BinaryWireProtoT = ::iorap::perfetto::PerfettoTraceProto;
84 
85   return file_infos
86     .map([](const CompilationInput& file_info) ->
87          std::optional<PerfettoTraceProtoInfo> {
88       LOG(VERBOSE) << "compiler::ReadProtosFromFileNames " << file_info.filename
89                    << " TimeStampLimit "<< file_info.timestamp_limit_ns << " [begin]";
90       std::optional<BinaryWireProtoT> maybe_proto =
91           BinaryWireProtoT::ReadFullyFromFile(file_info.filename);
92       if (!maybe_proto) {
93         LOG(ERROR) << "Failed to read file: " << file_info.filename;
94         return std::nullopt;
95       }
96       return {{std::move(maybe_proto.value()), file_info.timestamp_limit_ns}};
97     })
98     .filter([](const std::optional<PerfettoTraceProtoInfo>& proto_info) {
99       return proto_info.has_value();
100     })
101     .map([](std::optional<PerfettoTraceProtoInfo>& proto_info) ->
102          PerfettoTraceProtoInfo {
103       return proto_info.value();
104     })  // TODO: refactor to something that flattens the optional, and logs in one operator.
105     .map([](PerfettoTraceProtoInfo& proto_info) ->
106          std::optional<PerfettoTracePtrInfo> {
107       std::optional<ProtobufPtr<ProtoT>> t = proto_info.proto.template MaybeUnserialize<ProtoT>();
108       if (!t) {
109         LOG(ERROR) << "Failed to parse protobuf: ";  // TODO: filename.
110         return std::nullopt;
111       }
112       return {{std::move(t.value()), proto_info.timestamp_limit_ns}};
113     })
114     .filter([](const std::optional<PerfettoTracePtrInfo>& trace_info) {
115       return trace_info.has_value();
116     })
117     .map([](std::optional<PerfettoTracePtrInfo>& trace_info) ->
118          PerfettoTracePtrInfo {
119       LOG(VERBOSE) << "compiler::ReadProtosFromFileNames [success]";
120       return trace_info.value();
121       // TODO: protobufs have no move constructor. this might be inefficient?
122     });
123 
124 /*
125   return filenames
126     .map([](const std::string& filename) {
127       LOG(VERBOSE) << "compiler::ReadProtosFromFileNames " << filename << " [begin]";
128       std::optional<BinaryWireProtoT> maybe_proto =
129           BinaryWireProtoT::ReadFullyFromFile(filename);
130       if (!maybe_proto) {
131         LOG(ERROR) << "Failed to read file: " << filename;
132       }
133 
134       std::unique_ptr<BinaryWireProtoT> ptr;
135       if (maybe_proto) {
136         ptr.reset(new BinaryWireProtoT{std::move(*maybe_proto)});
137       }
138       return ptr;
139     })
140     .filter([](const std::unique_ptr<BinaryWireProtoT>& proto) {
141       return proto != nullptr;
142     })
143     .map([](std::unique_ptr<BinaryWireProtoT>& proto) {
144       std::optional<ProtoT> t = proto->template MaybeUnserialize<ProtoT>();
145       if (!t) {
146         LOG(ERROR) << "Failed to parse protobuf: ";  // TODO: filename.
147       }
148       return t;
149     })
150     .filter([](const std::optional<ProtoT>& proto) {
151       return proto.has_value();
152     })
153     .map([](std::optional<ProtoT> proto) -> ProtoT {
154       LOG(VERBOSE) << "compiler::ReadProtosFromFileNames [success]";
155       return std::move(proto.value());
156       // TODO: protobufs have no move constructor. this might be inefficient?
157     });
158     */
159 }
160 
ReadPerfettoTraceProtos(std::vector<CompilationInput> file_infos)161 auto/*observable<PerfettoTracePtrInfo>*/ ReadPerfettoTraceProtos(
162     std::vector<CompilationInput> file_infos) {
163   auto filename_obs = rxcpp::observable<>::iterate(std::move(file_infos));
164   rxcpp::observable<PerfettoTracePtrInfo> obs =
165       ReadProtosFromFileNames<::perfetto::protos::Trace>(std::move(filename_obs));
166   return obs;
167 }
168 
169 // A flattened data representation of an MmFileMap*FtraceEvent.
170 // This representation is used for streaming processing.
171 //
172 // Note: Perfetto applies a 'union' over all possible fields on all possible devices
173 // (and uses the max sizeof per-field).
174 //
175 // Since all protobuf fields are optional, fields not present on a particular device are always
176 // null
177 struct PageCacheFtraceEvent {
178   /*
179    * Ftrace buffer-specific
180    */
181   uint32_t cpu;  // e.g. 0-7 for the cpu core number.
182 
183   /*
184    * Ftrace-event general data
185    */
186 
187   // Nanoseconds since an epoch.
188   // Epoch is configurable by writing into trace_clock.
189   // By default this timestamp is CPU local.
190   uint64_t timestamp;
191   // Kernel pid (do not confuse with userspace pid aka tgid)
192   uint32_t pid;
193 
194   // Tagged by our code while parsing the ftraces:
195   uint64_t timestamp_relative;  // timestamp relative to first ftrace within a Trace protobuf.
196   bool add_to_page_cache;  // AddToPageCache=true, DeleteFromPageCache=false.
197 
198   /*
199    * mm_filemap-specific data
200    *
201    * Fields are common:
202    * - MmFilemapAddToPageCacheFtraceEvent
203    * - MmFilemapDeleteFromPageCacheFtraceEvent
204    */
205   uint64_t pfn;    // page frame number (physical) - null on some devices, e.g. marlin
206   uint64_t i_ino;  // inode number (use in conjunction with s_dev)
207   uint64_t index;  // offset into file: this is a multiple of the page size (usually 4096).
208   uint64_t s_dev;  // (dev_t) device number
209   uint64_t page;   // struct page*. - null on some devices, e.g. blueline.
210 
inodeiorap::compiler::PageCacheFtraceEvent211   Inode inode() const {
212     return Inode::FromDeviceAndInode(static_cast<dev_t>(s_dev),
213                                      static_cast<ino_t>(i_ino));
214   }
215 };
216 
operator <<(std::ostream & os,const PageCacheFtraceEvent & e)217 std::ostream& operator<<(std::ostream& os, const PageCacheFtraceEvent& e) {
218   os << "{";
219   os << "cpu:" << e.cpu << ",";
220   os << "timestamp:" << e.timestamp << ",";
221   os << "pid:" << e.pid << ",";
222   os << "timestamp_relative:" << e.timestamp_relative << ",";
223   os << "add_to_page_cache:" << e.add_to_page_cache << ",";
224   os << "pfn:" << e.pfn << ",";
225   os << "i_ino:" << e.i_ino << ",";
226   os << "index:" << e.index << ",";
227   os << "s_dev:" << e.s_dev << ",";
228   os << "page:" << e.page;
229   os << "}";
230 
231   return os;
232 }
233 
234 /*
235  * Gets the start timestamp.
236  *
237  * It is the minimium timestamp.
238  */
GetStartTimestamp(const::perfetto::protos::Trace & trace)239 std::optional<uint64_t> GetStartTimestamp(const ::perfetto::protos::Trace& trace) {
240   std::optional<uint64_t> timestamp_relative_start;
241   // Traverse each timestamp to get the minimium one.
242   for (const ::perfetto::protos::TracePacket& packet : trace.packet()) {
243     if (packet.has_timestamp()) {
244       timestamp_relative_start = timestamp_relative_start?
245           std::min(*timestamp_relative_start, packet.timestamp()) : packet.timestamp();
246     }
247     if (!packet.has_ftrace_events()) {
248       continue;
249     }
250     const ::perfetto::protos::FtraceEventBundle& ftrace_event_bundle =
251         packet.ftrace_events();
252     for (const ::perfetto::protos::FtraceEvent& event : ftrace_event_bundle.event()) {
253       if (event.has_timestamp()) {
254         timestamp_relative_start = timestamp_relative_start?
255             std::min(*timestamp_relative_start, event.timestamp()) : event.timestamp();
256       }
257     }
258   }
259   return timestamp_relative_start;
260 }
261 
262 /*
263  * sample blueline output:
264  *
265  * $ adb shell cat /d/tracing/events/filemap/mm_filemap_add_to_page_cache/format
266  *
267  * name: mm_filemap_add_to_page_cache
268  * ID: 178
269  * format:
270  * 	field:unsigned short common_type;	offset:0;	size:2;	signed:0;
271  * 	field:unsigned char common_flags;	offset:2;	size:1;	signed:0;
272  * 	field:unsigned char common_preempt_count;	offset:3;	size:1;	signed:0;
273  * 	field:int common_pid;	offset:4;	size:4;	signed:1;
274  *
275  * 	field:unsigned long pfn;	offset:8;	size:8;	signed:0;
276  * 	field:unsigned long i_ino;	offset:16;	size:8;	signed:0;
277  * 	field:unsigned long index;	offset:24;	size:8;	signed:0;
278  * 	field:dev_t s_dev;	offset:32;	size:4;	signed:0;
279  *
280  * print fmt: "dev %d:%d ino %lx page=%p pfn=%lu ofs=%lu", ((unsigned int) ((REC->s_dev) >> 20)),
281  *            ((unsigned int) ((REC->s_dev) & ((1U << 20) - 1))), REC->i_ino,
282  *             (((struct page *)(((0xffffffffffffffffUL) - ((1UL) << ((39) - 1)) + 1) -
283  *                 ((1UL) << ((39) - 12 - 1 + 6))) - (memstart_addr >> 12)) + (REC->pfn)),
284  *            REC->pfn, REC->index << 12
285  */
286 
SelectPageCacheFtraceEvents(PerfettoTracePtrInfo trace_info)287 auto /*observable<PageCacheFtraceEvent>*/ SelectPageCacheFtraceEvents(
288     PerfettoTracePtrInfo trace_info) {
289   const ::perfetto::protos::Trace& trace = *(trace_info.trace_ptr);
290 
291   constexpr bool kDebugFunction = true;
292 
293   return rxcpp::observable<>::create<PageCacheFtraceEvent>(
294       [trace=std::move(trace), timestamp_limit_ns=trace_info.timestamp_limit_ns]
295       (rxcpp::subscriber<PageCacheFtraceEvent> sub) {
296     uint64_t timestamp = 0;
297     uint64_t timestamp_relative = 0;
298 
299     std::optional<uint64_t> timestamp_relative_start = GetStartTimestamp(trace);
300     uint32_t cpu = 0;
301     uint32_t pid = 0;
302     bool add_to_page_cache = true;
303 
304     auto on_next_page_cache_event = [&](const auto& mm_event) {
305       PageCacheFtraceEvent out;
306       out.timestamp = timestamp;
307       out.cpu = cpu;
308       out.pid = pid;
309 
310       out.timestamp_relative = timestamp_relative;
311       out.add_to_page_cache = add_to_page_cache;
312 
313       out.pfn = mm_event.pfn();
314       out.i_ino = mm_event.i_ino();
315       out.index = mm_event.index();
316       out.s_dev = mm_event.s_dev();
317       out.page = mm_event.page();
318 
319       sub.on_next(std::move(out));
320     };
321 
322     for (const ::perfetto::protos::TracePacket& packet : trace.packet()) {
323       // Break out of all loops if we are unsubscribed.
324       if (!sub.is_subscribed()) {
325         if (kDebugFunction) LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents unsubscribe";
326         return;
327       }
328 
329       if (kDebugFunction) LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents TracePacket";
330 
331       if (packet.has_timestamp()) {
332         timestamp_relative_start = timestamp_relative_start.value_or(packet.timestamp());
333         timestamp = packet.timestamp();  // XX: should we call 'has_timestamp()' ?
334       } else {
335         timestamp = 0;
336       }
337 
338       if (packet.has_ftrace_events()) {
339         const ::perfetto::protos::FtraceEventBundle& ftrace_event_bundle =
340             packet.ftrace_events();
341 
342         cpu = ftrace_event_bundle.cpu();  // XX: has_cpu ?
343 
344         for (const ::perfetto::protos::FtraceEvent& event : ftrace_event_bundle.event()) {
345           // Break out of all loops if we are unsubscribed.
346           if (!sub.is_subscribed()) {
347             return;
348           }
349 
350           if (event.has_timestamp()) {
351             timestamp = event.timestamp();
352             if(timestamp > timestamp_limit_ns) {
353               LOG(VERBOSE) << "The timestamp is " << timestamp <<
354                            ", which exceeds the limit "<< timestamp_limit_ns;
355               continue;
356             }
357           } else {
358             DCHECK(packet.has_timestamp() == false)
359                 << "Timestamp in outer packet but not inner packet";
360             // XX: use timestamp from the perfetto TracePacket ???
361             // REVIEWERS: not sure if this is ok, does it use the same clock source and
362             // is the packet data going to be the same clock sample as the Ftrace event?
363           }
364 
365           if (timestamp_relative_start){
366             timestamp_relative = timestamp - *timestamp_relative_start;
367           } else {
368             timestamp_relative = 0;
369           }
370 
371           pid = event.pid();  // XX: has_pid ?
372 
373           if (event.has_mm_filemap_add_to_page_cache()) {
374             add_to_page_cache = true;
375 
376             const ::perfetto::protos::MmFilemapAddToPageCacheFtraceEvent& mm_event =
377                 event.mm_filemap_add_to_page_cache();
378 
379             on_next_page_cache_event(mm_event);
380           } else if (event.has_mm_filemap_delete_from_page_cache()) {
381             add_to_page_cache = false;
382 
383             const ::perfetto::protos::MmFilemapDeleteFromPageCacheFtraceEvent& mm_event =
384                 event.mm_filemap_delete_from_page_cache();
385 
386             on_next_page_cache_event(mm_event);
387           }
388         }
389       } else {
390         if (kDebugFunction) {
391           LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents no ftrace event bundle";
392         }
393       }
394     }
395 
396     if (kDebugFunction) {
397       LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents#on_completed";
398     }
399 
400     // Let subscriber know there are no more items.
401     sub.on_completed();
402   });
403 }
404 
SelectDistinctInodesFromTraces(rxcpp::observable<PerfettoTracePtrInfo> traces)405 auto /*observable<Inode*/ SelectDistinctInodesFromTraces(
406     rxcpp::observable<PerfettoTracePtrInfo> traces) {
407   // Emit only unique (s_dev, i_ino) pairs from all Trace protos.
408   auto obs = traces
409     .flat_map([](PerfettoTracePtrInfo trace) {
410       rxcpp::observable<PageCacheFtraceEvent> obs = SelectPageCacheFtraceEvents(std::move(trace));
411       // FIXME: dont check this in
412       // return obs;
413       //return obs.take(100);   // for faster development
414       return obs;
415     })  // TODO: Upstream bug? using []()::perfetto::protos::Trace&) causes a compilation error.
416     .map([](const PageCacheFtraceEvent& event) -> Inode {
417       return Inode::FromDeviceAndInode(static_cast<dev_t>(event.s_dev),
418                                        static_cast<ino_t>(event.i_ino));
419     })
420     .tap([](const Inode& inode) {
421       LOG(VERBOSE) << "SelectDistinctInodesFromTraces (pre-distinct): " << inode;
422     })
423     .distinct()  // observable<Inode>*/
424     ;
425 
426   return obs;
427 }
428 // TODO: static assert checks for convertible return values.
429 
ResolveInodesToFileNames(rxcpp::observable<Inode> inodes,inode2filename::InodeResolverDependencies dependencies)430 auto/*observable<InodeResult>*/ ResolveInodesToFileNames(
431     rxcpp::observable<Inode> inodes,
432     inode2filename::InodeResolverDependencies dependencies) {
433   std::shared_ptr<inode2filename::InodeResolver> inode_resolver =
434       inode2filename::InodeResolver::Create(std::move(dependencies));
435   return inode_resolver->FindFilenamesFromInodes(std::move(inodes));
436 }
437 
438 using InodeMap = std::unordered_map<Inode, std::string /*filename*/>;
ReduceResolvedInodesToMap(rxcpp::observable<InodeResult> inode_results)439 auto /*just observable<InodeMap>*/ ReduceResolvedInodesToMap(
440       rxcpp::observable<InodeResult> inode_results) {
441   return inode_results.reduce(
442     InodeMap{},
443     [](InodeMap m, InodeResult result) {
444       if (result) {
445         LOG(VERBOSE) << "compiler::ReduceResolvedInodesToMap insert " << result;
446         m.insert({std::move(result.inode), std::move(result.data.value())});
447       } else {
448         // TODO: side stats for how many of these are failed to resolve?
449         LOG(WARNING) << "compiler: Failed to resolve inode, " << result;
450       }
451       return m;
452     },
453     [](InodeMap m) {
454       return m;  // TODO: use an identity function
455     }); // emits exactly 1 InodeMap value.
456 }
457 
458 struct ResolvedPageCacheFtraceEvent {
459   std::string filename;
460   PageCacheFtraceEvent event;
461 };
462 
operator <<(std::ostream & os,const ResolvedPageCacheFtraceEvent & e)463 std::ostream& operator<<(std::ostream& os, const ResolvedPageCacheFtraceEvent& e) {
464   os << "{";
465   os << "filename:\"" << e.filename << "\",";
466   os << e.event;
467   os << "}";
468 
469   return os;
470 }
471 
472 struct CombinedState {
473   CombinedState() = default;
CombinedStateiorap::compiler::CombinedState474   explicit CombinedState(InodeMap inode_map) : inode_map{std::move(inode_map)} {}
CombinedStateiorap::compiler::CombinedState475   explicit CombinedState(PageCacheFtraceEvent event) : ftrace_event{std::move(event)} {}
476 
CombinedStateiorap::compiler::CombinedState477   CombinedState(InodeMap inode_map, PageCacheFtraceEvent event)
478     : inode_map(std::move(inode_map)),
479       ftrace_event{std::move(event)} {}
480 
481   std::optional<InodeMap> inode_map;
482   std::optional<PageCacheFtraceEvent> ftrace_event;
483 
HasAlliorap::compiler::CombinedState484   bool HasAll() const {
485     return inode_map.has_value() && ftrace_event.has_value();
486   }
487 
GetInodeMapiorap::compiler::CombinedState488   const InodeMap& GetInodeMap() const {
489     DCHECK(HasAll());
490     return inode_map.value();
491   }
492 
GetInodeMapiorap::compiler::CombinedState493   InodeMap& GetInodeMap() {
494     DCHECK(HasAll());
495     return inode_map.value();
496   }
497 
GetEventiorap::compiler::CombinedState498   const PageCacheFtraceEvent& GetEvent() const {
499     DCHECK(HasAll());
500     return ftrace_event.value();
501   }
502 
GetEventiorap::compiler::CombinedState503   PageCacheFtraceEvent& GetEvent() {
504     DCHECK(HasAll());
505     return ftrace_event.value();
506   }
507 
Mergeiorap::compiler::CombinedState508   void Merge(CombinedState&& other) {
509     if (other.inode_map) {
510       inode_map = std::move(other.inode_map);
511     }
512     if (other.ftrace_event) {
513       ftrace_event = std::move(other.ftrace_event);
514     }
515   }
516 };
517 
operator <<(std::ostream & os,const CombinedState & s)518 std::ostream& operator<<(std::ostream& os, const CombinedState& s) {
519   os << "CombinedState{inode_map:";
520   if (s.inode_map) {
521     os << "|sz=" << (s.inode_map.value().size()) << "|";
522   } else {
523     os << "(null)";
524   }
525   os << ",event:";
526   if (s.ftrace_event) {
527     //os << s.ftrace_event.value().timestamp << "ns";
528     os << s.ftrace_event.value();
529   } else {
530     os << "(null)";
531   }
532   os << "}";
533   return os;
534 }
535 
ResolvePageCacheEntriesFromProtos(rxcpp::observable<PerfettoTracePtrInfo> traces,inode2filename::InodeResolverDependencies dependencies)536 auto/*observable<ResolvedPageCacheFtraceEvent>*/ ResolvePageCacheEntriesFromProtos(
537     rxcpp::observable<PerfettoTracePtrInfo> traces,
538     inode2filename::InodeResolverDependencies dependencies) {
539 
540   // 1st chain = emits exactly 1 InodeMap.
541 
542   // [proto, proto, proto...] -> [inode, inode, inode, ...]
543   auto/*observable<Inode>*/ distinct_inodes = SelectDistinctInodesFromTraces(traces);
544   rxcpp::observable<Inode> distinct_inodes_obs = distinct_inodes.as_dynamic();
545   // [inode, inode, inode, ...] -> [(inode, {filename|error}), ...]
546   auto/*observable<InodeResult>*/ inode_names = ResolveInodesToFileNames(distinct_inodes_obs,
547                                                                          std::move(dependencies));
548   // rxcpp has no 'join' operators, so do a manual join with concat.
549   auto/*observable<InodeMap>*/ inode_name_map = ReduceResolvedInodesToMap(inode_names);
550 
551   // 2nd chain = emits all PageCacheFtraceEvent
552   auto/*observable<PageCacheFtraceEvent>*/ page_cache_ftrace_events = traces
553     .flat_map([](PerfettoTracePtrInfo trace) {
554       rxcpp::observable<PageCacheFtraceEvent> obs = SelectPageCacheFtraceEvents(std::move(trace));
555       return obs;
556     });
557 
558   auto inode_name_map_precombine = inode_name_map
559     .map([](InodeMap inode_map) {
560       LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos#inode_name_map_precombine ";
561       return CombinedState{std::move(inode_map)};
562     });
563 
564   auto page_cache_ftrace_events_precombine = page_cache_ftrace_events
565     .map([](PageCacheFtraceEvent event) {
566       LOG(VERBOSE)
567           << "compiler::ResolvePageCacheEntriesFromProtos#page_cache_ftrace_events_precombine "
568           << event;
569       return CombinedState{std::move(event)};
570     });
571 
572   // Combine 1st+2nd chain.
573   //
574   // concat subscribes to each observable, waiting until its completed, before subscribing
575   // to the next observable and waiting again.
576   //
577   // During all this, every #on_next is immediately forwarded to the downstream observables.
578   // In our case, we want to block until InodeNameMap is ready, and re-iterate all ftrace events.
579   auto/*observable<ResolvedPageCacheFtraceEvent>*/ resolved_events = inode_name_map_precombine
580     .concat(page_cache_ftrace_events_precombine)
581     .scan(CombinedState{},
582           [](CombinedState current_state, CombinedState delta_state) {
583             LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos#scan "
584                           << "current=" << current_state << ","
585                           << "delta=" << delta_state;
586             // IT0    = (,)               + (InodeMap,)
587             // IT1    = (InodeMap,)       + (,Event)
588             // IT2..N = (InodeMap,Event1) + (,Event2)
589             current_state.Merge(std::move(delta_state));
590             return current_state;
591           })
592     .filter([](const CombinedState& state) {
593       return state.HasAll();
594     })
595     .map([](CombinedState& state) -> std::optional<ResolvedPageCacheFtraceEvent> {
596       PageCacheFtraceEvent& event = state.GetEvent();
597       const InodeMap& inode_map = state.GetInodeMap();
598 
599       auto it = inode_map.find(event.inode());
600       if (it != inode_map.end()) {
601         std::string filename = it->second;
602         LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos combine_latest " << event;
603         return ResolvedPageCacheFtraceEvent{std::move(filename), std::move(event)};
604       } else {
605         LOG(ERROR) << "compiler: FtraceEvent's inode did not have resolved filename: " << event;
606         return std::nullopt;
607       }
608     })
609     .filter(
610       [](std::optional<ResolvedPageCacheFtraceEvent> maybe_event) {
611         return maybe_event.has_value();
612       })
613     .map([](std::optional<ResolvedPageCacheFtraceEvent> maybe_event) {
614       return std::move(maybe_event.value());
615     });
616     // -> observable<ResolvedPageCacheFtraceEvent>
617 
618   return resolved_events;
619 }
620 
621 namespace detail {
multiless_one(const std::string & a,const std::string & b)622 bool multiless_one(const std::string& a, const std::string& b) {
623   return std::lexicographical_compare(a.begin(), a.end(),
624                                       b.begin(), b.end());
625 }
626 
627 template <typename T>
multiless_one(T && a,T && b)628 constexpr bool multiless_one(T&& a, T&& b) {   // a < b
629   using std::less;  // ADL
630   return less<std::decay_t<T>>{}(std::forward<T>(a), std::forward<T>(b));
631 }
632 
multiless()633 constexpr bool multiless() {
634   return false;  // [] < [] is always false.
635 }
636 
637 template <typename T, typename ... Args>
multiless(T && a,T && b,Args &&...args)638 constexpr bool multiless(T&& a, T&& b, Args&&... args) {
639   if (a != b) {
640     return multiless_one(std::forward<T>(a), std::forward<T>(b));
641   } else {
642     return multiless(std::forward<Args>(args)...);
643   }
644 }
645 
646 }  // namespace detail
647 
648 // Return [A0...An] < [B0...Bn] ; vector-like scalar comparison of each field.
649 // Arguments are passed in the order A0,B0,A1,B1,...,An,Bn.
650 template <typename ... Args>
multiless(Args &&...args)651 constexpr bool multiless(Args&&... args) {
652   return detail::multiless(std::forward<Args>(args)...);
653 }
654 
655 struct CompilerPageCacheEvent {
656   std::string filename;
657   uint64_t timestamp_relative;  // use relative timestamp because absolute values aren't comparable
658                                 // across different trace protos.
659                                 // relative timestamps can be said to be 'approximately' comparable.
660                                 // assuming we compare the same application startup's trace times.
661   bool add_to_page_cache;  // AddToPageCache=true, DeleteFromPageCache=false.
662   uint64_t index;          // offset into file: this is a multiple of the page size (usually 4096).
663 
664   // All other data from the ftrace is dropped because we don't currently use it in the
665   // compiler algorithms.
666 
667   CompilerPageCacheEvent() = default;
CompilerPageCacheEventiorap::compiler::CompilerPageCacheEvent668   CompilerPageCacheEvent(const ResolvedPageCacheFtraceEvent& resolved)
669     : CompilerPageCacheEvent(resolved.filename, resolved.event) {
670   }
671 
CompilerPageCacheEventiorap::compiler::CompilerPageCacheEvent672   CompilerPageCacheEvent(ResolvedPageCacheFtraceEvent&& resolved)
673     : CompilerPageCacheEvent(std::move(resolved.filename), std::move(resolved.event)) {
674   }
675 
676   // Compare all fields (except the timestamp field).
LessIgnoringTimestampiorap::compiler::CompilerPageCacheEvent677   static bool LessIgnoringTimestamp(const CompilerPageCacheEvent& a,
678                                     const CompilerPageCacheEvent& b) {
679     return multiless(a.filename, b.filename,
680                      a.add_to_page_cache, b.add_to_page_cache,
681                      a.index, b.index);
682   }
683 
684   // Compare all fields. Timestamps get highest precedence.
operator <iorap::compiler::CompilerPageCacheEvent685   bool operator<(const CompilerPageCacheEvent& rhs) const {
686     return multiless(timestamp_relative, rhs.timestamp_relative,
687                      filename, rhs.filename,
688                      add_to_page_cache, rhs.add_to_page_cache,
689                      index, rhs.index);
690   }
691 
692  private:
CompilerPageCacheEventiorap::compiler::CompilerPageCacheEvent693   CompilerPageCacheEvent(std::string filename, const PageCacheFtraceEvent& event)
694     : filename(std::move(filename)),
695       timestamp_relative(event.timestamp_relative),
696       add_to_page_cache(event.add_to_page_cache),
697       index(event.index) {
698    }
699 };
700 
operator <<(std::ostream & os,const CompilerPageCacheEvent & e)701 std::ostream& operator<<(std::ostream& os, const CompilerPageCacheEvent& e) {
702   os << "{";
703   os << "filename:\"" << e.filename << "\",";
704   os << "timestamp:" << e.timestamp_relative << ",";
705   os << "add_to_page_cache:" << e.add_to_page_cache << ",";
706   os << "index:" << e.index;
707   os << "}";
708   return os;
709 }
710 
711 // Filter an observable chain of 'ResolvedPageCacheFtraceEvent'
712 // into an observable chain of 'ResolvedPageCacheFtraceEvent'.
713 //
714 // Any items emitted by the input chain that match the regular expression
715 // specified by blacklist_filter are not emitted into the output chain.
ApplyBlacklistToPageCacheEvents(rxcpp::observable<ResolvedPageCacheFtraceEvent> resolved_events,std::optional<std::string> blacklist_filter)716 auto/*observable<ResolvedPageCacheFtraceEvent>*/ ApplyBlacklistToPageCacheEvents(
717     rxcpp::observable<ResolvedPageCacheFtraceEvent> resolved_events,
718     std::optional<std::string> blacklist_filter) {
719   bool has_re = blacklist_filter.has_value();
720   // default regex engine is ecmascript.
721   std::regex reg_exp{blacklist_filter ? *blacklist_filter : std::string("")};
722 
723   return resolved_events.filter(
724     [reg_exp, has_re](const ResolvedPageCacheFtraceEvent& event) {
725       if (!has_re) {
726         return true;
727       }
728       // Remove any entries that match the regex in --blacklist-filter/-bf.
729       bool res = std::regex_search(event.filename, reg_exp);
730       if (res) {
731         LOG(VERBOSE) << "Blacklist filter removed '" << event.filename << "' from chain.";
732       }
733       return !res;
734     });
735 }
736 
737 // Compile an observable chain of 'ResolvedPageCacheFtraceEvent' into
738 // an observable chain of distinct, timestamp-ordered, CompilerPageCacheEvent.
739 //
740 // This is a reducing operation: No items are emitted until resolved_events is completed.
CompilePageCacheEvents(rxcpp::observable<ResolvedPageCacheFtraceEvent> resolved_events)741 auto/*observable<CompilerPageCacheEvent>*/ CompilePageCacheEvents(
742     rxcpp::observable<ResolvedPageCacheFtraceEvent> resolved_events) {
743 
744   struct CompilerPageCacheEventIgnoringTimestampLess {
745     bool operator()(const CompilerPageCacheEvent& lhs,
746                     const CompilerPageCacheEvent& rhs) const {
747       return CompilerPageCacheEvent::LessIgnoringTimestamp(lhs, rhs);
748     }
749   };
750 
751   // Greedy O(N) compilation algorithm.
752   //
753   // This produces an inoptimal result (e.g. a small timestamp
754   // that might occur only 1% of the time nevertheless wins out), but the
755   // algorithm itself is quite simple, and doesn't require any heuristic tuning.
756 
757   // First pass: *Merge* into set that ignores the timestamp value for order, but retains
758   //             the smallest timestamp value if the same key is re-inserted.
759   using IgnoreTimestampForOrderingSet =
760       std::set<CompilerPageCacheEvent, CompilerPageCacheEventIgnoringTimestampLess>;
761   // Second pass: *Sort* data by smallest timestamp first.
762   using CompilerPageCacheEventSet =
763       std::set<CompilerPageCacheEvent>;
764 
765   return resolved_events
766     .map(
767       [](ResolvedPageCacheFtraceEvent event) {
768         // Drop all the extra metadata like pid, cpu, etc.
769         // When we merge we could keep a list of the original data, but there is no advantage
770         // to doing so.
771         return CompilerPageCacheEvent{std::move(event)};
772       }
773     )
774    .reduce(
775     IgnoreTimestampForOrderingSet{},
776     [](IgnoreTimestampForOrderingSet set, CompilerPageCacheEvent event) {
777       // Add each event to the set, keying by everything but the timestamp.
778       // If the key is already inserted, re-insert with the smaller timestamp value.
779       auto it = set.find(event);
780 
781       if (it == set.end()) {
782         // Need to insert new element.
783         set.insert(std::move(event));
784       } else if (it->timestamp_relative > event.timestamp_relative) {
785         // Replace existing element: the new element has a smaller timestamp.
786         it = set.erase(it);
787         // Amortized O(1) time if insertion happens in the position before the hint.
788         set.insert(it, std::move(event));
789       } // else: Skip insertion. Element already present with the minimum timestamp.
790 
791       return set;
792     },
793     [](IgnoreTimestampForOrderingSet set) {
794       // Extract all elements from 'set', re-insert into 'ts_set'.
795       // The values are now ordered by timestamp (and then the rest of the fields).
796       CompilerPageCacheEventSet ts_set;
797       ts_set.merge(std::move(set));
798 
799 
800       std::shared_ptr<CompilerPageCacheEventSet> final_set{
801           new CompilerPageCacheEventSet{std::move(ts_set)}};
802       return final_set;
803       // return ts_set;
804     })  // observable<CompilerPageCacheEventSet> (just)
805   .flat_map(
806     [](std::shared_ptr<CompilerPageCacheEventSet> final_set) {
807       // TODO: flat_map seems to make a copy of the parameter _every single iteration_
808       // without the shared_ptr it would just make a copy of the set every time it went
809       // through the iterate function.
810       // Causing absurdly slow compile times x1000 slower than we wanted.
811       // TODO: file a bug upstream and/or fix upstream.
812       CompilerPageCacheEventSet& ts_set = *final_set;
813     // [](CompilerPageCacheEventSet& ts_set) {
814       LOG(DEBUG) << "compiler: Merge-pass completed (" << ts_set.size() << " entries).";
815       //return rxcpp::sources::iterate(std::move(ts_set));
816       return rxcpp::sources::iterate(ts_set).map([](CompilerPageCacheEvent e) { return e; });
817     }
818   );   // observable<CompilerPageCacheEvent>
819 }
820 
821 /** Makes a vector of info that includes filename and timestamp limit. */
MakeCompilationInputs(std::vector<std::string> input_file_names,std::vector<uint64_t> timestamp_limit_ns)822 std::vector<CompilationInput> MakeCompilationInputs(
823     std::vector<std::string> input_file_names,
824     std::vector<uint64_t> timestamp_limit_ns){
825   // If the timestamp limit is empty, set the limit to max value
826   // for each trace file.
827   if (timestamp_limit_ns.empty()) {
828     for (size_t i = 0; i < input_file_names.size(); i++) {
829       timestamp_limit_ns.push_back(std::numeric_limits<uint64_t>::max());
830     }
831   }
832   DCHECK_EQ(input_file_names.size(), timestamp_limit_ns.size());
833   std::vector<CompilationInput> file_infos;
834   for (size_t i = 0; i < input_file_names.size(); i++) {
835     file_infos.push_back({input_file_names[i], timestamp_limit_ns[i]});
836   }
837   return file_infos;
838 }
839 
PerformCompilation(std::vector<CompilationInput> perfetto_traces,std::string output_file_name,bool output_proto,std::optional<std::string> blacklist_filter,inode2filename::InodeResolverDependencies dependencies)840 bool PerformCompilation(std::vector<CompilationInput> perfetto_traces,
841                         std::string output_file_name,
842                         bool output_proto,
843                         std::optional<std::string> blacklist_filter,
844                         inode2filename::InodeResolverDependencies dependencies) {
845   auto trace_protos = ReadPerfettoTraceProtos(std::move(perfetto_traces));
846   auto resolved_events = ResolvePageCacheEntriesFromProtos(std::move(trace_protos),
847                                                            std::move(dependencies));
848   auto filtered_events =
849       ApplyBlacklistToPageCacheEvents(std::move(resolved_events), blacklist_filter);
850   auto compiled_events = CompilePageCacheEvents(std::move(filtered_events));
851 
852   std::ofstream ofs;
853   if (!output_file_name.empty()) {
854 
855     if (!output_proto) {
856       ofs.open(output_file_name);
857 
858       if (!ofs) {
859         LOG(ERROR) << "compiler: Failed to open output file for writing: " << output_file_name;
860         return false;
861       }
862     }
863   }
864 
865   auto trace_file_proto = serialize::ArenaPtr<serialize::proto::TraceFile>::Make();
866 
867   // Fast lookup of filename -> FileIndex id.
868   std::unordered_map<std::string, int64_t /*file handle id*/> file_path_map;
869   int64_t file_handle_id = 0;
870 
871   int counter = 0;
872   compiled_events
873     // .as_blocking()
874     .tap([&](CompilerPageCacheEvent& event) {
875       if (!output_proto) {
876         return;
877       }
878 
879       if (!event.add_to_page_cache) {
880         // Skip DeleteFromPageCache events, they are only used for intermediate.
881         return;
882       }
883 
884       DCHECK(trace_file_proto->mutable_index() != nullptr);
885       serialize::proto::TraceFileIndex& index = *trace_file_proto->mutable_index();
886       int64_t file_handle;
887 
888       // Add TraceFileIndexEntry if it doesn't exist.
889 
890       auto it = file_path_map.find(event.filename);
891       if (it == file_path_map.end()) {
892         file_handle = file_handle_id++;
893         file_path_map[event.filename] = file_handle;
894 
895         serialize::proto::TraceFileIndexEntry* entry = index.add_entries();
896         DCHECK(entry != nullptr);
897         entry->set_id(file_handle);
898         entry->set_file_name(event.filename);
899 
900         if (kIsDebugBuild) {
901           int i = static_cast<int>(file_handle);
902           const serialize::proto::TraceFileIndexEntry& entry_ex = index.entries(i);
903           DCHECK_EQ(entry->id(), entry_ex.id());
904           DCHECK_EQ(entry->file_name(), entry_ex.file_name());
905         }
906       } else {
907         file_handle = it->second;
908       }
909       int kPageSize = 4096;  // TODO: don't hardcode the page size.
910 
911       // Add TraceFileEntry.
912       DCHECK(trace_file_proto->mutable_list() != nullptr);
913       serialize::proto::TraceFileEntry* entry = trace_file_proto->mutable_list()->add_entries();
914       DCHECK(entry != nullptr);
915 
916       entry->set_index_id(file_handle);
917       // Page index -> file offset in bytes.
918       entry->set_file_offset(static_cast<int64_t>(event.index) * kPageSize);
919       entry->set_file_length(kPageSize);
920     })
921     .subscribe([&](CompilerPageCacheEvent event) {
922       if (!output_proto) {
923         if (output_file_name.empty()) {
924           LOG(INFO) << "CompilerPageCacheEvent" << event << std::endl;
925         } else {
926           ofs << event << "\n";  // TODO: write in proto format instead.
927         }
928       }
929       ++counter;
930     });
931 
932   if (output_proto) {
933     LOG(DEBUG) << "compiler: WriteFully to begin into " << output_file_name;
934     ::google::protobuf::MessageLite& message = *trace_file_proto.get();
935     if (auto res = serialize::ProtobufIO::WriteFully(message, output_file_name); !res) {
936       errno = res.error();
937       PLOG(ERROR) << "compiler: Failed to write protobuf to file: " << output_file_name;
938       return false;
939     } else {
940       LOG(INFO) << "compiler: Wrote protobuf " << output_file_name;
941     }
942   }
943 
944   LOG(DEBUG) << "compiler: Compilation completed (" << counter << " events).";
945 
946   return true;
947 }
948 
949 }  // namespace iorap::compiler
950