/* * Copyright (C) 2023 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "perfetto/ext/base/threading/spawn.h" #include #include "perfetto/base/task_runner.h" #include "perfetto/ext/base/thread_checker.h" #include "perfetto/ext/base/threading/future.h" #include "perfetto/ext/base/threading/poll.h" #include "perfetto/ext/base/threading/stream.h" namespace perfetto { namespace base { // Represents a future which is being polled to completion. Owned by // SpawnHandle. class PolledFuture { public: explicit PolledFuture(TaskRunner* task_runner, Future future) : task_runner_(task_runner), future_(std::move(future)) { PERFETTO_DCHECK(task_runner_->RunsTasksOnCurrentThread()); PollUntilFinish(); } ~PolledFuture() { PERFETTO_DCHECK_THREAD(thread_checker); ClearFutureAndWatches(interested_); } private: PolledFuture(PolledFuture&&) = delete; PolledFuture& operator=(PolledFuture&&) = delete; void PollUntilFinish() { PERFETTO_DCHECK(task_runner_->RunsTasksOnCurrentThread()); auto pre_poll_interested = std::move(interested_); interested_.clear(); FuturePollResult res = future_->Poll(&context_); if (!res.IsPending()) { ClearFutureAndWatches(pre_poll_interested); return; } for (PlatformHandle fd : SetDifference(pre_poll_interested, interested_)) { task_runner_->RemoveFileDescriptorWatch(fd); } auto weak_this = weak_ptr_factory_.GetWeakPtr(); for (PlatformHandle fd : SetDifference(interested_, pre_poll_interested)) { task_runner_->AddFileDescriptorWatch(fd, [weak_this, fd]() { if (!weak_this) { return; } weak_this->ready_ = {fd}; weak_this->PollUntilFinish(); }); } } void ClearFutureAndWatches(const FlatSet& interested) { future_ = std::nullopt; for (PlatformHandle fd : interested) { task_runner_->RemoveFileDescriptorWatch(fd); } interested_.clear(); ready_.clear(); } static std::vector SetDifference( const FlatSet& f, const FlatSet& s) { std::vector out(f.size()); auto it = std::set_difference(f.begin(), f.end(), s.begin(), s.end(), out.begin()); out.resize(static_cast(std::distance(out.begin(), it))); return out; } TaskRunner* const task_runner_ = nullptr; std::optional> future_; FlatSet interested_; FlatSet ready_; PollContext context_{&interested_, &ready_}; PERFETTO_THREAD_CHECKER(thread_checker) // Keep this last. WeakPtrFactory weak_ptr_factory_{this}; }; SpawnHandle::SpawnHandle(TaskRunner* task_runner, std::function()> fn) : task_runner_(task_runner), polled_future_(std::make_shared>()) { task_runner->PostTask( [t = task_runner, fn = std::move(fn), p = polled_future_]() mutable { p->reset(new PolledFuture(t, fn())); }); } SpawnHandle::~SpawnHandle() { task_runner_->PostTask( [f = std::move(polled_future_)]() mutable { f.reset(); }); } } // namespace base } // namespace perfetto