• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(loom, allow(dead_code, unreachable_pub, unused_imports))]
2 
3 //! Synchronization primitives for use in asynchronous contexts.
4 //!
5 //! Tokio programs tend to be organized as a set of [tasks] where each task
6 //! operates independently and may be executed on separate physical threads. The
7 //! synchronization primitives provided in this module permit these independent
8 //! tasks to communicate together.
9 //!
10 //! [tasks]: crate::task
11 //!
12 //! # Message passing
13 //!
14 //! The most common form of synchronization in a Tokio program is message
15 //! passing. Two tasks operate independently and send messages to each other to
16 //! synchronize. Doing so has the advantage of avoiding shared state.
17 //!
18 //! Message passing is implemented using channels. A channel supports sending a
19 //! message from one producer task to one or more consumer tasks. There are a
20 //! few flavors of channels provided by Tokio. Each channel flavor supports
21 //! different message passing patterns. When a channel supports multiple
22 //! producers, many separate tasks may **send** messages. When a channel
23 //! supports multiple consumers, many different separate tasks may **receive**
24 //! messages.
25 //!
26 //! Tokio provides many different channel flavors as different message passing
27 //! patterns are best handled with different implementations.
28 //!
29 //! ## `oneshot` channel
30 //!
31 //! The [`oneshot` channel][oneshot] supports sending a **single** value from a
32 //! single producer to a single consumer. This channel is usually used to send
33 //! the result of a computation to a waiter.
34 //!
35 //! **Example:** using a [`oneshot` channel][oneshot] to receive the result of a
36 //! computation.
37 //!
38 //! ```
39 //! use tokio::sync::oneshot;
40 //!
41 //! async fn some_computation() -> String {
42 //!     "represents the result of the computation".to_string()
43 //! }
44 //!
45 //! #[tokio::main]
46 //! async fn main() {
47 //!     let (tx, rx) = oneshot::channel();
48 //!
49 //!     tokio::spawn(async move {
50 //!         let res = some_computation().await;
51 //!         tx.send(res).unwrap();
52 //!     });
53 //!
54 //!     // Do other work while the computation is happening in the background
55 //!
56 //!     // Wait for the computation result
57 //!     let res = rx.await.unwrap();
58 //! }
59 //! ```
60 //!
61 //! Note, if the task produces a computation result as its final
62 //! action before terminating, the [`JoinHandle`] can be used to
63 //! receive that value instead of allocating resources for the
64 //! `oneshot` channel. Awaiting on [`JoinHandle`] returns `Result`. If
65 //! the task panics, the `Joinhandle` yields `Err` with the panic
66 //! cause.
67 //!
68 //! **Example:**
69 //!
70 //! ```
71 //! async fn some_computation() -> String {
72 //!     "the result of the computation".to_string()
73 //! }
74 //!
75 //! #[tokio::main]
76 //! async fn main() {
77 //!     let join_handle = tokio::spawn(async move {
78 //!         some_computation().await
79 //!     });
80 //!
81 //!     // Do other work while the computation is happening in the background
82 //!
83 //!     // Wait for the computation result
84 //!     let res = join_handle.await.unwrap();
85 //! }
86 //! ```
87 //!
88 //! [oneshot]: oneshot
89 //! [`JoinHandle`]: crate::task::JoinHandle
90 //!
91 //! ## `mpsc` channel
92 //!
93 //! The [`mpsc` channel][mpsc] supports sending **many** values from **many**
94 //! producers to a single consumer. This channel is often used to send work to a
95 //! task or to receive the result of many computations.
96 //!
97 //! This is also the channel you should use if you want to send many messages
98 //! from a single producer to a single consumer. There is no dedicated spsc
99 //! channel.
100 //!
101 //! **Example:** using an mpsc to incrementally stream the results of a series
102 //! of computations.
103 //!
104 //! ```
105 //! use tokio::sync::mpsc;
106 //!
107 //! async fn some_computation(input: u32) -> String {
108 //!     format!("the result of computation {}", input)
109 //! }
110 //!
111 //! #[tokio::main]
112 //! async fn main() {
113 //!     let (tx, mut rx) = mpsc::channel(100);
114 //!
115 //!     tokio::spawn(async move {
116 //!         for i in 0..10 {
117 //!             let res = some_computation(i).await;
118 //!             tx.send(res).await.unwrap();
119 //!         }
120 //!     });
121 //!
122 //!     while let Some(res) = rx.recv().await {
123 //!         println!("got = {}", res);
124 //!     }
125 //! }
126 //! ```
127 //!
128 //! The argument to `mpsc::channel` is the channel capacity. This is the maximum
129 //! number of values that can be stored in the channel pending receipt at any
130 //! given time. Properly setting this value is key in implementing robust
131 //! programs as the channel capacity plays a critical part in handling back
132 //! pressure.
133 //!
134 //! A common concurrency pattern for resource management is to spawn a task
135 //! dedicated to managing that resource and using message passing between other
136 //! tasks to interact with the resource. The resource may be anything that may
137 //! not be concurrently used. Some examples include a socket and program state.
138 //! For example, if multiple tasks need to send data over a single socket, spawn
139 //! a task to manage the socket and use a channel to synchronize.
140 //!
141 //! **Example:** sending data from many tasks over a single socket using message
142 //! passing.
143 //!
144 //! ```no_run
145 //! use tokio::io::{self, AsyncWriteExt};
146 //! use tokio::net::TcpStream;
147 //! use tokio::sync::mpsc;
148 //!
149 //! #[tokio::main]
150 //! async fn main() -> io::Result<()> {
151 //!     let mut socket = TcpStream::connect("www.example.com:1234").await?;
152 //!     let (tx, mut rx) = mpsc::channel(100);
153 //!
154 //!     for _ in 0..10 {
155 //!         // Each task needs its own `tx` handle. This is done by cloning the
156 //!         // original handle.
157 //!         let tx = tx.clone();
158 //!
159 //!         tokio::spawn(async move {
160 //!             tx.send(&b"data to write"[..]).await.unwrap();
161 //!         });
162 //!     }
163 //!
164 //!     // The `rx` half of the channel returns `None` once **all** `tx` clones
165 //!     // drop. To ensure `None` is returned, drop the handle owned by the
166 //!     // current task. If this `tx` handle is not dropped, there will always
167 //!     // be a single outstanding `tx` handle.
168 //!     drop(tx);
169 //!
170 //!     while let Some(res) = rx.recv().await {
171 //!         socket.write_all(res).await?;
172 //!     }
173 //!
174 //!     Ok(())
175 //! }
176 //! ```
177 //!
178 //! The [`mpsc`][mpsc] and [`oneshot`][oneshot] channels can be combined to
179 //! provide a request / response type synchronization pattern with a shared
180 //! resource. A task is spawned to synchronize a resource and waits on commands
181 //! received on a [`mpsc`][mpsc] channel. Each command includes a
182 //! [`oneshot`][oneshot] `Sender` on which the result of the command is sent.
183 //!
184 //! **Example:** use a task to synchronize a `u64` counter. Each task sends an
185 //! "fetch and increment" command. The counter value **before** the increment is
186 //! sent over the provided `oneshot` channel.
187 //!
188 //! ```
189 //! use tokio::sync::{oneshot, mpsc};
190 //! use Command::Increment;
191 //!
192 //! enum Command {
193 //!     Increment,
194 //!     // Other commands can be added here
195 //! }
196 //!
197 //! #[tokio::main]
198 //! async fn main() {
199 //!     let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100);
200 //!
201 //!     // Spawn a task to manage the counter
202 //!     tokio::spawn(async move {
203 //!         let mut counter: u64 = 0;
204 //!
205 //!         while let Some((cmd, response)) = cmd_rx.recv().await {
206 //!             match cmd {
207 //!                 Increment => {
208 //!                     let prev = counter;
209 //!                     counter += 1;
210 //!                     response.send(prev).unwrap();
211 //!                 }
212 //!             }
213 //!         }
214 //!     });
215 //!
216 //!     let mut join_handles = vec![];
217 //!
218 //!     // Spawn tasks that will send the increment command.
219 //!     for _ in 0..10 {
220 //!         let cmd_tx = cmd_tx.clone();
221 //!
222 //!         join_handles.push(tokio::spawn(async move {
223 //!             let (resp_tx, resp_rx) = oneshot::channel();
224 //!
225 //!             cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
226 //!             let res = resp_rx.await.unwrap();
227 //!
228 //!             println!("previous value = {}", res);
229 //!         }));
230 //!     }
231 //!
232 //!     // Wait for all tasks to complete
233 //!     for join_handle in join_handles.drain(..) {
234 //!         join_handle.await.unwrap();
235 //!     }
236 //! }
237 //! ```
238 //!
239 //! [mpsc]: mpsc
240 //!
241 //! ## `broadcast` channel
242 //!
243 //! The [`broadcast` channel] supports sending **many** values from
244 //! **many** producers to **many** consumers. Each consumer will receive
245 //! **each** value. This channel can be used to implement "fan out" style
246 //! patterns common with pub / sub or "chat" systems.
247 //!
248 //! This channel tends to be used less often than `oneshot` and `mpsc` but still
249 //! has its use cases.
250 //!
251 //! This is also the channel you should use if you want to broadcast values from
252 //! a single producer to many consumers. There is no dedicated spmc broadcast
253 //! channel.
254 //!
255 //! Basic usage
256 //!
257 //! ```
258 //! use tokio::sync::broadcast;
259 //!
260 //! #[tokio::main]
261 //! async fn main() {
262 //!     let (tx, mut rx1) = broadcast::channel(16);
263 //!     let mut rx2 = tx.subscribe();
264 //!
265 //!     tokio::spawn(async move {
266 //!         assert_eq!(rx1.recv().await.unwrap(), 10);
267 //!         assert_eq!(rx1.recv().await.unwrap(), 20);
268 //!     });
269 //!
270 //!     tokio::spawn(async move {
271 //!         assert_eq!(rx2.recv().await.unwrap(), 10);
272 //!         assert_eq!(rx2.recv().await.unwrap(), 20);
273 //!     });
274 //!
275 //!     tx.send(10).unwrap();
276 //!     tx.send(20).unwrap();
277 //! }
278 //! ```
279 //!
280 //! [`broadcast` channel]: crate::sync::broadcast
281 //!
282 //! ## `watch` channel
283 //!
284 //! The [`watch` channel] supports sending **many** values from a **single**
285 //! producer to **many** consumers. However, only the **most recent** value is
286 //! stored in the channel. Consumers are notified when a new value is sent, but
287 //! there is no guarantee that consumers will see **all** values.
288 //!
289 //! The [`watch` channel] is similar to a [`broadcast` channel] with capacity 1.
290 //!
291 //! Use cases for the [`watch` channel] include broadcasting configuration
292 //! changes or signalling program state changes, such as transitioning to
293 //! shutdown.
294 //!
295 //! **Example:** use a [`watch` channel] to notify tasks of configuration
296 //! changes. In this example, a configuration file is checked periodically. When
297 //! the file changes, the configuration changes are signalled to consumers.
298 //!
299 //! ```
300 //! use tokio::sync::watch;
301 //! use tokio::time::{self, Duration, Instant};
302 //!
303 //! use std::io;
304 //!
305 //! #[derive(Debug, Clone, Eq, PartialEq)]
306 //! struct Config {
307 //!     timeout: Duration,
308 //! }
309 //!
310 //! impl Config {
311 //!     async fn load_from_file() -> io::Result<Config> {
312 //!         // file loading and deserialization logic here
313 //! # Ok(Config { timeout: Duration::from_secs(1) })
314 //!     }
315 //! }
316 //!
317 //! async fn my_async_operation() {
318 //!     // Do something here
319 //! }
320 //!
321 //! #[tokio::main]
322 //! async fn main() {
323 //!     // Load initial configuration value
324 //!     let mut config = Config::load_from_file().await.unwrap();
325 //!
326 //!     // Create the watch channel, initialized with the loaded configuration
327 //!     let (tx, rx) = watch::channel(config.clone());
328 //!
329 //!     // Spawn a task to monitor the file.
330 //!     tokio::spawn(async move {
331 //!         loop {
332 //!             // Wait 10 seconds between checks
333 //!             time::sleep(Duration::from_secs(10)).await;
334 //!
335 //!             // Load the configuration file
336 //!             let new_config = Config::load_from_file().await.unwrap();
337 //!
338 //!             // If the configuration changed, send the new config value
339 //!             // on the watch channel.
340 //!             if new_config != config {
341 //!                 tx.send(new_config.clone()).unwrap();
342 //!                 config = new_config;
343 //!             }
344 //!         }
345 //!     });
346 //!
347 //!     let mut handles = vec![];
348 //!
349 //!     // Spawn tasks that runs the async operation for at most `timeout`. If
350 //!     // the timeout elapses, restart the operation.
351 //!     //
352 //!     // The task simultaneously watches the `Config` for changes. When the
353 //!     // timeout duration changes, the timeout is updated without restarting
354 //!     // the in-flight operation.
355 //!     for _ in 0..5 {
356 //!         // Clone a config watch handle for use in this task
357 //!         let mut rx = rx.clone();
358 //!
359 //!         let handle = tokio::spawn(async move {
360 //!             // Start the initial operation and pin the future to the stack.
361 //!             // Pinning to the stack is required to resume the operation
362 //!             // across multiple calls to `select!`
363 //!             let op = my_async_operation();
364 //!             tokio::pin!(op);
365 //!
366 //!             // Get the initial config value
367 //!             let mut conf = rx.borrow().clone();
368 //!
369 //!             let mut op_start = Instant::now();
370 //!             let sleep = time::sleep_until(op_start + conf.timeout);
371 //!             tokio::pin!(sleep);
372 //!
373 //!             loop {
374 //!                 tokio::select! {
375 //!                     _ = &mut sleep => {
376 //!                         // The operation elapsed. Restart it
377 //!                         op.set(my_async_operation());
378 //!
379 //!                         // Track the new start time
380 //!                         op_start = Instant::now();
381 //!
382 //!                         // Restart the timeout
383 //!                         sleep.set(time::sleep_until(op_start + conf.timeout));
384 //!                     }
385 //!                     _ = rx.changed() => {
386 //!                         conf = rx.borrow().clone();
387 //!
388 //!                         // The configuration has been updated. Update the
389 //!                         // `sleep` using the new `timeout` value.
390 //!                         sleep.as_mut().reset(op_start + conf.timeout);
391 //!                     }
392 //!                     _ = &mut op => {
393 //!                         // The operation completed!
394 //!                         return
395 //!                     }
396 //!                 }
397 //!             }
398 //!         });
399 //!
400 //!         handles.push(handle);
401 //!     }
402 //!
403 //!     for handle in handles.drain(..) {
404 //!         handle.await.unwrap();
405 //!     }
406 //! }
407 //! ```
408 //!
409 //! [`watch` channel]: mod@crate::sync::watch
410 //! [`broadcast` channel]: mod@crate::sync::broadcast
411 //!
412 //! # State synchronization
413 //!
414 //! The remaining synchronization primitives focus on synchronizing state.
415 //! These are asynchronous equivalents to versions provided by `std`. They
416 //! operate in a similar way as their `std` counterparts but will wait
417 //! asynchronously instead of blocking the thread.
418 //!
419 //! * [`Barrier`](Barrier) Ensures multiple tasks will wait for each other to
420 //!   reach a point in the program, before continuing execution all together.
421 //!
422 //! * [`Mutex`](Mutex) Mutual Exclusion mechanism, which ensures that at most
423 //!   one thread at a time is able to access some data.
424 //!
425 //! * [`Notify`](Notify) Basic task notification. `Notify` supports notifying a
426 //!   receiving task without sending data. In this case, the task wakes up and
427 //!   resumes processing.
428 //!
429 //! * [`RwLock`](RwLock) Provides a mutual exclusion mechanism which allows
430 //!   multiple readers at the same time, while allowing only one writer at a
431 //!   time. In some cases, this can be more efficient than a mutex.
432 //!
433 //! * [`Semaphore`](Semaphore) Limits the amount of concurrency. A semaphore
434 //!   holds a number of permits, which tasks may request in order to enter a
435 //!   critical section. Semaphores are useful for implementing limiting or
436 //!   bounding of any kind.
437 
438 cfg_sync! {
439     /// Named future types.
440     pub mod futures {
441         pub use super::notify::Notified;
442     }
443 
444     mod barrier;
445     pub use barrier::{Barrier, BarrierWaitResult};
446 
447     pub mod broadcast;
448 
449     pub mod mpsc;
450 
451     mod mutex;
452     pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard, OwnedMappedMutexGuard};
453 
454     pub(crate) mod notify;
455     pub use notify::Notify;
456 
457     pub mod oneshot;
458 
459     pub(crate) mod batch_semaphore;
460     pub use batch_semaphore::{AcquireError, TryAcquireError};
461 
462     mod semaphore;
463     pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};
464 
465     mod rwlock;
466     pub use rwlock::RwLock;
467     pub use rwlock::owned_read_guard::OwnedRwLockReadGuard;
468     pub use rwlock::owned_write_guard::OwnedRwLockWriteGuard;
469     pub use rwlock::owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
470     pub use rwlock::read_guard::RwLockReadGuard;
471     pub use rwlock::write_guard::RwLockWriteGuard;
472     pub use rwlock::write_guard_mapped::RwLockMappedWriteGuard;
473 
474     mod task;
475     pub(crate) use task::AtomicWaker;
476 
477     mod once_cell;
478     pub use self::once_cell::{OnceCell, SetError};
479 
480     pub mod watch;
481 }
482 
483 cfg_not_sync! {
484     cfg_fs! {
485         pub(crate) mod batch_semaphore;
486         mod mutex;
487         pub(crate) use mutex::Mutex;
488     }
489 
490     #[cfg(any(feature = "rt", feature = "signal", all(unix, feature = "process")))]
491     pub(crate) mod notify;
492 
493     #[cfg(any(feature = "rt", all(windows, feature = "process")))]
494     pub(crate) mod oneshot;
495 
496     cfg_atomic_waker_impl! {
497         mod task;
498         pub(crate) use task::AtomicWaker;
499     }
500 
501     #[cfg(any(feature = "signal", all(unix, feature = "process")))]
502     pub(crate) mod watch;
503 }
504 
505 /// Unit tests
506 #[cfg(test)]
507 mod tests;
508