• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::time::driver::{Handle, TimerEntry};
2 use crate::time::{error::Error, Duration, Instant};
3 use crate::util::trace;
4 
5 use pin_project_lite::pin_project;
6 use std::future::Future;
7 use std::panic::Location;
8 use std::pin::Pin;
9 use std::task::{self, Poll};
10 
11 cfg_trace! {
12     use crate::time::driver::ClockTime;
13 }
14 
15 /// Waits until `deadline` is reached.
16 ///
17 /// No work is performed while awaiting on the sleep future to complete. `Sleep`
18 /// operates at millisecond granularity and should not be used for tasks that
19 /// require high-resolution timers.
20 ///
21 /// To run something regularly on a schedule, see [`interval`].
22 ///
23 /// # Cancellation
24 ///
25 /// Canceling a sleep instance is done by dropping the returned future. No additional
26 /// cleanup work is required.
27 ///
28 /// # Examples
29 ///
30 /// Wait 100ms and print "100 ms have elapsed".
31 ///
32 /// ```
33 /// use tokio::time::{sleep_until, Instant, Duration};
34 ///
35 /// #[tokio::main]
36 /// async fn main() {
37 ///     sleep_until(Instant::now() + Duration::from_millis(100)).await;
38 ///     println!("100 ms have elapsed");
39 /// }
40 /// ```
41 ///
42 /// See the documentation for the [`Sleep`] type for more examples.
43 ///
44 /// [`Sleep`]: struct@crate::time::Sleep
45 /// [`interval`]: crate::time::interval()
46 // Alias for old name in 0.x
47 #[cfg_attr(docsrs, doc(alias = "delay_until"))]
48 #[cfg_attr(tokio_track_caller, track_caller)]
sleep_until(deadline: Instant) -> Sleep49 pub fn sleep_until(deadline: Instant) -> Sleep {
50     return Sleep::new_timeout(deadline, trace::caller_location());
51 }
52 
53 /// Waits until `duration` has elapsed.
54 ///
55 /// Equivalent to `sleep_until(Instant::now() + duration)`. An asynchronous
56 /// analog to `std::thread::sleep`.
57 ///
58 /// No work is performed while awaiting on the sleep future to complete. `Sleep`
59 /// operates at millisecond granularity and should not be used for tasks that
60 /// require high-resolution timers.
61 ///
62 /// To run something regularly on a schedule, see [`interval`].
63 ///
64 /// The maximum duration for a sleep is 68719476734 milliseconds (approximately 2.2 years).
65 ///
66 /// # Cancellation
67 ///
68 /// Canceling a sleep instance is done by dropping the returned future. No additional
69 /// cleanup work is required.
70 ///
71 /// # Examples
72 ///
73 /// Wait 100ms and print "100 ms have elapsed".
74 ///
75 /// ```
76 /// use tokio::time::{sleep, Duration};
77 ///
78 /// #[tokio::main]
79 /// async fn main() {
80 ///     sleep(Duration::from_millis(100)).await;
81 ///     println!("100 ms have elapsed");
82 /// }
83 /// ```
84 ///
85 /// See the documentation for the [`Sleep`] type for more examples.
86 ///
87 /// [`Sleep`]: struct@crate::time::Sleep
88 /// [`interval`]: crate::time::interval()
89 // Alias for old name in 0.x
90 #[cfg_attr(docsrs, doc(alias = "delay_for"))]
91 #[cfg_attr(docsrs, doc(alias = "wait"))]
92 #[cfg_attr(tokio_track_caller, track_caller)]
sleep(duration: Duration) -> Sleep93 pub fn sleep(duration: Duration) -> Sleep {
94     let location = trace::caller_location();
95 
96     match Instant::now().checked_add(duration) {
97         Some(deadline) => Sleep::new_timeout(deadline, location),
98         None => Sleep::new_timeout(Instant::far_future(), location),
99     }
100 }
101 
102 pin_project! {
103     /// Future returned by [`sleep`](sleep) and [`sleep_until`](sleep_until).
104     ///
105     /// This type does not implement the `Unpin` trait, which means that if you
106     /// use it with [`select!`] or by calling `poll`, you have to pin it first.
107     /// If you use it with `.await`, this does not apply.
108     ///
109     /// # Examples
110     ///
111     /// Wait 100ms and print "100 ms have elapsed".
112     ///
113     /// ```
114     /// use tokio::time::{sleep, Duration};
115     ///
116     /// #[tokio::main]
117     /// async fn main() {
118     ///     sleep(Duration::from_millis(100)).await;
119     ///     println!("100 ms have elapsed");
120     /// }
121     /// ```
122     ///
123     /// Use with [`select!`]. Pinning the `Sleep` with [`tokio::pin!`] is
124     /// necessary when the same `Sleep` is selected on multiple times.
125     /// ```no_run
126     /// use tokio::time::{self, Duration, Instant};
127     ///
128     /// #[tokio::main]
129     /// async fn main() {
130     ///     let sleep = time::sleep(Duration::from_millis(10));
131     ///     tokio::pin!(sleep);
132     ///
133     ///     loop {
134     ///         tokio::select! {
135     ///             () = &mut sleep => {
136     ///                 println!("timer elapsed");
137     ///                 sleep.as_mut().reset(Instant::now() + Duration::from_millis(50));
138     ///             },
139     ///         }
140     ///     }
141     /// }
142     /// ```
143     /// Use in a struct with boxing. By pinning the `Sleep` with a `Box`, the
144     /// `HasSleep` struct implements `Unpin`, even though `Sleep` does not.
145     /// ```
146     /// use std::future::Future;
147     /// use std::pin::Pin;
148     /// use std::task::{Context, Poll};
149     /// use tokio::time::Sleep;
150     ///
151     /// struct HasSleep {
152     ///     sleep: Pin<Box<Sleep>>,
153     /// }
154     ///
155     /// impl Future for HasSleep {
156     ///     type Output = ();
157     ///
158     ///     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
159     ///         self.sleep.as_mut().poll(cx)
160     ///     }
161     /// }
162     /// ```
163     /// Use in a struct with pin projection. This method avoids the `Box`, but
164     /// the `HasSleep` struct will not be `Unpin` as a consequence.
165     /// ```
166     /// use std::future::Future;
167     /// use std::pin::Pin;
168     /// use std::task::{Context, Poll};
169     /// use tokio::time::Sleep;
170     /// use pin_project_lite::pin_project;
171     ///
172     /// pin_project! {
173     ///     struct HasSleep {
174     ///         #[pin]
175     ///         sleep: Sleep,
176     ///     }
177     /// }
178     ///
179     /// impl Future for HasSleep {
180     ///     type Output = ();
181     ///
182     ///     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
183     ///         self.project().sleep.poll(cx)
184     ///     }
185     /// }
186     /// ```
187     ///
188     /// [`select!`]: ../macro.select.html
189     /// [`tokio::pin!`]: ../macro.pin.html
190     // Alias for old name in 0.2
191     #[cfg_attr(docsrs, doc(alias = "Delay"))]
192     #[derive(Debug)]
193     #[must_use = "futures do nothing unless you `.await` or poll them"]
194     pub struct Sleep {
195         inner: Inner,
196 
197         // The link between the `Sleep` instance and the timer that drives it.
198         #[pin]
199         entry: TimerEntry,
200     }
201 }
202 
203 cfg_trace! {
204     #[derive(Debug)]
205     struct Inner {
206         deadline: Instant,
207         resource_span: tracing::Span,
208         async_op_span: tracing::Span,
209         time_source: ClockTime,
210     }
211 }
212 
213 cfg_not_trace! {
214     #[derive(Debug)]
215     struct Inner {
216         deadline: Instant,
217     }
218 }
219 
220 impl Sleep {
221     #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
new_timeout( deadline: Instant, location: Option<&'static Location<'static>>, ) -> Sleep222     pub(crate) fn new_timeout(
223         deadline: Instant,
224         location: Option<&'static Location<'static>>,
225     ) -> Sleep {
226         let handle = Handle::current();
227         let entry = TimerEntry::new(&handle, deadline);
228 
229         #[cfg(all(tokio_unstable, feature = "tracing"))]
230         let inner = {
231             let time_source = handle.time_source().clone();
232             let deadline_tick = time_source.deadline_to_tick(deadline);
233             let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0);
234 
235             #[cfg(tokio_track_caller)]
236             let location = location.expect("should have location if tracking caller");
237 
238             #[cfg(tokio_track_caller)]
239             let resource_span = tracing::trace_span!(
240                 "runtime.resource",
241                 concrete_type = "Sleep",
242                 kind = "timer",
243                 loc.file = location.file(),
244                 loc.line = location.line(),
245                 loc.col = location.column(),
246             );
247 
248             #[cfg(not(tokio_track_caller))]
249             let resource_span =
250                 tracing::trace_span!("runtime.resource", concrete_type = "Sleep", kind = "timer");
251 
252             let async_op_span =
253                 tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout");
254 
255             tracing::trace!(
256                 target: "runtime::resource::state_update",
257                 parent: resource_span.id(),
258                 duration = duration,
259                 duration.unit = "ms",
260                 duration.op = "override",
261             );
262 
263             Inner {
264                 deadline,
265                 resource_span,
266                 async_op_span,
267                 time_source,
268             }
269         };
270 
271         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
272         let inner = Inner { deadline };
273 
274         Sleep { inner, entry }
275     }
276 
far_future(location: Option<&'static Location<'static>>) -> Sleep277     pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {
278         Self::new_timeout(Instant::far_future(), location)
279     }
280 
281     /// Returns the instant at which the future will complete.
deadline(&self) -> Instant282     pub fn deadline(&self) -> Instant {
283         self.inner.deadline
284     }
285 
286     /// Returns `true` if `Sleep` has elapsed.
287     ///
288     /// A `Sleep` instance is elapsed when the requested duration has elapsed.
is_elapsed(&self) -> bool289     pub fn is_elapsed(&self) -> bool {
290         self.entry.is_elapsed()
291     }
292 
293     /// Resets the `Sleep` instance to a new deadline.
294     ///
295     /// Calling this function allows changing the instant at which the `Sleep`
296     /// future completes without having to create new associated state.
297     ///
298     /// This function can be called both before and after the future has
299     /// completed.
300     ///
301     /// To call this method, you will usually combine the call with
302     /// [`Pin::as_mut`], which lets you call the method without consuming the
303     /// `Sleep` itself.
304     ///
305     /// # Example
306     ///
307     /// ```
308     /// use tokio::time::{Duration, Instant};
309     ///
310     /// # #[tokio::main(flavor = "current_thread")]
311     /// # async fn main() {
312     /// let sleep = tokio::time::sleep(Duration::from_millis(10));
313     /// tokio::pin!(sleep);
314     ///
315     /// sleep.as_mut().reset(Instant::now() + Duration::from_millis(20));
316     /// # }
317     /// ```
318     ///
319     /// See also the top-level examples.
320     ///
321     /// [`Pin::as_mut`]: fn@std::pin::Pin::as_mut
reset(self: Pin<&mut Self>, deadline: Instant)322     pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
323         self.reset_inner(deadline)
324     }
325 
reset_inner(self: Pin<&mut Self>, deadline: Instant)326     fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
327         let me = self.project();
328         me.entry.reset(deadline);
329         (*me.inner).deadline = deadline;
330 
331         #[cfg(all(tokio_unstable, feature = "tracing"))]
332         {
333             me.inner.async_op_span =
334                 tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset");
335 
336             tracing::trace!(
337                 target: "runtime::resource::state_update",
338                 parent: me.inner.resource_span.id(),
339                 duration = {
340                     let now = me.inner.time_source.now();
341                     let deadline_tick = me.inner.time_source.deadline_to_tick(deadline);
342                     deadline_tick.checked_sub(now).unwrap_or(0)
343                 },
344                 duration.unit = "ms",
345                 duration.op = "override",
346             );
347         }
348     }
349 
350     cfg_not_trace! {
351         fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
352             let me = self.project();
353 
354             // Keep track of task budget
355             let coop = ready!(crate::coop::poll_proceed(cx));
356 
357             me.entry.poll_elapsed(cx).map(move |r| {
358                 coop.made_progress();
359                 r
360             })
361         }
362     }
363 
364     cfg_trace! {
365         fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
366             let me = self.project();
367             // Keep track of task budget
368             let coop = ready!(trace_poll_op!(
369                 "poll_elapsed",
370                 crate::coop::poll_proceed(cx),
371                 me.inner.resource_span.id(),
372             ));
373 
374             let result =  me.entry.poll_elapsed(cx).map(move |r| {
375                 coop.made_progress();
376                 r
377             });
378 
379             trace_poll_op!("poll_elapsed", result, me.inner.resource_span.id())
380         }
381     }
382 }
383 
384 impl Future for Sleep {
385     type Output = ();
386 
387     // `poll_elapsed` can return an error in two cases:
388     //
389     // - AtCapacity: this is a pathological case where far too many
390     //   sleep instances have been scheduled.
391     // - Shutdown: No timer has been setup, which is a mis-use error.
392     //
393     // Both cases are extremely rare, and pretty accurately fit into
394     // "logic errors", so we just panic in this case. A user couldn't
395     // really do much better if we passed the error onwards.
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>396     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
397         #[cfg(all(tokio_unstable, feature = "tracing"))]
398         let _span = self.inner.async_op_span.clone().entered();
399 
400         match ready!(self.as_mut().poll_elapsed(cx)) {
401             Ok(()) => Poll::Ready(()),
402             Err(e) => panic!("timer error: {}", e),
403         }
404     }
405 }
406