• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! This is a small server which is intended to run inside of an emulator or
2 //! on a remote test device. This server pairs with the `remote-test-client`
3 //! program in this repository. The `remote-test-client` connects to this
4 //! server over a TCP socket and performs work such as:
5 //!
6 //! 1. Pushing shared libraries to the server
7 //! 2. Running tests through the server
8 //!
9 //! The server supports running tests concurrently and also supports tests
10 //! themselves having support libraries. All data over the TCP sockets is in a
11 //! basically custom format suiting our needs.
12 
13 #[cfg(not(windows))]
14 use std::fs::Permissions;
15 use std::net::SocketAddr;
16 #[cfg(not(windows))]
17 use std::os::unix::prelude::*;
18 
19 use std::cmp;
20 use std::env;
21 use std::fs::{self, File};
22 use std::io::prelude::*;
23 use std::io::{self, BufReader};
24 use std::net::{TcpListener, TcpStream};
25 use std::path::{Path, PathBuf};
26 use std::process::{Command, ExitStatus, Stdio};
27 use std::str;
28 use std::sync::atomic::{AtomicUsize, Ordering};
29 use std::sync::{Arc, Mutex};
30 use std::thread;
31 
32 macro_rules! t {
33     ($e:expr) => {
34         match $e {
35             Ok(e) => e,
36             Err(e) => panic!("{} failed with {}", stringify!($e), e),
37         }
38     };
39 }
40 
41 static TEST: AtomicUsize = AtomicUsize::new(0);
42 const RETRY_INTERVAL: u64 = 1;
43 const NUMBER_OF_RETRIES: usize = 5;
44 
45 #[derive(Copy, Clone)]
46 struct Config {
47     verbose: bool,
48     sequential: bool,
49     batch: bool,
50     bind: SocketAddr,
51 }
52 
53 impl Config {
default() -> Config54     pub fn default() -> Config {
55         Config {
56             verbose: false,
57             sequential: false,
58             batch: false,
59             bind: if cfg!(target_os = "android") || cfg!(windows) {
60                 ([0, 0, 0, 0], 12345).into()
61             } else {
62                 ([10, 0, 2, 15], 12345).into()
63             },
64         }
65     }
66 
parse_args() -> Config67     pub fn parse_args() -> Config {
68         let mut config = Config::default();
69 
70         let args = env::args().skip(1);
71         let mut next_is_bind = false;
72         for argument in args {
73             match &argument[..] {
74                 bind if next_is_bind => {
75                     config.bind = t!(bind.parse());
76                     next_is_bind = false;
77                 }
78                 "--bind" => next_is_bind = true,
79                 "--sequential" => config.sequential = true,
80                 "--batch" => config.batch = true,
81                 "--verbose" | "-v" => config.verbose = true,
82                 "--help" | "-h" => {
83                     show_help();
84                     std::process::exit(0);
85                 }
86                 arg => panic!("unknown argument: {}, use `--help` for known arguments", arg),
87             }
88         }
89         if next_is_bind {
90             panic!("missing value for --bind");
91         }
92 
93         config
94     }
95 }
96 
show_help()97 fn show_help() {
98     eprintln!(
99         r#"Usage:
100 
101 {} [OPTIONS]
102 
103 OPTIONS:
104     --bind <IP>:<PORT>   Specify IP address and port to listen for requests, e.g. "0.0.0.0:12345"
105     --sequential         Run only one test at a time
106     --batch              Send stdout and stderr in batch instead of streaming
107     -v, --verbose        Show status messages
108     -h, --help           Show this help screen
109 "#,
110         std::env::args().next().unwrap()
111     );
112 }
113 
print_verbose(s: &str, conf: Config)114 fn print_verbose(s: &str, conf: Config) {
115     if conf.verbose {
116         println!("{}", s);
117     }
118 }
119 
main()120 fn main() {
121     let config = Config::parse_args();
122     println!("starting test server");
123 
124     let listener = bind_socket(config.bind);
125     let (work, tmp): (PathBuf, PathBuf) = if cfg!(target_os = "android") {
126         ("/data/local/tmp/work".into(), "/data/local/tmp/work/tmp".into())
127     } else {
128         let mut work_dir = env::temp_dir();
129         work_dir.push("work");
130         let mut tmp_dir = work_dir.clone();
131         tmp_dir.push("tmp");
132         (work_dir, tmp_dir)
133     };
134     println!("listening on {}!", config.bind);
135 
136     t!(fs::create_dir_all(&work));
137     t!(fs::create_dir_all(&tmp));
138 
139     let lock = Arc::new(Mutex::new(()));
140 
141     for socket in listener.incoming() {
142         let mut socket = t!(socket);
143         let mut buf = [0; 4];
144         if socket.read_exact(&mut buf).is_err() {
145             continue;
146         }
147         if &buf[..] == b"ping" {
148             print_verbose("Received ping", config);
149             t!(socket.write_all(b"pong"));
150         } else if &buf[..] == b"push" {
151             handle_push(socket, &work, config);
152         } else if &buf[..] == b"run " {
153             let lock = lock.clone();
154             let work = work.clone();
155             let tmp = tmp.clone();
156             let f = move || handle_run(socket, &work, &tmp, &lock, config);
157             if config.sequential {
158                 f();
159             } else {
160                 thread::spawn(f);
161             }
162         } else {
163             panic!("unknown command {:?}", buf);
164         }
165     }
166 }
167 
bind_socket(addr: SocketAddr) -> TcpListener168 fn bind_socket(addr: SocketAddr) -> TcpListener {
169     for _ in 0..(NUMBER_OF_RETRIES - 1) {
170         if let Ok(x) = TcpListener::bind(addr) {
171             return x;
172         }
173         std::thread::sleep(std::time::Duration::from_secs(RETRY_INTERVAL));
174     }
175     TcpListener::bind(addr).unwrap()
176 }
177 
handle_push(socket: TcpStream, work: &Path, config: Config)178 fn handle_push(socket: TcpStream, work: &Path, config: Config) {
179     let mut reader = BufReader::new(socket);
180     let dst = recv(&work, &mut reader);
181     print_verbose(&format!("push {:#?}", dst), config);
182 
183     let mut socket = reader.into_inner();
184     t!(socket.write_all(b"ack "));
185 }
186 
187 struct RemoveOnDrop<'a> {
188     inner: &'a Path,
189 }
190 
191 impl Drop for RemoveOnDrop<'_> {
drop(&mut self)192     fn drop(&mut self) {
193         t!(fs::remove_dir_all(self.inner));
194     }
195 }
196 
handle_run(socket: TcpStream, work: &Path, tmp: &Path, lock: &Mutex<()>, config: Config)197 fn handle_run(socket: TcpStream, work: &Path, tmp: &Path, lock: &Mutex<()>, config: Config) {
198     let mut arg = Vec::new();
199     let mut reader = BufReader::new(socket);
200 
201     // Allocate ourselves a directory that we'll delete when we're done to save
202     // space.
203     let n = TEST.fetch_add(1, Ordering::SeqCst);
204     let path = work.join(format!("test{}", n));
205     t!(fs::create_dir(&path));
206     let _a = RemoveOnDrop { inner: &path };
207 
208     // First up we'll get a list of arguments delimited with 0 bytes. An empty
209     // argument means that we're done.
210     let mut args = Vec::new();
211     while t!(reader.read_until(0, &mut arg)) > 1 {
212         args.push(t!(str::from_utf8(&arg[..arg.len() - 1])).to_string());
213         arg.truncate(0);
214     }
215 
216     // Next we'll get a bunch of env vars in pairs delimited by 0s as well
217     let mut env = Vec::new();
218     arg.truncate(0);
219     while t!(reader.read_until(0, &mut arg)) > 1 {
220         let key_len = arg.len() - 1;
221         let val_len = t!(reader.read_until(0, &mut arg)) - 1;
222         {
223             let key = &arg[..key_len];
224             let val = &arg[key_len + 1..][..val_len];
225             let key = t!(str::from_utf8(key)).to_string();
226             let val = t!(str::from_utf8(val)).to_string();
227             env.push((key, val));
228         }
229         arg.truncate(0);
230     }
231 
232     // The section of code from here down to where we drop the lock is going to
233     // be a critical section for us. On Linux you can't execute a file which is
234     // open somewhere for writing, as you'll receive the error "text file busy".
235     // Now here we never have the text file open for writing when we spawn it,
236     // so why do we still need a critical section?
237     //
238     // Process spawning first involves a `fork` on Unix, which clones all file
239     // descriptors into the child process. This means that it's possible for us
240     // to open the file for writing (as we're downloading it), then some other
241     // thread forks, then we close the file and try to exec. At that point the
242     // other thread created a child process with the file open for writing, and
243     // we attempt to execute it, so we get an error.
244     //
245     // This race is resolve by ensuring that only one thread can write the file
246     // and spawn a child process at once. Kinda an unfortunate solution, but we
247     // don't have many other choices with this sort of setup!
248     //
249     // In any case the lock is acquired here, before we start writing any files.
250     // It's then dropped just after we spawn the child. That way we don't lock
251     // the execution of the child, just the creation of its files.
252     let lock = lock.lock();
253 
254     // Next there's a list of dynamic libraries preceded by their filenames.
255     while t!(reader.fill_buf())[0] != 0 {
256         recv(&path, &mut reader);
257     }
258     assert_eq!(t!(reader.read(&mut [0])), 1);
259 
260     // Finally we'll get the binary. The other end will tell us how big the
261     // binary is and then we'll download it all to the exe path we calculated
262     // earlier.
263     let exe = recv(&path, &mut reader);
264     print_verbose(&format!("run {:#?}", exe), config);
265 
266     let mut cmd = Command::new(&exe);
267     cmd.args(args);
268     cmd.envs(env);
269 
270     // On windows, libraries are just searched in the executable directory,
271     // system directories, PWD, and PATH, in that order. PATH is the only one
272     // we can change for this.
273     let library_path = if cfg!(windows) { "PATH" } else { "LD_LIBRARY_PATH" };
274 
275     // Support libraries were uploaded to `work` earlier, so make sure that's
276     // in `LD_LIBRARY_PATH`. Also include our own current dir which may have
277     // had some libs uploaded.
278     let mut paths = vec![work.to_owned(), path.clone()];
279     if let Some(library_path) = env::var_os(library_path) {
280         paths.extend(env::split_paths(&library_path));
281     }
282     cmd.env(library_path, env::join_paths(paths).unwrap());
283 
284     // Some tests assume RUST_TEST_TMPDIR exists
285     cmd.env("RUST_TEST_TMPDIR", tmp.to_owned());
286 
287     let socket = Arc::new(Mutex::new(reader.into_inner()));
288 
289     let status = if config.batch {
290         let child =
291             t!(cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped()).output());
292         batch_copy(&child.stdout, 0, &*socket);
293         batch_copy(&child.stderr, 1, &*socket);
294         child.status
295     } else {
296         // Spawn the child and ferry over stdout/stderr to the socket in a framed
297         // fashion (poor man's style)
298         let mut child =
299             t!(cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped()).spawn());
300         drop(lock);
301         let mut stdout = child.stdout.take().unwrap();
302         let mut stderr = child.stderr.take().unwrap();
303         let socket2 = socket.clone();
304         let thread = thread::spawn(move || my_copy(&mut stdout, 0, &*socket2));
305         my_copy(&mut stderr, 1, &*socket);
306         thread.join().unwrap();
307         t!(child.wait())
308     };
309 
310     // Finally send over the exit status.
311     let (which, code) = get_status_code(&status);
312 
313     t!(socket.lock().unwrap().write_all(&[
314         which,
315         (code >> 24) as u8,
316         (code >> 16) as u8,
317         (code >> 8) as u8,
318         (code >> 0) as u8,
319     ]));
320 }
321 
322 #[cfg(not(windows))]
get_status_code(status: &ExitStatus) -> (u8, i32)323 fn get_status_code(status: &ExitStatus) -> (u8, i32) {
324     match status.code() {
325         Some(n) => (0, n),
326         None => (1, status.signal().unwrap()),
327     }
328 }
329 
330 #[cfg(windows)]
get_status_code(status: &ExitStatus) -> (u8, i32)331 fn get_status_code(status: &ExitStatus) -> (u8, i32) {
332     (0, status.code().unwrap())
333 }
334 
recv<B: BufRead>(dir: &Path, io: &mut B) -> PathBuf335 fn recv<B: BufRead>(dir: &Path, io: &mut B) -> PathBuf {
336     let mut filename = Vec::new();
337     t!(io.read_until(0, &mut filename));
338 
339     // We've got some tests with *really* long names. We try to name the test
340     // executable the same on the target as it is on the host to aid with
341     // debugging, but the targets we're emulating are often more restrictive
342     // than the hosts as well.
343     //
344     // To ensure we can run a maximum number of tests without modifications we
345     // just arbitrarily truncate the filename to 50 bytes. That should
346     // hopefully allow us to still identify what's running while staying under
347     // the filesystem limits.
348     let len = cmp::min(filename.len() - 1, 50);
349     let dst = dir.join(t!(str::from_utf8(&filename[..len])));
350     let amt = read_u32(io) as u64;
351     t!(io::copy(&mut io.take(amt), &mut t!(File::create(&dst))));
352     set_permissions(&dst);
353     dst
354 }
355 
356 #[cfg(not(windows))]
set_permissions(path: &Path)357 fn set_permissions(path: &Path) {
358     t!(fs::set_permissions(&path, Permissions::from_mode(0o755)));
359 }
360 #[cfg(windows)]
set_permissions(_path: &Path)361 fn set_permissions(_path: &Path) {}
362 
my_copy(src: &mut dyn Read, which: u8, dst: &Mutex<dyn Write>)363 fn my_copy(src: &mut dyn Read, which: u8, dst: &Mutex<dyn Write>) {
364     let mut b = [0; 1024];
365     loop {
366         let n = t!(src.read(&mut b));
367         let mut dst = dst.lock().unwrap();
368         t!(dst.write_all(&create_header(which, n as u32)));
369         if n > 0 {
370             t!(dst.write_all(&b[..n]));
371         } else {
372             break;
373         }
374     }
375 }
376 
batch_copy(buf: &[u8], which: u8, dst: &Mutex<dyn Write>)377 fn batch_copy(buf: &[u8], which: u8, dst: &Mutex<dyn Write>) {
378     let n = buf.len();
379     let mut dst = dst.lock().unwrap();
380     t!(dst.write_all(&create_header(which, n as u32)));
381     if n > 0 {
382         t!(dst.write_all(buf));
383         // Marking buf finished
384         t!(dst.write_all(&[which, 0, 0, 0, 0,]));
385     }
386 }
387 
create_header(which: u8, n: u32) -> [u8; 5]388 const fn create_header(which: u8, n: u32) -> [u8; 5] {
389     let bytes = n.to_be_bytes();
390     [which, bytes[0], bytes[1], bytes[2], bytes[3]]
391 }
392 
read_u32(r: &mut dyn Read) -> u32393 fn read_u32(r: &mut dyn Read) -> u32 {
394     let mut len = [0; 4];
395     t!(r.read_exact(&mut len));
396     u32::from_be_bytes(len)
397 }
398