• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use futures::future::FutureExt;
5 use tokio::sync::oneshot;
6 use tokio::task::JoinSet;
7 use tokio::time::Duration;
8 
rt() -> tokio::runtime::Runtime9 fn rt() -> tokio::runtime::Runtime {
10     tokio::runtime::Builder::new_current_thread()
11         .build()
12         .unwrap()
13 }
14 
15 #[tokio::test(start_paused = true)]
test_with_sleep()16 async fn test_with_sleep() {
17     let mut set = JoinSet::new();
18 
19     for i in 0..10 {
20         set.spawn(async move { i });
21         assert_eq!(set.len(), 1 + i);
22     }
23     set.detach_all();
24     assert_eq!(set.len(), 0);
25 
26     assert!(matches!(set.join_next().await, None));
27 
28     for i in 0..10 {
29         set.spawn(async move {
30             tokio::time::sleep(Duration::from_secs(i as u64)).await;
31             i
32         });
33         assert_eq!(set.len(), 1 + i);
34     }
35 
36     let mut seen = [false; 10];
37     while let Some(res) = set.join_next().await.transpose().unwrap() {
38         seen[res] = true;
39     }
40 
41     for was_seen in &seen {
42         assert!(was_seen);
43     }
44     assert!(matches!(set.join_next().await, None));
45 
46     // Do it again.
47     for i in 0..10 {
48         set.spawn(async move {
49             tokio::time::sleep(Duration::from_secs(i as u64)).await;
50             i
51         });
52     }
53 
54     let mut seen = [false; 10];
55     while let Some(res) = set.join_next().await.transpose().unwrap() {
56         seen[res] = true;
57     }
58 
59     for was_seen in &seen {
60         assert!(was_seen);
61     }
62     assert!(matches!(set.join_next().await, None));
63 }
64 
65 #[tokio::test]
test_abort_on_drop()66 async fn test_abort_on_drop() {
67     let mut set = JoinSet::new();
68 
69     let mut recvs = Vec::new();
70 
71     for _ in 0..16 {
72         let (send, recv) = oneshot::channel::<()>();
73         recvs.push(recv);
74 
75         set.spawn(async {
76             // This task will never complete on its own.
77             futures::future::pending::<()>().await;
78             drop(send);
79         });
80     }
81 
82     drop(set);
83 
84     for recv in recvs {
85         // The task is aborted soon and we will receive an error.
86         assert!(recv.await.is_err());
87     }
88 }
89 
90 #[tokio::test]
alternating()91 async fn alternating() {
92     let mut set = JoinSet::new();
93 
94     assert_eq!(set.len(), 0);
95     set.spawn(async {});
96     assert_eq!(set.len(), 1);
97     set.spawn(async {});
98     assert_eq!(set.len(), 2);
99 
100     for _ in 0..16 {
101         let () = set.join_next().await.unwrap().unwrap();
102         assert_eq!(set.len(), 1);
103         set.spawn(async {});
104         assert_eq!(set.len(), 2);
105     }
106 }
107 
108 #[tokio::test(start_paused = true)]
abort_tasks()109 async fn abort_tasks() {
110     let mut set = JoinSet::new();
111     let mut num_canceled = 0;
112     let mut num_completed = 0;
113     for i in 0..16 {
114         let abort = set.spawn(async move {
115             tokio::time::sleep(Duration::from_secs(i as u64)).await;
116             i
117         });
118 
119         if i % 2 != 0 {
120             // abort odd-numbered tasks.
121             abort.abort();
122         }
123     }
124     loop {
125         match set.join_next().await {
126             Some(Ok(res)) => {
127                 num_completed += 1;
128                 assert_eq!(res % 2, 0);
129             }
130             Some(Err(e)) => {
131                 assert!(e.is_cancelled());
132                 num_canceled += 1;
133             }
134             None => break,
135         }
136     }
137 
138     assert_eq!(num_canceled, 8);
139     assert_eq!(num_completed, 8);
140 }
141 
142 #[test]
runtime_gone()143 fn runtime_gone() {
144     let mut set = JoinSet::new();
145     {
146         let rt = rt();
147         set.spawn_on(async { 1 }, rt.handle());
148         drop(rt);
149     }
150 
151     assert!(rt()
152         .block_on(set.join_next())
153         .unwrap()
154         .unwrap_err()
155         .is_cancelled());
156 }
157 
158 #[tokio::test(start_paused = true)]
abort_all()159 async fn abort_all() {
160     let mut set: JoinSet<()> = JoinSet::new();
161 
162     for _ in 0..5 {
163         set.spawn(futures::future::pending());
164     }
165     for _ in 0..5 {
166         set.spawn(async {
167             tokio::time::sleep(Duration::from_secs(1)).await;
168         });
169     }
170 
171     // The join set will now have 5 pending tasks and 5 ready tasks.
172     tokio::time::sleep(Duration::from_secs(2)).await;
173 
174     set.abort_all();
175     assert_eq!(set.len(), 10);
176 
177     let mut count = 0;
178     while let Some(res) = set.join_next().await {
179         if let Err(err) = res {
180             assert!(err.is_cancelled());
181         }
182         count += 1;
183     }
184     assert_eq!(count, 10);
185     assert_eq!(set.len(), 0);
186 }
187 
188 // This ensures that `join_next` works correctly when the coop budget is
189 // exhausted.
190 #[tokio::test(flavor = "current_thread")]
join_set_coop()191 async fn join_set_coop() {
192     // Large enough to trigger coop.
193     const TASK_NUM: u32 = 1000;
194 
195     static SEM: tokio::sync::Semaphore = tokio::sync::Semaphore::const_new(0);
196 
197     let mut set = JoinSet::new();
198 
199     for _ in 0..TASK_NUM {
200         set.spawn(async {
201             SEM.add_permits(1);
202         });
203     }
204 
205     // Wait for all tasks to complete.
206     //
207     // Since this is a `current_thread` runtime, there's no race condition
208     // between the last permit being added and the task completing.
209     let _ = SEM.acquire_many(TASK_NUM).await.unwrap();
210 
211     let mut count = 0;
212     let mut coop_count = 0;
213     loop {
214         match set.join_next().now_or_never() {
215             Some(Some(Ok(()))) => {}
216             Some(Some(Err(err))) => panic!("failed: {}", err),
217             None => {
218                 coop_count += 1;
219                 tokio::task::yield_now().await;
220                 continue;
221             }
222             Some(None) => break,
223         }
224 
225         count += 1;
226     }
227     assert!(coop_count >= 1);
228     assert_eq!(count, TASK_NUM);
229 }
230