• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! Provides helpers that make it easier to process virtio queues on an async executor.
6 
7 #![cfg_attr(windows, allow(dead_code))]
8 
9 use anyhow::bail;
10 use anyhow::Context;
11 use base::warn;
12 use cros_async::AsyncResult;
13 use cros_async::Executor;
14 use cros_async::TaskHandle;
15 use futures::future::AbortHandle;
16 use futures::future::Abortable;
17 use futures::future::Pending;
18 use futures::Future;
19 
20 /// A queue for which processing can be started on an async executor.
21 ///
22 /// `T` is the resource type of the queue, i.e. the device-specific data it needs in order to run.
23 /// For instance, a block device will likely need a file to provide its data.
24 pub enum AsyncQueueState<T: 'static> {
25     /// Queue is currently stopped.
26     Stopped(T),
27     /// Queue is being processed as a `Task` on an `Executor`, and can be stopped by aborting the
28     /// `AbortHandle`.
29     Running((TaskHandle<T>, Executor, AbortHandle)),
30     /// Something terrible happened and this queue is in a non-recoverable state.
31     Broken,
32 }
33 
34 impl<T: 'static> AsyncQueueState<T> {
35     /// Start processing of the queue on `ex`, or stop and restart it with the new parameters if
36     /// it was already running.
37     ///
38     /// `fut_provider` is a closure that is passed the resource of the queue, as well as a
39     /// `Abortable` future. It must return a `Future` that takes ownership of the device's resource
40     /// and processes the queue for as long as possible, but immediately quits and returns the
41     /// device resource when the `Abortable` is signaled.
42     ///
43     /// If `fut_provider` or the `Future` it returns end with an error, the queue is considered
44     /// broken and cannot be used anymore.
45     ///
46     /// The task is only scheduled and no processing actually starts in this method. The task is
47     /// scheduled locally, which implies that `ex` must be run on the current thread.
start< U: Future<Output = T> + 'static, F: FnOnce(T, Abortable<Pending<()>>) -> anyhow::Result<U>, >( &mut self, ex: &Executor, fut_provider: F, ) -> anyhow::Result<()>48     pub fn start<
49         U: Future<Output = T> + 'static,
50         F: FnOnce(T, Abortable<Pending<()>>) -> anyhow::Result<U>,
51     >(
52         &mut self,
53         ex: &Executor,
54         fut_provider: F,
55     ) -> anyhow::Result<()> {
56         if matches!(self, AsyncQueueState::Running(_)) {
57             warn!("queue is already running, stopping it first");
58             self.stop().context("while trying to restart queue")?;
59         }
60 
61         let resource = match std::mem::replace(self, AsyncQueueState::Broken) {
62             AsyncQueueState::Stopped(resource) => resource,
63             _ => bail!("queue is in a bad state and cannot be started"),
64         };
65 
66         let (wait_fut, abort_handle) = futures::future::abortable(futures::future::pending::<()>());
67         let queue_future = fut_provider(resource, wait_fut)?;
68         let task = ex.spawn_local(queue_future);
69 
70         *self = AsyncQueueState::Running((task, ex.clone(), abort_handle));
71         Ok(())
72     }
73 
74     /// Stops a previously started queue.
75     ///
76     /// The executor on which the task has been started will be run if needed in order to retrieve
77     /// the queue's resource.
78     ///
79     /// Returns `true` if the queue was running, `false` if it wasn't.
stop(&mut self) -> AsyncResult<bool>80     pub fn stop(&mut self) -> AsyncResult<bool> {
81         match std::mem::replace(self, AsyncQueueState::Broken) {
82             AsyncQueueState::Running((task, ex, handle)) => {
83                 // Abort the task and run it to completion to retrieve the queue's resource.
84                 handle.abort();
85                 let resource = ex.run_until(task)?;
86                 *self = AsyncQueueState::Stopped(resource);
87                 Ok(true)
88             }
89             state => {
90                 *self = state;
91                 Ok(false)
92             }
93         }
94     }
95 }
96