1 // Copyright 2022 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 //! Provides helpers that make it easier to process virtio queues on an async executor. 6 7 #![cfg_attr(windows, allow(dead_code))] 8 9 use anyhow::bail; 10 use anyhow::Context; 11 use base::warn; 12 use cros_async::AsyncResult; 13 use cros_async::Executor; 14 use cros_async::TaskHandle; 15 use futures::future::AbortHandle; 16 use futures::future::Abortable; 17 use futures::future::Pending; 18 use futures::Future; 19 20 /// A queue for which processing can be started on an async executor. 21 /// 22 /// `T` is the resource type of the queue, i.e. the device-specific data it needs in order to run. 23 /// For instance, a block device will likely need a file to provide its data. 24 pub enum AsyncQueueState<T: 'static> { 25 /// Queue is currently stopped. 26 Stopped(T), 27 /// Queue is being processed as a `Task` on an `Executor`, and can be stopped by aborting the 28 /// `AbortHandle`. 29 Running((TaskHandle<T>, Executor, AbortHandle)), 30 /// Something terrible happened and this queue is in a non-recoverable state. 31 Broken, 32 } 33 34 impl<T: 'static> AsyncQueueState<T> { 35 /// Start processing of the queue on `ex`, or stop and restart it with the new parameters if 36 /// it was already running. 37 /// 38 /// `fut_provider` is a closure that is passed the resource of the queue, as well as a 39 /// `Abortable` future. It must return a `Future` that takes ownership of the device's resource 40 /// and processes the queue for as long as possible, but immediately quits and returns the 41 /// device resource when the `Abortable` is signaled. 42 /// 43 /// If `fut_provider` or the `Future` it returns end with an error, the queue is considered 44 /// broken and cannot be used anymore. 45 /// 46 /// The task is only scheduled and no processing actually starts in this method. The task is 47 /// scheduled locally, which implies that `ex` must be run on the current thread. start< U: Future<Output = T> + 'static, F: FnOnce(T, Abortable<Pending<()>>) -> anyhow::Result<U>, >( &mut self, ex: &Executor, fut_provider: F, ) -> anyhow::Result<()>48 pub fn start< 49 U: Future<Output = T> + 'static, 50 F: FnOnce(T, Abortable<Pending<()>>) -> anyhow::Result<U>, 51 >( 52 &mut self, 53 ex: &Executor, 54 fut_provider: F, 55 ) -> anyhow::Result<()> { 56 if matches!(self, AsyncQueueState::Running(_)) { 57 warn!("queue is already running, stopping it first"); 58 self.stop().context("while trying to restart queue")?; 59 } 60 61 let resource = match std::mem::replace(self, AsyncQueueState::Broken) { 62 AsyncQueueState::Stopped(resource) => resource, 63 _ => bail!("queue is in a bad state and cannot be started"), 64 }; 65 66 let (wait_fut, abort_handle) = futures::future::abortable(futures::future::pending::<()>()); 67 let queue_future = fut_provider(resource, wait_fut)?; 68 let task = ex.spawn_local(queue_future); 69 70 *self = AsyncQueueState::Running((task, ex.clone(), abort_handle)); 71 Ok(()) 72 } 73 74 /// Stops a previously started queue. 75 /// 76 /// The executor on which the task has been started will be run if needed in order to retrieve 77 /// the queue's resource. 78 /// 79 /// Returns `true` if the queue was running, `false` if it wasn't. stop(&mut self) -> AsyncResult<bool>80 pub fn stop(&mut self) -> AsyncResult<bool> { 81 match std::mem::replace(self, AsyncQueueState::Broken) { 82 AsyncQueueState::Running((task, ex, handle)) => { 83 // Abort the task and run it to completion to retrieve the queue's resource. 84 handle.abort(); 85 let resource = ex.run_until(task)?; 86 *self = AsyncQueueState::Stopped(resource); 87 Ok(true) 88 } 89 state => { 90 *self = state; 91 Ok(false) 92 } 93 } 94 } 95 } 96