1 // Copyright 2020, The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 //! This module implements the handling of async tasks. 16 //! The worker thread has a high priority and a low priority queue. Adding a job to either 17 //! will cause one thread to be spawned if none exists. As a compromise between performance 18 //! and resource consumption, the thread will linger for about 30 seconds after it has 19 //! processed all tasks before it terminates. 20 //! Note that low priority tasks are processed only when the high priority queue is empty. 21 22 use std::{any::Any, any::TypeId, time::Duration}; 23 use std::{ 24 collections::{HashMap, VecDeque}, 25 sync::Arc, 26 sync::{Condvar, Mutex, MutexGuard}, 27 thread, 28 }; 29 30 #[derive(Debug, PartialEq, Eq)] 31 enum State { 32 Exiting, 33 Running, 34 } 35 36 /// The Shelf allows async tasks to store state across invocations. 37 /// Note: Store elves at your own peril ;-). 38 #[derive(Debug, Default)] 39 pub struct Shelf(HashMap<TypeId, Box<dyn Any + Send>>); 40 41 impl Shelf { 42 /// Get a reference to the shelved data of type T. Returns Some if the data exists. get_downcast_ref<T: Any + Send>(&self) -> Option<&T>43 pub fn get_downcast_ref<T: Any + Send>(&self) -> Option<&T> { 44 self.0.get(&TypeId::of::<T>()).and_then(|v| v.downcast_ref::<T>()) 45 } 46 47 /// Get a mutable reference to the shelved data of type T. If a T was inserted using put, 48 /// get_mut, or get_or_put_with. get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T>49 pub fn get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T> { 50 self.0.get_mut(&TypeId::of::<T>()).and_then(|v| v.downcast_mut::<T>()) 51 } 52 53 /// Remove the entry of the given type and returns the stored data if it existed. remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T>54 pub fn remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T> { 55 self.0.remove(&TypeId::of::<T>()).and_then(|v| v.downcast::<T>().ok().map(|b| *b)) 56 } 57 58 /// Puts data `v` on the shelf. If there already was an entry of type T it is returned. put<T: Any + Send>(&mut self, v: T) -> Option<T>59 pub fn put<T: Any + Send>(&mut self, v: T) -> Option<T> { 60 self.0 61 .insert(TypeId::of::<T>(), Box::new(v) as Box<dyn Any + Send>) 62 .and_then(|v| v.downcast::<T>().ok().map(|b| *b)) 63 } 64 65 /// Gets a mutable reference to the entry of the given type and default creates it if necessary. 66 /// The type must implement Default. get_mut<T: Any + Send + Default>(&mut self) -> &mut T67 pub fn get_mut<T: Any + Send + Default>(&mut self) -> &mut T { 68 self.0 69 .entry(TypeId::of::<T>()) 70 .or_insert_with(|| Box::new(T::default()) as Box<dyn Any + Send>) 71 .downcast_mut::<T>() 72 .unwrap() 73 } 74 75 /// Gets a mutable reference to the entry of the given type or creates it using the init 76 /// function. Init is not executed if the entry already existed. get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T where F: FnOnce() -> T,77 pub fn get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T 78 where 79 F: FnOnce() -> T, 80 { 81 self.0 82 .entry(TypeId::of::<T>()) 83 .or_insert_with(|| Box::new(init()) as Box<dyn Any + Send>) 84 .downcast_mut::<T>() 85 .unwrap() 86 } 87 } 88 89 struct AsyncTaskState { 90 state: State, 91 thread: Option<thread::JoinHandle<()>>, 92 timeout: Duration, 93 hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>, 94 lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>, 95 idle_fns: Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>, 96 /// The store allows tasks to store state across invocations. It is passed to each invocation 97 /// of each task. Tasks need to cooperate on the ids they use for storing state. 98 shelf: Option<Shelf>, 99 } 100 101 /// AsyncTask spawns one worker thread on demand to process jobs inserted into 102 /// a low and a high priority work queue. The queues are processed FIFO, and low 103 /// priority queue is processed if the high priority queue is empty. 104 /// Note: Because there is only one worker thread at a time for a given AsyncTask instance, 105 /// all scheduled requests are guaranteed to be serialized with respect to one another. 106 pub struct AsyncTask { 107 state: Arc<(Condvar, Mutex<AsyncTaskState>)>, 108 } 109 110 impl Default for AsyncTask { default() -> Self111 fn default() -> Self { 112 Self::new(Duration::from_secs(30)) 113 } 114 } 115 116 impl AsyncTask { 117 /// Construct an [`AsyncTask`] with a specific timeout value. new(timeout: Duration) -> Self118 pub fn new(timeout: Duration) -> Self { 119 Self { 120 state: Arc::new(( 121 Condvar::new(), 122 Mutex::new(AsyncTaskState { 123 state: State::Exiting, 124 thread: None, 125 timeout, 126 hi_prio_req: VecDeque::new(), 127 lo_prio_req: VecDeque::new(), 128 idle_fns: Vec::new(), 129 shelf: None, 130 }), 131 )), 132 } 133 } 134 135 /// Adds a one-off job to the high priority queue. High priority jobs are 136 /// completed before low priority jobs and can also overtake low priority 137 /// jobs. But they cannot preempt them. queue_hi<F>(&self, f: F) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,138 pub fn queue_hi<F>(&self, f: F) 139 where 140 F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static, 141 { 142 self.queue(f, true) 143 } 144 145 /// Adds a one-off job to the low priority queue. Low priority jobs are 146 /// completed after high priority. And they are not executed as long as high 147 /// priority jobs are present. Jobs always run to completion and are never 148 /// preempted by high priority jobs. queue_lo<F>(&self, f: F) where F: FnOnce(&mut Shelf) + Send + 'static,149 pub fn queue_lo<F>(&self, f: F) 150 where 151 F: FnOnce(&mut Shelf) + Send + 'static, 152 { 153 self.queue(f, false) 154 } 155 156 /// Adds an idle callback. This will be invoked whenever the worker becomes 157 /// idle (all high and low priority jobs have been performed). add_idle<F>(&self, f: F) where F: Fn(&mut Shelf) + Send + Sync + 'static,158 pub fn add_idle<F>(&self, f: F) 159 where 160 F: Fn(&mut Shelf) + Send + Sync + 'static, 161 { 162 let (ref _condvar, ref state) = *self.state; 163 let mut state = state.lock().unwrap(); 164 state.idle_fns.push(Arc::new(f)); 165 } 166 queue<F>(&self, f: F, hi_prio: bool) where F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,167 fn queue<F>(&self, f: F, hi_prio: bool) 168 where 169 F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static, 170 { 171 let (ref condvar, ref state) = *self.state; 172 let mut state = state.lock().unwrap(); 173 174 if hi_prio { 175 state.hi_prio_req.push_back(Box::new(f)); 176 } else { 177 state.lo_prio_req.push_back(Box::new(f)); 178 } 179 180 if state.state != State::Running { 181 self.spawn_thread(&mut state); 182 } 183 drop(state); 184 condvar.notify_all(); 185 } 186 spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>)187 fn spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>) { 188 if let Some(t) = state.thread.take() { 189 t.join().expect("AsyncTask panicked."); 190 } 191 192 let cloned_state = self.state.clone(); 193 let timeout_period = state.timeout; 194 195 state.thread = Some(thread::spawn(move || { 196 let (ref condvar, ref state) = *cloned_state; 197 198 enum Action { 199 QueuedFn(Box<dyn FnOnce(&mut Shelf) + Send>), 200 IdleFns(Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>), 201 } 202 let mut done_idle = false; 203 204 // When the worker starts, it takes the shelf and puts it on the stack. 205 let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default(); 206 loop { 207 if let Some(action) = { 208 let state = state.lock().unwrap(); 209 if !done_idle && state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() { 210 // No jobs queued so invoke the idle callbacks. 211 Some(Action::IdleFns(state.idle_fns.clone())) 212 } else { 213 // Wait for either a queued job to arrive or a timeout. 214 let (mut state, timeout) = condvar 215 .wait_timeout_while(state, timeout_period, |state| { 216 state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() 217 }) 218 .unwrap(); 219 match ( 220 state.hi_prio_req.pop_front(), 221 state.lo_prio_req.is_empty(), 222 timeout.timed_out(), 223 ) { 224 (Some(f), _, _) => Some(Action::QueuedFn(f)), 225 (None, false, _) => { 226 state.lo_prio_req.pop_front().map(|f| Action::QueuedFn(f)) 227 } 228 (None, true, true) => { 229 // When the worker exits it puts the shelf back into the shared 230 // state for the next worker to use. So state is preserved not 231 // only across invocations but also across worker thread shut down. 232 state.shelf = Some(shelf); 233 state.state = State::Exiting; 234 break; 235 } 236 (None, true, false) => None, 237 } 238 } 239 } { 240 // Now that the lock has been dropped, perform the action. 241 match action { 242 Action::QueuedFn(f) => { 243 f(&mut shelf); 244 done_idle = false; 245 } 246 Action::IdleFns(idle_fns) => { 247 for idle_fn in idle_fns { 248 idle_fn(&mut shelf); 249 } 250 done_idle = true; 251 } 252 } 253 } 254 } 255 })); 256 state.state = State::Running; 257 } 258 } 259 260 #[cfg(test)] 261 mod tests { 262 use super::{AsyncTask, Shelf}; 263 use std::sync::{ 264 mpsc::{channel, sync_channel, RecvTimeoutError}, 265 Arc, 266 }; 267 use std::time::Duration; 268 269 #[test] test_shelf()270 fn test_shelf() { 271 let mut shelf = Shelf::default(); 272 273 let s = "A string".to_string(); 274 assert_eq!(shelf.put(s), None); 275 276 let s2 = "Another string".to_string(); 277 assert_eq!(shelf.put(s2), Some("A string".to_string())); 278 279 // Put something of a different type on the shelf. 280 #[derive(Debug, PartialEq, Eq)] 281 struct Elf { 282 pub name: String, 283 } 284 let e1 = Elf { name: "Glorfindel".to_string() }; 285 assert_eq!(shelf.put(e1), None); 286 287 // The String value is still on the shelf. 288 let s3 = shelf.get_downcast_ref::<String>().unwrap(); 289 assert_eq!(s3, "Another string"); 290 291 // As is the Elf. 292 { 293 let e2 = shelf.get_downcast_mut::<Elf>().unwrap(); 294 assert_eq!(e2.name, "Glorfindel"); 295 e2.name = "Celeborn".to_string(); 296 } 297 298 // Take the Elf off the shelf. 299 let e3 = shelf.remove_downcast_ref::<Elf>().unwrap(); 300 assert_eq!(e3.name, "Celeborn"); 301 302 assert_eq!(shelf.remove_downcast_ref::<Elf>(), None); 303 304 // No u64 value has been put on the shelf, so getting one gives the default value. 305 { 306 let i = shelf.get_mut::<u64>(); 307 assert_eq!(*i, 0); 308 *i = 42; 309 } 310 let i2 = shelf.get_downcast_ref::<u64>().unwrap(); 311 assert_eq!(*i2, 42); 312 313 // No i32 value has ever been seen near the shelf. 314 assert_eq!(shelf.get_downcast_ref::<i32>(), None); 315 assert_eq!(shelf.get_downcast_mut::<i32>(), None); 316 assert_eq!(shelf.remove_downcast_ref::<i32>(), None); 317 } 318 319 #[test] test_async_task()320 fn test_async_task() { 321 let at = AsyncTask::default(); 322 323 // First queue up a job that blocks until we release it, to avoid 324 // unpredictable synchronization. 325 let (start_sender, start_receiver) = channel(); 326 at.queue_hi(move |shelf| { 327 start_receiver.recv().unwrap(); 328 // Put a trace vector on the shelf 329 shelf.put(Vec::<String>::new()); 330 }); 331 332 // Queue up some high-priority and low-priority jobs. 333 for i in 0..3 { 334 let j = i; 335 at.queue_lo(move |shelf| { 336 let trace = shelf.get_mut::<Vec<String>>(); 337 trace.push(format!("L{}", j)); 338 }); 339 let j = i; 340 at.queue_hi(move |shelf| { 341 let trace = shelf.get_mut::<Vec<String>>(); 342 trace.push(format!("H{}", j)); 343 }); 344 } 345 346 // Finally queue up a low priority job that emits the trace. 347 let (trace_sender, trace_receiver) = channel(); 348 at.queue_lo(move |shelf| { 349 let trace = shelf.get_downcast_ref::<Vec<String>>().unwrap(); 350 trace_sender.send(trace.clone()).unwrap(); 351 }); 352 353 // Ready, set, go. 354 start_sender.send(()).unwrap(); 355 let trace = trace_receiver.recv().unwrap(); 356 357 assert_eq!(trace, vec!["H0", "H1", "H2", "L0", "L1", "L2"]); 358 } 359 360 #[test] test_async_task_chain()361 fn test_async_task_chain() { 362 let at = Arc::new(AsyncTask::default()); 363 let (sender, receiver) = channel(); 364 // Queue up a job that will queue up another job. This confirms 365 // that the job is not invoked with any internal AsyncTask locks held. 366 let at_clone = at.clone(); 367 at.queue_hi(move |_shelf| { 368 at_clone.queue_lo(move |_shelf| { 369 sender.send(()).unwrap(); 370 }); 371 }); 372 receiver.recv().unwrap(); 373 } 374 375 #[test] 376 #[should_panic] test_async_task_panic()377 fn test_async_task_panic() { 378 let at = AsyncTask::default(); 379 at.queue_hi(|_shelf| { 380 panic!("Panic from queued job"); 381 }); 382 // Queue another job afterwards to ensure that the async thread gets joined. 383 let (done_sender, done_receiver) = channel(); 384 at.queue_hi(move |_shelf| { 385 done_sender.send(()).unwrap(); 386 }); 387 done_receiver.recv().unwrap(); 388 } 389 390 #[test] test_async_task_idle()391 fn test_async_task_idle() { 392 let at = AsyncTask::new(Duration::from_secs(3)); 393 // Need a SyncSender as it is Send+Sync. 394 let (idle_done_sender, idle_done_receiver) = sync_channel::<()>(3); 395 at.add_idle(move |_shelf| { 396 idle_done_sender.send(()).unwrap(); 397 }); 398 399 // Queue up some high-priority and low-priority jobs that take time. 400 for _i in 0..3 { 401 at.queue_lo(|_shelf| { 402 std::thread::sleep(Duration::from_millis(500)); 403 }); 404 at.queue_hi(|_shelf| { 405 std::thread::sleep(Duration::from_millis(500)); 406 }); 407 } 408 // Final low-priority job. 409 let (done_sender, done_receiver) = channel(); 410 at.queue_lo(move |_shelf| { 411 done_sender.send(()).unwrap(); 412 }); 413 414 // Nothing happens until the last job completes. 415 assert_eq!( 416 idle_done_receiver.recv_timeout(Duration::from_secs(1)), 417 Err(RecvTimeoutError::Timeout) 418 ); 419 done_receiver.recv().unwrap(); 420 idle_done_receiver.recv_timeout(Duration::from_millis(1)).unwrap(); 421 422 // Idle callback not executed again even if we wait for a while. 423 assert_eq!( 424 idle_done_receiver.recv_timeout(Duration::from_secs(3)), 425 Err(RecvTimeoutError::Timeout) 426 ); 427 428 // However, if more work is done then there's another chance to go idle. 429 let (done_sender, done_receiver) = channel(); 430 at.queue_hi(move |_shelf| { 431 std::thread::sleep(Duration::from_millis(500)); 432 done_sender.send(()).unwrap(); 433 }); 434 // Idle callback not immediately executed, because the high priority 435 // job is taking a while. 436 assert_eq!( 437 idle_done_receiver.recv_timeout(Duration::from_millis(1)), 438 Err(RecvTimeoutError::Timeout) 439 ); 440 done_receiver.recv().unwrap(); 441 idle_done_receiver.recv_timeout(Duration::from_millis(1)).unwrap(); 442 } 443 444 #[test] test_async_task_multiple_idle()445 fn test_async_task_multiple_idle() { 446 let at = AsyncTask::new(Duration::from_secs(3)); 447 let (idle_sender, idle_receiver) = sync_channel::<i32>(5); 448 // Queue a high priority job to start things off 449 at.queue_hi(|_shelf| { 450 std::thread::sleep(Duration::from_millis(500)); 451 }); 452 453 // Multiple idle callbacks. 454 for i in 0..3 { 455 let idle_sender = idle_sender.clone(); 456 at.add_idle(move |_shelf| { 457 idle_sender.send(i).unwrap(); 458 }); 459 } 460 461 // Nothing happens immediately. 462 assert_eq!( 463 idle_receiver.recv_timeout(Duration::from_millis(1)), 464 Err(RecvTimeoutError::Timeout) 465 ); 466 // Wait for a moment and the idle jobs should have run. 467 std::thread::sleep(Duration::from_secs(1)); 468 469 let mut results = Vec::new(); 470 while let Ok(i) = idle_receiver.recv_timeout(Duration::from_millis(1)) { 471 results.push(i); 472 } 473 assert_eq!(results, [0, 1, 2]); 474 } 475 476 #[test] test_async_task_idle_queues_job()477 fn test_async_task_idle_queues_job() { 478 let at = Arc::new(AsyncTask::new(Duration::from_secs(1))); 479 let at_clone = at.clone(); 480 let (idle_sender, idle_receiver) = sync_channel::<i32>(100); 481 // Add an idle callback that queues a low-priority job. 482 at.add_idle(move |shelf| { 483 at_clone.queue_lo(|_shelf| { 484 // Slow things down so the channel doesn't fill up. 485 std::thread::sleep(Duration::from_millis(50)); 486 }); 487 let i = shelf.get_mut::<i32>(); 488 idle_sender.send(*i).unwrap(); 489 *i += 1; 490 }); 491 492 // Nothing happens immediately. 493 assert_eq!( 494 idle_receiver.recv_timeout(Duration::from_millis(1500)), 495 Err(RecvTimeoutError::Timeout) 496 ); 497 498 // Once we queue a normal job, things start. 499 at.queue_hi(|_shelf| {}); 500 assert_eq!(0, idle_receiver.recv_timeout(Duration::from_millis(200)).unwrap()); 501 502 // The idle callback queues a job, and completion of that job 503 // means the task is going idle again...so the idle callback will 504 // be called repeatedly. 505 assert_eq!(1, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap()); 506 assert_eq!(2, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap()); 507 assert_eq!(3, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap()); 508 } 509 510 #[test] 511 #[should_panic] test_async_task_idle_panic()512 fn test_async_task_idle_panic() { 513 let at = AsyncTask::new(Duration::from_secs(1)); 514 let (idle_sender, idle_receiver) = sync_channel::<()>(3); 515 // Add an idle callback that panics. 516 at.add_idle(move |_shelf| { 517 idle_sender.send(()).unwrap(); 518 panic!("Panic from idle callback"); 519 }); 520 // Queue a job to trigger idleness and ensuing panic. 521 at.queue_hi(|_shelf| {}); 522 idle_receiver.recv().unwrap(); 523 524 // Queue another job afterwards to ensure that the async thread gets joined 525 // and the panic detected. 526 let (done_sender, done_receiver) = channel(); 527 at.queue_hi(move |_shelf| { 528 done_sender.send(()).unwrap(); 529 }); 530 done_receiver.recv().unwrap(); 531 } 532 } 533