• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![allow(clippy::declare_interior_mutable_const)]
3 #![cfg(all(feature = "full", tokio_unstable))]
4 
5 #[cfg(not(target_os = "wasi"))]
6 use std::error::Error;
7 use std::future::Future;
8 use std::pin::Pin;
9 use std::task::{Context, Poll};
10 #[cfg(not(target_os = "wasi"))]
11 use tokio::runtime::{Builder, Runtime};
12 use tokio::sync::oneshot;
13 use tokio::task::{self, Id, LocalSet};
14 
15 #[cfg(not(target_os = "wasi"))]
16 mod support {
17     pub mod panic;
18 }
19 #[cfg(not(target_os = "wasi"))]
20 use support::panic::test_panic;
21 
22 #[tokio::test(flavor = "current_thread")]
task_id_spawn()23 async fn task_id_spawn() {
24     tokio::spawn(async { println!("task id: {}", task::id()) })
25         .await
26         .unwrap();
27 }
28 
29 #[cfg(not(target_os = "wasi"))]
30 #[tokio::test(flavor = "current_thread")]
task_id_spawn_blocking()31 async fn task_id_spawn_blocking() {
32     task::spawn_blocking(|| println!("task id: {}", task::id()))
33         .await
34         .unwrap();
35 }
36 
37 #[tokio::test(flavor = "current_thread")]
task_id_collision_current_thread()38 async fn task_id_collision_current_thread() {
39     let handle1 = tokio::spawn(async { task::id() });
40     let handle2 = tokio::spawn(async { task::id() });
41 
42     let (id1, id2) = tokio::join!(handle1, handle2);
43     assert_ne!(id1.unwrap(), id2.unwrap());
44 }
45 
46 #[cfg(not(target_os = "wasi"))]
47 #[tokio::test(flavor = "multi_thread")]
task_id_collision_multi_thread()48 async fn task_id_collision_multi_thread() {
49     let handle1 = tokio::spawn(async { task::id() });
50     let handle2 = tokio::spawn(async { task::id() });
51 
52     let (id1, id2) = tokio::join!(handle1, handle2);
53     assert_ne!(id1.unwrap(), id2.unwrap());
54 }
55 
56 #[tokio::test(flavor = "current_thread")]
task_ids_match_current_thread()57 async fn task_ids_match_current_thread() {
58     let (tx, rx) = oneshot::channel();
59     let handle = tokio::spawn(async {
60         let id = rx.await.unwrap();
61         assert_eq!(id, task::id());
62     });
63     tx.send(handle.id()).unwrap();
64     handle.await.unwrap();
65 }
66 
67 #[cfg(not(target_os = "wasi"))]
68 #[tokio::test(flavor = "multi_thread")]
task_ids_match_multi_thread()69 async fn task_ids_match_multi_thread() {
70     let (tx, rx) = oneshot::channel();
71     let handle = tokio::spawn(async {
72         let id = rx.await.unwrap();
73         assert_eq!(id, task::id());
74     });
75     tx.send(handle.id()).unwrap();
76     handle.await.unwrap();
77 }
78 
79 #[cfg(not(target_os = "wasi"))]
80 #[tokio::test(flavor = "multi_thread")]
task_id_future_destructor_completion()81 async fn task_id_future_destructor_completion() {
82     struct MyFuture {
83         tx: Option<oneshot::Sender<Id>>,
84     }
85 
86     impl Future for MyFuture {
87         type Output = ();
88 
89         fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
90             Poll::Ready(())
91         }
92     }
93 
94     impl Drop for MyFuture {
95         fn drop(&mut self) {
96             let _ = self.tx.take().unwrap().send(task::id());
97         }
98     }
99 
100     let (tx, rx) = oneshot::channel();
101     let handle = tokio::spawn(MyFuture { tx: Some(tx) });
102     let id = handle.id();
103     handle.await.unwrap();
104     assert_eq!(rx.await.unwrap(), id);
105 }
106 
107 #[cfg(not(target_os = "wasi"))]
108 #[tokio::test(flavor = "multi_thread")]
task_id_future_destructor_abort()109 async fn task_id_future_destructor_abort() {
110     struct MyFuture {
111         tx: Option<oneshot::Sender<Id>>,
112     }
113 
114     impl Future for MyFuture {
115         type Output = ();
116 
117         fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
118             Poll::Pending
119         }
120     }
121     impl Drop for MyFuture {
122         fn drop(&mut self) {
123             let _ = self.tx.take().unwrap().send(task::id());
124         }
125     }
126 
127     let (tx, rx) = oneshot::channel();
128     let handle = tokio::spawn(MyFuture { tx: Some(tx) });
129     let id = handle.id();
130     handle.abort();
131     assert!(handle.await.unwrap_err().is_cancelled());
132     assert_eq!(rx.await.unwrap(), id);
133 }
134 
135 #[tokio::test(flavor = "current_thread")]
task_id_output_destructor_handle_dropped_before_completion()136 async fn task_id_output_destructor_handle_dropped_before_completion() {
137     struct MyOutput {
138         tx: Option<oneshot::Sender<Id>>,
139     }
140 
141     impl Drop for MyOutput {
142         fn drop(&mut self) {
143             let _ = self.tx.take().unwrap().send(task::id());
144         }
145     }
146 
147     struct MyFuture {
148         tx: Option<oneshot::Sender<Id>>,
149     }
150 
151     impl Future for MyFuture {
152         type Output = MyOutput;
153 
154         fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
155             Poll::Ready(MyOutput { tx: self.tx.take() })
156         }
157     }
158 
159     let (tx, mut rx) = oneshot::channel();
160     let handle = tokio::spawn(MyFuture { tx: Some(tx) });
161     let id = handle.id();
162     drop(handle);
163     assert!(rx.try_recv().is_err());
164     assert_eq!(rx.await.unwrap(), id);
165 }
166 
167 #[tokio::test(flavor = "current_thread")]
task_id_output_destructor_handle_dropped_after_completion()168 async fn task_id_output_destructor_handle_dropped_after_completion() {
169     struct MyOutput {
170         tx: Option<oneshot::Sender<Id>>,
171     }
172 
173     impl Drop for MyOutput {
174         fn drop(&mut self) {
175             let _ = self.tx.take().unwrap().send(task::id());
176         }
177     }
178 
179     struct MyFuture {
180         tx_output: Option<oneshot::Sender<Id>>,
181         tx_future: Option<oneshot::Sender<()>>,
182     }
183 
184     impl Future for MyFuture {
185         type Output = MyOutput;
186 
187         fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
188             let _ = self.tx_future.take().unwrap().send(());
189             Poll::Ready(MyOutput {
190                 tx: self.tx_output.take(),
191             })
192         }
193     }
194 
195     let (tx_output, mut rx_output) = oneshot::channel();
196     let (tx_future, rx_future) = oneshot::channel();
197     let handle = tokio::spawn(MyFuture {
198         tx_output: Some(tx_output),
199         tx_future: Some(tx_future),
200     });
201     let id = handle.id();
202     rx_future.await.unwrap();
203     assert!(rx_output.try_recv().is_err());
204     drop(handle);
205     assert_eq!(rx_output.await.unwrap(), id);
206 }
207 
208 #[test]
task_try_id_outside_task()209 fn task_try_id_outside_task() {
210     assert_eq!(None, task::try_id());
211 }
212 
213 #[cfg(not(target_os = "wasi"))]
214 #[test]
task_try_id_inside_block_on()215 fn task_try_id_inside_block_on() {
216     let rt = Runtime::new().unwrap();
217     rt.block_on(async {
218         assert_eq!(None, task::try_id());
219     });
220 }
221 
222 #[tokio::test(flavor = "current_thread")]
task_id_spawn_local()223 async fn task_id_spawn_local() {
224     LocalSet::new()
225         .run_until(async {
226             task::spawn_local(async { println!("task id: {}", task::id()) })
227                 .await
228                 .unwrap();
229         })
230         .await
231 }
232 
233 #[tokio::test(flavor = "current_thread")]
task_id_nested_spawn_local()234 async fn task_id_nested_spawn_local() {
235     LocalSet::new()
236         .run_until(async {
237             task::spawn_local(async {
238                 let parent_id = task::id();
239                 LocalSet::new()
240                     .run_until(async {
241                         task::spawn_local(async move {
242                             assert_ne!(parent_id, task::id());
243                         })
244                         .await
245                         .unwrap();
246                     })
247                     .await;
248                 assert_eq!(parent_id, task::id());
249             })
250             .await
251             .unwrap();
252         })
253         .await;
254 }
255 
256 #[cfg(not(target_os = "wasi"))]
257 #[tokio::test(flavor = "multi_thread")]
task_id_block_in_place_block_on_spawn()258 async fn task_id_block_in_place_block_on_spawn() {
259     task::spawn(async {
260         let parent_id = task::id();
261 
262         task::block_in_place(move || {
263             let rt = Builder::new_current_thread().build().unwrap();
264             rt.block_on(rt.spawn(async move {
265                 assert_ne!(parent_id, task::id());
266             }))
267             .unwrap();
268         });
269 
270         assert_eq!(parent_id, task::id());
271     })
272     .await
273     .unwrap();
274 }
275 
276 #[cfg(not(target_os = "wasi"))]
277 #[test]
task_id_outside_task_panic_caller() -> Result<(), Box<dyn Error>>278 fn task_id_outside_task_panic_caller() -> Result<(), Box<dyn Error>> {
279     let panic_location_file = test_panic(|| {
280         let _ = task::id();
281     });
282 
283     // The panic location should be in this file
284     assert_eq!(&panic_location_file.unwrap(), file!());
285 
286     Ok(())
287 }
288 
289 #[cfg(not(target_os = "wasi"))]
290 #[test]
task_id_inside_block_on_panic_caller() -> Result<(), Box<dyn Error>>291 fn task_id_inside_block_on_panic_caller() -> Result<(), Box<dyn Error>> {
292     let panic_location_file = test_panic(|| {
293         let rt = Runtime::new().unwrap();
294         rt.block_on(async {
295             task::id();
296         });
297     });
298 
299     // The panic location should be in this file
300     assert_eq!(&panic_location_file.unwrap(), file!());
301 
302     Ok(())
303 }
304