/* * Copyright (C) 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "binder/package_version_map.h" #include "common/debug.h" #include "common/expected.h" #include "common/printer.h" #include "common/rx_async.h" #include "common/property.h" #include "common/trace.h" #include "db/app_component_name.h" #include "db/file_models.h" #include "db/models.h" #include "maintenance/controller.h" #include "maintenance/db_cleaner.h" #include "manager/event_manager.h" #include "perfetto/rx_producer.h" #include "prefetcher/read_ahead.h" #include "prefetcher/task_id.h" #include #include #include #include #include #include #include #include #include #include #include #include using rxcpp::observe_on_one_worker; namespace iorap::manager { using binder::AppLaunchEvent; using binder::DexOptEvent; using binder::JobScheduledEvent; using binder::RequestId; using binder::TaskResult; using common::AsyncPool; using common::RxAsync; using perfetto::PerfettoStreamCommand; using perfetto::PerfettoTraceProto; using db::AppComponentName; const constexpr bool kExcludeDexFilesDefault = true; static std::atomic s_tracing_allowed{false}; static std::atomic s_readahead_allowed{false}; static std::atomic s_min_traces{3}; struct PackageBlacklister { // "x.y.z;foo.bar.baz" colon-separated list of substrings PackageBlacklister(std::string blacklist_string) { LOG(VERBOSE) << "Configuring package blacklister with string: " << blacklist_string; std::vector split = ::android::base::Split(blacklist_string, ";"); // Ignore any l/r whitespace or empty strings. for (const std::string& s : split) { std::string t = ::android::base::Trim(s); if (!t.empty()) { LOG(INFO) << "Blacklisted package: " << t << "; will not optimize."; packages_.push_back(t); } } } PackageBlacklister() = default; bool IsBlacklisted(const std::string& package_name) const { return std::find(packages_.begin(), packages_.end(), package_name) != packages_.end(); } bool IsBlacklisted(const AppComponentName& component_name) const { return IsBlacklisted(component_name.package); } bool IsBlacklisted(const std::optional& component_name) const { return component_name.has_value() && IsBlacklisted(component_name->package); } private: std::vector packages_; }; using PackageVersionMap = std::unordered_map; // Main logic of the #OnAppLaunchEvent scan method. // // All functions are called from the same thread as the event manager // functions. // // This is a data type, it's moved (std::move) around from one iteration // of #scan to another. struct AppLaunchEventState { std::optional component_name_; // Sequence ID is shared amongst the same app launch sequence, // but changes whenever a new app launch sequence begins. size_t sequence_id_ = static_cast(-1); std::optional temperature_; // Push data to perfetto rx chain for associating // the raw_trace with the history_id. std::optional> history_id_subscriber_; rxcpp::observable history_id_observable_; std::optional intent_started_ns_; std::optional total_time_ns_; // Used by kReportFullyDrawn to find the right history_id. // We assume no interleaving between different sequences. // This assumption is checked in the Java service code. std::optional recent_history_id_; // labeled as 'shared' due to rx not being able to handle move-only objects. // lifetime: in practice equivalent to unique_ptr. std::shared_ptr read_ahead_; bool allowed_readahead_{true}; bool is_read_ahead_{false}; std::optional read_ahead_task_; bool allowed_tracing_{true}; bool is_tracing_{false}; std::optional rx_lifetime_; std::vector rx_in_flight_; PackageBlacklister package_blacklister_{}; borrowed perfetto_factory_; // not null borrowed thread_; // not null borrowed io_thread_; // not null borrowed async_pool_; // not null std::shared_ptr version_map_; explicit AppLaunchEventState(borrowed perfetto_factory, bool allowed_readahead, bool allowed_tracing, PackageBlacklister package_blacklister, borrowed thread, borrowed io_thread, borrowed async_pool, std::shared_ptr version_map) : read_ahead_{std::make_shared()} { perfetto_factory_ = perfetto_factory; DCHECK(perfetto_factory_ != nullptr); allowed_readahead_ = allowed_readahead; allowed_tracing_ = allowed_tracing; package_blacklister_ = package_blacklister; thread_ = thread; DCHECK(thread_ != nullptr); io_thread_ = io_thread; DCHECK(io_thread_ != nullptr); async_pool_ = async_pool; DCHECK(async_pool_ != nullptr); version_map_ = version_map; DCHECK(version_map_ != nullptr); } // Updates the values in this struct only as a side effect. // // May create and fire a new rx chain on the same threads as passed // in by the constructors. void OnNewEvent(const AppLaunchEvent& event) { LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: " << event; android::ScopedTrace trace_db_init{ATRACE_TAG_ACTIVITY_MANAGER, "IorapNativeService::OnAppLaunchEvent"}; using Type = AppLaunchEvent::Type; DCHECK_GE(event.sequence_id, 0); sequence_id_ = static_cast(event.sequence_id); allowed_readahead_ = s_readahead_allowed; allowed_tracing_ = s_tracing_allowed; switch (event.type) { case Type::kIntentStarted: { const std::string& package_name = event.intent_proto->component().package_name(); const std::string& class_name = event.intent_proto->component().class_name(); AppComponentName component_name{package_name, class_name}; component_name = component_name.Canonicalize(); component_name_ = component_name; if (package_blacklister_.IsBlacklisted(component_name)) { LOG(DEBUG) << "kIntentStarted: package " << component_name.package << " ignored due to blacklisting."; break; } // Create a new history ID chain for each new app start-up sequence. auto history_id_observable = rxcpp::observable<>::create( [&](rxcpp::subscriber subscriber) { history_id_subscriber_ = std::move(subscriber); LOG(VERBOSE) << " set up the history id subscriber "; }) .tap([](int history_id) { LOG(VERBOSE) << " tap rx history id = " << history_id; }) .replay(1); // Remember the history id in case we subscribe too late. history_id_observable_ = history_id_observable; // Immediately turn observable hot, creating the subscriber. history_id_observable.connect(); DCHECK(!IsTracing()); // The time should be set before perfetto tracing. // Record the timestamp even no perfetto tracing is triggered, // because the tracing may start in the following ActivityLaunched // event. Otherwise, there will be no starting timestamp and // trace without starting timestamp is not considered for compilation. if (event.timestamp_nanos >= 0) { intent_started_ns_ = event.timestamp_nanos; } else { LOG(WARNING) << "Negative event timestamp: " << event.timestamp_nanos; } break; } case Type::kIntentFailed: if (package_blacklister_.IsBlacklisted(component_name_)) { LOG(VERBOSE) << "kIntentFailed: package " << component_name_->package << " ignored due to blacklisting."; break; } if (history_id_subscriber_) { history_id_subscriber_->on_error(rxcpp::util::make_error_ptr( std::ios_base::failure("Aborting due to intent failed"))); history_id_subscriber_ = std::nullopt; } break; case Type::kActivityLaunched: { // TODO add test in Android framework to verify this. const std::string& title = event.activity_record_proto->window_token().window_container().identifier().title(); if (!AppComponentName::HasAppComponentName(title)) { // Proto comment claim this is sometimes a window title. // We need the actual 'package/component' here, so just ignore it if it's a title. LOG(WARNING) << "App launched without a component name: " << event; break; } AppComponentName component_name = AppComponentName::FromString(title); component_name = component_name.Canonicalize(); component_name_ = component_name; if (package_blacklister_.IsBlacklisted(component_name_)) { LOG(VERBOSE) << "kActivityLaunched: package " << component_name_->package << " ignored due to blacklisting."; break; } // Cancel tracing for warm/hot. // Restart tracing if the activity was unexpected. AppLaunchEvent::Temperature temperature = event.temperature; temperature_ = temperature; if (temperature != AppLaunchEvent::Temperature::kCold) { LOG(DEBUG) << "AppLaunchEventState#OnNewEvent don't trace due to non-cold temperature"; } else if (!IsTracing() && !IsReadAhead()) { // and the temperature is Cold. // Start late trace when intent didn't have a component name LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent need to start new trace"; if (allowed_readahead_ && !IsReadAhead()) { StartReadAhead(sequence_id_, component_name); } if (allowed_tracing_ && !IsTracing() && !IsReadAhead()) { rx_lifetime_ = StartTracing(std::move(component_name)); } } else { // FIXME: match actual component name against intent component name. // abort traces if they don't match. if (allowed_tracing_) { LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already tracing"; } LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already doing readahead"; } break; } case Type::kActivityLaunchFinished: if (package_blacklister_.IsBlacklisted(component_name_)) { LOG(VERBOSE) << "kActivityLaunchFinished: package " << component_name_->package << " ignored due to blacklisting."; break; } if (event.timestamp_nanos >= 0) { total_time_ns_ = event.timestamp_nanos; } RecordDbLaunchHistory(event.activity_record_proto->proc_id()); // Finish tracing and collect trace buffer. // // TODO: this happens automatically when perfetto finishes its // trace duration. if (IsTracing()) { MarkPendingTrace(); } FinishReadAhead(); break; case Type::kActivityLaunchCancelled: if (package_blacklister_.IsBlacklisted(component_name_)) { LOG(VERBOSE) << "kActivityLaunchCancelled: package " << component_name_->package << " ignored due to blacklisting."; break; } // Abort tracing. AbortTrace(); AbortReadAhead(); break; case Type::kReportFullyDrawn: { if (package_blacklister_.IsBlacklisted(component_name_)) { LOG(VERBOSE) << "kReportFullyDrawn: package " << component_name_->package << " ignored due to blacklisting."; break; } if (!recent_history_id_) { LOG(WARNING) << "Dangling kReportFullyDrawn event"; return; } UpdateReportFullyDrawn(*recent_history_id_, event.timestamp_nanos); recent_history_id_ = std::nullopt; break; } default: DCHECK(false) << "invalid type: " << event; // binder layer should've rejected this. LOG(ERROR) << "invalid type: " << event; // binder layer should've rejected this. } } // Is there an in-flight readahead task currently? bool IsReadAhead() const { return read_ahead_task_.has_value(); } // Gets the compiled trace. // If a compiled trace exists in sqlite, use that one. Otherwise, try // to find a prebuilt one. std::optional GetCompiledTrace(const AppComponentName& component_name) { ScopedFormatTrace atrace_get_compiled_trace(ATRACE_TAG_ACTIVITY_MANAGER, "GetCompiledTrace"); // Firstly, try to find the compiled trace from sqlite. android::base::Timer timer{}; db::DbHandle db{db::SchemaModel::GetSingleton()}; std::optional version = version_map_->GetOrQueryPackageVersion(component_name.package); if (!version) { LOG(DEBUG) << "The version is NULL, maybe package manager is down."; return std::nullopt; } db::VersionedComponentName vcn{component_name.package, component_name.activity_name, *version}; std::optional compiled_trace = db::PrefetchFileModel::SelectByVersionedComponentName(db, vcn); std::chrono::milliseconds duration_ms = timer.duration(); LOG(DEBUG) << "EventManager: Looking up compiled trace done in " << duration_ms.count() // the count of ticks. << "ms."; if (compiled_trace) { if (std::filesystem::exists(compiled_trace->file_path)) { return compiled_trace->file_path; } else { LOG(DEBUG) << "Compiled trace in sqlite doesn't exists. file_path: " << compiled_trace->file_path; } } LOG(DEBUG) << "Cannot find compiled trace in sqlite for package_name: " << component_name.package << " activity_name: " << component_name.activity_name; // If sqlite doesn't have the compiled trace, try the prebuilt path. std::string file_path = "/product/iorap-trace/"; file_path += component_name.ToMakeFileSafeEncodedPkgString(); file_path += ".compiled_trace.pb"; if (std::filesystem::exists(file_path)) { return file_path; } LOG(DEBUG) << "Prebuilt compiled trace doesn't exists. file_path: " << file_path; return std::nullopt; } void StartReadAhead(size_t id, const AppComponentName& component_name) { DCHECK(allowed_readahead_); DCHECK(!IsReadAhead()); std::optional file_path = GetCompiledTrace(component_name); if (!file_path) { LOG(VERBOSE) << "Cannot find a compiled trace."; return; } prefetcher::TaskId task{id, *file_path}; read_ahead_->BeginTask(task); // TODO: non-void return signature? read_ahead_task_ = std::move(task); } void FinishReadAhead() { // if no readahead task exist, do nothing. if (!IsReadAhead()){ return; } read_ahead_->FinishTask(*read_ahead_task_); read_ahead_task_ = std::nullopt; } void AbortReadAhead() { FinishReadAhead(); } bool IsTracing() const { return is_tracing_; } std::optional StartTracing( AppComponentName component_name) { DCHECK(allowed_tracing_); DCHECK(!IsTracing()); std::optional version = version_map_->GetOrQueryPackageVersion(component_name_->package); if (!version) { LOG(DEBUG) << "The version is NULL, maybe package manager is down."; return std::nullopt; } db::VersionedComponentName versioned_component_name{component_name.package, component_name.activity_name, *version}; db::DbHandle db{db::SchemaModel::GetSingleton()}; { ScopedFormatTrace atrace_traces_number_check( ATRACE_TAG_ACTIVITY_MANAGER, "IorapNativeService::CheckPerfettoTracesNnumber"); // Just return if we have enough perfetto traces. if (!db::PerfettoTraceFileModel::NeedMorePerfettoTraces( db, versioned_component_name)) { return std::nullopt; } } auto /*observable*/ perfetto_commands = rxcpp::observable<>::just(PerfettoStreamCommand::kStartTracing) // wait 1x .concat( // Pick a value longer than the perfetto config delay_ms, so that we send // 'kShutdown' after tracing has already finished. rxcpp::observable<>::interval(std::chrono::milliseconds(10000)) .take(2) // kStopTracing, kShutdown. .map([](int value) { // value is 1,2,3,... return static_cast(value); // 1,2, ... }) ); auto /*observable*/ trace_proto_stream = perfetto_factory_->CreateTraceStream(perfetto_commands); // This immediately connects to perfetto asynchronously. // // TODO: create a perfetto handle earlier, to minimize perfetto startup latency. rxcpp::composite_subscription lifetime; auto stream_via_threads = trace_proto_stream .tap([](const PerfettoTraceProto& trace_proto) { LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (1)"; }) .combine_latest(history_id_observable_) .observe_on(*thread_) // All work prior to 'observe_on' is handled on thread_. .subscribe_on(*thread_) // All work prior to 'observe_on' is handled on thread_. .observe_on(*io_thread_) // Write data on an idle-class-priority thread. .tap([](std::tuple trace_tuple) { LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (2)"; }); lifetime = RxAsync::SubscribeAsync(*async_pool_, std::move(stream_via_threads), /*on_next*/[versioned_component_name] (std::tuple trace_tuple) { PerfettoTraceProto& trace_proto = std::get<0>(trace_tuple); int history_id = std::get<1>(trace_tuple); db::PerfettoTraceFileModel file_model = db::PerfettoTraceFileModel::CalculateNewestFilePath(versioned_component_name); std::string file_path = file_model.FilePath(); ScopedFormatTrace atrace_write_to_file(ATRACE_TAG_ACTIVITY_MANAGER, "Perfetto Write Trace To File %s", file_path.c_str()); if (!file_model.MkdirWithParents()) { LOG(ERROR) << "Cannot save TraceBuffer; failed to mkdirs " << file_path; return; } if (!trace_proto.WriteFullyToFile(file_path)) { LOG(ERROR) << "Failed to save TraceBuffer to " << file_path; } else { LOG(INFO) << "Perfetto TraceBuffer saved to file: " << file_path; ScopedFormatTrace atrace_update_raw_traces_table( ATRACE_TAG_ACTIVITY_MANAGER, "update raw_traces table history_id = %d", history_id); db::DbHandle db{db::SchemaModel::GetSingleton()}; std::optional raw_trace = db::RawTraceModel::Insert(db, history_id, file_path); if (!raw_trace) { LOG(ERROR) << "Failed to insert raw_traces for " << file_path; } else { LOG(VERBOSE) << "Inserted into db: " << *raw_trace; } } }, /*on_error*/[](rxcpp::util::error_ptr err) { LOG(ERROR) << "Perfetto trace proto collection error: " << rxcpp::util::what(err); }); is_tracing_ = true; return lifetime; } void AbortTrace() { LOG(VERBOSE) << "AppLaunchEventState - AbortTrace"; // if the tracing is not running, do nothing. if (!IsTracing()){ return; } is_tracing_ = false; if (rx_lifetime_) { // TODO: it would be good to call perfetto Destroy. rx_in_flight_.erase(std::remove(rx_in_flight_.begin(), rx_in_flight_.end(), *rx_lifetime_), rx_in_flight_.end()); LOG(VERBOSE) << "AppLaunchEventState - AbortTrace - Unsubscribe"; rx_lifetime_->unsubscribe(); rx_lifetime_.reset(); } } void MarkPendingTrace() { LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace"; DCHECK(is_tracing_); DCHECK(rx_lifetime_.has_value()); if (rx_lifetime_) { LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime moved"; // Don't unsubscribe because that would cause the perfetto TraceBuffer // to get dropped on the floor. // // Instead, we want to let it finish and write it out to a file. rx_in_flight_.push_back(*std::move(rx_lifetime_)); rx_lifetime_.reset(); } else { LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime was empty"; } is_tracing_ = false; // FIXME: how do we clear this vector? } void RecordDbLaunchHistory(int32_t pid) { std::optional history = InsertDbLaunchHistory(pid); // RecordDbLaunchHistory happens-after kIntentStarted if (!history_id_subscriber_.has_value()) { LOG(WARNING) << "Logic error? Should always have a subscriber here."; return; } // Ensure that the history id rx chain is terminated either with an error or with // the newly inserted app_launch_histories.id if (!history) { history_id_subscriber_->on_error(rxcpp::util::make_error_ptr( std::ios_base::failure("Failed to insert history id"))); recent_history_id_ = std::nullopt; } else { // Note: we must have already subscribed, or this value will disappear. LOG(VERBOSE) << "history_id_subscriber on_next history_id=" << history->id; history_id_subscriber_->on_next(history->id); history_id_subscriber_->on_completed(); recent_history_id_ = history->id; } history_id_subscriber_ = std::nullopt; } std::optional InsertDbLaunchHistory(int32_t pid) { // TODO: deferred queue into a different lower priority thread. if (!component_name_ || !temperature_) { LOG(VERBOSE) << "Skip RecordDbLaunchHistory, no component name available."; return std::nullopt; } android::ScopedTrace trace{ATRACE_TAG_ACTIVITY_MANAGER, "IorapNativeService::RecordDbLaunchHistory"}; db::DbHandle db{db::SchemaModel::GetSingleton()}; using namespace iorap::db; std::optional version = version_map_->GetOrQueryPackageVersion(component_name_->package); if (!version) { LOG(DEBUG) << "The version is NULL, maybe package manager is down."; return std::nullopt; } std::optional activity = ActivityModel::SelectOrInsert(db, component_name_->package, *version, component_name_->activity_name); if (!activity) { LOG(WARNING) << "Failed to query activity row for : " << *component_name_; return std::nullopt; } auto temp = static_cast(*temperature_); std::optional alh = AppLaunchHistoryModel::Insert(db, activity->id, temp, IsTracing(), IsReadAhead(), intent_started_ns_, total_time_ns_, // ReportFullyDrawn event normally occurs after this. Need update later. /* report_fully_drawn_ns= */ std::nullopt, pid); //Repo if (!alh) { LOG(WARNING) << "Failed to insert app_launch_histories row"; return std::nullopt; } LOG(VERBOSE) << "RecordDbLaunchHistory: " << *alh; return alh; } void UpdateReportFullyDrawn(int history_id, uint64_t timestamp_ns) { LOG(DEBUG) << "Update kReportFullyDrawn for history_id:" << history_id << " timestamp_ns: " << timestamp_ns; android::ScopedTrace trace{ATRACE_TAG_ACTIVITY_MANAGER, "IorapNativeService::UpdateReportFullyDrawn"}; db::DbHandle db{db::SchemaModel::GetSingleton()}; bool result = db::AppLaunchHistoryModel::UpdateReportFullyDrawn(db, history_id, timestamp_ns); if (!result) { LOG(WARNING) << "Failed to update app_launch_histories row"; } } }; struct AppLaunchEventDefender { binder::AppLaunchEvent::Type last_event_type_{binder::AppLaunchEvent::Type::kUninitialized}; enum class Result { kAccept, // Pass-through the new event. kOverwrite, // Overwrite the new event with a different event. kReject // Completely reject the new event, it will not be delivered. }; Result OnAppLaunchEvent(binder::RequestId request_id, const binder::AppLaunchEvent& event, binder::AppLaunchEvent* overwrite) { using Type = binder::AppLaunchEvent::Type; CHECK(overwrite != nullptr); // Ensure only legal transitions are allowed. switch (last_event_type_) { case Type::kUninitialized: case Type::kIntentFailed: case Type::kActivityLaunchCancelled: case Type::kReportFullyDrawn: { // From a terminal state, only go to kIntentStarted if (event.type != Type::kIntentStarted) { LOG(DEBUG) << "Rejecting transition from " << last_event_type_ << " to " << event.type; last_event_type_ = Type::kUninitialized; return Result::kReject; } else { LOG(VERBOSE) << "Accept transition from " << last_event_type_ << " to " << event.type; last_event_type_ = event.type; return Result::kAccept; } } case Type::kIntentStarted: { if (event.type == Type::kIntentFailed || event.type == Type::kActivityLaunched) { LOG(VERBOSE) << "Accept transition from " << last_event_type_ << " to " << event.type; last_event_type_ = event.type; return Result::kAccept; } else { LOG(DEBUG) << "Overwriting transition from kIntentStarted to " << event.type << " into kIntentFailed"; last_event_type_ = Type::kIntentFailed; *overwrite = event; overwrite->type = Type::kIntentFailed; return Result::kOverwrite; } } case Type::kActivityLaunched: { if (event.type == Type::kActivityLaunchFinished || event.type == Type::kActivityLaunchCancelled) { LOG(VERBOSE) << "Accept transition from " << last_event_type_ << " to " << event.type; last_event_type_ = event.type; return Result::kAccept; } else { LOG(DEBUG) << "Overwriting transition from kActivityLaunched to " << event.type << " into kActivityLaunchCancelled"; last_event_type_ = Type::kActivityLaunchCancelled; *overwrite = event; overwrite->type = Type::kActivityLaunchCancelled; return Result::kOverwrite; } } case Type::kActivityLaunchFinished: { if (event.type == Type::kIntentStarted || event.type == Type::kReportFullyDrawn) { LOG(VERBOSE) << "Accept transition from " << last_event_type_ << " to " << event.type; last_event_type_ = event.type; return Result::kAccept; } else { LOG(DEBUG) << "Rejecting transition from " << last_event_type_ << " to " << event.type; last_event_type_ = Type::kUninitialized; return Result::kReject; } } } } }; // Convert callback pattern into reactive pattern. struct AppLaunchEventSubject { using RefWrapper = std::reference_wrapper; AppLaunchEventSubject() {} void Subscribe(rxcpp::subscriber subscriber) { DCHECK(ready_ != true) << "Cannot Subscribe twice"; subscriber_ = std::move(subscriber); // Release edge of synchronizes-with AcquireIsReady. ready_.store(true); } void OnNext(const AppLaunchEvent& e) { if (!AcquireIsReady()) { return; } if (!subscriber_->is_subscribed()) { return; } /* * TODO: fix upstream. * * Rx workaround: this fails to compile when * the observable is a reference type: * * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:354:18: error: multiple overloads of 'on_next' instantiate to the same signature 'void (const iorap::binder::AppLaunchEvent &) const' * virtual void on_next(T&&) const {}; * * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:353:18: note: previous declaration is here * virtual void on_next(T&) const {}; * * (The workaround is to use reference_wrapper instead * of const AppLaunchEvent&) */ subscriber_->on_next(std::cref(e)); } void OnCompleted() { if (!AcquireIsReady()) { return; } subscriber_->on_completed(); } private: bool AcquireIsReady() { // Synchronizes-with the release-edge in Subscribe. // This can happen much later, only once the subscription actually happens. // However, as far as I know, 'rxcpp::subscriber' is not thread safe, // (but the observable chain itself can be made thread-safe via #observe_on, etc). // so we must avoid reading it until it has been fully synchronized. // // TODO: investigate rxcpp subscribers and see if we can get rid of this atomics, // to make it simpler. return ready_.load(); } // TODO: also track the RequestId ? std::atomic ready_{false}; std::optional> subscriber_; }; // Convert callback pattern into reactive pattern. struct JobScheduledEventSubject { JobScheduledEventSubject() {} void Subscribe(rxcpp::subscriber> subscriber) { DCHECK(ready_ != true) << "Cannot Subscribe twice"; subscriber_ = std::move(subscriber); // Release edge of synchronizes-with AcquireIsReady. ready_.store(true); } void OnNext(RequestId request_id, JobScheduledEvent e) { if (!AcquireIsReady()) { return; } if (!subscriber_->is_subscribed()) { return; } subscriber_->on_next(std::pair{std::move(request_id), std::move(e)}); } void OnCompleted() { if (!AcquireIsReady()) { return; } subscriber_->on_completed(); } private: bool AcquireIsReady() { // Synchronizes-with the release-edge in Subscribe. // This can happen much later, only once the subscription actually happens. // However, as far as I know, 'rxcpp::subscriber' is not thread safe, // (but the observable chain itself can be made thread-safe via #observe_on, etc). // so we must avoid reading it until it has been fully synchronized. // // TODO: investigate rxcpp subscribers and see if we can get rid of this atomics, // to make it simpler. return ready_.load(); } // TODO: also track the RequestId ? std::atomic ready_{false}; std::optional>> subscriber_; }; std::ostream& operator<<(std::ostream& os, const android::content::pm::PackageChangeEvent& event) { os << "PackageChangeEvent{"; os << "packageName=" << event.packageName << ","; os << "version=" << event.version << ","; os << "lastUpdateTimeMillis=" << event.lastUpdateTimeMillis; os << "}"; return os; } class EventManager::Impl { public: Impl(/*borrow*/perfetto::RxProducerFactory& perfetto_factory) : perfetto_factory_(perfetto_factory), worker_thread_(rxcpp::observe_on_new_thread()), worker_thread2_(rxcpp::observe_on_new_thread()), io_thread_(perfetto::ObserveOnNewIoThread()) { // Try to create version map RetryCreateVersionMap(); iorap::common::StderrLogPrinter printer{"iorapd"}; RefreshSystemProperties(printer); rx_lifetime_ = InitializeRxGraph(); rx_lifetime_jobs_ = InitializeRxGraphForJobScheduledEvents(); android::add_sysprop_change_callback(&Impl::OnSyspropChanged, /*priority*/-10000); } void RetryCreateVersionMap() { android::base::Timer timer{}; version_map_ = binder::PackageVersionMap::Create(); std::chrono::milliseconds duration_ms = timer.duration(); LOG(DEBUG) << "Got versions for " << version_map_->Size() << " packages in " << duration_ms.count() << "ms"; } void SetTaskResultCallbacks(std::shared_ptr callbacks) { DCHECK(callbacks_.expired()); callbacks_ = callbacks; } void Join() { async_pool_.Join(); } bool OnAppLaunchEvent(RequestId request_id, const AppLaunchEvent& event) { LOG(VERBOSE) << "EventManager::OnAppLaunchEvent(" << "request_id=" << request_id.request_id << "," << event; // Filter any incoming events through a defender that enforces // that all state transitions are as contractually documented in // ActivityMetricsLaunchObserver's javadoc. AppLaunchEvent overwrite_event{}; AppLaunchEventDefender::Result result = app_launch_event_defender_.OnAppLaunchEvent(request_id, event, /*out*/&overwrite_event); switch (result) { case AppLaunchEventDefender::Result::kAccept: app_launch_event_subject_.OnNext(event); return true; case AppLaunchEventDefender::Result::kOverwrite: app_launch_event_subject_.OnNext(overwrite_event); return false; case AppLaunchEventDefender::Result::kReject: // Intentionally left-empty: we drop the event completely. return false; } // In theory returns BAD_VALUE to the other side of this binder connection. // In practice we use 'oneway' flags so this doesn't matter on a regular build. return false; } bool OnDexOptEvent(RequestId request_id, const DexOptEvent& event) { LOG(VERBOSE) << "EventManager::OnDexOptEvent(" << "request_id=" << request_id.request_id << "," << event.package_name << ")"; if (common::ExcludeDexFiles(kExcludeDexFilesDefault)) { LOG(VERBOSE) << "Dex files are excluded. Skip the purging."; return true; } return PurgePackage(event.package_name); } bool OnJobScheduledEvent(RequestId request_id, const JobScheduledEvent& event) { LOG(VERBOSE) << "EventManager::OnJobScheduledEvent(" << "request_id=" << request_id.request_id << ",event=TODO)."; job_scheduled_event_subject_.OnNext(std::move(request_id), event); return true; // No errors. } bool OnPackageChanged(const android::content::pm::PackageChangeEvent& event) { LOG(DEBUG) << "Received " << event; if (event.isDeleted) { // Do nothing if the package is deleted rignt now. // The package will be removed from db during maintenance. return true; } // Update the version map. if (version_map_->Update(event.packageName, event.version)) { return true; } // Sometimes a package is updated without any version change. // Clean it up in this case. db::DbHandle db{db::SchemaModel::GetSingleton()}; db::CleanUpFilesForPackage(db, event.packageName, event.version); return true; } void Dump(/*borrow*/::android::Printer& printer) { ::iorap::prefetcher::ReadAhead::Dump(printer); ::iorap::perfetto::PerfettoConsumerImpl::Dump(/*borrow*/printer); ::iorap::maintenance::Dump(db::SchemaModel::GetSingleton(), printer); } rxcpp::composite_subscription InitializeRxGraph() { LOG(VERBOSE) << "EventManager::InitializeRxGraph"; app_launch_events_ = rxcpp::observable<>::create( [&](rxcpp::subscriber subscriber) { app_launch_event_subject_.Subscribe(std::move(subscriber)); }); rxcpp::composite_subscription lifetime; if (!tracing_allowed_) { LOG(WARNING) << "Tracing disabled by system property"; } if (!readahead_allowed_) { LOG(WARNING) << "Readahead disabled by system property"; } AppLaunchEventState initial_state{&perfetto_factory_, readahead_allowed_, tracing_allowed_, package_blacklister_, &worker_thread2_, &io_thread_, &async_pool_, version_map_}; app_launch_events_ .subscribe_on(worker_thread_) .scan(std::move(initial_state), [](AppLaunchEventState state, AppLaunchEventRefWrapper event) { state.OnNewEvent(event.get()); return state; }) .subscribe(/*out*/lifetime, [](const AppLaunchEventState& state) { // Intentionally left blank. (void)state; }); return lifetime; } // Runs the maintenance code to compile perfetto traces to compiled // trace for a package. void StartMaintenance(bool output_text, std::optional inode_textcache, bool verbose, bool recompile, uint64_t min_traces, std::string package_name, bool should_update_versions) { ScopedFormatTrace atrace_bg_scope(ATRACE_TAG_PACKAGE_MANAGER, "Background Job Scope"); db::DbHandle db{db::SchemaModel::GetSingleton()}; if (should_update_versions) { { ScopedFormatTrace atrace_update_versions(ATRACE_TAG_PACKAGE_MANAGER, "Update package versions map cache"); // Update the version map. version_map_->UpdateAll(); } { ScopedFormatTrace atrace_cleanup_db(ATRACE_TAG_PACKAGE_MANAGER, "Clean up obsolete data in database"); // Cleanup the obsolete data in the database. maintenance::CleanUpDatabase(db, version_map_); } } { ScopedFormatTrace atrace_compile_apps(ATRACE_TAG_PACKAGE_MANAGER, "Compile apps on device"); // Compilation maintenance::ControllerParameters params{ output_text, inode_textcache, verbose, recompile, min_traces, std::make_shared(), common::ExcludeDexFiles(kExcludeDexFilesDefault)}; LOG(DEBUG) << "StartMaintenance: min_traces=" << min_traces; maintenance::CompileSingleAppOnDevice(db, params, package_name); } } rxcpp::composite_subscription InitializeRxGraphForJobScheduledEvents() { LOG(VERBOSE) << "EventManager::InitializeRxGraphForJobScheduledEvents"; using RequestAndJobEvent = std::pair; job_scheduled_events_ = rxcpp::observable<>::create( [&](rxcpp::subscriber subscriber) { job_scheduled_event_subject_.Subscribe(std::move(subscriber)); }); rxcpp::composite_subscription lifetime; job_scheduled_events_ .observe_on(worker_thread_) // async handling. .tap([this](const RequestAndJobEvent& e) { LOG(VERBOSE) << "EventManager#JobScheduledEvent#tap(1) - job begins"; this->NotifyProgress(e.first, TaskResult{TaskResult::State::kBegan}); LOG(VERBOSE) << "Compile " << std::get<1>(e).package_name; StartMaintenance(/*output_text=*/false, /*inode_textcache=*/std::nullopt, /*verbose=*/false, /*recompile=*/false, s_min_traces, std::get<1>(e).package_name, std::get<1>(e).should_update_versions); // TODO: probably this shouldn't be emitted until most of the usual DCHECKs // (for example, validate a job isn't already started, the request is not reused, etc). // In this way we could block from the client until it sees 'kBegan' and Log.wtf otherwise. }) .tap([](const RequestAndJobEvent& e) { // TODO. Actual work. LOG(VERBOSE) << "EventManager#JobScheduledEvent#tap(2) - job is being processed"; // TODO: abort functionality for in-flight jobs. // // maybe something like scan that returns an observable + flat map to that job. // then we could unsubscribe from the scan to do a partial abort? need to try it and see if it works. // // other option is to create a new outer subscription for each job id which seems less ideal. }) .subscribe(/*out*/lifetime, /*on_next*/ [this](const RequestAndJobEvent& e) { LOG(VERBOSE) << "EventManager#JobScheduledEvent#subscribe - job completed"; this->NotifyComplete(e.first, TaskResult{TaskResult::State::kCompleted}); } #if 0 , /*on_error*/ [](rxcpp::util::error_ptr err) { LOG(ERROR) << "Scheduled job event failed: " << rxcpp::util::what(err); //std::shared_ptr callbacks = callbacks_.lock(); //if (callbacks != nullptr) { // FIXME: How do we get the request ID back out of the error? Seems like a problem. // callbacks->OnComplete(, TaskResult{TaskResult::kError}); // We may have to wrap with an iorap::expected instead of using on_error. //} // FIXME: need to add a 'OnErrorResumeNext' operator? DCHECK(false) << "forgot to implement OnErrorResumeNext"; } #endif ); // TODO: error output should happen via an observable. return lifetime; } void NotifyComplete(RequestId request_id, TaskResult result) { std::shared_ptr callbacks = callbacks_.lock(); if (callbacks != nullptr) { callbacks->OnComplete(std::move(request_id), std::move(result)); } else { LOG(WARNING) << "EventManager: TaskResultCallbacks may have been released early"; } } void NotifyProgress(RequestId request_id, TaskResult result) { std::shared_ptr callbacks = callbacks_.lock(); if (callbacks != nullptr) { callbacks->OnProgress(std::move(request_id), std::move(result)); } else { LOG(WARNING) << "EventManager: TaskResultCallbacks may have been released early"; } } static void OnSyspropChanged() { LOG(DEBUG) << "OnSyspropChanged"; } void RefreshSystemProperties(::android::Printer& printer) { // TODO: read all properties from one config class. // PH properties do not work if they contain ".". "_" was instead used here. tracing_allowed_ = common::IsTracingEnabled(/*default_value=*/"false"); s_tracing_allowed = tracing_allowed_; printer.printFormatLine("iorapd.perfetto.enable = %s", tracing_allowed_ ? "true" : "false"); readahead_allowed_ = common::IsReadAheadEnabled(/*default_value=*/"false"); s_readahead_allowed = readahead_allowed_; printer.printFormatLine("iorapd.readahead.enable = %s", s_readahead_allowed ? "true" : "false"); s_min_traces = ::android::base::GetUintProperty("iorapd.maintenance.min_traces", /*default*/1); uint64_t min_traces = s_min_traces; printer.printFormatLine("iorapd.maintenance.min_traces = %" PRIu64, min_traces); printer.printFormatLine("iorapd.exclude_dex_files = %s", common::ExcludeDexFiles(kExcludeDexFilesDefault) ? "true" : "false"); package_blacklister_ = PackageBlacklister{ /* Colon-separated string list of blacklisted packages, e.g. * "foo.bar.baz;com.fake.name" would blacklist {"foo.bar.baz", "com.fake.name"} packages. * * Blacklisted packages are ignored by iorapd. */ server_configurable_flags::GetServerConfigurableFlag( common::ph_namespace, "iorap_blacklisted_packages", ::android::base::GetProperty("iorapd.blacklist_packages", /*default*/"")) }; LOG(DEBUG) << "RefreshSystemProperties"; } bool PurgePackage(::android::Printer& printer, const std::string& package_name) { (void)printer; return PurgePackage(package_name); } bool PurgePackage(const std::string& package_name) { db::DbHandle db{db::SchemaModel::GetSingleton()}; db::CleanUpFilesForPackage(db, package_name); LOG(DEBUG) << "PurgePackage: " << package_name; return true; } bool CompilePackage(::android::Printer& printer, const std::string& package_name) { (void)printer; ScopedFormatTrace atrace_compile_app(ATRACE_TAG_PACKAGE_MANAGER, "Compile one app on device"); maintenance::ControllerParameters params{ /*output_text*/false, /*inode_textcache*/std::nullopt, WOULD_LOG(VERBOSE), /*recompile*/false, s_min_traces, std::make_shared(), common::ExcludeDexFiles(kExcludeDexFilesDefault)}; db::DbHandle db{db::SchemaModel::GetSingleton()}; bool res = maintenance::CompileSingleAppOnDevice(db, std::move(params), package_name); LOG(DEBUG) << "CompilePackage: " << package_name; return res; } bool readahead_allowed_{true}; perfetto::RxProducerFactory& perfetto_factory_; bool tracing_allowed_{true}; PackageBlacklister package_blacklister_{}; std::weak_ptr callbacks_; // avoid cycles with weakptr. using AppLaunchEventRefWrapper = AppLaunchEventSubject::RefWrapper; rxcpp::observable app_launch_events_; AppLaunchEventSubject app_launch_event_subject_; AppLaunchEventDefender app_launch_event_defender_; rxcpp::observable> job_scheduled_events_; JobScheduledEventSubject job_scheduled_event_subject_; rxcpp::observable completed_requests_; // regular-priority thread to handle binder callbacks. observe_on_one_worker worker_thread_; observe_on_one_worker worker_thread2_; // low priority idle-class thread for IO operations. observe_on_one_worker io_thread_; // async futures pool for async rx operations. AsyncPool async_pool_; rxcpp::composite_subscription rx_lifetime_; // app launch events rxcpp::composite_subscription rx_lifetime_jobs_; // job scheduled events // package version map std::shared_ptr version_map_; //INTENTIONAL_COMPILER_ERROR_HERE: // FIXME: // ok so we want to expose a 'BlockingSubscribe' or a 'Subscribe' or some kind of function // that the main thread can call. This would subscribe on all the observables we internally // have here (probably on an event-manager-dedicated thread for simplicity). // // ideally we'd just reuse the binder thread to handle the events but I'm not super sure, // maybe this already works with the identity_current_thread coordination? }; using Impl = EventManager::Impl; EventManager::EventManager(perfetto::RxProducerFactory& perfetto_factory) : impl_(new Impl(perfetto_factory)) {} std::shared_ptr EventManager::Create() { static perfetto::PerfettoDependencies::Injector injector{ perfetto::PerfettoDependencies::CreateComponent }; static perfetto::RxProducerFactory producer_factory{ /*borrow*/injector }; return EventManager::Create(/*borrow*/producer_factory); } std::shared_ptr EventManager::Create(perfetto::RxProducerFactory& perfetto_factory) { std::shared_ptr p{new EventManager{/*borrow*/perfetto_factory}}; return p; } void EventManager::SetTaskResultCallbacks(std::shared_ptr callbacks) { return impl_->SetTaskResultCallbacks(std::move(callbacks)); } void EventManager::Join() { return impl_->Join(); } bool EventManager::OnAppLaunchEvent(RequestId request_id, const AppLaunchEvent& event) { return impl_->OnAppLaunchEvent(request_id, event); } bool EventManager::OnDexOptEvent(RequestId request_id, const DexOptEvent& event) { return impl_->OnDexOptEvent(request_id, event); } bool EventManager::OnJobScheduledEvent(RequestId request_id, const JobScheduledEvent& event) { return impl_->OnJobScheduledEvent(request_id, event); } bool EventManager::OnPackageChanged(const android::content::pm::PackageChangeEvent& event) { return impl_->OnPackageChanged(event); } void EventManager::Dump(/*borrow*/::android::Printer& printer) { return impl_->Dump(printer); } void EventManager::RefreshSystemProperties(::android::Printer& printer) { return impl_->RefreshSystemProperties(printer); } bool EventManager::PurgePackage(::android::Printer& printer, const std::string& package_name) { return impl_->PurgePackage(printer, package_name); } bool EventManager::CompilePackage(::android::Printer& printer, const std::string& package_name) { return impl_->CompilePackage(printer, package_name); } } // namespace iorap::manager