1 //
2 // Copyright (C) 2021 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 //! ProfCollect tracing scheduler.
18
19 use std::fs;
20 use std::mem;
21 use std::path::Path;
22 use std::sync::mpsc::{sync_channel, SyncSender};
23 use std::sync::Arc;
24 use std::sync::Mutex;
25 use std::thread;
26 use std::time::{Duration, Instant};
27
28 use crate::config::{Config, LOG_FILE, PROFILE_OUTPUT_DIR, TRACE_OUTPUT_DIR};
29 use crate::trace_provider::{self, TraceProvider};
30 use anyhow::{anyhow, ensure, Context, Result};
31
32 pub struct Scheduler {
33 /// Signal to terminate the periodic collection worker thread, None if periodic collection is
34 /// not scheduled.
35 termination_ch: Option<SyncSender<()>>,
36 /// The preferred trace provider for the system.
37 trace_provider: Arc<Mutex<dyn TraceProvider + Send>>,
38 provider_ready_callbacks: Arc<Mutex<Vec<Box<dyn FnOnce() + Send>>>>,
39 }
40
41 impl Scheduler {
new() -> Result<Self>42 pub fn new() -> Result<Self> {
43 let p = trace_provider::get_trace_provider()?;
44 p.lock().map_err(|e| anyhow!(e.to_string()))?.set_log_file(&LOG_FILE);
45 Ok(Scheduler {
46 termination_ch: None,
47 trace_provider: p,
48 provider_ready_callbacks: Arc::new(Mutex::new(Vec::new())),
49 })
50 }
51
is_scheduled(&self) -> bool52 fn is_scheduled(&self) -> bool {
53 self.termination_ch.is_some()
54 }
55
schedule_periodic(&mut self, config: &Config) -> Result<()>56 pub fn schedule_periodic(&mut self, config: &Config) -> Result<()> {
57 ensure!(!self.is_scheduled(), "Already scheduled.");
58
59 let (sender, receiver) = sync_channel(1);
60 self.termination_ch = Some(sender);
61
62 // Clone config and trace_provider ARC for the worker thread.
63 let config = config.clone();
64 let trace_provider = self.trace_provider.clone();
65
66 thread::spawn(move || {
67 loop {
68 match receiver.recv_timeout(config.collection_interval) {
69 Ok(_) => break,
70 Err(_) => {
71 // Did not receive a termination signal, initiate trace event.
72 if check_space_limit(&TRACE_OUTPUT_DIR, &config).unwrap() {
73 trace_provider.lock().unwrap().trace(
74 &TRACE_OUTPUT_DIR,
75 "periodic",
76 &config.sampling_period,
77 &config.binary_filter,
78 );
79 }
80 }
81 }
82 }
83 });
84 Ok(())
85 }
86
terminate_periodic(&mut self) -> Result<()>87 pub fn terminate_periodic(&mut self) -> Result<()> {
88 self.termination_ch
89 .as_ref()
90 .ok_or_else(|| anyhow!("Not scheduled"))?
91 .send(())
92 .context("Scheduler worker disappeared.")?;
93 self.termination_ch = None;
94 Ok(())
95 }
96
one_shot(&self, config: &Config, tag: &str) -> Result<()>97 pub fn one_shot(&self, config: &Config, tag: &str) -> Result<()> {
98 let trace_provider = self.trace_provider.clone();
99 if check_space_limit(&TRACE_OUTPUT_DIR, config)? {
100 trace_provider.lock().unwrap().trace(
101 &TRACE_OUTPUT_DIR,
102 tag,
103 &config.sampling_period,
104 &config.binary_filter,
105 );
106 }
107 Ok(())
108 }
109
process(&self, config: &Config) -> Result<()>110 pub fn process(&self, config: &Config) -> Result<()> {
111 let trace_provider = self.trace_provider.clone();
112 trace_provider
113 .lock()
114 .unwrap()
115 .process(&TRACE_OUTPUT_DIR, &PROFILE_OUTPUT_DIR, &config.binary_filter)
116 .context("Failed to process profiles.")?;
117 Ok(())
118 }
119
get_trace_provider_name(&self) -> &'static str120 pub fn get_trace_provider_name(&self) -> &'static str {
121 self.trace_provider.lock().unwrap().get_name()
122 }
123
is_provider_ready(&self) -> bool124 pub fn is_provider_ready(&self) -> bool {
125 self.trace_provider.lock().unwrap().is_ready()
126 }
127
register_provider_ready_callback(&self, cb: Box<dyn FnOnce() + Send>)128 pub fn register_provider_ready_callback(&self, cb: Box<dyn FnOnce() + Send>) {
129 let mut locked_callbacks = self.provider_ready_callbacks.lock().unwrap();
130 locked_callbacks.push(cb);
131 if locked_callbacks.len() == 1 {
132 self.start_thread_waiting_for_provider_ready();
133 }
134 }
135
start_thread_waiting_for_provider_ready(&self)136 fn start_thread_waiting_for_provider_ready(&self) {
137 let provider = self.trace_provider.clone();
138 let callbacks = self.provider_ready_callbacks.clone();
139
140 thread::spawn(move || {
141 let start_time = Instant::now();
142 loop {
143 let elapsed = Instant::now().duration_since(start_time);
144 if provider.lock().unwrap().is_ready() {
145 break;
146 }
147 // Decide check period based on how long we have waited:
148 // For the first 10s waiting, check every 100ms (likely to work on EVT devices).
149 // For the first 10m waiting, check every 10s (likely to work on DVT devices).
150 // For others, check every 10m.
151 let sleep_duration = if elapsed < Duration::from_secs(10) {
152 Duration::from_millis(100)
153 } else if elapsed < Duration::from_secs(60 * 10) {
154 Duration::from_secs(10)
155 } else {
156 Duration::from_secs(60 * 10)
157 };
158 thread::sleep(sleep_duration);
159 }
160
161 let mut locked_callbacks = callbacks.lock().unwrap();
162 let v = mem::take(&mut *locked_callbacks);
163 for cb in v {
164 cb();
165 }
166 });
167 }
168
clear_trace_log(&self) -> Result<()>169 pub fn clear_trace_log(&self) -> Result<()> {
170 let provider = self.trace_provider.lock().map_err(|e| anyhow!(e.to_string()))?;
171 provider.reset_log_file();
172 let mut result = Ok(());
173 if LOG_FILE.exists() {
174 result = fs::remove_file(*LOG_FILE).map_err(|e| anyhow!(e));
175 }
176 provider.set_log_file(&LOG_FILE);
177 result
178 }
179 }
180
181 /// Run if space usage is under limit.
check_space_limit(path: &Path, config: &Config) -> Result<bool>182 fn check_space_limit(path: &Path, config: &Config) -> Result<bool> {
183 // Returns the size of a directory, non-recursive.
184 let dir_size = |path| -> Result<u64> {
185 fs::read_dir(path)?.try_fold(0, |acc, file| {
186 let metadata = file?.metadata()?;
187 let size = if metadata.is_file() { metadata.len() } else { 0 };
188 Ok(acc + size)
189 })
190 };
191
192 if dir_size(path)? > config.max_trace_limit {
193 log::error!("trace storage exhausted.");
194 return Ok(false);
195 }
196 Ok(true)
197 }
198