• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Interface to the select mechanism.
2 
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_utils::Backoff;
9 
10 use crate::channel::{self, Receiver, Sender};
11 use crate::context::Context;
12 use crate::err::{ReadyTimeoutError, TryReadyError};
13 use crate::err::{RecvError, SendError};
14 use crate::err::{SelectTimeoutError, TrySelectError};
15 use crate::flavors;
16 use crate::utils;
17 
18 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19 /// `read` or `write`.
20 ///
21 /// Each field contains data associated with a specific channel flavor.
22 // This is a private API that is used by the select macro.
23 #[derive(Debug, Default)]
24 pub struct Token {
25     pub(crate) at: flavors::at::AtToken,
26     pub(crate) array: flavors::array::ArrayToken,
27     pub(crate) list: flavors::list::ListToken,
28     #[allow(dead_code)]
29     pub(crate) never: flavors::never::NeverToken,
30     pub(crate) tick: flavors::tick::TickToken,
31     pub(crate) zero: flavors::zero::ZeroToken,
32 }
33 
34 /// Identifier associated with an operation by a specific thread on a specific channel.
35 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
36 pub struct Operation(usize);
37 
38 impl Operation {
39     /// Creates an operation identifier from a mutable reference.
40     ///
41     /// This function essentially just turns the address of the reference into a number. The
42     /// reference should point to a variable that is specific to the thread and the operation,
43     /// and is alive for the entire duration of select or blocking operation.
44     #[inline]
hook<T>(r: &mut T) -> Operation45     pub fn hook<T>(r: &mut T) -> Operation {
46         let val = r as *mut T as usize;
47         // Make sure that the pointer address doesn't equal the numerical representation of
48         // `Selected::{Waiting, Aborted, Disconnected}`.
49         assert!(val > 2);
50         Operation(val)
51     }
52 }
53 
54 /// Current state of a select or a blocking operation.
55 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
56 pub enum Selected {
57     /// Still waiting for an operation.
58     Waiting,
59 
60     /// The attempt to block the current thread has been aborted.
61     Aborted,
62 
63     /// An operation became ready because a channel is disconnected.
64     Disconnected,
65 
66     /// An operation became ready because a message can be sent or received.
67     Operation(Operation),
68 }
69 
70 impl From<usize> for Selected {
71     #[inline]
from(val: usize) -> Selected72     fn from(val: usize) -> Selected {
73         match val {
74             0 => Selected::Waiting,
75             1 => Selected::Aborted,
76             2 => Selected::Disconnected,
77             oper => Selected::Operation(Operation(oper)),
78         }
79     }
80 }
81 
82 impl Into<usize> for Selected {
83     #[inline]
into(self) -> usize84     fn into(self) -> usize {
85         match self {
86             Selected::Waiting => 0,
87             Selected::Aborted => 1,
88             Selected::Disconnected => 2,
89             Selected::Operation(Operation(val)) => val,
90         }
91     }
92 }
93 
94 /// A receiver or a sender that can participate in select.
95 ///
96 /// This is a handle that assists select in executing an operation, registration, deciding on the
97 /// appropriate deadline for blocking, etc.
98 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
99 pub trait SelectHandle {
100     /// Attempts to select an operation and returns `true` on success.
try_select(&self, token: &mut Token) -> bool101     fn try_select(&self, token: &mut Token) -> bool;
102 
103     /// Returns a deadline for an operation, if there is one.
deadline(&self) -> Option<Instant>104     fn deadline(&self) -> Option<Instant>;
105 
106     /// Registers an operation for execution and returns `true` if it is now ready.
register(&self, oper: Operation, cx: &Context) -> bool107     fn register(&self, oper: Operation, cx: &Context) -> bool;
108 
109     /// Unregisters an operation for execution.
unregister(&self, oper: Operation)110     fn unregister(&self, oper: Operation);
111 
112     /// Attempts to select an operation the thread got woken up for and returns `true` on success.
accept(&self, token: &mut Token, cx: &Context) -> bool113     fn accept(&self, token: &mut Token, cx: &Context) -> bool;
114 
115     /// Returns `true` if an operation can be executed without blocking.
is_ready(&self) -> bool116     fn is_ready(&self) -> bool;
117 
118     /// Registers an operation for readiness notification and returns `true` if it is now ready.
watch(&self, oper: Operation, cx: &Context) -> bool119     fn watch(&self, oper: Operation, cx: &Context) -> bool;
120 
121     /// Unregisters an operation for readiness notification.
unwatch(&self, oper: Operation)122     fn unwatch(&self, oper: Operation);
123 }
124 
125 impl<T: SelectHandle> SelectHandle for &T {
try_select(&self, token: &mut Token) -> bool126     fn try_select(&self, token: &mut Token) -> bool {
127         (**self).try_select(token)
128     }
129 
deadline(&self) -> Option<Instant>130     fn deadline(&self) -> Option<Instant> {
131         (**self).deadline()
132     }
133 
register(&self, oper: Operation, cx: &Context) -> bool134     fn register(&self, oper: Operation, cx: &Context) -> bool {
135         (**self).register(oper, cx)
136     }
137 
unregister(&self, oper: Operation)138     fn unregister(&self, oper: Operation) {
139         (**self).unregister(oper);
140     }
141 
accept(&self, token: &mut Token, cx: &Context) -> bool142     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
143         (**self).accept(token, cx)
144     }
145 
is_ready(&self) -> bool146     fn is_ready(&self) -> bool {
147         (**self).is_ready()
148     }
149 
watch(&self, oper: Operation, cx: &Context) -> bool150     fn watch(&self, oper: Operation, cx: &Context) -> bool {
151         (**self).watch(oper, cx)
152     }
153 
unwatch(&self, oper: Operation)154     fn unwatch(&self, oper: Operation) {
155         (**self).unwatch(oper)
156     }
157 }
158 
159 /// Determines when a select operation should time out.
160 #[derive(Clone, Copy, Eq, PartialEq)]
161 enum Timeout {
162     /// No blocking.
163     Now,
164 
165     /// Block forever.
166     Never,
167 
168     /// Time out after the time instant.
169     At(Instant),
170 }
171 
172 /// Runs until one of the operations is selected, potentially blocking the current thread.
173 ///
174 /// Successful receive operations will have to be followed up by `channel::read()` and successful
175 /// send operations by `channel::write()`.
run_select( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<(Token, usize, *const u8)>176 fn run_select(
177     handles: &mut [(&dyn SelectHandle, usize, *const u8)],
178     timeout: Timeout,
179 ) -> Option<(Token, usize, *const u8)> {
180     if handles.is_empty() {
181         // Wait until the timeout and return.
182         match timeout {
183             Timeout::Now => return None,
184             Timeout::Never => {
185                 utils::sleep_until(None);
186                 unreachable!();
187             }
188             Timeout::At(when) => {
189                 utils::sleep_until(Some(when));
190                 return None;
191             }
192         }
193     }
194 
195     // Shuffle the operations for fairness.
196     utils::shuffle(handles);
197 
198     // Create a token, which serves as a temporary variable that gets initialized in this function
199     // and is later used by a call to `channel::read()` or `channel::write()` that completes the
200     // selected operation.
201     let mut token = Token::default();
202 
203     // Try selecting one of the operations without blocking.
204     for &(handle, i, ptr) in handles.iter() {
205         if handle.try_select(&mut token) {
206             return Some((token, i, ptr));
207         }
208     }
209 
210     loop {
211         // Prepare for blocking.
212         let res = Context::with(|cx| {
213             let mut sel = Selected::Waiting;
214             let mut registered_count = 0;
215             let mut index_ready = None;
216 
217             if let Timeout::Now = timeout {
218                 cx.try_select(Selected::Aborted).unwrap();
219             }
220 
221             // Register all operations.
222             for (handle, i, _) in handles.iter_mut() {
223                 registered_count += 1;
224 
225                 // If registration returns `false`, that means the operation has just become ready.
226                 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
227                     // Try aborting select.
228                     sel = match cx.try_select(Selected::Aborted) {
229                         Ok(()) => {
230                             index_ready = Some(*i);
231                             Selected::Aborted
232                         }
233                         Err(s) => s,
234                     };
235                     break;
236                 }
237 
238                 // If another thread has already selected one of the operations, stop registration.
239                 sel = cx.selected();
240                 if sel != Selected::Waiting {
241                     break;
242                 }
243             }
244 
245             if sel == Selected::Waiting {
246                 // Check with each operation for how long we're allowed to block, and compute the
247                 // earliest deadline.
248                 let mut deadline: Option<Instant> = match timeout {
249                     Timeout::Now => return None,
250                     Timeout::Never => None,
251                     Timeout::At(when) => Some(when),
252                 };
253                 for &(handle, _, _) in handles.iter() {
254                     if let Some(x) = handle.deadline() {
255                         deadline = deadline.map(|y| x.min(y)).or(Some(x));
256                     }
257                 }
258 
259                 // Block the current thread.
260                 sel = cx.wait_until(deadline);
261             }
262 
263             // Unregister all registered operations.
264             for (handle, _, _) in handles.iter_mut().take(registered_count) {
265                 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
266             }
267 
268             match sel {
269                 Selected::Waiting => unreachable!(),
270                 Selected::Aborted => {
271                     // If an operation became ready during registration, try selecting it.
272                     if let Some(index_ready) = index_ready {
273                         for &(handle, i, ptr) in handles.iter() {
274                             if i == index_ready && handle.try_select(&mut token) {
275                                 return Some((i, ptr));
276                             }
277                         }
278                     }
279                 }
280                 Selected::Disconnected => {}
281                 Selected::Operation(_) => {
282                     // Find the selected operation.
283                     for (handle, i, ptr) in handles.iter_mut() {
284                         // Is this the selected operation?
285                         if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
286                         {
287                             // Try selecting this operation.
288                             if handle.accept(&mut token, cx) {
289                                 return Some((*i, *ptr));
290                             }
291                         }
292                     }
293                 }
294             }
295 
296             None
297         });
298 
299         // Return if an operation was selected.
300         if let Some((i, ptr)) = res {
301             return Some((token, i, ptr));
302         }
303 
304         // Try selecting one of the operations without blocking.
305         for &(handle, i, ptr) in handles.iter() {
306             if handle.try_select(&mut token) {
307                 return Some((token, i, ptr));
308             }
309         }
310 
311         match timeout {
312             Timeout::Now => return None,
313             Timeout::Never => {}
314             Timeout::At(when) => {
315                 if Instant::now() >= when {
316                     return None;
317                 }
318             }
319         }
320     }
321 }
322 
323 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
run_ready( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<usize>324 fn run_ready(
325     handles: &mut [(&dyn SelectHandle, usize, *const u8)],
326     timeout: Timeout,
327 ) -> Option<usize> {
328     if handles.is_empty() {
329         // Wait until the timeout and return.
330         match timeout {
331             Timeout::Now => return None,
332             Timeout::Never => {
333                 utils::sleep_until(None);
334                 unreachable!();
335             }
336             Timeout::At(when) => {
337                 utils::sleep_until(Some(when));
338                 return None;
339             }
340         }
341     }
342 
343     // Shuffle the operations for fairness.
344     utils::shuffle(handles);
345 
346     loop {
347         let backoff = Backoff::new();
348         loop {
349             // Check operations for readiness.
350             for &(handle, i, _) in handles.iter() {
351                 if handle.is_ready() {
352                     return Some(i);
353                 }
354             }
355 
356             if backoff.is_completed() {
357                 break;
358             } else {
359                 backoff.snooze();
360             }
361         }
362 
363         // Check for timeout.
364         match timeout {
365             Timeout::Now => return None,
366             Timeout::Never => {}
367             Timeout::At(when) => {
368                 if Instant::now() >= when {
369                     return None;
370                 }
371             }
372         }
373 
374         // Prepare for blocking.
375         let res = Context::with(|cx| {
376             let mut sel = Selected::Waiting;
377             let mut registered_count = 0;
378 
379             // Begin watching all operations.
380             for (handle, _, _) in handles.iter_mut() {
381                 registered_count += 1;
382                 let oper = Operation::hook::<&dyn SelectHandle>(handle);
383 
384                 // If registration returns `false`, that means the operation has just become ready.
385                 if handle.watch(oper, cx) {
386                     sel = match cx.try_select(Selected::Operation(oper)) {
387                         Ok(()) => Selected::Operation(oper),
388                         Err(s) => s,
389                     };
390                     break;
391                 }
392 
393                 // If another thread has already chosen one of the operations, stop registration.
394                 sel = cx.selected();
395                 if sel != Selected::Waiting {
396                     break;
397                 }
398             }
399 
400             if sel == Selected::Waiting {
401                 // Check with each operation for how long we're allowed to block, and compute the
402                 // earliest deadline.
403                 let mut deadline: Option<Instant> = match timeout {
404                     Timeout::Now => unreachable!(),
405                     Timeout::Never => None,
406                     Timeout::At(when) => Some(when),
407                 };
408                 for &(handle, _, _) in handles.iter() {
409                     if let Some(x) = handle.deadline() {
410                         deadline = deadline.map(|y| x.min(y)).or(Some(x));
411                     }
412                 }
413 
414                 // Block the current thread.
415                 sel = cx.wait_until(deadline);
416             }
417 
418             // Unwatch all operations.
419             for (handle, _, _) in handles.iter_mut().take(registered_count) {
420                 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
421             }
422 
423             match sel {
424                 Selected::Waiting => unreachable!(),
425                 Selected::Aborted => {}
426                 Selected::Disconnected => {}
427                 Selected::Operation(_) => {
428                     for (handle, i, _) in handles.iter_mut() {
429                         let oper = Operation::hook::<&dyn SelectHandle>(handle);
430                         if sel == Selected::Operation(oper) {
431                             return Some(*i);
432                         }
433                     }
434                 }
435             }
436 
437             None
438         });
439 
440         // Return if an operation became ready.
441         if res.is_some() {
442             return res;
443         }
444     }
445 }
446 
447 /// Attempts to select one of the operations without blocking.
448 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
449 #[inline]
try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> Result<SelectedOperation<'a>, TrySelectError>450 pub fn try_select<'a>(
451     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
452 ) -> Result<SelectedOperation<'a>, TrySelectError> {
453     match run_select(handles, Timeout::Now) {
454         None => Err(TrySelectError),
455         Some((token, index, ptr)) => Ok(SelectedOperation {
456             token,
457             index,
458             ptr,
459             _marker: PhantomData,
460         }),
461     }
462 }
463 
464 /// Blocks until one of the operations becomes ready and selects it.
465 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
466 #[inline]
select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> SelectedOperation<'a>467 pub fn select<'a>(
468     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
469 ) -> SelectedOperation<'a> {
470     if handles.is_empty() {
471         panic!("no operations have been added to `Select`");
472     }
473 
474     let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
475     SelectedOperation {
476         token,
477         index,
478         ptr,
479         _marker: PhantomData,
480     }
481 }
482 
483 /// Blocks for a limited time until one of the operations becomes ready and selects it.
484 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
485 #[inline]
select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>486 pub fn select_timeout<'a>(
487     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
488     timeout: Duration,
489 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
490     match Instant::now().checked_add(timeout) {
491         Some(deadline) => select_deadline(handles, deadline),
492         None => Ok(select(handles)),
493     }
494 }
495 
496 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
497 #[inline]
select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>498 pub(crate) fn select_deadline<'a>(
499     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
500     deadline: Instant,
501 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
502     match run_select(handles, Timeout::At(deadline)) {
503         None => Err(SelectTimeoutError),
504         Some((token, index, ptr)) => Ok(SelectedOperation {
505             token,
506             index,
507             ptr,
508             _marker: PhantomData,
509         }),
510     }
511 }
512 
513 /// Selects from a set of channel operations.
514 ///
515 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
516 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
517 /// among them is selected.
518 ///
519 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
520 /// when it will simply return an error because the channel is disconnected.
521 ///
522 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
523 /// dynamically created list of channel operations.
524 ///
525 /// [`select!`]: crate::select!
526 ///
527 /// Once a list of operations has been built with `Select`, there are two different ways of
528 /// proceeding:
529 ///
530 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
531 ///   the returned selected operation has already begun and **must** be completed. If we don't
532 ///   complete it, a panic will occur.
533 ///
534 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
535 ///   successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
536 ///   possible for another thread to make the operation not ready just before we try executing it,
537 ///   so it's wise to use a retry loop. However, note that these methods might return with success
538 ///   spuriously, so it's a good idea to always double check if the operation is really ready.
539 ///
540 /// # Examples
541 ///
542 /// Use [`select`] to receive a message from a list of receivers:
543 ///
544 /// ```
545 /// use crossbeam_channel::{Receiver, RecvError, Select};
546 ///
547 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
548 ///     // Build a list of operations.
549 ///     let mut sel = Select::new();
550 ///     for r in rs {
551 ///         sel.recv(r);
552 ///     }
553 ///
554 ///     // Complete the selected operation.
555 ///     let oper = sel.select();
556 ///     let index = oper.index();
557 ///     oper.recv(&rs[index])
558 /// }
559 /// ```
560 ///
561 /// Use [`ready`] to receive a message from a list of receivers:
562 ///
563 /// ```
564 /// use crossbeam_channel::{Receiver, RecvError, Select};
565 ///
566 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
567 ///     // Build a list of operations.
568 ///     let mut sel = Select::new();
569 ///     for r in rs {
570 ///         sel.recv(r);
571 ///     }
572 ///
573 ///     loop {
574 ///         // Wait until a receive operation becomes ready and try executing it.
575 ///         let index = sel.ready();
576 ///         let res = rs[index].try_recv();
577 ///
578 ///         // If the operation turns out not to be ready, retry.
579 ///         if let Err(e) = res {
580 ///             if e.is_empty() {
581 ///                 continue;
582 ///             }
583 ///         }
584 ///
585 ///         // Success!
586 ///         return res.map_err(|_| RecvError);
587 ///     }
588 /// }
589 /// ```
590 ///
591 /// [`try_select`]: Select::try_select
592 /// [`select`]: Select::select
593 /// [`select_timeout`]: Select::select_timeout
594 /// [`try_ready`]: Select::try_ready
595 /// [`ready`]: Select::ready
596 /// [`ready_timeout`]: Select::ready_timeout
597 pub struct Select<'a> {
598     /// A list of senders and receivers participating in selection.
599     handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
600 
601     /// The next index to assign to an operation.
602     next_index: usize,
603 }
604 
605 unsafe impl Send for Select<'_> {}
606 unsafe impl Sync for Select<'_> {}
607 
608 impl<'a> Select<'a> {
609     /// Creates an empty list of channel operations for selection.
610     ///
611     /// # Examples
612     ///
613     /// ```
614     /// use crossbeam_channel::Select;
615     ///
616     /// let mut sel = Select::new();
617     ///
618     /// // The list of operations is empty, which means no operation can be selected.
619     /// assert!(sel.try_select().is_err());
620     /// ```
new() -> Select<'a>621     pub fn new() -> Select<'a> {
622         Select {
623             handles: Vec::with_capacity(4),
624             next_index: 0,
625         }
626     }
627 
628     /// Adds a send operation.
629     ///
630     /// Returns the index of the added operation.
631     ///
632     /// # Examples
633     ///
634     /// ```
635     /// use crossbeam_channel::{unbounded, Select};
636     ///
637     /// let (s, r) = unbounded::<i32>();
638     ///
639     /// let mut sel = Select::new();
640     /// let index = sel.send(&s);
641     /// ```
send<T>(&mut self, s: &'a Sender<T>) -> usize642     pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
643         let i = self.next_index;
644         let ptr = s as *const Sender<_> as *const u8;
645         self.handles.push((s, i, ptr));
646         self.next_index += 1;
647         i
648     }
649 
650     /// Adds a receive operation.
651     ///
652     /// Returns the index of the added operation.
653     ///
654     /// # Examples
655     ///
656     /// ```
657     /// use crossbeam_channel::{unbounded, Select};
658     ///
659     /// let (s, r) = unbounded::<i32>();
660     ///
661     /// let mut sel = Select::new();
662     /// let index = sel.recv(&r);
663     /// ```
recv<T>(&mut self, r: &'a Receiver<T>) -> usize664     pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
665         let i = self.next_index;
666         let ptr = r as *const Receiver<_> as *const u8;
667         self.handles.push((r, i, ptr));
668         self.next_index += 1;
669         i
670     }
671 
672     /// Removes a previously added operation.
673     ///
674     /// This is useful when an operation is selected because the channel got disconnected and we
675     /// want to try again to select a different operation instead.
676     ///
677     /// If new operations are added after removing some, the indices of removed operations will not
678     /// be reused.
679     ///
680     /// # Panics
681     ///
682     /// An attempt to remove a non-existing or already removed operation will panic.
683     ///
684     /// # Examples
685     ///
686     /// ```
687     /// use crossbeam_channel::{unbounded, Select};
688     ///
689     /// let (s1, r1) = unbounded::<i32>();
690     /// let (_, r2) = unbounded::<i32>();
691     ///
692     /// let mut sel = Select::new();
693     /// let oper1 = sel.recv(&r1);
694     /// let oper2 = sel.recv(&r2);
695     ///
696     /// // Both operations are initially ready, so a random one will be executed.
697     /// let oper = sel.select();
698     /// assert_eq!(oper.index(), oper2);
699     /// assert!(oper.recv(&r2).is_err());
700     /// sel.remove(oper2);
701     ///
702     /// s1.send(10).unwrap();
703     ///
704     /// let oper = sel.select();
705     /// assert_eq!(oper.index(), oper1);
706     /// assert_eq!(oper.recv(&r1), Ok(10));
707     /// ```
remove(&mut self, index: usize)708     pub fn remove(&mut self, index: usize) {
709         assert!(
710             index < self.next_index,
711             "index out of bounds; {} >= {}",
712             index,
713             self.next_index,
714         );
715 
716         let i = self
717             .handles
718             .iter()
719             .enumerate()
720             .find(|(_, (_, i, _))| *i == index)
721             .expect("no operation with this index")
722             .0;
723 
724         self.handles.swap_remove(i);
725     }
726 
727     /// Attempts to select one of the operations without blocking.
728     ///
729     /// If an operation is ready, it is selected and returned. If multiple operations are ready at
730     /// the same time, a random one among them is selected. If none of the operations are ready, an
731     /// error is returned.
732     ///
733     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
734     /// even when it will simply return an error because the channel is disconnected.
735     ///
736     /// The selected operation must be completed with [`SelectedOperation::send`]
737     /// or [`SelectedOperation::recv`].
738     ///
739     /// # Examples
740     ///
741     /// ```
742     /// use crossbeam_channel::{unbounded, Select};
743     ///
744     /// let (s1, r1) = unbounded();
745     /// let (s2, r2) = unbounded();
746     ///
747     /// s1.send(10).unwrap();
748     /// s2.send(20).unwrap();
749     ///
750     /// let mut sel = Select::new();
751     /// let oper1 = sel.recv(&r1);
752     /// let oper2 = sel.recv(&r2);
753     ///
754     /// // Both operations are initially ready, so a random one will be executed.
755     /// let oper = sel.try_select();
756     /// match oper {
757     ///     Err(_) => panic!("both operations should be ready"),
758     ///     Ok(oper) => match oper.index() {
759     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
760     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
761     ///         _ => unreachable!(),
762     ///     }
763     /// }
764     /// ```
try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError>765     pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
766         try_select(&mut self.handles)
767     }
768 
769     /// Blocks until one of the operations becomes ready and selects it.
770     ///
771     /// Once an operation becomes ready, it is selected and returned. If multiple operations are
772     /// ready at the same time, a random one among them is selected.
773     ///
774     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
775     /// even when it will simply return an error because the channel is disconnected.
776     ///
777     /// The selected operation must be completed with [`SelectedOperation::send`]
778     /// or [`SelectedOperation::recv`].
779     ///
780     /// # Panics
781     ///
782     /// Panics if no operations have been added to `Select`.
783     ///
784     /// # Examples
785     ///
786     /// ```
787     /// use std::thread;
788     /// use std::time::Duration;
789     /// use crossbeam_channel::{unbounded, Select};
790     ///
791     /// let (s1, r1) = unbounded();
792     /// let (s2, r2) = unbounded();
793     ///
794     /// thread::spawn(move || {
795     ///     thread::sleep(Duration::from_secs(1));
796     ///     s1.send(10).unwrap();
797     /// });
798     /// thread::spawn(move || s2.send(20).unwrap());
799     ///
800     /// let mut sel = Select::new();
801     /// let oper1 = sel.recv(&r1);
802     /// let oper2 = sel.recv(&r2);
803     ///
804     /// // The second operation will be selected because it becomes ready first.
805     /// let oper = sel.select();
806     /// match oper.index() {
807     ///     i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
808     ///     i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
809     ///     _ => unreachable!(),
810     /// }
811     /// ```
select(&mut self) -> SelectedOperation<'a>812     pub fn select(&mut self) -> SelectedOperation<'a> {
813         select(&mut self.handles)
814     }
815 
816     /// Blocks for a limited time until one of the operations becomes ready and selects it.
817     ///
818     /// If an operation becomes ready, it is selected and returned. If multiple operations are
819     /// ready at the same time, a random one among them is selected. If none of the operations
820     /// become ready for the specified duration, an error is returned.
821     ///
822     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
823     /// even when it will simply return an error because the channel is disconnected.
824     ///
825     /// The selected operation must be completed with [`SelectedOperation::send`]
826     /// or [`SelectedOperation::recv`].
827     ///
828     /// # Examples
829     ///
830     /// ```
831     /// use std::thread;
832     /// use std::time::Duration;
833     /// use crossbeam_channel::{unbounded, Select};
834     ///
835     /// let (s1, r1) = unbounded();
836     /// let (s2, r2) = unbounded();
837     ///
838     /// thread::spawn(move || {
839     ///     thread::sleep(Duration::from_secs(1));
840     ///     s1.send(10).unwrap();
841     /// });
842     /// thread::spawn(move || s2.send(20).unwrap());
843     ///
844     /// let mut sel = Select::new();
845     /// let oper1 = sel.recv(&r1);
846     /// let oper2 = sel.recv(&r2);
847     ///
848     /// // The second operation will be selected because it becomes ready first.
849     /// let oper = sel.select_timeout(Duration::from_millis(500));
850     /// match oper {
851     ///     Err(_) => panic!("should not have timed out"),
852     ///     Ok(oper) => match oper.index() {
853     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
854     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
855     ///         _ => unreachable!(),
856     ///     }
857     /// }
858     /// ```
select_timeout( &mut self, timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>859     pub fn select_timeout(
860         &mut self,
861         timeout: Duration,
862     ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
863         select_timeout(&mut self.handles, timeout)
864     }
865 
866     /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
867     ///
868     /// If an operation becomes ready, it is selected and returned. If multiple operations are
869     /// ready at the same time, a random one among them is selected. If none of the operations
870     /// become ready before the given deadline, an error is returned.
871     ///
872     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
873     /// even when it will simply return an error because the channel is disconnected.
874     ///
875     /// The selected operation must be completed with [`SelectedOperation::send`]
876     /// or [`SelectedOperation::recv`].
877     ///
878     /// # Examples
879     ///
880     /// ```
881     /// use std::thread;
882     /// use std::time::{Instant, Duration};
883     /// use crossbeam_channel::{unbounded, Select};
884     ///
885     /// let (s1, r1) = unbounded();
886     /// let (s2, r2) = unbounded();
887     ///
888     /// thread::spawn(move || {
889     ///     thread::sleep(Duration::from_secs(1));
890     ///     s1.send(10).unwrap();
891     /// });
892     /// thread::spawn(move || s2.send(20).unwrap());
893     ///
894     /// let mut sel = Select::new();
895     /// let oper1 = sel.recv(&r1);
896     /// let oper2 = sel.recv(&r2);
897     ///
898     /// let deadline = Instant::now() + Duration::from_millis(500);
899     ///
900     /// // The second operation will be selected because it becomes ready first.
901     /// let oper = sel.select_deadline(deadline);
902     /// match oper {
903     ///     Err(_) => panic!("should not have timed out"),
904     ///     Ok(oper) => match oper.index() {
905     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
906     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
907     ///         _ => unreachable!(),
908     ///     }
909     /// }
910     /// ```
select_deadline( &mut self, deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>911     pub fn select_deadline(
912         &mut self,
913         deadline: Instant,
914     ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
915         select_deadline(&mut self.handles, deadline)
916     }
917 
918     /// Attempts to find a ready operation without blocking.
919     ///
920     /// If an operation is ready, its index is returned. If multiple operations are ready at the
921     /// same time, a random one among them is chosen. If none of the operations are ready, an error
922     /// is returned.
923     ///
924     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
925     /// even when it will simply return an error because the channel is disconnected.
926     ///
927     /// Note that this method might return with success spuriously, so it's a good idea to always
928     /// double check if the operation is really ready.
929     ///
930     /// # Examples
931     ///
932     /// ```
933     /// use crossbeam_channel::{unbounded, Select};
934     ///
935     /// let (s1, r1) = unbounded();
936     /// let (s2, r2) = unbounded();
937     ///
938     /// s1.send(10).unwrap();
939     /// s2.send(20).unwrap();
940     ///
941     /// let mut sel = Select::new();
942     /// let oper1 = sel.recv(&r1);
943     /// let oper2 = sel.recv(&r2);
944     ///
945     /// // Both operations are initially ready, so a random one will be chosen.
946     /// match sel.try_ready() {
947     ///     Err(_) => panic!("both operations should be ready"),
948     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
949     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
950     ///     Ok(_) => unreachable!(),
951     /// }
952     /// ```
try_ready(&mut self) -> Result<usize, TryReadyError>953     pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
954         match run_ready(&mut self.handles, Timeout::Now) {
955             None => Err(TryReadyError),
956             Some(index) => Ok(index),
957         }
958     }
959 
960     /// Blocks until one of the operations becomes ready.
961     ///
962     /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
963     /// the same time, a random one among them is chosen.
964     ///
965     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
966     /// even when it will simply return an error because the channel is disconnected.
967     ///
968     /// Note that this method might return with success spuriously, so it's a good idea to always
969     /// double check if the operation is really ready.
970     ///
971     /// # Panics
972     ///
973     /// Panics if no operations have been added to `Select`.
974     ///
975     /// # Examples
976     ///
977     /// ```
978     /// use std::thread;
979     /// use std::time::Duration;
980     /// use crossbeam_channel::{unbounded, Select};
981     ///
982     /// let (s1, r1) = unbounded();
983     /// let (s2, r2) = unbounded();
984     ///
985     /// thread::spawn(move || {
986     ///     thread::sleep(Duration::from_secs(1));
987     ///     s1.send(10).unwrap();
988     /// });
989     /// thread::spawn(move || s2.send(20).unwrap());
990     ///
991     /// let mut sel = Select::new();
992     /// let oper1 = sel.recv(&r1);
993     /// let oper2 = sel.recv(&r2);
994     ///
995     /// // The second operation will be selected because it becomes ready first.
996     /// match sel.ready() {
997     ///     i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
998     ///     i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
999     ///     _ => unreachable!(),
1000     /// }
1001     /// ```
ready(&mut self) -> usize1002     pub fn ready(&mut self) -> usize {
1003         if self.handles.is_empty() {
1004             panic!("no operations have been added to `Select`");
1005         }
1006 
1007         run_ready(&mut self.handles, Timeout::Never).unwrap()
1008     }
1009 
1010     /// Blocks for a limited time until one of the operations becomes ready.
1011     ///
1012     /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1013     /// the same time, a random one among them is chosen. If none of the operations become ready
1014     /// for the specified duration, an error is returned.
1015     ///
1016     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1017     /// even when it will simply return an error because the channel is disconnected.
1018     ///
1019     /// Note that this method might return with success spuriously, so it's a good idea to double
1020     /// check if the operation is really ready.
1021     ///
1022     /// # Examples
1023     ///
1024     /// ```
1025     /// use std::thread;
1026     /// use std::time::Duration;
1027     /// use crossbeam_channel::{unbounded, Select};
1028     ///
1029     /// let (s1, r1) = unbounded();
1030     /// let (s2, r2) = unbounded();
1031     ///
1032     /// thread::spawn(move || {
1033     ///     thread::sleep(Duration::from_secs(1));
1034     ///     s1.send(10).unwrap();
1035     /// });
1036     /// thread::spawn(move || s2.send(20).unwrap());
1037     ///
1038     /// let mut sel = Select::new();
1039     /// let oper1 = sel.recv(&r1);
1040     /// let oper2 = sel.recv(&r2);
1041     ///
1042     /// // The second operation will be selected because it becomes ready first.
1043     /// match sel.ready_timeout(Duration::from_millis(500)) {
1044     ///     Err(_) => panic!("should not have timed out"),
1045     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1046     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1047     ///     Ok(_) => unreachable!(),
1048     /// }
1049     /// ```
ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError>1050     pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1051         match Instant::now().checked_add(timeout) {
1052             Some(deadline) => self.ready_deadline(deadline),
1053             None => Ok(self.ready()),
1054         }
1055     }
1056 
1057     /// Blocks until a given deadline, or until one of the operations becomes ready.
1058     ///
1059     /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1060     /// the same time, a random one among them is chosen. If none of the operations become ready
1061     /// before the deadline, an error is returned.
1062     ///
1063     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1064     /// even when it will simply return an error because the channel is disconnected.
1065     ///
1066     /// Note that this method might return with success spuriously, so it's a good idea to double
1067     /// check if the operation is really ready.
1068     ///
1069     /// # Examples
1070     ///
1071     /// ```
1072     /// use std::thread;
1073     /// use std::time::{Duration, Instant};
1074     /// use crossbeam_channel::{unbounded, Select};
1075     ///
1076     /// let deadline = Instant::now() + Duration::from_millis(500);
1077     ///
1078     /// let (s1, r1) = unbounded();
1079     /// let (s2, r2) = unbounded();
1080     ///
1081     /// thread::spawn(move || {
1082     ///     thread::sleep(Duration::from_secs(1));
1083     ///     s1.send(10).unwrap();
1084     /// });
1085     /// thread::spawn(move || s2.send(20).unwrap());
1086     ///
1087     /// let mut sel = Select::new();
1088     /// let oper1 = sel.recv(&r1);
1089     /// let oper2 = sel.recv(&r2);
1090     ///
1091     /// // The second operation will be selected because it becomes ready first.
1092     /// match sel.ready_deadline(deadline) {
1093     ///     Err(_) => panic!("should not have timed out"),
1094     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1095     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1096     ///     Ok(_) => unreachable!(),
1097     /// }
1098     /// ```
ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError>1099     pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1100         match run_ready(&mut self.handles, Timeout::At(deadline)) {
1101             None => Err(ReadyTimeoutError),
1102             Some(index) => Ok(index),
1103         }
1104     }
1105 }
1106 
1107 impl<'a> Clone for Select<'a> {
clone(&self) -> Select<'a>1108     fn clone(&self) -> Select<'a> {
1109         Select {
1110             handles: self.handles.clone(),
1111             next_index: self.next_index,
1112         }
1113     }
1114 }
1115 
1116 impl<'a> Default for Select<'a> {
default() -> Select<'a>1117     fn default() -> Select<'a> {
1118         Select::new()
1119     }
1120 }
1121 
1122 impl fmt::Debug for Select<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1123     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1124         f.pad("Select { .. }")
1125     }
1126 }
1127 
1128 /// A selected operation that needs to be completed.
1129 ///
1130 /// To complete the operation, call [`send`] or [`recv`].
1131 ///
1132 /// # Panics
1133 ///
1134 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1135 /// `SelectedOperation` is dropped without completion, a panic occurs.
1136 ///
1137 /// [`send`]: SelectedOperation::send
1138 /// [`recv`]: SelectedOperation::recv
1139 #[must_use]
1140 pub struct SelectedOperation<'a> {
1141     /// Token needed to complete the operation.
1142     token: Token,
1143 
1144     /// The index of the selected operation.
1145     index: usize,
1146 
1147     /// The address of the selected `Sender` or `Receiver`.
1148     ptr: *const u8,
1149 
1150     /// Indicates that `Sender`s and `Receiver`s are borrowed.
1151     _marker: PhantomData<&'a ()>,
1152 }
1153 
1154 impl SelectedOperation<'_> {
1155     /// Returns the index of the selected operation.
1156     ///
1157     /// # Examples
1158     ///
1159     /// ```
1160     /// use crossbeam_channel::{bounded, Select};
1161     ///
1162     /// let (s1, r1) = bounded::<()>(0);
1163     /// let (s2, r2) = bounded::<()>(0);
1164     /// let (s3, r3) = bounded::<()>(1);
1165     ///
1166     /// let mut sel = Select::new();
1167     /// let oper1 = sel.send(&s1);
1168     /// let oper2 = sel.recv(&r2);
1169     /// let oper3 = sel.send(&s3);
1170     ///
1171     /// // Only the last operation is ready.
1172     /// let oper = sel.select();
1173     /// assert_eq!(oper.index(), 2);
1174     /// assert_eq!(oper.index(), oper3);
1175     ///
1176     /// // Complete the operation.
1177     /// oper.send(&s3, ()).unwrap();
1178     /// ```
index(&self) -> usize1179     pub fn index(&self) -> usize {
1180         self.index
1181     }
1182 
1183     /// Completes the send operation.
1184     ///
1185     /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1186     /// when the operation was added.
1187     ///
1188     /// # Panics
1189     ///
1190     /// Panics if an incorrect [`Sender`] reference is passed.
1191     ///
1192     /// # Examples
1193     ///
1194     /// ```
1195     /// use crossbeam_channel::{bounded, Select, SendError};
1196     ///
1197     /// let (s, r) = bounded::<i32>(0);
1198     /// drop(r);
1199     ///
1200     /// let mut sel = Select::new();
1201     /// let oper1 = sel.send(&s);
1202     ///
1203     /// let oper = sel.select();
1204     /// assert_eq!(oper.index(), oper1);
1205     /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1206     /// ```
send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>>1207     pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1208         assert!(
1209             s as *const Sender<T> as *const u8 == self.ptr,
1210             "passed a sender that wasn't selected",
1211         );
1212         let res = unsafe { channel::write(s, &mut self.token, msg) };
1213         mem::forget(self);
1214         res.map_err(SendError)
1215     }
1216 
1217     /// Completes the receive operation.
1218     ///
1219     /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1220     /// when the operation was added.
1221     ///
1222     /// # Panics
1223     ///
1224     /// Panics if an incorrect [`Receiver`] reference is passed.
1225     ///
1226     /// # Examples
1227     ///
1228     /// ```
1229     /// use crossbeam_channel::{bounded, Select, RecvError};
1230     ///
1231     /// let (s, r) = bounded::<i32>(0);
1232     /// drop(s);
1233     ///
1234     /// let mut sel = Select::new();
1235     /// let oper1 = sel.recv(&r);
1236     ///
1237     /// let oper = sel.select();
1238     /// assert_eq!(oper.index(), oper1);
1239     /// assert_eq!(oper.recv(&r), Err(RecvError));
1240     /// ```
recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError>1241     pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1242         assert!(
1243             r as *const Receiver<T> as *const u8 == self.ptr,
1244             "passed a receiver that wasn't selected",
1245         );
1246         let res = unsafe { channel::read(r, &mut self.token) };
1247         mem::forget(self);
1248         res.map_err(|_| RecvError)
1249     }
1250 }
1251 
1252 impl fmt::Debug for SelectedOperation<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1253     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1254         f.pad("SelectedOperation { .. }")
1255     }
1256 }
1257 
1258 impl Drop for SelectedOperation<'_> {
drop(&mut self)1259     fn drop(&mut self) {
1260         panic!("dropped `SelectedOperation` without completing the operation");
1261     }
1262 }
1263