1 //! Futures compatibility for [`tracing`]. 2 //! 3 //! # Overview 4 //! 5 //! [`tracing`] is a framework for instrumenting Rust programs to collect 6 //! structured, event-based diagnostic information. This crate provides utilities 7 //! for using `tracing` to instrument asynchronous code written using futures and 8 //! async/await. 9 //! 10 //! The crate provides the following traits: 11 //! 12 //! * [`Instrument`] allows a `tracing` [span] to be attached to a future, sink, 13 //! stream, or executor. 14 //! 15 //! * [`WithSubscriber`] allows a `tracing` [`Subscriber`] to be attached to a 16 //! future, sink, stream, or executor. 17 //! 18 //! *Compiler support: [requires `rustc` 1.49+][msrv]* 19 //! 20 //! [msrv]: #supported-rust-versions 21 //! 22 //! # Feature flags 23 //! 24 //! This crate provides a number of feature flags that enable compatibility 25 //! features with other crates in the asynchronous ecosystem: 26 //! 27 //! - `tokio`: Enables compatibility with the `tokio` crate, including 28 //! [`Instrument`] and [`WithSubscriber`] implementations for 29 //! `tokio::executor::Executor`, `tokio::runtime::Runtime`, and 30 //! `tokio::runtime::current_thread`. Enabled by default. 31 //! - `tokio-executor`: Enables compatibility with the `tokio-executor` 32 //! crate, including [`Instrument`] and [`WithSubscriber`] 33 //! implementations for types implementing `tokio_executor::Executor`. 34 //! This is intended primarily for use in crates which depend on 35 //! `tokio-executor` rather than `tokio`; in general the `tokio` feature 36 //! should be used instead. 37 //! - `std-future`: Enables compatibility with `std::future::Future`. 38 //! - `futures-01`: Enables compatibility with version 0.1.x of the [`futures`] 39 //! crate. 40 //! - `futures-03`: Enables compatibility with version 0.3.x of the `futures` 41 //! crate's `Spawn` and `LocalSpawn` traits. 42 //! - `tokio-alpha`: Enables compatibility with `tokio` 0.2's alpha releases, 43 //! including the `tokio` 0.2 `Executor` and `TypedExecutor` traits. 44 //! - `std`: Depend on the Rust standard library. 45 //! 46 //! `no_std` users may disable this feature with `default-features = false`: 47 //! 48 //! ```toml 49 //! [dependencies] 50 //! tracing-futures = { version = "0.2.5", default-features = false } 51 //! ``` 52 //! 53 //! The `tokio`, `std-future` and `std` features are enabled by default. 54 //! 55 //! [span]: tracing::span! 56 //! [`Subscriber`]: tracing::subscriber 57 //! [`futures`]: https://crates.io/crates/futures 58 //! 59 //! ## Supported Rust Versions 60 //! 61 //! Tracing is built against the latest stable release. The minimum supported 62 //! version is 1.49. The current Tracing version is not guaranteed to build on 63 //! Rust versions earlier than the minimum supported version. 64 //! 65 //! Tracing follows the same compiler support policies as the rest of the Tokio 66 //! project. The current stable Rust compiler and the three most recent minor 67 //! versions before it will always be supported. For example, if the current 68 //! stable compiler version is 1.45, the minimum supported version will not be 69 //! increased past 1.42, three minor versions prior. Increasing the minimum 70 //! supported compiler version is not considered a semver breaking change as 71 //! long as doing so complies with this policy. 72 //! 73 #![doc(html_root_url = "https://docs.rs/tracing-futures/0.2.5")] 74 #![doc( 75 html_logo_url = "https://raw.githubusercontent.com/tokio-rs/tracing/master/assets/logo-type.png", 76 issue_tracker_base_url = "https://github.com/tokio-rs/tracing/issues/" 77 )] 78 #![warn( 79 missing_debug_implementations, 80 missing_docs, 81 rust_2018_idioms, 82 unreachable_pub, 83 bad_style, 84 const_err, 85 dead_code, 86 improper_ctypes, 87 non_shorthand_field_patterns, 88 no_mangle_generic_items, 89 overflowing_literals, 90 path_statements, 91 patterns_in_fns_without_body, 92 private_in_public, 93 unconditional_recursion, 94 unused, 95 unused_allocation, 96 unused_comparisons, 97 unused_parens, 98 while_true 99 )] 100 #![cfg_attr(not(feature = "std"), no_std)] 101 #![cfg_attr(docsrs, feature(doc_cfg), deny(rustdoc::broken_intra_doc_links))] 102 #[cfg(feature = "std-future")] 103 use pin_project_lite::pin_project; 104 105 pub(crate) mod stdlib; 106 107 #[cfg(feature = "std-future")] 108 use crate::stdlib::{pin::Pin, task::Context}; 109 110 #[cfg(feature = "std")] 111 use tracing::{dispatcher, Dispatch}; 112 113 use tracing::Span; 114 115 /// Implementations for `Instrument`ed future executors. 116 pub mod executor; 117 118 /// Extension trait allowing futures, streams, sinks, and executors to be 119 /// instrumented with a `tracing` [span]. 120 /// 121 /// [span]: mod@tracing::span 122 pub trait Instrument: Sized { 123 /// Instruments this type with the provided `Span`, returning an 124 /// `Instrumented` wrapper. 125 /// 126 /// If the instrumented type is a future, stream, or sink, the attached `Span` 127 /// will be [entered] every time it is polled. If the instrumented type 128 /// is a future executor, every future spawned on that executor will be 129 /// instrumented by the attached `Span`. 130 /// 131 /// # Examples 132 /// 133 /// Instrumenting a future: 134 /// 135 // TODO: ignored until async-await is stable... 136 /// ```rust,ignore 137 /// use tracing_futures::Instrument; 138 /// 139 /// # async fn doc() { 140 /// let my_future = async { 141 /// // ... 142 /// }; 143 /// 144 /// my_future 145 /// .instrument(tracing::info_span!("my_future")) 146 /// .await 147 /// # } 148 /// ``` 149 /// 150 /// [entered]: tracing::Span::enter instrument(self, span: Span) -> Instrumented<Self>151 fn instrument(self, span: Span) -> Instrumented<Self> { 152 Instrumented { inner: self, span } 153 } 154 155 /// Instruments this type with the [current] `Span`, returning an 156 /// `Instrumented` wrapper. 157 /// 158 /// If the instrumented type is a future, stream, or sink, the attached `Span` 159 /// will be [entered] every time it is polled. If the instrumented type 160 /// is a future executor, every future spawned on that executor will be 161 /// instrumented by the attached `Span`. 162 /// 163 /// This can be used to propagate the current span when spawning a new future. 164 /// 165 /// # Examples 166 /// 167 // TODO: ignored until async-await is stable... 168 /// ```rust,ignore 169 /// use tracing_futures::Instrument; 170 /// 171 /// # async fn doc() { 172 /// let span = tracing::info_span!("my_span"); 173 /// let _enter = span.enter(); 174 /// 175 /// // ... 176 /// 177 /// let future = async { 178 /// tracing::debug!("this event will occur inside `my_span`"); 179 /// // ... 180 /// }; 181 /// tokio::spawn(future.in_current_span()); 182 /// # } 183 /// ``` 184 /// 185 /// [current]: tracing::Span::current 186 /// [entered]: tracing::Span::enter 187 #[inline] in_current_span(self) -> Instrumented<Self>188 fn in_current_span(self) -> Instrumented<Self> { 189 self.instrument(Span::current()) 190 } 191 } 192 193 /// Extension trait allowing futures, streams, and sinks to be instrumented with 194 /// a `tracing` [`Subscriber`]. 195 /// 196 /// [`Subscriber`]: tracing::Subscriber 197 #[cfg(feature = "std")] 198 #[cfg_attr(docsrs, doc(cfg(feature = "std")))] 199 pub trait WithSubscriber: Sized { 200 /// Attaches the provided [`Subscriber`] to this type, returning a 201 /// `WithDispatch` wrapper. 202 /// 203 /// When the wrapped type is a future, stream, or sink, the attached 204 /// subscriber will be set as the [default] while it is being polled. 205 /// When the wrapped type is an executor, the subscriber will be set as the 206 /// default for any futures spawned on that executor. 207 /// 208 /// [`Subscriber`]: tracing::Subscriber 209 /// [default]: tracing::dispatcher#setting-the-default-subscriber with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where S: Into<Dispatch>,210 fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> 211 where 212 S: Into<Dispatch>, 213 { 214 WithDispatch { 215 inner: self, 216 dispatch: subscriber.into(), 217 } 218 } 219 220 /// Attaches the current [default] [`Subscriber`] to this type, returning a 221 /// `WithDispatch` wrapper. 222 /// 223 /// When the wrapped type is a future, stream, or sink, the attached 224 /// subscriber will be set as the [default] while it is being polled. 225 /// When the wrapped type is an executor, the subscriber will be set as the 226 /// default for any futures spawned on that executor. 227 /// 228 /// This can be used to propagate the current dispatcher context when 229 /// spawning a new future. 230 /// 231 /// [`Subscriber`]: tracing::Subscriber 232 /// [default]: tracing::dispatcher#setting-the-default-subscriber 233 #[inline] with_current_subscriber(self) -> WithDispatch<Self>234 fn with_current_subscriber(self) -> WithDispatch<Self> { 235 WithDispatch { 236 inner: self, 237 dispatch: dispatcher::get_default(|default| default.clone()), 238 } 239 } 240 } 241 242 #[cfg(feature = "std-future")] 243 pin_project! { 244 /// A future, stream, sink, or executor that has been instrumented with a `tracing` span. 245 #[derive(Debug, Clone)] 246 pub struct Instrumented<T> { 247 #[pin] 248 inner: T, 249 span: Span, 250 } 251 } 252 253 /// A future, stream, sink, or executor that has been instrumented with a `tracing` span. 254 #[cfg(not(feature = "std-future"))] 255 #[derive(Debug, Clone)] 256 pub struct Instrumented<T> { 257 inner: T, 258 span: Span, 259 } 260 261 #[cfg(all(feature = "std", feature = "std-future"))] 262 pin_project! { 263 /// A future, stream, sink, or executor that has been instrumented with a 264 /// `tracing` subscriber. 265 #[cfg_attr(docsrs, doc(cfg(feature = "std")))] 266 #[derive(Clone, Debug)] 267 pub struct WithDispatch<T> { 268 #[pin] 269 inner: T, 270 dispatch: Dispatch, 271 } 272 } 273 274 /// A future, stream, sink, or executor that has been instrumented with a 275 /// `tracing` subscriber. 276 #[cfg(all(feature = "std", not(feature = "std-future")))] 277 #[cfg_attr(docsrs, doc(cfg(feature = "std")))] 278 #[derive(Clone, Debug)] 279 pub struct WithDispatch<T> { 280 inner: T, 281 dispatch: Dispatch, 282 } 283 284 impl<T: Sized> Instrument for T {} 285 286 #[cfg(feature = "std-future")] 287 #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))] 288 impl<T: crate::stdlib::future::Future> crate::stdlib::future::Future for Instrumented<T> { 289 type Output = T::Output; 290 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output>291 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output> { 292 let this = self.project(); 293 let _enter = this.span.enter(); 294 this.inner.poll(cx) 295 } 296 } 297 298 #[cfg(feature = "futures-01")] 299 #[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))] 300 impl<T: futures_01::Future> futures_01::Future for Instrumented<T> { 301 type Item = T::Item; 302 type Error = T::Error; 303 poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error>304 fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> { 305 let _enter = self.span.enter(); 306 self.inner.poll() 307 } 308 } 309 310 #[cfg(feature = "futures-01")] 311 #[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))] 312 impl<T: futures_01::Stream> futures_01::Stream for Instrumented<T> { 313 type Item = T::Item; 314 type Error = T::Error; 315 poll(&mut self) -> futures_01::Poll<Option<Self::Item>, Self::Error>316 fn poll(&mut self) -> futures_01::Poll<Option<Self::Item>, Self::Error> { 317 let _enter = self.span.enter(); 318 self.inner.poll() 319 } 320 } 321 322 #[cfg(feature = "futures-01")] 323 #[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))] 324 impl<T: futures_01::Sink> futures_01::Sink for Instrumented<T> { 325 type SinkItem = T::SinkItem; 326 type SinkError = T::SinkError; 327 start_send( &mut self, item: Self::SinkItem, ) -> futures_01::StartSend<Self::SinkItem, Self::SinkError>328 fn start_send( 329 &mut self, 330 item: Self::SinkItem, 331 ) -> futures_01::StartSend<Self::SinkItem, Self::SinkError> { 332 let _enter = self.span.enter(); 333 self.inner.start_send(item) 334 } 335 poll_complete(&mut self) -> futures_01::Poll<(), Self::SinkError>336 fn poll_complete(&mut self) -> futures_01::Poll<(), Self::SinkError> { 337 let _enter = self.span.enter(); 338 self.inner.poll_complete() 339 } 340 } 341 342 #[cfg(all(feature = "futures-03", feature = "std-future"))] 343 #[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))] 344 impl<T: futures::Stream> futures::Stream for Instrumented<T> { 345 type Item = T::Item; 346 poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> futures::task::Poll<Option<Self::Item>>347 fn poll_next( 348 self: Pin<&mut Self>, 349 cx: &mut Context<'_>, 350 ) -> futures::task::Poll<Option<Self::Item>> { 351 let this = self.project(); 352 let _enter = this.span.enter(); 353 T::poll_next(this.inner, cx) 354 } 355 } 356 357 #[cfg(all(feature = "futures-03", feature = "std-future"))] 358 #[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))] 359 impl<I, T: futures::Sink<I>> futures::Sink<I> for Instrumented<T> 360 where 361 T: futures::Sink<I>, 362 { 363 type Error = T::Error; 364 poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> futures::task::Poll<Result<(), Self::Error>>365 fn poll_ready( 366 self: Pin<&mut Self>, 367 cx: &mut Context<'_>, 368 ) -> futures::task::Poll<Result<(), Self::Error>> { 369 let this = self.project(); 370 let _enter = this.span.enter(); 371 T::poll_ready(this.inner, cx) 372 } 373 start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error>374 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { 375 let this = self.project(); 376 let _enter = this.span.enter(); 377 T::start_send(this.inner, item) 378 } 379 poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> futures::task::Poll<Result<(), Self::Error>>380 fn poll_flush( 381 self: Pin<&mut Self>, 382 cx: &mut Context<'_>, 383 ) -> futures::task::Poll<Result<(), Self::Error>> { 384 let this = self.project(); 385 let _enter = this.span.enter(); 386 T::poll_flush(this.inner, cx) 387 } 388 poll_close( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> futures::task::Poll<Result<(), Self::Error>>389 fn poll_close( 390 self: Pin<&mut Self>, 391 cx: &mut Context<'_>, 392 ) -> futures::task::Poll<Result<(), Self::Error>> { 393 let this = self.project(); 394 let _enter = this.span.enter(); 395 T::poll_close(this.inner, cx) 396 } 397 } 398 399 impl<T> Instrumented<T> { 400 /// Borrows the `Span` that this type is instrumented by. span(&self) -> &Span401 pub fn span(&self) -> &Span { 402 &self.span 403 } 404 405 /// Mutably borrows the `Span` that this type is instrumented by. span_mut(&mut self) -> &mut Span406 pub fn span_mut(&mut self) -> &mut Span { 407 &mut self.span 408 } 409 410 /// Borrows the wrapped type. inner(&self) -> &T411 pub fn inner(&self) -> &T { 412 &self.inner 413 } 414 415 /// Mutably borrows the wrapped type. inner_mut(&mut self) -> &mut T416 pub fn inner_mut(&mut self) -> &mut T { 417 &mut self.inner 418 } 419 420 /// Get a pinned reference to the wrapped type. 421 #[cfg(feature = "std-future")] 422 #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))] inner_pin_ref(self: Pin<&Self>) -> Pin<&T>423 pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> { 424 self.project_ref().inner 425 } 426 427 /// Get a pinned mutable reference to the wrapped type. 428 #[cfg(feature = "std-future")] 429 #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))] inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>430 pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { 431 self.project().inner 432 } 433 434 /// Consumes the `Instrumented`, returning the wrapped type. 435 /// 436 /// Note that this drops the span. into_inner(self) -> T437 pub fn into_inner(self) -> T { 438 self.inner 439 } 440 } 441 442 #[cfg(feature = "std")] 443 impl<T: Sized> WithSubscriber for T {} 444 445 #[cfg(all(feature = "futures-01", feature = "std"))] 446 #[cfg_attr(docsrs, doc(cfg(all(feature = "futures-01", feature = "std"))))] 447 impl<T: futures_01::Future> futures_01::Future for WithDispatch<T> { 448 type Item = T::Item; 449 type Error = T::Error; 450 poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error>451 fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> { 452 let inner = &mut self.inner; 453 dispatcher::with_default(&self.dispatch, || inner.poll()) 454 } 455 } 456 457 #[cfg(all(feature = "std-future", feature = "std"))] 458 #[cfg_attr(docsrs, doc(cfg(all(feature = "std-future", feature = "std"))))] 459 impl<T: crate::stdlib::future::Future> crate::stdlib::future::Future for WithDispatch<T> { 460 type Output = T::Output; 461 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output>462 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output> { 463 let this = self.project(); 464 let dispatch = this.dispatch; 465 let future = this.inner; 466 dispatcher::with_default(dispatch, || future.poll(cx)) 467 } 468 } 469 470 #[cfg(feature = "std")] 471 impl<T> WithDispatch<T> { 472 /// Wrap a future, stream, sink or executor with the same subscriber as this WithDispatch. with_dispatch<U>(&self, inner: U) -> WithDispatch<U>473 pub fn with_dispatch<U>(&self, inner: U) -> WithDispatch<U> { 474 WithDispatch { 475 dispatch: self.dispatch.clone(), 476 inner, 477 } 478 } 479 480 /// Borrows the `Dispatch` that this type is instrumented by. dispatch(&self) -> &Dispatch481 pub fn dispatch(&self) -> &Dispatch { 482 &self.dispatch 483 } 484 485 /// Get a pinned reference to the wrapped type. 486 #[cfg(feature = "std-future")] 487 #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))] inner_pin_ref(self: Pin<&Self>) -> Pin<&T>488 pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> { 489 self.project_ref().inner 490 } 491 492 /// Get a pinned mutable reference to the wrapped type. 493 #[cfg(feature = "std-future")] 494 #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))] inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>495 pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { 496 self.project().inner 497 } 498 499 /// Borrows the wrapped type. inner(&self) -> &T500 pub fn inner(&self) -> &T { 501 &self.inner 502 } 503 504 /// Mutably borrows the wrapped type. inner_mut(&mut self) -> &mut T505 pub fn inner_mut(&mut self) -> &mut T { 506 &mut self.inner 507 } 508 509 /// Consumes the `WithDispatch`, returning the wrapped type. into_inner(self) -> T510 pub fn into_inner(self) -> T { 511 self.inner 512 } 513 } 514 515 #[cfg(test)] 516 mod tests { 517 use super::*; 518 use tracing_mock::*; 519 520 #[cfg(feature = "futures-01")] 521 mod futures_01_tests { 522 use futures_01::{future, stream, task, Async, Future, Stream}; 523 use tracing::subscriber::with_default; 524 525 use super::*; 526 527 struct PollN<T, E> { 528 and_return: Option<Result<T, E>>, 529 finish_at: usize, 530 polls: usize, 531 } 532 533 impl PollN<(), ()> { new_ok(finish_at: usize) -> Self534 fn new_ok(finish_at: usize) -> Self { 535 Self { 536 and_return: Some(Ok(())), 537 finish_at, 538 polls: 0, 539 } 540 } 541 new_err(finish_at: usize) -> Self542 fn new_err(finish_at: usize) -> Self { 543 Self { 544 and_return: Some(Err(())), 545 finish_at, 546 polls: 0, 547 } 548 } 549 } 550 551 impl<T, E> futures_01::Future for PollN<T, E> { 552 type Item = T; 553 type Error = E; poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error>554 fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> { 555 self.polls += 1; 556 if self.polls == self.finish_at { 557 self.and_return 558 .take() 559 .expect("polled after ready") 560 .map(Async::Ready) 561 } else { 562 task::current().notify(); 563 Ok(Async::NotReady) 564 } 565 } 566 } 567 568 #[test] future_enter_exit_is_reasonable()569 fn future_enter_exit_is_reasonable() { 570 let (subscriber, handle) = subscriber::mock() 571 .enter(span::mock().named("foo")) 572 .exit(span::mock().named("foo")) 573 .enter(span::mock().named("foo")) 574 .exit(span::mock().named("foo")) 575 .drop_span(span::mock().named("foo")) 576 .done() 577 .run_with_handle(); 578 with_default(subscriber, || { 579 PollN::new_ok(2) 580 .instrument(tracing::trace_span!("foo")) 581 .wait() 582 .unwrap(); 583 }); 584 handle.assert_finished(); 585 } 586 587 #[test] future_error_ends_span()588 fn future_error_ends_span() { 589 let (subscriber, handle) = subscriber::mock() 590 .enter(span::mock().named("foo")) 591 .exit(span::mock().named("foo")) 592 .enter(span::mock().named("foo")) 593 .exit(span::mock().named("foo")) 594 .drop_span(span::mock().named("foo")) 595 .done() 596 .run_with_handle(); 597 with_default(subscriber, || { 598 PollN::new_err(2) 599 .instrument(tracing::trace_span!("foo")) 600 .wait() 601 .unwrap_err(); 602 }); 603 604 handle.assert_finished(); 605 } 606 607 #[test] stream_enter_exit_is_reasonable()608 fn stream_enter_exit_is_reasonable() { 609 let (subscriber, handle) = subscriber::mock() 610 .enter(span::mock().named("foo")) 611 .exit(span::mock().named("foo")) 612 .enter(span::mock().named("foo")) 613 .exit(span::mock().named("foo")) 614 .enter(span::mock().named("foo")) 615 .exit(span::mock().named("foo")) 616 .enter(span::mock().named("foo")) 617 .exit(span::mock().named("foo")) 618 .drop_span(span::mock().named("foo")) 619 .run_with_handle(); 620 with_default(subscriber, || { 621 stream::iter_ok::<_, ()>(&[1, 2, 3]) 622 .instrument(tracing::trace_span!("foo")) 623 .for_each(|_| future::ok(())) 624 .wait() 625 .unwrap(); 626 }); 627 handle.assert_finished(); 628 } 629 630 // #[test] 631 // fn span_follows_future_onto_threadpool() { 632 // let (subscriber, handle) = subscriber::mock() 633 // .enter(span::mock().named("a")) 634 // .enter(span::mock().named("b")) 635 // .exit(span::mock().named("b")) 636 // .enter(span::mock().named("b")) 637 // .exit(span::mock().named("b")) 638 // .drop_span(span::mock().named("b")) 639 // .exit(span::mock().named("a")) 640 // .drop_span(span::mock().named("a")) 641 // .done() 642 // .run_with_handle(); 643 // let mut runtime = tokio::runtime::Runtime::new().unwrap(); 644 // with_default(subscriber, || { 645 // tracing::trace_span!("a").in_scope(|| { 646 // let future = PollN::new_ok(2) 647 // .instrument(tracing::trace_span!("b")) 648 // .map(|_| { 649 // tracing::trace_span!("c").in_scope(|| { 650 // // "c" happens _outside_ of the instrumented future's 651 // // span, so we don't expect it. 652 // }) 653 // }); 654 // runtime.block_on(Box::new(future)).unwrap(); 655 // }) 656 // }); 657 // handle.assert_finished(); 658 // } 659 } 660 661 #[cfg(all(feature = "futures-03", feature = "std-future"))] 662 mod futures_03_tests { 663 use futures::{future, sink, stream, FutureExt, SinkExt, StreamExt}; 664 use tracing::subscriber::with_default; 665 666 use super::*; 667 668 #[test] stream_enter_exit_is_reasonable()669 fn stream_enter_exit_is_reasonable() { 670 let (subscriber, handle) = subscriber::mock() 671 .enter(span::mock().named("foo")) 672 .exit(span::mock().named("foo")) 673 .enter(span::mock().named("foo")) 674 .exit(span::mock().named("foo")) 675 .enter(span::mock().named("foo")) 676 .exit(span::mock().named("foo")) 677 .enter(span::mock().named("foo")) 678 .exit(span::mock().named("foo")) 679 .drop_span(span::mock().named("foo")) 680 .run_with_handle(); 681 with_default(subscriber, || { 682 Instrument::instrument(stream::iter(&[1, 2, 3]), tracing::trace_span!("foo")) 683 .for_each(|_| future::ready(())) 684 .now_or_never() 685 .unwrap(); 686 }); 687 handle.assert_finished(); 688 } 689 690 #[test] sink_enter_exit_is_reasonable()691 fn sink_enter_exit_is_reasonable() { 692 let (subscriber, handle) = subscriber::mock() 693 .enter(span::mock().named("foo")) 694 .exit(span::mock().named("foo")) 695 .enter(span::mock().named("foo")) 696 .exit(span::mock().named("foo")) 697 .enter(span::mock().named("foo")) 698 .exit(span::mock().named("foo")) 699 .drop_span(span::mock().named("foo")) 700 .run_with_handle(); 701 with_default(subscriber, || { 702 Instrument::instrument(sink::drain(), tracing::trace_span!("foo")) 703 .send(1u8) 704 .now_or_never() 705 .unwrap() 706 .unwrap() 707 }); 708 handle.assert_finished(); 709 } 710 } 711 } 712