• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Interface to the select mechanism.
2 
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_utils::Backoff;
9 
10 use crate::channel::{self, Receiver, Sender};
11 use crate::context::Context;
12 use crate::err::{ReadyTimeoutError, TryReadyError};
13 use crate::err::{RecvError, SendError};
14 use crate::err::{SelectTimeoutError, TrySelectError};
15 use crate::flavors;
16 use crate::utils;
17 
18 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19 /// `read` or `write`.
20 ///
21 /// Each field contains data associated with a specific channel flavor.
22 // This is a private API that is used by the select macro.
23 #[derive(Debug, Default)]
24 pub struct Token {
25     pub at: flavors::at::AtToken,
26     pub array: flavors::array::ArrayToken,
27     pub list: flavors::list::ListToken,
28     pub never: flavors::never::NeverToken,
29     pub tick: flavors::tick::TickToken,
30     pub zero: flavors::zero::ZeroToken,
31 }
32 
33 /// Identifier associated with an operation by a specific thread on a specific channel.
34 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
35 pub struct Operation(usize);
36 
37 impl Operation {
38     /// Creates an operation identifier from a mutable reference.
39     ///
40     /// This function essentially just turns the address of the reference into a number. The
41     /// reference should point to a variable that is specific to the thread and the operation,
42     /// and is alive for the entire duration of select or blocking operation.
43     #[inline]
hook<T>(r: &mut T) -> Operation44     pub fn hook<T>(r: &mut T) -> Operation {
45         let val = r as *mut T as usize;
46         // Make sure that the pointer address doesn't equal the numerical representation of
47         // `Selected::{Waiting, Aborted, Disconnected}`.
48         assert!(val > 2);
49         Operation(val)
50     }
51 }
52 
53 /// Current state of a select or a blocking operation.
54 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
55 pub enum Selected {
56     /// Still waiting for an operation.
57     Waiting,
58 
59     /// The attempt to block the current thread has been aborted.
60     Aborted,
61 
62     /// An operation became ready because a channel is disconnected.
63     Disconnected,
64 
65     /// An operation became ready because a message can be sent or received.
66     Operation(Operation),
67 }
68 
69 impl From<usize> for Selected {
70     #[inline]
from(val: usize) -> Selected71     fn from(val: usize) -> Selected {
72         match val {
73             0 => Selected::Waiting,
74             1 => Selected::Aborted,
75             2 => Selected::Disconnected,
76             oper => Selected::Operation(Operation(oper)),
77         }
78     }
79 }
80 
81 impl Into<usize> for Selected {
82     #[inline]
into(self) -> usize83     fn into(self) -> usize {
84         match self {
85             Selected::Waiting => 0,
86             Selected::Aborted => 1,
87             Selected::Disconnected => 2,
88             Selected::Operation(Operation(val)) => val,
89         }
90     }
91 }
92 
93 /// A receiver or a sender that can participate in select.
94 ///
95 /// This is a handle that assists select in executing an operation, registration, deciding on the
96 /// appropriate deadline for blocking, etc.
97 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
98 pub trait SelectHandle {
99     /// Attempts to select an operation and returns `true` on success.
try_select(&self, token: &mut Token) -> bool100     fn try_select(&self, token: &mut Token) -> bool;
101 
102     /// Returns a deadline for an operation, if there is one.
deadline(&self) -> Option<Instant>103     fn deadline(&self) -> Option<Instant>;
104 
105     /// Registers an operation for execution and returns `true` if it is now ready.
register(&self, oper: Operation, cx: &Context) -> bool106     fn register(&self, oper: Operation, cx: &Context) -> bool;
107 
108     /// Unregisters an operation for execution.
unregister(&self, oper: Operation)109     fn unregister(&self, oper: Operation);
110 
111     /// Attempts to select an operation the thread got woken up for and returns `true` on success.
accept(&self, token: &mut Token, cx: &Context) -> bool112     fn accept(&self, token: &mut Token, cx: &Context) -> bool;
113 
114     /// Returns `true` if an operation can be executed without blocking.
is_ready(&self) -> bool115     fn is_ready(&self) -> bool;
116 
117     /// Registers an operation for readiness notification and returns `true` if it is now ready.
watch(&self, oper: Operation, cx: &Context) -> bool118     fn watch(&self, oper: Operation, cx: &Context) -> bool;
119 
120     /// Unregisters an operation for readiness notification.
unwatch(&self, oper: Operation)121     fn unwatch(&self, oper: Operation);
122 }
123 
124 impl<T: SelectHandle> SelectHandle for &T {
try_select(&self, token: &mut Token) -> bool125     fn try_select(&self, token: &mut Token) -> bool {
126         (**self).try_select(token)
127     }
128 
deadline(&self) -> Option<Instant>129     fn deadline(&self) -> Option<Instant> {
130         (**self).deadline()
131     }
132 
register(&self, oper: Operation, cx: &Context) -> bool133     fn register(&self, oper: Operation, cx: &Context) -> bool {
134         (**self).register(oper, cx)
135     }
136 
unregister(&self, oper: Operation)137     fn unregister(&self, oper: Operation) {
138         (**self).unregister(oper);
139     }
140 
accept(&self, token: &mut Token, cx: &Context) -> bool141     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
142         (**self).accept(token, cx)
143     }
144 
is_ready(&self) -> bool145     fn is_ready(&self) -> bool {
146         (**self).is_ready()
147     }
148 
watch(&self, oper: Operation, cx: &Context) -> bool149     fn watch(&self, oper: Operation, cx: &Context) -> bool {
150         (**self).watch(oper, cx)
151     }
152 
unwatch(&self, oper: Operation)153     fn unwatch(&self, oper: Operation) {
154         (**self).unwatch(oper)
155     }
156 }
157 
158 /// Determines when a select operation should time out.
159 #[derive(Clone, Copy, Eq, PartialEq)]
160 enum Timeout {
161     /// No blocking.
162     Now,
163 
164     /// Block forever.
165     Never,
166 
167     /// Time out after the time instant.
168     At(Instant),
169 }
170 
171 /// Runs until one of the operations is selected, potentially blocking the current thread.
172 ///
173 /// Successful receive operations will have to be followed up by `channel::read()` and successful
174 /// send operations by `channel::write()`.
run_select( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<(Token, usize, *const u8)>175 fn run_select(
176     handles: &mut [(&dyn SelectHandle, usize, *const u8)],
177     timeout: Timeout,
178 ) -> Option<(Token, usize, *const u8)> {
179     if handles.is_empty() {
180         // Wait until the timeout and return.
181         match timeout {
182             Timeout::Now => return None,
183             Timeout::Never => {
184                 utils::sleep_until(None);
185                 unreachable!();
186             }
187             Timeout::At(when) => {
188                 utils::sleep_until(Some(when));
189                 return None;
190             }
191         }
192     }
193 
194     // Shuffle the operations for fairness.
195     utils::shuffle(handles);
196 
197     // Create a token, which serves as a temporary variable that gets initialized in this function
198     // and is later used by a call to `channel::read()` or `channel::write()` that completes the
199     // selected operation.
200     let mut token = Token::default();
201 
202     // Try selecting one of the operations without blocking.
203     for &(handle, i, ptr) in handles.iter() {
204         if handle.try_select(&mut token) {
205             return Some((token, i, ptr));
206         }
207     }
208 
209     loop {
210         // Prepare for blocking.
211         let res = Context::with(|cx| {
212             let mut sel = Selected::Waiting;
213             let mut registered_count = 0;
214             let mut index_ready = None;
215 
216             if let Timeout::Now = timeout {
217                 cx.try_select(Selected::Aborted).unwrap();
218             }
219 
220             // Register all operations.
221             for (handle, i, _) in handles.iter_mut() {
222                 registered_count += 1;
223 
224                 // If registration returns `false`, that means the operation has just become ready.
225                 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
226                     // Try aborting select.
227                     sel = match cx.try_select(Selected::Aborted) {
228                         Ok(()) => {
229                             index_ready = Some(*i);
230                             Selected::Aborted
231                         }
232                         Err(s) => s,
233                     };
234                     break;
235                 }
236 
237                 // If another thread has already selected one of the operations, stop registration.
238                 sel = cx.selected();
239                 if sel != Selected::Waiting {
240                     break;
241                 }
242             }
243 
244             if sel == Selected::Waiting {
245                 // Check with each operation for how long we're allowed to block, and compute the
246                 // earliest deadline.
247                 let mut deadline: Option<Instant> = match timeout {
248                     Timeout::Now => return None,
249                     Timeout::Never => None,
250                     Timeout::At(when) => Some(when),
251                 };
252                 for &(handle, _, _) in handles.iter() {
253                     if let Some(x) = handle.deadline() {
254                         deadline = deadline.map(|y| x.min(y)).or(Some(x));
255                     }
256                 }
257 
258                 // Block the current thread.
259                 sel = cx.wait_until(deadline);
260             }
261 
262             // Unregister all registered operations.
263             for (handle, _, _) in handles.iter_mut().take(registered_count) {
264                 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
265             }
266 
267             match sel {
268                 Selected::Waiting => unreachable!(),
269                 Selected::Aborted => {
270                     // If an operation became ready during registration, try selecting it.
271                     if let Some(index_ready) = index_ready {
272                         for &(handle, i, ptr) in handles.iter() {
273                             if i == index_ready && handle.try_select(&mut token) {
274                                 return Some((i, ptr));
275                             }
276                         }
277                     }
278                 }
279                 Selected::Disconnected => {}
280                 Selected::Operation(_) => {
281                     // Find the selected operation.
282                     for (handle, i, ptr) in handles.iter_mut() {
283                         // Is this the selected operation?
284                         if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
285                         {
286                             // Try selecting this operation.
287                             if handle.accept(&mut token, cx) {
288                                 return Some((*i, *ptr));
289                             }
290                         }
291                     }
292                 }
293             }
294 
295             None
296         });
297 
298         // Return if an operation was selected.
299         if let Some((i, ptr)) = res {
300             return Some((token, i, ptr));
301         }
302 
303         // Try selecting one of the operations without blocking.
304         for &(handle, i, ptr) in handles.iter() {
305             if handle.try_select(&mut token) {
306                 return Some((token, i, ptr));
307             }
308         }
309 
310         match timeout {
311             Timeout::Now => return None,
312             Timeout::Never => {}
313             Timeout::At(when) => {
314                 if Instant::now() >= when {
315                     return None;
316                 }
317             }
318         }
319     }
320 }
321 
322 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
run_ready( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<usize>323 fn run_ready(
324     handles: &mut [(&dyn SelectHandle, usize, *const u8)],
325     timeout: Timeout,
326 ) -> Option<usize> {
327     if handles.is_empty() {
328         // Wait until the timeout and return.
329         match timeout {
330             Timeout::Now => return None,
331             Timeout::Never => {
332                 utils::sleep_until(None);
333                 unreachable!();
334             }
335             Timeout::At(when) => {
336                 utils::sleep_until(Some(when));
337                 return None;
338             }
339         }
340     }
341 
342     // Shuffle the operations for fairness.
343     utils::shuffle(handles);
344 
345     loop {
346         let backoff = Backoff::new();
347         loop {
348             // Check operations for readiness.
349             for &(handle, i, _) in handles.iter() {
350                 if handle.is_ready() {
351                     return Some(i);
352                 }
353             }
354 
355             if backoff.is_completed() {
356                 break;
357             } else {
358                 backoff.snooze();
359             }
360         }
361 
362         // Check for timeout.
363         match timeout {
364             Timeout::Now => return None,
365             Timeout::Never => {}
366             Timeout::At(when) => {
367                 if Instant::now() >= when {
368                     return None;
369                 }
370             }
371         }
372 
373         // Prepare for blocking.
374         let res = Context::with(|cx| {
375             let mut sel = Selected::Waiting;
376             let mut registered_count = 0;
377 
378             // Begin watching all operations.
379             for (handle, _, _) in handles.iter_mut() {
380                 registered_count += 1;
381                 let oper = Operation::hook::<&dyn SelectHandle>(handle);
382 
383                 // If registration returns `false`, that means the operation has just become ready.
384                 if handle.watch(oper, cx) {
385                     sel = match cx.try_select(Selected::Operation(oper)) {
386                         Ok(()) => Selected::Operation(oper),
387                         Err(s) => s,
388                     };
389                     break;
390                 }
391 
392                 // If another thread has already chosen one of the operations, stop registration.
393                 sel = cx.selected();
394                 if sel != Selected::Waiting {
395                     break;
396                 }
397             }
398 
399             if sel == Selected::Waiting {
400                 // Check with each operation for how long we're allowed to block, and compute the
401                 // earliest deadline.
402                 let mut deadline: Option<Instant> = match timeout {
403                     Timeout::Now => unreachable!(),
404                     Timeout::Never => None,
405                     Timeout::At(when) => Some(when),
406                 };
407                 for &(handle, _, _) in handles.iter() {
408                     if let Some(x) = handle.deadline() {
409                         deadline = deadline.map(|y| x.min(y)).or(Some(x));
410                     }
411                 }
412 
413                 // Block the current thread.
414                 sel = cx.wait_until(deadline);
415             }
416 
417             // Unwatch all operations.
418             for (handle, _, _) in handles.iter_mut().take(registered_count) {
419                 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
420             }
421 
422             match sel {
423                 Selected::Waiting => unreachable!(),
424                 Selected::Aborted => {}
425                 Selected::Disconnected => {}
426                 Selected::Operation(_) => {
427                     for (handle, i, _) in handles.iter_mut() {
428                         let oper = Operation::hook::<&dyn SelectHandle>(handle);
429                         if sel == Selected::Operation(oper) {
430                             return Some(*i);
431                         }
432                     }
433                 }
434             }
435 
436             None
437         });
438 
439         // Return if an operation became ready.
440         if res.is_some() {
441             return res;
442         }
443     }
444 }
445 
446 /// Attempts to select one of the operations without blocking.
447 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
448 #[inline]
try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> Result<SelectedOperation<'a>, TrySelectError>449 pub fn try_select<'a>(
450     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
451 ) -> Result<SelectedOperation<'a>, TrySelectError> {
452     match run_select(handles, Timeout::Now) {
453         None => Err(TrySelectError),
454         Some((token, index, ptr)) => Ok(SelectedOperation {
455             token,
456             index,
457             ptr,
458             _marker: PhantomData,
459         }),
460     }
461 }
462 
463 /// Blocks until one of the operations becomes ready and selects it.
464 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
465 #[inline]
select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> SelectedOperation<'a>466 pub fn select<'a>(
467     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
468 ) -> SelectedOperation<'a> {
469     if handles.is_empty() {
470         panic!("no operations have been added to `Select`");
471     }
472 
473     let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
474     SelectedOperation {
475         token,
476         index,
477         ptr,
478         _marker: PhantomData,
479     }
480 }
481 
482 /// Blocks for a limited time until one of the operations becomes ready and selects it.
483 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
484 #[inline]
select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>485 pub fn select_timeout<'a>(
486     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
487     timeout: Duration,
488 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
489     select_deadline(handles, Instant::now() + timeout)
490 }
491 
492 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
493 #[inline]
select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>494 pub(crate) fn select_deadline<'a>(
495     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
496     deadline: Instant,
497 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
498     match run_select(handles, Timeout::At(deadline)) {
499         None => Err(SelectTimeoutError),
500         Some((token, index, ptr)) => Ok(SelectedOperation {
501             token,
502             index,
503             ptr,
504             _marker: PhantomData,
505         }),
506     }
507 }
508 
509 /// Selects from a set of channel operations.
510 ///
511 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
512 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
513 /// among them is selected.
514 ///
515 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
516 /// when it will simply return an error because the channel is disconnected.
517 ///
518 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
519 /// dynamically created list of channel operations.
520 ///
521 /// Once a list of operations has been built with `Select`, there are two different ways of
522 /// proceeding:
523 ///
524 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
525 ///   the returned selected operation has already begun and **must** be completed. If we don't
526 ///   complete it, a panic will occur.
527 ///
528 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
529 ///   successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
530 ///   possible for another thread to make the operation not ready just before we try executing it,
531 ///   so it's wise to use a retry loop. However, note that these methods might return with success
532 ///   spuriously, so it's a good idea to always double check if the operation is really ready.
533 ///
534 /// # Examples
535 ///
536 /// Use [`select`] to receive a message from a list of receivers:
537 ///
538 /// ```
539 /// use crossbeam_channel::{Receiver, RecvError, Select};
540 ///
541 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
542 ///     // Build a list of operations.
543 ///     let mut sel = Select::new();
544 ///     for r in rs {
545 ///         sel.recv(r);
546 ///     }
547 ///
548 ///     // Complete the selected operation.
549 ///     let oper = sel.select();
550 ///     let index = oper.index();
551 ///     oper.recv(&rs[index])
552 /// }
553 /// ```
554 ///
555 /// Use [`ready`] to receive a message from a list of receivers:
556 ///
557 /// ```
558 /// use crossbeam_channel::{Receiver, RecvError, Select};
559 ///
560 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
561 ///     // Build a list of operations.
562 ///     let mut sel = Select::new();
563 ///     for r in rs {
564 ///         sel.recv(r);
565 ///     }
566 ///
567 ///     loop {
568 ///         // Wait until a receive operation becomes ready and try executing it.
569 ///         let index = sel.ready();
570 ///         let res = rs[index].try_recv();
571 ///
572 ///         // If the operation turns out not to be ready, retry.
573 ///         if let Err(e) = res {
574 ///             if e.is_empty() {
575 ///                 continue;
576 ///             }
577 ///         }
578 ///
579 ///         // Success!
580 ///         return res.map_err(|_| RecvError);
581 ///     }
582 /// }
583 /// ```
584 ///
585 /// [`try_select`]: Select::try_select
586 /// [`select`]: Select::select
587 /// [`select_timeout`]: Select::select_timeout
588 /// [`try_ready`]: Select::try_ready
589 /// [`ready`]: Select::ready
590 /// [`ready_timeout`]: Select::ready_timeout
591 pub struct Select<'a> {
592     /// A list of senders and receivers participating in selection.
593     handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
594 
595     /// The next index to assign to an operation.
596     next_index: usize,
597 }
598 
599 unsafe impl Send for Select<'_> {}
600 unsafe impl Sync for Select<'_> {}
601 
602 impl<'a> Select<'a> {
603     /// Creates an empty list of channel operations for selection.
604     ///
605     /// # Examples
606     ///
607     /// ```
608     /// use crossbeam_channel::Select;
609     ///
610     /// let mut sel = Select::new();
611     ///
612     /// // The list of operations is empty, which means no operation can be selected.
613     /// assert!(sel.try_select().is_err());
614     /// ```
new() -> Select<'a>615     pub fn new() -> Select<'a> {
616         Select {
617             handles: Vec::with_capacity(4),
618             next_index: 0,
619         }
620     }
621 
622     /// Adds a send operation.
623     ///
624     /// Returns the index of the added operation.
625     ///
626     /// # Examples
627     ///
628     /// ```
629     /// use crossbeam_channel::{unbounded, Select};
630     ///
631     /// let (s, r) = unbounded::<i32>();
632     ///
633     /// let mut sel = Select::new();
634     /// let index = sel.send(&s);
635     /// ```
send<T>(&mut self, s: &'a Sender<T>) -> usize636     pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
637         let i = self.next_index;
638         let ptr = s as *const Sender<_> as *const u8;
639         self.handles.push((s, i, ptr));
640         self.next_index += 1;
641         i
642     }
643 
644     /// Adds a receive operation.
645     ///
646     /// Returns the index of the added operation.
647     ///
648     /// # Examples
649     ///
650     /// ```
651     /// use crossbeam_channel::{unbounded, Select};
652     ///
653     /// let (s, r) = unbounded::<i32>();
654     ///
655     /// let mut sel = Select::new();
656     /// let index = sel.recv(&r);
657     /// ```
recv<T>(&mut self, r: &'a Receiver<T>) -> usize658     pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
659         let i = self.next_index;
660         let ptr = r as *const Receiver<_> as *const u8;
661         self.handles.push((r, i, ptr));
662         self.next_index += 1;
663         i
664     }
665 
666     /// Removes a previously added operation.
667     ///
668     /// This is useful when an operation is selected because the channel got disconnected and we
669     /// want to try again to select a different operation instead.
670     ///
671     /// If new operations are added after removing some, the indices of removed operations will not
672     /// be reused.
673     ///
674     /// # Panics
675     ///
676     /// An attempt to remove a non-existing or already removed operation will panic.
677     ///
678     /// # Examples
679     ///
680     /// ```
681     /// use crossbeam_channel::{unbounded, Select};
682     ///
683     /// let (s1, r1) = unbounded::<i32>();
684     /// let (_, r2) = unbounded::<i32>();
685     ///
686     /// let mut sel = Select::new();
687     /// let oper1 = sel.recv(&r1);
688     /// let oper2 = sel.recv(&r2);
689     ///
690     /// // Both operations are initially ready, so a random one will be executed.
691     /// let oper = sel.select();
692     /// assert_eq!(oper.index(), oper2);
693     /// assert!(oper.recv(&r2).is_err());
694     /// sel.remove(oper2);
695     ///
696     /// s1.send(10).unwrap();
697     ///
698     /// let oper = sel.select();
699     /// assert_eq!(oper.index(), oper1);
700     /// assert_eq!(oper.recv(&r1), Ok(10));
701     /// ```
remove(&mut self, index: usize)702     pub fn remove(&mut self, index: usize) {
703         assert!(
704             index < self.next_index,
705             "index out of bounds; {} >= {}",
706             index,
707             self.next_index,
708         );
709 
710         let i = self
711             .handles
712             .iter()
713             .enumerate()
714             .find(|(_, (_, i, _))| *i == index)
715             .expect("no operation with this index")
716             .0;
717 
718         self.handles.swap_remove(i);
719     }
720 
721     /// Attempts to select one of the operations without blocking.
722     ///
723     /// If an operation is ready, it is selected and returned. If multiple operations are ready at
724     /// the same time, a random one among them is selected. If none of the operations are ready, an
725     /// error is returned.
726     ///
727     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
728     /// even when it will simply return an error because the channel is disconnected.
729     ///
730     /// The selected operation must be completed with [`SelectedOperation::send`]
731     /// or [`SelectedOperation::recv`].
732     ///
733     /// # Examples
734     ///
735     /// ```
736     /// use crossbeam_channel::{unbounded, Select};
737     ///
738     /// let (s1, r1) = unbounded();
739     /// let (s2, r2) = unbounded();
740     ///
741     /// s1.send(10).unwrap();
742     /// s2.send(20).unwrap();
743     ///
744     /// let mut sel = Select::new();
745     /// let oper1 = sel.recv(&r1);
746     /// let oper2 = sel.recv(&r2);
747     ///
748     /// // Both operations are initially ready, so a random one will be executed.
749     /// let oper = sel.try_select();
750     /// match oper {
751     ///     Err(_) => panic!("both operations should be ready"),
752     ///     Ok(oper) => match oper.index() {
753     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
754     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
755     ///         _ => unreachable!(),
756     ///     }
757     /// }
758     /// ```
try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError>759     pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
760         try_select(&mut self.handles)
761     }
762 
763     /// Blocks until one of the operations becomes ready and selects it.
764     ///
765     /// Once an operation becomes ready, it is selected and returned. If multiple operations are
766     /// ready at the same time, a random one among them is selected.
767     ///
768     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
769     /// even when it will simply return an error because the channel is disconnected.
770     ///
771     /// The selected operation must be completed with [`SelectedOperation::send`]
772     /// or [`SelectedOperation::recv`].
773     ///
774     /// # Panics
775     ///
776     /// Panics if no operations have been added to `Select`.
777     ///
778     /// # Examples
779     ///
780     /// ```
781     /// use std::thread;
782     /// use std::time::Duration;
783     /// use crossbeam_channel::{unbounded, Select};
784     ///
785     /// let (s1, r1) = unbounded();
786     /// let (s2, r2) = unbounded();
787     ///
788     /// thread::spawn(move || {
789     ///     thread::sleep(Duration::from_secs(1));
790     ///     s1.send(10).unwrap();
791     /// });
792     /// thread::spawn(move || s2.send(20).unwrap());
793     ///
794     /// let mut sel = Select::new();
795     /// let oper1 = sel.recv(&r1);
796     /// let oper2 = sel.recv(&r2);
797     ///
798     /// // The second operation will be selected because it becomes ready first.
799     /// let oper = sel.select();
800     /// match oper.index() {
801     ///     i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
802     ///     i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
803     ///     _ => unreachable!(),
804     /// }
805     /// ```
select(&mut self) -> SelectedOperation<'a>806     pub fn select(&mut self) -> SelectedOperation<'a> {
807         select(&mut self.handles)
808     }
809 
810     /// Blocks for a limited time until one of the operations becomes ready and selects it.
811     ///
812     /// If an operation becomes ready, it is selected and returned. If multiple operations are
813     /// ready at the same time, a random one among them is selected. If none of the operations
814     /// become ready for the specified duration, an error is returned.
815     ///
816     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
817     /// even when it will simply return an error because the channel is disconnected.
818     ///
819     /// The selected operation must be completed with [`SelectedOperation::send`]
820     /// or [`SelectedOperation::recv`].
821     ///
822     /// # Examples
823     ///
824     /// ```
825     /// use std::thread;
826     /// use std::time::Duration;
827     /// use crossbeam_channel::{unbounded, Select};
828     ///
829     /// let (s1, r1) = unbounded();
830     /// let (s2, r2) = unbounded();
831     ///
832     /// thread::spawn(move || {
833     ///     thread::sleep(Duration::from_secs(1));
834     ///     s1.send(10).unwrap();
835     /// });
836     /// thread::spawn(move || s2.send(20).unwrap());
837     ///
838     /// let mut sel = Select::new();
839     /// let oper1 = sel.recv(&r1);
840     /// let oper2 = sel.recv(&r2);
841     ///
842     /// // The second operation will be selected because it becomes ready first.
843     /// let oper = sel.select_timeout(Duration::from_millis(500));
844     /// match oper {
845     ///     Err(_) => panic!("should not have timed out"),
846     ///     Ok(oper) => match oper.index() {
847     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
848     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
849     ///         _ => unreachable!(),
850     ///     }
851     /// }
852     /// ```
select_timeout( &mut self, timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>853     pub fn select_timeout(
854         &mut self,
855         timeout: Duration,
856     ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
857         select_timeout(&mut self.handles, timeout)
858     }
859 
860     /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
861     ///
862     /// If an operation becomes ready, it is selected and returned. If multiple operations are
863     /// ready at the same time, a random one among them is selected. If none of the operations
864     /// become ready before the given deadline, an error is returned.
865     ///
866     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
867     /// even when it will simply return an error because the channel is disconnected.
868     ///
869     /// The selected operation must be completed with [`SelectedOperation::send`]
870     /// or [`SelectedOperation::recv`].
871     ///
872     /// # Examples
873     ///
874     /// ```
875     /// use std::thread;
876     /// use std::time::{Instant, Duration};
877     /// use crossbeam_channel::{unbounded, Select};
878     ///
879     /// let (s1, r1) = unbounded();
880     /// let (s2, r2) = unbounded();
881     ///
882     /// thread::spawn(move || {
883     ///     thread::sleep(Duration::from_secs(1));
884     ///     s1.send(10).unwrap();
885     /// });
886     /// thread::spawn(move || s2.send(20).unwrap());
887     ///
888     /// let mut sel = Select::new();
889     /// let oper1 = sel.recv(&r1);
890     /// let oper2 = sel.recv(&r2);
891     ///
892     /// let deadline = Instant::now() + Duration::from_millis(500);
893     ///
894     /// // The second operation will be selected because it becomes ready first.
895     /// let oper = sel.select_deadline(deadline);
896     /// match oper {
897     ///     Err(_) => panic!("should not have timed out"),
898     ///     Ok(oper) => match oper.index() {
899     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
900     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
901     ///         _ => unreachable!(),
902     ///     }
903     /// }
904     /// ```
select_deadline( &mut self, deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>905     pub fn select_deadline(
906         &mut self,
907         deadline: Instant,
908     ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
909         select_deadline(&mut self.handles, deadline)
910     }
911 
912     /// Attempts to find a ready operation without blocking.
913     ///
914     /// If an operation is ready, its index is returned. If multiple operations are ready at the
915     /// same time, a random one among them is chosen. If none of the operations are ready, an error
916     /// is returned.
917     ///
918     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
919     /// even when it will simply return an error because the channel is disconnected.
920     ///
921     /// Note that this method might return with success spuriously, so it's a good idea to always
922     /// double check if the operation is really ready.
923     ///
924     /// # Examples
925     ///
926     /// ```
927     /// use crossbeam_channel::{unbounded, Select};
928     ///
929     /// let (s1, r1) = unbounded();
930     /// let (s2, r2) = unbounded();
931     ///
932     /// s1.send(10).unwrap();
933     /// s2.send(20).unwrap();
934     ///
935     /// let mut sel = Select::new();
936     /// let oper1 = sel.recv(&r1);
937     /// let oper2 = sel.recv(&r2);
938     ///
939     /// // Both operations are initially ready, so a random one will be chosen.
940     /// match sel.try_ready() {
941     ///     Err(_) => panic!("both operations should be ready"),
942     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
943     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
944     ///     Ok(_) => unreachable!(),
945     /// }
946     /// ```
try_ready(&mut self) -> Result<usize, TryReadyError>947     pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
948         match run_ready(&mut self.handles, Timeout::Now) {
949             None => Err(TryReadyError),
950             Some(index) => Ok(index),
951         }
952     }
953 
954     /// Blocks until one of the operations becomes ready.
955     ///
956     /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
957     /// the same time, a random one among them is chosen.
958     ///
959     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
960     /// even when it will simply return an error because the channel is disconnected.
961     ///
962     /// Note that this method might return with success spuriously, so it's a good idea to always
963     /// double check if the operation is really ready.
964     ///
965     /// # Panics
966     ///
967     /// Panics if no operations have been added to `Select`.
968     ///
969     /// # Examples
970     ///
971     /// ```
972     /// use std::thread;
973     /// use std::time::Duration;
974     /// use crossbeam_channel::{unbounded, Select};
975     ///
976     /// let (s1, r1) = unbounded();
977     /// let (s2, r2) = unbounded();
978     ///
979     /// thread::spawn(move || {
980     ///     thread::sleep(Duration::from_secs(1));
981     ///     s1.send(10).unwrap();
982     /// });
983     /// thread::spawn(move || s2.send(20).unwrap());
984     ///
985     /// let mut sel = Select::new();
986     /// let oper1 = sel.recv(&r1);
987     /// let oper2 = sel.recv(&r2);
988     ///
989     /// // The second operation will be selected because it becomes ready first.
990     /// match sel.ready() {
991     ///     i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
992     ///     i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
993     ///     _ => unreachable!(),
994     /// }
995     /// ```
ready(&mut self) -> usize996     pub fn ready(&mut self) -> usize {
997         if self.handles.is_empty() {
998             panic!("no operations have been added to `Select`");
999         }
1000 
1001         run_ready(&mut self.handles, Timeout::Never).unwrap()
1002     }
1003 
1004     /// Blocks for a limited time until one of the operations becomes ready.
1005     ///
1006     /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1007     /// the same time, a random one among them is chosen. If none of the operations become ready
1008     /// for the specified duration, an error is returned.
1009     ///
1010     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1011     /// even when it will simply return an error because the channel is disconnected.
1012     ///
1013     /// Note that this method might return with success spuriously, so it's a good idea to double
1014     /// check if the operation is really ready.
1015     ///
1016     /// # Examples
1017     ///
1018     /// ```
1019     /// use std::thread;
1020     /// use std::time::Duration;
1021     /// use crossbeam_channel::{unbounded, Select};
1022     ///
1023     /// let (s1, r1) = unbounded();
1024     /// let (s2, r2) = unbounded();
1025     ///
1026     /// thread::spawn(move || {
1027     ///     thread::sleep(Duration::from_secs(1));
1028     ///     s1.send(10).unwrap();
1029     /// });
1030     /// thread::spawn(move || s2.send(20).unwrap());
1031     ///
1032     /// let mut sel = Select::new();
1033     /// let oper1 = sel.recv(&r1);
1034     /// let oper2 = sel.recv(&r2);
1035     ///
1036     /// // The second operation will be selected because it becomes ready first.
1037     /// match sel.ready_timeout(Duration::from_millis(500)) {
1038     ///     Err(_) => panic!("should not have timed out"),
1039     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1040     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1041     ///     Ok(_) => unreachable!(),
1042     /// }
1043     /// ```
ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError>1044     pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1045         self.ready_deadline(Instant::now() + timeout)
1046     }
1047 
1048     /// Blocks until a given deadline, or until one of the operations becomes ready.
1049     ///
1050     /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1051     /// the same time, a random one among them is chosen. If none of the operations become ready
1052     /// before the deadline, an error is returned.
1053     ///
1054     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1055     /// even when it will simply return an error because the channel is disconnected.
1056     ///
1057     /// Note that this method might return with success spuriously, so it's a good idea to double
1058     /// check if the operation is really ready.
1059     ///
1060     /// # Examples
1061     ///
1062     /// ```
1063     /// use std::thread;
1064     /// use std::time::{Duration, Instant};
1065     /// use crossbeam_channel::{unbounded, Select};
1066     ///
1067     /// let deadline = Instant::now() + Duration::from_millis(500);
1068     ///
1069     /// let (s1, r1) = unbounded();
1070     /// let (s2, r2) = unbounded();
1071     ///
1072     /// thread::spawn(move || {
1073     ///     thread::sleep(Duration::from_secs(1));
1074     ///     s1.send(10).unwrap();
1075     /// });
1076     /// thread::spawn(move || s2.send(20).unwrap());
1077     ///
1078     /// let mut sel = Select::new();
1079     /// let oper1 = sel.recv(&r1);
1080     /// let oper2 = sel.recv(&r2);
1081     ///
1082     /// // The second operation will be selected because it becomes ready first.
1083     /// match sel.ready_deadline(deadline) {
1084     ///     Err(_) => panic!("should not have timed out"),
1085     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1086     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1087     ///     Ok(_) => unreachable!(),
1088     /// }
1089     /// ```
ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError>1090     pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1091         match run_ready(&mut self.handles, Timeout::At(deadline)) {
1092             None => Err(ReadyTimeoutError),
1093             Some(index) => Ok(index),
1094         }
1095     }
1096 }
1097 
1098 impl<'a> Clone for Select<'a> {
clone(&self) -> Select<'a>1099     fn clone(&self) -> Select<'a> {
1100         Select {
1101             handles: self.handles.clone(),
1102             next_index: self.next_index,
1103         }
1104     }
1105 }
1106 
1107 impl<'a> Default for Select<'a> {
default() -> Select<'a>1108     fn default() -> Select<'a> {
1109         Select::new()
1110     }
1111 }
1112 
1113 impl fmt::Debug for Select<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1114     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1115         f.pad("Select { .. }")
1116     }
1117 }
1118 
1119 /// A selected operation that needs to be completed.
1120 ///
1121 /// To complete the operation, call [`send`] or [`recv`].
1122 ///
1123 /// # Panics
1124 ///
1125 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1126 /// `SelectedOperation` is dropped without completion, a panic occurs.
1127 ///
1128 /// [`send`]: SelectedOperation::send
1129 /// [`recv`]: SelectedOperation::recv
1130 #[must_use]
1131 pub struct SelectedOperation<'a> {
1132     /// Token needed to complete the operation.
1133     token: Token,
1134 
1135     /// The index of the selected operation.
1136     index: usize,
1137 
1138     /// The address of the selected `Sender` or `Receiver`.
1139     ptr: *const u8,
1140 
1141     /// Indicates that `Sender`s and `Receiver`s are borrowed.
1142     _marker: PhantomData<&'a ()>,
1143 }
1144 
1145 impl SelectedOperation<'_> {
1146     /// Returns the index of the selected operation.
1147     ///
1148     /// # Examples
1149     ///
1150     /// ```
1151     /// use crossbeam_channel::{bounded, Select};
1152     ///
1153     /// let (s1, r1) = bounded::<()>(0);
1154     /// let (s2, r2) = bounded::<()>(0);
1155     /// let (s3, r3) = bounded::<()>(1);
1156     ///
1157     /// let mut sel = Select::new();
1158     /// let oper1 = sel.send(&s1);
1159     /// let oper2 = sel.recv(&r2);
1160     /// let oper3 = sel.send(&s3);
1161     ///
1162     /// // Only the last operation is ready.
1163     /// let oper = sel.select();
1164     /// assert_eq!(oper.index(), 2);
1165     /// assert_eq!(oper.index(), oper3);
1166     ///
1167     /// // Complete the operation.
1168     /// oper.send(&s3, ()).unwrap();
1169     /// ```
index(&self) -> usize1170     pub fn index(&self) -> usize {
1171         self.index
1172     }
1173 
1174     /// Completes the send operation.
1175     ///
1176     /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1177     /// when the operation was added.
1178     ///
1179     /// # Panics
1180     ///
1181     /// Panics if an incorrect [`Sender`] reference is passed.
1182     ///
1183     /// # Examples
1184     ///
1185     /// ```
1186     /// use crossbeam_channel::{bounded, Select, SendError};
1187     ///
1188     /// let (s, r) = bounded::<i32>(0);
1189     /// drop(r);
1190     ///
1191     /// let mut sel = Select::new();
1192     /// let oper1 = sel.send(&s);
1193     ///
1194     /// let oper = sel.select();
1195     /// assert_eq!(oper.index(), oper1);
1196     /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1197     /// ```
send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>>1198     pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1199         assert!(
1200             s as *const Sender<T> as *const u8 == self.ptr,
1201             "passed a sender that wasn't selected",
1202         );
1203         let res = unsafe { channel::write(s, &mut self.token, msg) };
1204         mem::forget(self);
1205         res.map_err(SendError)
1206     }
1207 
1208     /// Completes the receive operation.
1209     ///
1210     /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1211     /// when the operation was added.
1212     ///
1213     /// # Panics
1214     ///
1215     /// Panics if an incorrect [`Receiver`] reference is passed.
1216     ///
1217     /// # Examples
1218     ///
1219     /// ```
1220     /// use crossbeam_channel::{bounded, Select, RecvError};
1221     ///
1222     /// let (s, r) = bounded::<i32>(0);
1223     /// drop(s);
1224     ///
1225     /// let mut sel = Select::new();
1226     /// let oper1 = sel.recv(&r);
1227     ///
1228     /// let oper = sel.select();
1229     /// assert_eq!(oper.index(), oper1);
1230     /// assert_eq!(oper.recv(&r), Err(RecvError));
1231     /// ```
recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError>1232     pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1233         assert!(
1234             r as *const Receiver<T> as *const u8 == self.ptr,
1235             "passed a receiver that wasn't selected",
1236         );
1237         let res = unsafe { channel::read(r, &mut self.token) };
1238         mem::forget(self);
1239         res.map_err(|_| RecvError)
1240     }
1241 }
1242 
1243 impl fmt::Debug for SelectedOperation<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1244     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1245         f.pad("SelectedOperation { .. }")
1246     }
1247 }
1248 
1249 impl Drop for SelectedOperation<'_> {
drop(&mut self)1250     fn drop(&mut self) {
1251         panic!("dropped `SelectedOperation` without completing the operation");
1252     }
1253 }
1254