• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Futures compatibility for [`tracing`].
2 //!
3 //! # Overview
4 //!
5 //! [`tracing`] is a framework for instrumenting Rust programs to collect
6 //! structured, event-based diagnostic information. This crate provides utilities
7 //! for using `tracing` to instrument asynchronous code written using futures and
8 //! async/await.
9 //!
10 //! The crate provides the following traits:
11 //!
12 //! * [`Instrument`] allows a `tracing` [span] to be attached to a future, sink,
13 //!   stream, or executor.
14 //!
15 //! * [`WithSubscriber`] allows a `tracing` [`Subscriber`] to be attached to a
16 //!   future, sink, stream, or executor.
17 //!
18 //! *Compiler support: [requires `rustc` 1.49+][msrv]*
19 //!
20 //! [msrv]: #supported-rust-versions
21 //!
22 //! # Feature flags
23 //!
24 //! This crate provides a number of feature flags that enable compatibility
25 //! features with other crates in the asynchronous ecosystem:
26 //!
27 //! - `tokio`: Enables compatibility with the `tokio` crate, including
28 //!    [`Instrument`] and [`WithSubscriber`] implementations for
29 //!    `tokio::executor::Executor`, `tokio::runtime::Runtime`, and
30 //!    `tokio::runtime::current_thread`. Enabled by default.
31 //! - `tokio-executor`: Enables compatibility with the `tokio-executor`
32 //!    crate, including [`Instrument`] and [`WithSubscriber`]
33 //!    implementations for types implementing `tokio_executor::Executor`.
34 //!    This is intended primarily for use in crates which depend on
35 //!    `tokio-executor` rather than `tokio`; in general the `tokio` feature
36 //!    should be used instead.
37 //! - `std-future`: Enables compatibility with `std::future::Future`.
38 //! - `futures-01`: Enables compatibility with version 0.1.x of the [`futures`]
39 //!   crate.
40 //! - `futures-03`: Enables compatibility with version 0.3.x of the `futures`
41 //!   crate's `Spawn` and `LocalSpawn` traits.
42 //! - `tokio-alpha`: Enables compatibility with `tokio` 0.2's alpha releases,
43 //!   including the `tokio` 0.2 `Executor` and `TypedExecutor` traits.
44 //! - `std`: Depend on the Rust standard library.
45 //!
46 //!   `no_std` users may disable this feature with `default-features = false`:
47 //!
48 //!   ```toml
49 //!   [dependencies]
50 //!   tracing-futures = { version = "0.2.5", default-features = false }
51 //!   ```
52 //!
53 //! The `tokio`, `std-future` and `std` features are enabled by default.
54 //!
55 //! [span]: tracing::span!
56 //! [`Subscriber`]: tracing::subscriber
57 //! [`futures`]: https://crates.io/crates/futures
58 //!
59 //! ## Supported Rust Versions
60 //!
61 //! Tracing is built against the latest stable release. The minimum supported
62 //! version is 1.49. The current Tracing version is not guaranteed to build on
63 //! Rust versions earlier than the minimum supported version.
64 //!
65 //! Tracing follows the same compiler support policies as the rest of the Tokio
66 //! project. The current stable Rust compiler and the three most recent minor
67 //! versions before it will always be supported. For example, if the current
68 //! stable compiler version is 1.45, the minimum supported version will not be
69 //! increased past 1.42, three minor versions prior. Increasing the minimum
70 //! supported compiler version is not considered a semver breaking change as
71 //! long as doing so complies with this policy.
72 //!
73 #![doc(html_root_url = "https://docs.rs/tracing-futures/0.2.5")]
74 #![doc(
75     html_logo_url = "https://raw.githubusercontent.com/tokio-rs/tracing/master/assets/logo-type.png",
76     issue_tracker_base_url = "https://github.com/tokio-rs/tracing/issues/"
77 )]
78 #![warn(
79     missing_debug_implementations,
80     missing_docs,
81     rust_2018_idioms,
82     unreachable_pub,
83     bad_style,
84     const_err,
85     dead_code,
86     improper_ctypes,
87     non_shorthand_field_patterns,
88     no_mangle_generic_items,
89     overflowing_literals,
90     path_statements,
91     patterns_in_fns_without_body,
92     private_in_public,
93     unconditional_recursion,
94     unused,
95     unused_allocation,
96     unused_comparisons,
97     unused_parens,
98     while_true
99 )]
100 #![cfg_attr(not(feature = "std"), no_std)]
101 #![cfg_attr(docsrs, feature(doc_cfg), deny(rustdoc::broken_intra_doc_links))]
102 #[cfg(feature = "std-future")]
103 use pin_project_lite::pin_project;
104 
105 pub(crate) mod stdlib;
106 
107 #[cfg(feature = "std-future")]
108 use crate::stdlib::{pin::Pin, task::Context};
109 
110 #[cfg(feature = "std")]
111 use tracing::{dispatcher, Dispatch};
112 
113 use tracing::Span;
114 
115 /// Implementations for `Instrument`ed future executors.
116 pub mod executor;
117 
118 /// Extension trait allowing futures, streams, sinks, and executors to be
119 /// instrumented with a `tracing` [span].
120 ///
121 /// [span]: mod@tracing::span
122 pub trait Instrument: Sized {
123     /// Instruments this type with the provided `Span`, returning an
124     /// `Instrumented` wrapper.
125     ///
126     /// If the instrumented type is a future, stream, or sink, the attached `Span`
127     /// will be [entered] every time it is polled. If the instrumented type
128     /// is a future executor, every future spawned on that executor will be
129     /// instrumented by the attached `Span`.
130     ///
131     /// # Examples
132     ///
133     /// Instrumenting a future:
134     ///
135     // TODO: ignored until async-await is stable...
136     /// ```rust,ignore
137     /// use tracing_futures::Instrument;
138     ///
139     /// # async fn doc() {
140     /// let my_future = async {
141     ///     // ...
142     /// };
143     ///
144     /// my_future
145     ///     .instrument(tracing::info_span!("my_future"))
146     ///     .await
147     /// # }
148     /// ```
149     ///
150     /// [entered]: tracing::Span::enter
instrument(self, span: Span) -> Instrumented<Self>151     fn instrument(self, span: Span) -> Instrumented<Self> {
152         Instrumented { inner: self, span }
153     }
154 
155     /// Instruments this type with the [current] `Span`, returning an
156     /// `Instrumented` wrapper.
157     ///
158     /// If the instrumented type is a future, stream, or sink, the attached `Span`
159     /// will be [entered] every time it is polled. If the instrumented type
160     /// is a future executor, every future spawned on that executor will be
161     /// instrumented by the attached `Span`.
162     ///
163     /// This can be used to propagate the current span when spawning a new future.
164     ///
165     /// # Examples
166     ///
167     // TODO: ignored until async-await is stable...
168     /// ```rust,ignore
169     /// use tracing_futures::Instrument;
170     ///
171     /// # async fn doc() {
172     /// let span = tracing::info_span!("my_span");
173     /// let _enter = span.enter();
174     ///
175     /// // ...
176     ///
177     /// let future = async {
178     ///     tracing::debug!("this event will occur inside `my_span`");
179     ///     // ...
180     /// };
181     /// tokio::spawn(future.in_current_span());
182     /// # }
183     /// ```
184     ///
185     /// [current]: tracing::Span::current
186     /// [entered]: tracing::Span::enter
187     #[inline]
in_current_span(self) -> Instrumented<Self>188     fn in_current_span(self) -> Instrumented<Self> {
189         self.instrument(Span::current())
190     }
191 }
192 
193 /// Extension trait allowing futures, streams, and sinks to be instrumented with
194 /// a `tracing` [`Subscriber`].
195 ///
196 /// [`Subscriber`]: tracing::Subscriber
197 #[cfg(feature = "std")]
198 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
199 pub trait WithSubscriber: Sized {
200     /// Attaches the provided [`Subscriber`] to this type, returning a
201     /// `WithDispatch` wrapper.
202     ///
203     /// When the wrapped type is a future, stream, or sink, the attached
204     /// subscriber will be set as the [default] while it is being polled.
205     /// When the wrapped type is an executor, the subscriber will be set as the
206     /// default for any futures spawned on that executor.
207     ///
208     /// [`Subscriber`]: tracing::Subscriber
209     /// [default]: tracing::dispatcher#setting-the-default-subscriber
with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where S: Into<Dispatch>,210     fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
211     where
212         S: Into<Dispatch>,
213     {
214         WithDispatch {
215             inner: self,
216             dispatch: subscriber.into(),
217         }
218     }
219 
220     /// Attaches the current [default] [`Subscriber`] to this type, returning a
221     /// `WithDispatch` wrapper.
222     ///
223     /// When the wrapped type is a future, stream, or sink, the attached
224     /// subscriber will be set as the [default] while it is being polled.
225     /// When the wrapped type is an executor, the subscriber will be set as the
226     /// default for any futures spawned on that executor.
227     ///
228     /// This can be used to propagate the current dispatcher context when
229     /// spawning a new future.
230     ///
231     /// [`Subscriber`]: tracing::Subscriber
232     /// [default]: tracing::dispatcher#setting-the-default-subscriber
233     #[inline]
with_current_subscriber(self) -> WithDispatch<Self>234     fn with_current_subscriber(self) -> WithDispatch<Self> {
235         WithDispatch {
236             inner: self,
237             dispatch: dispatcher::get_default(|default| default.clone()),
238         }
239     }
240 }
241 
242 #[cfg(feature = "std-future")]
243 pin_project! {
244     /// A future, stream, sink, or executor that has been instrumented with a `tracing` span.
245     #[derive(Debug, Clone)]
246     pub struct Instrumented<T> {
247         #[pin]
248         inner: T,
249         span: Span,
250     }
251 }
252 
253 /// A future, stream, sink, or executor that has been instrumented with a `tracing` span.
254 #[cfg(not(feature = "std-future"))]
255 #[derive(Debug, Clone)]
256 pub struct Instrumented<T> {
257     inner: T,
258     span: Span,
259 }
260 
261 #[cfg(all(feature = "std", feature = "std-future"))]
262 pin_project! {
263     /// A future, stream, sink, or executor that has been instrumented with a
264     /// `tracing` subscriber.
265     #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
266     #[derive(Clone, Debug)]
267     pub struct WithDispatch<T> {
268         #[pin]
269         inner: T,
270         dispatch: Dispatch,
271     }
272 }
273 
274 /// A future, stream, sink, or executor that has been instrumented with a
275 /// `tracing` subscriber.
276 #[cfg(all(feature = "std", not(feature = "std-future")))]
277 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
278 #[derive(Clone, Debug)]
279 pub struct WithDispatch<T> {
280     inner: T,
281     dispatch: Dispatch,
282 }
283 
284 impl<T: Sized> Instrument for T {}
285 
286 #[cfg(feature = "std-future")]
287 #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
288 impl<T: crate::stdlib::future::Future> crate::stdlib::future::Future for Instrumented<T> {
289     type Output = T::Output;
290 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output>291     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output> {
292         let this = self.project();
293         let _enter = this.span.enter();
294         this.inner.poll(cx)
295     }
296 }
297 
298 #[cfg(feature = "futures-01")]
299 #[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
300 impl<T: futures_01::Future> futures_01::Future for Instrumented<T> {
301     type Item = T::Item;
302     type Error = T::Error;
303 
poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error>304     fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
305         let _enter = self.span.enter();
306         self.inner.poll()
307     }
308 }
309 
310 #[cfg(feature = "futures-01")]
311 #[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
312 impl<T: futures_01::Stream> futures_01::Stream for Instrumented<T> {
313     type Item = T::Item;
314     type Error = T::Error;
315 
poll(&mut self) -> futures_01::Poll<Option<Self::Item>, Self::Error>316     fn poll(&mut self) -> futures_01::Poll<Option<Self::Item>, Self::Error> {
317         let _enter = self.span.enter();
318         self.inner.poll()
319     }
320 }
321 
322 #[cfg(feature = "futures-01")]
323 #[cfg_attr(docsrs, doc(cfg(feature = "futures-01")))]
324 impl<T: futures_01::Sink> futures_01::Sink for Instrumented<T> {
325     type SinkItem = T::SinkItem;
326     type SinkError = T::SinkError;
327 
start_send( &mut self, item: Self::SinkItem, ) -> futures_01::StartSend<Self::SinkItem, Self::SinkError>328     fn start_send(
329         &mut self,
330         item: Self::SinkItem,
331     ) -> futures_01::StartSend<Self::SinkItem, Self::SinkError> {
332         let _enter = self.span.enter();
333         self.inner.start_send(item)
334     }
335 
poll_complete(&mut self) -> futures_01::Poll<(), Self::SinkError>336     fn poll_complete(&mut self) -> futures_01::Poll<(), Self::SinkError> {
337         let _enter = self.span.enter();
338         self.inner.poll_complete()
339     }
340 }
341 
342 #[cfg(all(feature = "futures-03", feature = "std-future"))]
343 #[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))]
344 impl<T: futures::Stream> futures::Stream for Instrumented<T> {
345     type Item = T::Item;
346 
poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> futures::task::Poll<Option<Self::Item>>347     fn poll_next(
348         self: Pin<&mut Self>,
349         cx: &mut Context<'_>,
350     ) -> futures::task::Poll<Option<Self::Item>> {
351         let this = self.project();
352         let _enter = this.span.enter();
353         T::poll_next(this.inner, cx)
354     }
355 }
356 
357 #[cfg(all(feature = "futures-03", feature = "std-future"))]
358 #[cfg_attr(docsrs, doc(cfg(all(feature = "futures-03", feature = "std-future"))))]
359 impl<I, T: futures::Sink<I>> futures::Sink<I> for Instrumented<T>
360 where
361     T: futures::Sink<I>,
362 {
363     type Error = T::Error;
364 
poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> futures::task::Poll<Result<(), Self::Error>>365     fn poll_ready(
366         self: Pin<&mut Self>,
367         cx: &mut Context<'_>,
368     ) -> futures::task::Poll<Result<(), Self::Error>> {
369         let this = self.project();
370         let _enter = this.span.enter();
371         T::poll_ready(this.inner, cx)
372     }
373 
start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error>374     fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
375         let this = self.project();
376         let _enter = this.span.enter();
377         T::start_send(this.inner, item)
378     }
379 
poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> futures::task::Poll<Result<(), Self::Error>>380     fn poll_flush(
381         self: Pin<&mut Self>,
382         cx: &mut Context<'_>,
383     ) -> futures::task::Poll<Result<(), Self::Error>> {
384         let this = self.project();
385         let _enter = this.span.enter();
386         T::poll_flush(this.inner, cx)
387     }
388 
poll_close( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> futures::task::Poll<Result<(), Self::Error>>389     fn poll_close(
390         self: Pin<&mut Self>,
391         cx: &mut Context<'_>,
392     ) -> futures::task::Poll<Result<(), Self::Error>> {
393         let this = self.project();
394         let _enter = this.span.enter();
395         T::poll_close(this.inner, cx)
396     }
397 }
398 
399 impl<T> Instrumented<T> {
400     /// Borrows the `Span` that this type is instrumented by.
span(&self) -> &Span401     pub fn span(&self) -> &Span {
402         &self.span
403     }
404 
405     /// Mutably borrows the `Span` that this type is instrumented by.
span_mut(&mut self) -> &mut Span406     pub fn span_mut(&mut self) -> &mut Span {
407         &mut self.span
408     }
409 
410     /// Borrows the wrapped type.
inner(&self) -> &T411     pub fn inner(&self) -> &T {
412         &self.inner
413     }
414 
415     /// Mutably borrows the wrapped type.
inner_mut(&mut self) -> &mut T416     pub fn inner_mut(&mut self) -> &mut T {
417         &mut self.inner
418     }
419 
420     /// Get a pinned reference to the wrapped type.
421     #[cfg(feature = "std-future")]
422     #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
inner_pin_ref(self: Pin<&Self>) -> Pin<&T>423     pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> {
424         self.project_ref().inner
425     }
426 
427     /// Get a pinned mutable reference to the wrapped type.
428     #[cfg(feature = "std-future")]
429     #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>430     pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
431         self.project().inner
432     }
433 
434     /// Consumes the `Instrumented`, returning the wrapped type.
435     ///
436     /// Note that this drops the span.
into_inner(self) -> T437     pub fn into_inner(self) -> T {
438         self.inner
439     }
440 }
441 
442 #[cfg(feature = "std")]
443 impl<T: Sized> WithSubscriber for T {}
444 
445 #[cfg(all(feature = "futures-01", feature = "std"))]
446 #[cfg_attr(docsrs, doc(cfg(all(feature = "futures-01", feature = "std"))))]
447 impl<T: futures_01::Future> futures_01::Future for WithDispatch<T> {
448     type Item = T::Item;
449     type Error = T::Error;
450 
poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error>451     fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
452         let inner = &mut self.inner;
453         dispatcher::with_default(&self.dispatch, || inner.poll())
454     }
455 }
456 
457 #[cfg(all(feature = "std-future", feature = "std"))]
458 #[cfg_attr(docsrs, doc(cfg(all(feature = "std-future", feature = "std"))))]
459 impl<T: crate::stdlib::future::Future> crate::stdlib::future::Future for WithDispatch<T> {
460     type Output = T::Output;
461 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output>462     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> crate::stdlib::task::Poll<Self::Output> {
463         let this = self.project();
464         let dispatch = this.dispatch;
465         let future = this.inner;
466         dispatcher::with_default(dispatch, || future.poll(cx))
467     }
468 }
469 
470 #[cfg(feature = "std")]
471 impl<T> WithDispatch<T> {
472     /// Wrap a future, stream, sink or executor with the same subscriber as this WithDispatch.
with_dispatch<U>(&self, inner: U) -> WithDispatch<U>473     pub fn with_dispatch<U>(&self, inner: U) -> WithDispatch<U> {
474         WithDispatch {
475             dispatch: self.dispatch.clone(),
476             inner,
477         }
478     }
479 
480     /// Borrows the `Dispatch` that this type is instrumented by.
dispatch(&self) -> &Dispatch481     pub fn dispatch(&self) -> &Dispatch {
482         &self.dispatch
483     }
484 
485     /// Get a pinned reference to the wrapped type.
486     #[cfg(feature = "std-future")]
487     #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
inner_pin_ref(self: Pin<&Self>) -> Pin<&T>488     pub fn inner_pin_ref(self: Pin<&Self>) -> Pin<&T> {
489         self.project_ref().inner
490     }
491 
492     /// Get a pinned mutable reference to the wrapped type.
493     #[cfg(feature = "std-future")]
494     #[cfg_attr(docsrs, doc(cfg(feature = "std-future")))]
inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T>495     pub fn inner_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
496         self.project().inner
497     }
498 
499     /// Borrows the wrapped type.
inner(&self) -> &T500     pub fn inner(&self) -> &T {
501         &self.inner
502     }
503 
504     /// Mutably borrows the wrapped type.
inner_mut(&mut self) -> &mut T505     pub fn inner_mut(&mut self) -> &mut T {
506         &mut self.inner
507     }
508 
509     /// Consumes the `WithDispatch`, returning the wrapped type.
into_inner(self) -> T510     pub fn into_inner(self) -> T {
511         self.inner
512     }
513 }
514 
515 #[cfg(test)]
516 mod tests {
517     use super::*;
518     use tracing_mock::*;
519 
520     #[cfg(feature = "futures-01")]
521     mod futures_01_tests {
522         use futures_01::{future, stream, task, Async, Future, Stream};
523         use tracing::subscriber::with_default;
524 
525         use super::*;
526 
527         struct PollN<T, E> {
528             and_return: Option<Result<T, E>>,
529             finish_at: usize,
530             polls: usize,
531         }
532 
533         impl PollN<(), ()> {
new_ok(finish_at: usize) -> Self534             fn new_ok(finish_at: usize) -> Self {
535                 Self {
536                     and_return: Some(Ok(())),
537                     finish_at,
538                     polls: 0,
539                 }
540             }
541 
new_err(finish_at: usize) -> Self542             fn new_err(finish_at: usize) -> Self {
543                 Self {
544                     and_return: Some(Err(())),
545                     finish_at,
546                     polls: 0,
547                 }
548             }
549         }
550 
551         impl<T, E> futures_01::Future for PollN<T, E> {
552             type Item = T;
553             type Error = E;
poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error>554             fn poll(&mut self) -> futures_01::Poll<Self::Item, Self::Error> {
555                 self.polls += 1;
556                 if self.polls == self.finish_at {
557                     self.and_return
558                         .take()
559                         .expect("polled after ready")
560                         .map(Async::Ready)
561                 } else {
562                     task::current().notify();
563                     Ok(Async::NotReady)
564                 }
565             }
566         }
567 
568         #[test]
future_enter_exit_is_reasonable()569         fn future_enter_exit_is_reasonable() {
570             let (subscriber, handle) = subscriber::mock()
571                 .enter(span::mock().named("foo"))
572                 .exit(span::mock().named("foo"))
573                 .enter(span::mock().named("foo"))
574                 .exit(span::mock().named("foo"))
575                 .drop_span(span::mock().named("foo"))
576                 .done()
577                 .run_with_handle();
578             with_default(subscriber, || {
579                 PollN::new_ok(2)
580                     .instrument(tracing::trace_span!("foo"))
581                     .wait()
582                     .unwrap();
583             });
584             handle.assert_finished();
585         }
586 
587         #[test]
future_error_ends_span()588         fn future_error_ends_span() {
589             let (subscriber, handle) = subscriber::mock()
590                 .enter(span::mock().named("foo"))
591                 .exit(span::mock().named("foo"))
592                 .enter(span::mock().named("foo"))
593                 .exit(span::mock().named("foo"))
594                 .drop_span(span::mock().named("foo"))
595                 .done()
596                 .run_with_handle();
597             with_default(subscriber, || {
598                 PollN::new_err(2)
599                     .instrument(tracing::trace_span!("foo"))
600                     .wait()
601                     .unwrap_err();
602             });
603 
604             handle.assert_finished();
605         }
606 
607         #[test]
stream_enter_exit_is_reasonable()608         fn stream_enter_exit_is_reasonable() {
609             let (subscriber, handle) = subscriber::mock()
610                 .enter(span::mock().named("foo"))
611                 .exit(span::mock().named("foo"))
612                 .enter(span::mock().named("foo"))
613                 .exit(span::mock().named("foo"))
614                 .enter(span::mock().named("foo"))
615                 .exit(span::mock().named("foo"))
616                 .enter(span::mock().named("foo"))
617                 .exit(span::mock().named("foo"))
618                 .drop_span(span::mock().named("foo"))
619                 .run_with_handle();
620             with_default(subscriber, || {
621                 stream::iter_ok::<_, ()>(&[1, 2, 3])
622                     .instrument(tracing::trace_span!("foo"))
623                     .for_each(|_| future::ok(()))
624                     .wait()
625                     .unwrap();
626             });
627             handle.assert_finished();
628         }
629 
630         // #[test]
631         // fn span_follows_future_onto_threadpool() {
632         //     let (subscriber, handle) = subscriber::mock()
633         //         .enter(span::mock().named("a"))
634         //         .enter(span::mock().named("b"))
635         //         .exit(span::mock().named("b"))
636         //         .enter(span::mock().named("b"))
637         //         .exit(span::mock().named("b"))
638         //         .drop_span(span::mock().named("b"))
639         //         .exit(span::mock().named("a"))
640         //         .drop_span(span::mock().named("a"))
641         //         .done()
642         //         .run_with_handle();
643         //     let mut runtime = tokio::runtime::Runtime::new().unwrap();
644         //     with_default(subscriber, || {
645         //         tracing::trace_span!("a").in_scope(|| {
646         //             let future = PollN::new_ok(2)
647         //                 .instrument(tracing::trace_span!("b"))
648         //                 .map(|_| {
649         //                     tracing::trace_span!("c").in_scope(|| {
650         //                         // "c" happens _outside_ of the instrumented future's
651         //                         // span, so we don't expect it.
652         //                     })
653         //                 });
654         //             runtime.block_on(Box::new(future)).unwrap();
655         //         })
656         //     });
657         //     handle.assert_finished();
658         // }
659     }
660 
661     #[cfg(all(feature = "futures-03", feature = "std-future"))]
662     mod futures_03_tests {
663         use futures::{future, sink, stream, FutureExt, SinkExt, StreamExt};
664         use tracing::subscriber::with_default;
665 
666         use super::*;
667 
668         #[test]
stream_enter_exit_is_reasonable()669         fn stream_enter_exit_is_reasonable() {
670             let (subscriber, handle) = subscriber::mock()
671                 .enter(span::mock().named("foo"))
672                 .exit(span::mock().named("foo"))
673                 .enter(span::mock().named("foo"))
674                 .exit(span::mock().named("foo"))
675                 .enter(span::mock().named("foo"))
676                 .exit(span::mock().named("foo"))
677                 .enter(span::mock().named("foo"))
678                 .exit(span::mock().named("foo"))
679                 .drop_span(span::mock().named("foo"))
680                 .run_with_handle();
681             with_default(subscriber, || {
682                 Instrument::instrument(stream::iter(&[1, 2, 3]), tracing::trace_span!("foo"))
683                     .for_each(|_| future::ready(()))
684                     .now_or_never()
685                     .unwrap();
686             });
687             handle.assert_finished();
688         }
689 
690         #[test]
sink_enter_exit_is_reasonable()691         fn sink_enter_exit_is_reasonable() {
692             let (subscriber, handle) = subscriber::mock()
693                 .enter(span::mock().named("foo"))
694                 .exit(span::mock().named("foo"))
695                 .enter(span::mock().named("foo"))
696                 .exit(span::mock().named("foo"))
697                 .enter(span::mock().named("foo"))
698                 .exit(span::mock().named("foo"))
699                 .drop_span(span::mock().named("foo"))
700                 .run_with_handle();
701             with_default(subscriber, || {
702                 Instrument::instrument(sink::drain(), tracing::trace_span!("foo"))
703                     .send(1u8)
704                     .now_or_never()
705                     .unwrap()
706                     .unwrap()
707             });
708             handle.assert_finished();
709         }
710     }
711 }
712