• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright (C) 2021 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 //! ProfCollect tracing scheduler.
18 
19 use std::fs;
20 use std::mem;
21 use std::path::Path;
22 use std::sync::mpsc::{sync_channel, SyncSender};
23 use std::sync::Arc;
24 use std::sync::Mutex;
25 use std::thread;
26 use std::time::{Duration, Instant};
27 
28 use crate::config::{Config, PROFILE_OUTPUT_DIR, TRACE_OUTPUT_DIR};
29 use crate::trace_provider::{self, TraceProvider};
30 use anyhow::{anyhow, ensure, Context, Result};
31 
32 pub struct Scheduler {
33     /// Signal to terminate the periodic collection worker thread, None if periodic collection is
34     /// not scheduled.
35     termination_ch: Option<SyncSender<()>>,
36     /// The preferred trace provider for the system.
37     trace_provider: Arc<Mutex<dyn TraceProvider + Send>>,
38     provider_ready_callbacks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send>>>>,
39 }
40 
41 impl Scheduler {
new() -> Result<Self>42     pub fn new() -> Result<Self> {
43         let p = trace_provider::get_trace_provider()?;
44         Ok(Scheduler {
45             termination_ch: None,
46             trace_provider: p,
47             provider_ready_callbacks: Arc::new(Mutex::new(Vec::new())),
48         })
49     }
50 
is_scheduled(&self) -> bool51     fn is_scheduled(&self) -> bool {
52         self.termination_ch.is_some()
53     }
54 
schedule_periodic(&mut self, config: &Config) -> Result<()>55     pub fn schedule_periodic(&mut self, config: &Config) -> Result<()> {
56         ensure!(!self.is_scheduled(), "Already scheduled.");
57 
58         let (sender, receiver) = sync_channel(1);
59         self.termination_ch = Some(sender);
60 
61         // Clone config and trace_provider ARC for the worker thread.
62         let config = config.clone();
63         let trace_provider = self.trace_provider.clone();
64 
65         thread::spawn(move || {
66             loop {
67                 match receiver.recv_timeout(config.collection_interval) {
68                     Ok(_) => break,
69                     Err(_) => {
70                         // Did not receive a termination signal, initiate trace event.
71                         if check_space_limit(*TRACE_OUTPUT_DIR, &config).unwrap() {
72                             trace_provider.lock().unwrap().trace(
73                                 &TRACE_OUTPUT_DIR,
74                                 "periodic",
75                                 &config.sampling_period,
76                             );
77                         }
78                     }
79                 }
80             }
81         });
82         Ok(())
83     }
84 
terminate_periodic(&mut self) -> Result<()>85     pub fn terminate_periodic(&mut self) -> Result<()> {
86         self.termination_ch
87             .as_ref()
88             .ok_or_else(|| anyhow!("Not scheduled"))?
89             .send(())
90             .context("Scheduler worker disappeared.")?;
91         self.termination_ch = None;
92         Ok(())
93     }
94 
one_shot(&self, config: &Config, tag: &str) -> Result<()>95     pub fn one_shot(&self, config: &Config, tag: &str) -> Result<()> {
96         let trace_provider = self.trace_provider.clone();
97         if check_space_limit(*TRACE_OUTPUT_DIR, config)? {
98             trace_provider.lock().unwrap().trace(&TRACE_OUTPUT_DIR, tag, &config.sampling_period);
99         }
100         Ok(())
101     }
102 
process(&self, config: &Config) -> Result<()>103     pub fn process(&self, config: &Config) -> Result<()> {
104         let trace_provider = self.trace_provider.clone();
105         trace_provider
106             .lock()
107             .unwrap()
108             .process(&TRACE_OUTPUT_DIR, &PROFILE_OUTPUT_DIR, &config.binary_filter)
109             .context("Failed to process profiles.")?;
110         Ok(())
111     }
112 
get_trace_provider_name(&self) -> &'static str113     pub fn get_trace_provider_name(&self) -> &'static str {
114         self.trace_provider.lock().unwrap().get_name()
115     }
116 
is_provider_ready(&self) -> bool117     pub fn is_provider_ready(&self) -> bool {
118         self.trace_provider.lock().unwrap().is_ready()
119     }
120 
register_provider_ready_callback(&self, cb: Box<dyn FnOnce() + Send>)121     pub fn register_provider_ready_callback(&self, cb: Box<dyn FnOnce() + Send>) {
122         let mut locked_callbacks = self.provider_ready_callbacks.lock().unwrap();
123         locked_callbacks.push(cb);
124         if locked_callbacks.len() == 1 {
125             self.start_thread_waiting_for_provider_ready();
126         }
127     }
128 
start_thread_waiting_for_provider_ready(&self)129     fn start_thread_waiting_for_provider_ready(&self) {
130         let provider = self.trace_provider.clone();
131         let callbacks = self.provider_ready_callbacks.clone();
132 
133         thread::spawn(move || {
134             let start_time = Instant::now();
135             loop {
136                 let elapsed = Instant::now().duration_since(start_time);
137                 if provider.lock().unwrap().is_ready() {
138                     break;
139                 }
140                 // Decide check period based on how long we have waited:
141                 // For the first 10s waiting, check every 100ms (likely to work on EVT devices).
142                 // For the first 10m waiting, check every 10s (likely to work on DVT devices).
143                 // For others, check every 10m.
144                 let sleep_duration = if elapsed < Duration::from_secs(10) {
145                     Duration::from_millis(100)
146                 } else if elapsed < Duration::from_secs(60 * 10) {
147                     Duration::from_secs(10)
148                 } else {
149                     Duration::from_secs(60 * 10)
150                 };
151                 thread::sleep(sleep_duration);
152             }
153 
154             let mut locked_callbacks = callbacks.lock().unwrap();
155             let v = mem::take(&mut *locked_callbacks);
156             for cb in v {
157                 cb();
158             }
159         });
160     }
161 }
162 
163 /// Run if space usage is under limit.
check_space_limit(path: &Path, config: &Config) -> Result<bool>164 fn check_space_limit(path: &Path, config: &Config) -> Result<bool> {
165     // Returns the size of a directory, non-recursive.
166     let dir_size = |path| -> Result<u64> {
167         fs::read_dir(path)?.try_fold(0, |acc, file| {
168             let metadata = file?.metadata()?;
169             let size = if metadata.is_file() { metadata.len() } else { 0 };
170             Ok(acc + size)
171         })
172     };
173 
174     if dir_size(path)? > config.max_trace_limit {
175         log::error!("trace storage exhausted.");
176         return Ok(false);
177     }
178     Ok(true)
179 }
180