1 //
2 // Copyright (C) 2021 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 //! ProfCollect tracing scheduler.
18
19 use std::fs;
20 use std::mem;
21 use std::path::Path;
22 use std::sync::Arc;
23 use std::sync::Mutex;
24 use std::thread;
25 use std::time::{Duration, Instant};
26
27 use crate::config::{get_sampling_period, Config, LOG_FILE, PROFILE_OUTPUT_DIR, TRACE_OUTPUT_DIR};
28 use crate::trace_provider::{self, TraceProvider};
29 use anyhow::{anyhow, Context, Result};
30
31 pub struct Scheduler {
32 /// The preferred trace provider for the system.
33 trace_provider: Arc<Mutex<dyn TraceProvider + Send>>,
34 provider_ready_callbacks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send>>>>,
35 }
36
37 impl Scheduler {
new() -> Result<Self>38 pub fn new() -> Result<Self> {
39 let p = trace_provider::get_trace_provider()?;
40 p.lock().map_err(|e| anyhow!(e.to_string()))?.set_log_file(&LOG_FILE);
41 Ok(Scheduler {
42 trace_provider: p,
43 provider_ready_callbacks: Arc::new(Mutex::new(Vec::new())),
44 })
45 }
46
trace_system(&self, config: &Config, tag: &str) -> Result<()>47 pub fn trace_system(&self, config: &Config, tag: &str) -> Result<()> {
48 let trace_provider = self.trace_provider.clone();
49 if check_space_limit(&TRACE_OUTPUT_DIR, config)? {
50 trace_provider.lock().unwrap().trace_system(
51 &TRACE_OUTPUT_DIR,
52 tag,
53 &get_sampling_period(),
54 &config.binary_filter,
55 );
56 }
57 Ok(())
58 }
59
trace_process( &self, config: &Config, tag: &str, processes: &str, samplng_period: f32, ) -> Result<()>60 pub fn trace_process(
61 &self,
62 config: &Config,
63 tag: &str,
64 processes: &str,
65 samplng_period: f32,
66 ) -> Result<()> {
67 let trace_provider = self.trace_provider.clone();
68 let duration = match samplng_period {
69 0.0 => get_sampling_period(),
70 _ => Duration::from_millis(samplng_period as u64),
71 };
72 if check_space_limit(&TRACE_OUTPUT_DIR, config)? {
73 trace_provider.lock().unwrap().trace_process(
74 &TRACE_OUTPUT_DIR,
75 tag,
76 &duration,
77 processes,
78 );
79 }
80 Ok(())
81 }
82
process(&self, config: &Config) -> Result<()>83 pub fn process(&self, config: &Config) -> Result<()> {
84 let trace_provider = self.trace_provider.clone();
85 trace_provider
86 .lock()
87 .unwrap()
88 .process(&TRACE_OUTPUT_DIR, &PROFILE_OUTPUT_DIR, &config.binary_filter)
89 .context("Failed to process profiles.")?;
90 Ok(())
91 }
92
get_trace_provider_name(&self) -> &'static str93 pub fn get_trace_provider_name(&self) -> &'static str {
94 self.trace_provider.lock().unwrap().get_name()
95 }
96
is_provider_ready(&self) -> bool97 pub fn is_provider_ready(&self) -> bool {
98 self.trace_provider.lock().unwrap().is_ready()
99 }
100
register_provider_ready_callback(&self, cb: Box<dyn FnOnce() + Send>)101 pub fn register_provider_ready_callback(&self, cb: Box<dyn FnOnce() + Send>) {
102 let mut locked_callbacks = self.provider_ready_callbacks.lock().unwrap();
103 locked_callbacks.push(cb);
104 if locked_callbacks.len() == 1 {
105 self.start_thread_waiting_for_provider_ready();
106 }
107 }
108
start_thread_waiting_for_provider_ready(&self)109 fn start_thread_waiting_for_provider_ready(&self) {
110 let provider = self.trace_provider.clone();
111 let callbacks = self.provider_ready_callbacks.clone();
112
113 thread::spawn(move || {
114 let start_time = Instant::now();
115 loop {
116 let elapsed = Instant::now().duration_since(start_time);
117 if provider.lock().unwrap().is_ready() {
118 break;
119 }
120 // Decide check period based on how long we have waited:
121 // For the first 10s waiting, check every 100ms (likely to work on EVT devices).
122 // For the first 10m waiting, check every 10s (likely to work on DVT devices).
123 // For others, check every 10m.
124 let sleep_duration = if elapsed < Duration::from_secs(10) {
125 Duration::from_millis(100)
126 } else if elapsed < Duration::from_secs(60 * 10) {
127 Duration::from_secs(10)
128 } else {
129 Duration::from_secs(60 * 10)
130 };
131 thread::sleep(sleep_duration);
132 }
133
134 let mut locked_callbacks = callbacks.lock().unwrap();
135 let v = mem::take(&mut *locked_callbacks);
136 for cb in v {
137 cb();
138 }
139 });
140 }
141
clear_trace_log(&self) -> Result<()>142 pub fn clear_trace_log(&self) -> Result<()> {
143 let provider = self.trace_provider.lock().map_err(|e| anyhow!(e.to_string()))?;
144 provider.reset_log_file();
145 let mut result = Ok(());
146 if LOG_FILE.exists() {
147 result = fs::remove_file(*LOG_FILE).map_err(|e| anyhow!(e));
148 }
149 provider.set_log_file(&LOG_FILE);
150 result
151 }
152 }
153
154 /// Run if space usage is under limit.
check_space_limit(path: &Path, config: &Config) -> Result<bool>155 fn check_space_limit(path: &Path, config: &Config) -> Result<bool> {
156 // Returns the size of a directory, non-recursive.
157 let dir_size = |path| -> Result<u64> {
158 fs::read_dir(path)?.try_fold(0, |acc, file| {
159 let metadata = file?.metadata()?;
160 let size = if metadata.is_file() { metadata.len() } else { 0 };
161 Ok(acc + size)
162 })
163 };
164
165 if dir_size(path)? > config.max_trace_limit_mb * 1024 * 1024 {
166 log::error!("trace storage exhausted.");
167 return Ok(false);
168 }
169 Ok(true)
170 }
171