1 //! Flycheck provides the functionality needed to run `cargo check` or 2 //! another compatible command (f.x. clippy) in a background thread and provide 3 //! LSP diagnostics based on the output of the command. 4 5 #![warn(rust_2018_idioms, unused_lifetimes, semicolon_in_expressions_from_macros)] 6 7 use std::{ 8 fmt, io, 9 process::{ChildStderr, ChildStdout, Command, Stdio}, 10 time::Duration, 11 }; 12 13 use command_group::{CommandGroup, GroupChild}; 14 use crossbeam_channel::{never, select, unbounded, Receiver, Sender}; 15 use paths::AbsPathBuf; 16 use rustc_hash::FxHashMap; 17 use serde::Deserialize; 18 use stdx::process::streaming_output; 19 20 pub use cargo_metadata::diagnostic::{ 21 Applicability, Diagnostic, DiagnosticCode, DiagnosticLevel, DiagnosticSpan, 22 DiagnosticSpanMacroExpansion, 23 }; 24 25 #[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] 26 pub enum InvocationStrategy { 27 Once, 28 #[default] 29 PerWorkspace, 30 } 31 32 #[derive(Clone, Debug, Default, PartialEq, Eq)] 33 pub enum InvocationLocation { 34 Root(AbsPathBuf), 35 #[default] 36 Workspace, 37 } 38 39 #[derive(Clone, Debug, PartialEq, Eq)] 40 pub enum FlycheckConfig { 41 CargoCommand { 42 command: String, 43 target_triples: Vec<String>, 44 all_targets: bool, 45 no_default_features: bool, 46 all_features: bool, 47 features: Vec<String>, 48 extra_args: Vec<String>, 49 extra_env: FxHashMap<String, String>, 50 ansi_color_output: bool, 51 }, 52 CustomCommand { 53 command: String, 54 args: Vec<String>, 55 extra_env: FxHashMap<String, String>, 56 invocation_strategy: InvocationStrategy, 57 invocation_location: InvocationLocation, 58 }, 59 } 60 61 impl fmt::Display for FlycheckConfig { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 63 match self { 64 FlycheckConfig::CargoCommand { command, .. } => write!(f, "cargo {command}"), 65 FlycheckConfig::CustomCommand { command, args, .. } => { 66 write!(f, "{command} {}", args.join(" ")) 67 } 68 } 69 } 70 } 71 72 /// Flycheck wraps the shared state and communication machinery used for 73 /// running `cargo check` (or other compatible command) and providing 74 /// diagnostics based on the output. 75 /// The spawned thread is shut down when this struct is dropped. 76 #[derive(Debug)] 77 pub struct FlycheckHandle { 78 // XXX: drop order is significant 79 sender: Sender<StateChange>, 80 _thread: stdx::thread::JoinHandle, 81 id: usize, 82 } 83 84 impl FlycheckHandle { spawn( id: usize, sender: Box<dyn Fn(Message) + Send>, config: FlycheckConfig, workspace_root: AbsPathBuf, ) -> FlycheckHandle85 pub fn spawn( 86 id: usize, 87 sender: Box<dyn Fn(Message) + Send>, 88 config: FlycheckConfig, 89 workspace_root: AbsPathBuf, 90 ) -> FlycheckHandle { 91 let actor = FlycheckActor::new(id, sender, config, workspace_root); 92 let (sender, receiver) = unbounded::<StateChange>(); 93 let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker) 94 .name("Flycheck".to_owned()) 95 .spawn(move || actor.run(receiver)) 96 .expect("failed to spawn thread"); 97 FlycheckHandle { id, sender, _thread: thread } 98 } 99 100 /// Schedule a re-start of the cargo check worker. restart(&self)101 pub fn restart(&self) { 102 self.sender.send(StateChange::Restart).unwrap(); 103 } 104 105 /// Stop this cargo check worker. cancel(&self)106 pub fn cancel(&self) { 107 self.sender.send(StateChange::Cancel).unwrap(); 108 } 109 id(&self) -> usize110 pub fn id(&self) -> usize { 111 self.id 112 } 113 } 114 115 pub enum Message { 116 /// Request adding a diagnostic with fixes included to a file 117 AddDiagnostic { id: usize, workspace_root: AbsPathBuf, diagnostic: Diagnostic }, 118 119 /// Request check progress notification to client 120 Progress { 121 /// Flycheck instance ID 122 id: usize, 123 progress: Progress, 124 }, 125 } 126 127 impl fmt::Debug for Message { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result128 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 129 match self { 130 Message::AddDiagnostic { id, workspace_root, diagnostic } => f 131 .debug_struct("AddDiagnostic") 132 .field("id", id) 133 .field("workspace_root", workspace_root) 134 .field("diagnostic_code", &diagnostic.code.as_ref().map(|it| &it.code)) 135 .finish(), 136 Message::Progress { id, progress } => { 137 f.debug_struct("Progress").field("id", id).field("progress", progress).finish() 138 } 139 } 140 } 141 } 142 143 #[derive(Debug)] 144 pub enum Progress { 145 DidStart, 146 DidCheckCrate(String), 147 DidFinish(io::Result<()>), 148 DidCancel, 149 DidFailToRestart(String), 150 } 151 152 enum StateChange { 153 Restart, 154 Cancel, 155 } 156 157 /// A [`FlycheckActor`] is a single check instance of a workspace. 158 struct FlycheckActor { 159 /// The workspace id of this flycheck instance. 160 id: usize, 161 sender: Box<dyn Fn(Message) + Send>, 162 config: FlycheckConfig, 163 /// Either the workspace root of the workspace we are flychecking, 164 /// or the project root of the project. 165 root: AbsPathBuf, 166 /// CargoHandle exists to wrap around the communication needed to be able to 167 /// run `cargo check` without blocking. Currently the Rust standard library 168 /// doesn't provide a way to read sub-process output without blocking, so we 169 /// have to wrap sub-processes output handling in a thread and pass messages 170 /// back over a channel. 171 cargo_handle: Option<CargoHandle>, 172 } 173 174 enum Event { 175 RequestStateChange(StateChange), 176 CheckEvent(Option<CargoMessage>), 177 } 178 179 impl FlycheckActor { new( id: usize, sender: Box<dyn Fn(Message) + Send>, config: FlycheckConfig, workspace_root: AbsPathBuf, ) -> FlycheckActor180 fn new( 181 id: usize, 182 sender: Box<dyn Fn(Message) + Send>, 183 config: FlycheckConfig, 184 workspace_root: AbsPathBuf, 185 ) -> FlycheckActor { 186 tracing::info!(%id, ?workspace_root, "Spawning flycheck"); 187 FlycheckActor { id, sender, config, root: workspace_root, cargo_handle: None } 188 } 189 report_progress(&self, progress: Progress)190 fn report_progress(&self, progress: Progress) { 191 self.send(Message::Progress { id: self.id, progress }); 192 } 193 next_event(&self, inbox: &Receiver<StateChange>) -> Option<Event>194 fn next_event(&self, inbox: &Receiver<StateChange>) -> Option<Event> { 195 let check_chan = self.cargo_handle.as_ref().map(|cargo| &cargo.receiver); 196 if let Ok(msg) = inbox.try_recv() { 197 // give restarts a preference so check outputs don't block a restart or stop 198 return Some(Event::RequestStateChange(msg)); 199 } 200 select! { 201 recv(inbox) -> msg => msg.ok().map(Event::RequestStateChange), 202 recv(check_chan.unwrap_or(&never())) -> msg => Some(Event::CheckEvent(msg.ok())), 203 } 204 } 205 run(mut self, inbox: Receiver<StateChange>)206 fn run(mut self, inbox: Receiver<StateChange>) { 207 'event: while let Some(event) = self.next_event(&inbox) { 208 match event { 209 Event::RequestStateChange(StateChange::Cancel) => { 210 tracing::debug!(flycheck_id = self.id, "flycheck cancelled"); 211 self.cancel_check_process(); 212 } 213 Event::RequestStateChange(StateChange::Restart) => { 214 // Cancel the previously spawned process 215 self.cancel_check_process(); 216 while let Ok(restart) = inbox.recv_timeout(Duration::from_millis(50)) { 217 // restart chained with a stop, so just cancel 218 if let StateChange::Cancel = restart { 219 continue 'event; 220 } 221 } 222 223 let command = self.check_command(); 224 tracing::debug!(?command, "will restart flycheck"); 225 match CargoHandle::spawn(command) { 226 Ok(cargo_handle) => { 227 tracing::debug!( 228 command = ?self.check_command(), 229 "did restart flycheck" 230 ); 231 self.cargo_handle = Some(cargo_handle); 232 self.report_progress(Progress::DidStart); 233 } 234 Err(error) => { 235 self.report_progress(Progress::DidFailToRestart(format!( 236 "Failed to run the following command: {:?} error={}", 237 self.check_command(), 238 error 239 ))); 240 } 241 } 242 } 243 Event::CheckEvent(None) => { 244 tracing::debug!(flycheck_id = self.id, "flycheck finished"); 245 246 // Watcher finished 247 let cargo_handle = self.cargo_handle.take().unwrap(); 248 let res = cargo_handle.join(); 249 if res.is_err() { 250 tracing::error!( 251 "Flycheck failed to run the following command: {:?}", 252 self.check_command() 253 ); 254 } 255 self.report_progress(Progress::DidFinish(res)); 256 } 257 Event::CheckEvent(Some(message)) => match message { 258 CargoMessage::CompilerArtifact(msg) => { 259 tracing::trace!( 260 flycheck_id = self.id, 261 artifact = msg.target.name, 262 "artifact received" 263 ); 264 self.report_progress(Progress::DidCheckCrate(msg.target.name)); 265 } 266 267 CargoMessage::Diagnostic(msg) => { 268 tracing::trace!( 269 flycheck_id = self.id, 270 message = msg.message, 271 "diagnostic received" 272 ); 273 self.send(Message::AddDiagnostic { 274 id: self.id, 275 workspace_root: self.root.clone(), 276 diagnostic: msg, 277 }); 278 } 279 }, 280 } 281 } 282 // If we rerun the thread, we need to discard the previous check results first 283 self.cancel_check_process(); 284 } 285 cancel_check_process(&mut self)286 fn cancel_check_process(&mut self) { 287 if let Some(cargo_handle) = self.cargo_handle.take() { 288 tracing::debug!( 289 command = ?self.check_command(), 290 "did cancel flycheck" 291 ); 292 cargo_handle.cancel(); 293 self.report_progress(Progress::DidCancel); 294 } 295 } 296 check_command(&self) -> Command297 fn check_command(&self) -> Command { 298 let (mut cmd, args) = match &self.config { 299 FlycheckConfig::CargoCommand { 300 command, 301 target_triples, 302 no_default_features, 303 all_targets, 304 all_features, 305 extra_args, 306 features, 307 extra_env, 308 ansi_color_output, 309 } => { 310 let mut cmd = Command::new(toolchain::cargo()); 311 cmd.arg(command); 312 cmd.current_dir(&self.root); 313 cmd.arg("--workspace"); 314 315 cmd.arg(if *ansi_color_output { 316 "--message-format=json-diagnostic-rendered-ansi" 317 } else { 318 "--message-format=json" 319 }); 320 321 cmd.arg("--manifest-path"); 322 cmd.arg(self.root.join("Cargo.toml").as_os_str()); 323 324 for target in target_triples { 325 cmd.args(["--target", target.as_str()]); 326 } 327 if *all_targets { 328 cmd.arg("--all-targets"); 329 } 330 if *all_features { 331 cmd.arg("--all-features"); 332 } else { 333 if *no_default_features { 334 cmd.arg("--no-default-features"); 335 } 336 if !features.is_empty() { 337 cmd.arg("--features"); 338 cmd.arg(features.join(" ")); 339 } 340 } 341 cmd.envs(extra_env); 342 (cmd, extra_args) 343 } 344 FlycheckConfig::CustomCommand { 345 command, 346 args, 347 extra_env, 348 invocation_strategy, 349 invocation_location, 350 } => { 351 let mut cmd = Command::new(command); 352 cmd.envs(extra_env); 353 354 match invocation_location { 355 InvocationLocation::Workspace => { 356 match invocation_strategy { 357 InvocationStrategy::Once => { 358 cmd.current_dir(&self.root); 359 } 360 InvocationStrategy::PerWorkspace => { 361 // FIXME: cmd.current_dir(&affected_workspace); 362 cmd.current_dir(&self.root); 363 } 364 } 365 } 366 InvocationLocation::Root(root) => { 367 cmd.current_dir(root); 368 } 369 } 370 371 (cmd, args) 372 } 373 }; 374 375 cmd.args(args); 376 cmd 377 } 378 send(&self, check_task: Message)379 fn send(&self, check_task: Message) { 380 (self.sender)(check_task); 381 } 382 } 383 384 struct JodGroupChild(GroupChild); 385 386 impl Drop for JodGroupChild { drop(&mut self)387 fn drop(&mut self) { 388 _ = self.0.kill(); 389 _ = self.0.wait(); 390 } 391 } 392 393 /// A handle to a cargo process used for fly-checking. 394 struct CargoHandle { 395 /// The handle to the actual cargo process. As we cannot cancel directly from with 396 /// a read syscall dropping and therefore terminating the process is our best option. 397 child: JodGroupChild, 398 thread: stdx::thread::JoinHandle<io::Result<(bool, String)>>, 399 receiver: Receiver<CargoMessage>, 400 } 401 402 impl CargoHandle { spawn(mut command: Command) -> std::io::Result<CargoHandle>403 fn spawn(mut command: Command) -> std::io::Result<CargoHandle> { 404 command.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null()); 405 let mut child = command.group_spawn().map(JodGroupChild)?; 406 407 let stdout = child.0.inner().stdout.take().unwrap(); 408 let stderr = child.0.inner().stderr.take().unwrap(); 409 410 let (sender, receiver) = unbounded(); 411 let actor = CargoActor::new(sender, stdout, stderr); 412 let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker) 413 .name("CargoHandle".to_owned()) 414 .spawn(move || actor.run()) 415 .expect("failed to spawn thread"); 416 Ok(CargoHandle { child, thread, receiver }) 417 } 418 cancel(mut self)419 fn cancel(mut self) { 420 let _ = self.child.0.kill(); 421 let _ = self.child.0.wait(); 422 } 423 join(mut self) -> io::Result<()>424 fn join(mut self) -> io::Result<()> { 425 let _ = self.child.0.kill(); 426 let exit_status = self.child.0.wait()?; 427 let (read_at_least_one_message, error) = self.thread.join()?; 428 if read_at_least_one_message || exit_status.success() { 429 Ok(()) 430 } else { 431 Err(io::Error::new(io::ErrorKind::Other, format!( 432 "Cargo watcher failed, the command produced no valid metadata (exit code: {exit_status:?}):\n{error}" 433 ))) 434 } 435 } 436 } 437 438 struct CargoActor { 439 sender: Sender<CargoMessage>, 440 stdout: ChildStdout, 441 stderr: ChildStderr, 442 } 443 444 impl CargoActor { new(sender: Sender<CargoMessage>, stdout: ChildStdout, stderr: ChildStderr) -> CargoActor445 fn new(sender: Sender<CargoMessage>, stdout: ChildStdout, stderr: ChildStderr) -> CargoActor { 446 CargoActor { sender, stdout, stderr } 447 } 448 run(self) -> io::Result<(bool, String)>449 fn run(self) -> io::Result<(bool, String)> { 450 // We manually read a line at a time, instead of using serde's 451 // stream deserializers, because the deserializer cannot recover 452 // from an error, resulting in it getting stuck, because we try to 453 // be resilient against failures. 454 // 455 // Because cargo only outputs one JSON object per line, we can 456 // simply skip a line if it doesn't parse, which just ignores any 457 // erroneous output. 458 459 let mut stdout_errors = String::new(); 460 let mut stderr_errors = String::new(); 461 let mut read_at_least_one_stdout_message = false; 462 let mut read_at_least_one_stderr_message = false; 463 let process_line = |line: &str, error: &mut String| { 464 // Try to deserialize a message from Cargo or Rustc. 465 let mut deserializer = serde_json::Deserializer::from_str(line); 466 deserializer.disable_recursion_limit(); 467 if let Ok(message) = JsonMessage::deserialize(&mut deserializer) { 468 match message { 469 // Skip certain kinds of messages to only spend time on what's useful 470 JsonMessage::Cargo(message) => match message { 471 cargo_metadata::Message::CompilerArtifact(artifact) if !artifact.fresh => { 472 self.sender.send(CargoMessage::CompilerArtifact(artifact)).unwrap(); 473 } 474 cargo_metadata::Message::CompilerMessage(msg) => { 475 self.sender.send(CargoMessage::Diagnostic(msg.message)).unwrap(); 476 } 477 _ => (), 478 }, 479 JsonMessage::Rustc(message) => { 480 self.sender.send(CargoMessage::Diagnostic(message)).unwrap(); 481 } 482 } 483 return true; 484 } 485 486 error.push_str(line); 487 error.push('\n'); 488 false 489 }; 490 let output = streaming_output( 491 self.stdout, 492 self.stderr, 493 &mut |line| { 494 if process_line(line, &mut stdout_errors) { 495 read_at_least_one_stdout_message = true; 496 } 497 }, 498 &mut |line| { 499 if process_line(line, &mut stderr_errors) { 500 read_at_least_one_stderr_message = true; 501 } 502 }, 503 ); 504 505 let read_at_least_one_message = 506 read_at_least_one_stdout_message || read_at_least_one_stderr_message; 507 let mut error = stdout_errors; 508 error.push_str(&stderr_errors); 509 match output { 510 Ok(_) => Ok((read_at_least_one_message, error)), 511 Err(e) => Err(io::Error::new(e.kind(), format!("{e:?}: {error}"))), 512 } 513 } 514 } 515 516 enum CargoMessage { 517 CompilerArtifact(cargo_metadata::Artifact), 518 Diagnostic(Diagnostic), 519 } 520 521 #[derive(Deserialize)] 522 #[serde(untagged)] 523 enum JsonMessage { 524 Cargo(cargo_metadata::Message), 525 Rustc(Diagnostic), 526 } 527