1 //! Interface to the select mechanism.
2
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_utils::Backoff;
9
10 use crate::channel::{self, Receiver, Sender};
11 use crate::context::Context;
12 use crate::err::{ReadyTimeoutError, TryReadyError};
13 use crate::err::{RecvError, SendError};
14 use crate::err::{SelectTimeoutError, TrySelectError};
15 use crate::flavors;
16 use crate::utils;
17
18 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19 /// `read` or `write`.
20 ///
21 /// Each field contains data associated with a specific channel flavor.
22 // This is a private API that is used by the select macro.
23 #[derive(Debug, Default)]
24 pub struct Token {
25 pub at: flavors::at::AtToken,
26 pub array: flavors::array::ArrayToken,
27 pub list: flavors::list::ListToken,
28 pub never: flavors::never::NeverToken,
29 pub tick: flavors::tick::TickToken,
30 pub zero: flavors::zero::ZeroToken,
31 }
32
33 /// Identifier associated with an operation by a specific thread on a specific channel.
34 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
35 pub struct Operation(usize);
36
37 impl Operation {
38 /// Creates an operation identifier from a mutable reference.
39 ///
40 /// This function essentially just turns the address of the reference into a number. The
41 /// reference should point to a variable that is specific to the thread and the operation,
42 /// and is alive for the entire duration of select or blocking operation.
43 #[inline]
hook<T>(r: &mut T) -> Operation44 pub fn hook<T>(r: &mut T) -> Operation {
45 let val = r as *mut T as usize;
46 // Make sure that the pointer address doesn't equal the numerical representation of
47 // `Selected::{Waiting, Aborted, Disconnected}`.
48 assert!(val > 2);
49 Operation(val)
50 }
51 }
52
53 /// Current state of a select or a blocking operation.
54 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
55 pub enum Selected {
56 /// Still waiting for an operation.
57 Waiting,
58
59 /// The attempt to block the current thread has been aborted.
60 Aborted,
61
62 /// An operation became ready because a channel is disconnected.
63 Disconnected,
64
65 /// An operation became ready because a message can be sent or received.
66 Operation(Operation),
67 }
68
69 impl From<usize> for Selected {
70 #[inline]
from(val: usize) -> Selected71 fn from(val: usize) -> Selected {
72 match val {
73 0 => Selected::Waiting,
74 1 => Selected::Aborted,
75 2 => Selected::Disconnected,
76 oper => Selected::Operation(Operation(oper)),
77 }
78 }
79 }
80
81 impl Into<usize> for Selected {
82 #[inline]
into(self) -> usize83 fn into(self) -> usize {
84 match self {
85 Selected::Waiting => 0,
86 Selected::Aborted => 1,
87 Selected::Disconnected => 2,
88 Selected::Operation(Operation(val)) => val,
89 }
90 }
91 }
92
93 /// A receiver or a sender that can participate in select.
94 ///
95 /// This is a handle that assists select in executing an operation, registration, deciding on the
96 /// appropriate deadline for blocking, etc.
97 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
98 pub trait SelectHandle {
99 /// Attempts to select an operation and returns `true` on success.
try_select(&self, token: &mut Token) -> bool100 fn try_select(&self, token: &mut Token) -> bool;
101
102 /// Returns a deadline for an operation, if there is one.
deadline(&self) -> Option<Instant>103 fn deadline(&self) -> Option<Instant>;
104
105 /// Registers an operation for execution and returns `true` if it is now ready.
register(&self, oper: Operation, cx: &Context) -> bool106 fn register(&self, oper: Operation, cx: &Context) -> bool;
107
108 /// Unregisters an operation for execution.
unregister(&self, oper: Operation)109 fn unregister(&self, oper: Operation);
110
111 /// Attempts to select an operation the thread got woken up for and returns `true` on success.
accept(&self, token: &mut Token, cx: &Context) -> bool112 fn accept(&self, token: &mut Token, cx: &Context) -> bool;
113
114 /// Returns `true` if an operation can be executed without blocking.
is_ready(&self) -> bool115 fn is_ready(&self) -> bool;
116
117 /// Registers an operation for readiness notification and returns `true` if it is now ready.
watch(&self, oper: Operation, cx: &Context) -> bool118 fn watch(&self, oper: Operation, cx: &Context) -> bool;
119
120 /// Unregisters an operation for readiness notification.
unwatch(&self, oper: Operation)121 fn unwatch(&self, oper: Operation);
122 }
123
124 impl<T: SelectHandle> SelectHandle for &T {
try_select(&self, token: &mut Token) -> bool125 fn try_select(&self, token: &mut Token) -> bool {
126 (**self).try_select(token)
127 }
128
deadline(&self) -> Option<Instant>129 fn deadline(&self) -> Option<Instant> {
130 (**self).deadline()
131 }
132
register(&self, oper: Operation, cx: &Context) -> bool133 fn register(&self, oper: Operation, cx: &Context) -> bool {
134 (**self).register(oper, cx)
135 }
136
unregister(&self, oper: Operation)137 fn unregister(&self, oper: Operation) {
138 (**self).unregister(oper);
139 }
140
accept(&self, token: &mut Token, cx: &Context) -> bool141 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
142 (**self).accept(token, cx)
143 }
144
is_ready(&self) -> bool145 fn is_ready(&self) -> bool {
146 (**self).is_ready()
147 }
148
watch(&self, oper: Operation, cx: &Context) -> bool149 fn watch(&self, oper: Operation, cx: &Context) -> bool {
150 (**self).watch(oper, cx)
151 }
152
unwatch(&self, oper: Operation)153 fn unwatch(&self, oper: Operation) {
154 (**self).unwatch(oper)
155 }
156 }
157
158 /// Determines when a select operation should time out.
159 #[derive(Clone, Copy, Eq, PartialEq)]
160 enum Timeout {
161 /// No blocking.
162 Now,
163
164 /// Block forever.
165 Never,
166
167 /// Time out after the time instant.
168 At(Instant),
169 }
170
171 /// Runs until one of the operations is selected, potentially blocking the current thread.
172 ///
173 /// Successful receive operations will have to be followed up by `channel::read()` and successful
174 /// send operations by `channel::write()`.
run_select( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<(Token, usize, *const u8)>175 fn run_select(
176 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
177 timeout: Timeout,
178 ) -> Option<(Token, usize, *const u8)> {
179 if handles.is_empty() {
180 // Wait until the timeout and return.
181 match timeout {
182 Timeout::Now => return None,
183 Timeout::Never => {
184 utils::sleep_until(None);
185 unreachable!();
186 }
187 Timeout::At(when) => {
188 utils::sleep_until(Some(when));
189 return None;
190 }
191 }
192 }
193
194 // Shuffle the operations for fairness.
195 utils::shuffle(handles);
196
197 // Create a token, which serves as a temporary variable that gets initialized in this function
198 // and is later used by a call to `channel::read()` or `channel::write()` that completes the
199 // selected operation.
200 let mut token = Token::default();
201
202 // Try selecting one of the operations without blocking.
203 for &(handle, i, ptr) in handles.iter() {
204 if handle.try_select(&mut token) {
205 return Some((token, i, ptr));
206 }
207 }
208
209 loop {
210 // Prepare for blocking.
211 let res = Context::with(|cx| {
212 let mut sel = Selected::Waiting;
213 let mut registered_count = 0;
214 let mut index_ready = None;
215
216 if let Timeout::Now = timeout {
217 cx.try_select(Selected::Aborted).unwrap();
218 }
219
220 // Register all operations.
221 for (handle, i, _) in handles.iter_mut() {
222 registered_count += 1;
223
224 // If registration returns `false`, that means the operation has just become ready.
225 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
226 // Try aborting select.
227 sel = match cx.try_select(Selected::Aborted) {
228 Ok(()) => {
229 index_ready = Some(*i);
230 Selected::Aborted
231 }
232 Err(s) => s,
233 };
234 break;
235 }
236
237 // If another thread has already selected one of the operations, stop registration.
238 sel = cx.selected();
239 if sel != Selected::Waiting {
240 break;
241 }
242 }
243
244 if sel == Selected::Waiting {
245 // Check with each operation for how long we're allowed to block, and compute the
246 // earliest deadline.
247 let mut deadline: Option<Instant> = match timeout {
248 Timeout::Now => return None,
249 Timeout::Never => None,
250 Timeout::At(when) => Some(when),
251 };
252 for &(handle, _, _) in handles.iter() {
253 if let Some(x) = handle.deadline() {
254 deadline = deadline.map(|y| x.min(y)).or(Some(x));
255 }
256 }
257
258 // Block the current thread.
259 sel = cx.wait_until(deadline);
260 }
261
262 // Unregister all registered operations.
263 for (handle, _, _) in handles.iter_mut().take(registered_count) {
264 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
265 }
266
267 match sel {
268 Selected::Waiting => unreachable!(),
269 Selected::Aborted => {
270 // If an operation became ready during registration, try selecting it.
271 if let Some(index_ready) = index_ready {
272 for &(handle, i, ptr) in handles.iter() {
273 if i == index_ready && handle.try_select(&mut token) {
274 return Some((i, ptr));
275 }
276 }
277 }
278 }
279 Selected::Disconnected => {}
280 Selected::Operation(_) => {
281 // Find the selected operation.
282 for (handle, i, ptr) in handles.iter_mut() {
283 // Is this the selected operation?
284 if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
285 {
286 // Try selecting this operation.
287 if handle.accept(&mut token, cx) {
288 return Some((*i, *ptr));
289 }
290 }
291 }
292 }
293 }
294
295 None
296 });
297
298 // Return if an operation was selected.
299 if let Some((i, ptr)) = res {
300 return Some((token, i, ptr));
301 }
302
303 // Try selecting one of the operations without blocking.
304 for &(handle, i, ptr) in handles.iter() {
305 if handle.try_select(&mut token) {
306 return Some((token, i, ptr));
307 }
308 }
309
310 match timeout {
311 Timeout::Now => return None,
312 Timeout::Never => {}
313 Timeout::At(when) => {
314 if Instant::now() >= when {
315 return None;
316 }
317 }
318 }
319 }
320 }
321
322 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
run_ready( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, ) -> Option<usize>323 fn run_ready(
324 handles: &mut [(&dyn SelectHandle, usize, *const u8)],
325 timeout: Timeout,
326 ) -> Option<usize> {
327 if handles.is_empty() {
328 // Wait until the timeout and return.
329 match timeout {
330 Timeout::Now => return None,
331 Timeout::Never => {
332 utils::sleep_until(None);
333 unreachable!();
334 }
335 Timeout::At(when) => {
336 utils::sleep_until(Some(when));
337 return None;
338 }
339 }
340 }
341
342 // Shuffle the operations for fairness.
343 utils::shuffle(handles);
344
345 loop {
346 let backoff = Backoff::new();
347 loop {
348 // Check operations for readiness.
349 for &(handle, i, _) in handles.iter() {
350 if handle.is_ready() {
351 return Some(i);
352 }
353 }
354
355 if backoff.is_completed() {
356 break;
357 } else {
358 backoff.snooze();
359 }
360 }
361
362 // Check for timeout.
363 match timeout {
364 Timeout::Now => return None,
365 Timeout::Never => {}
366 Timeout::At(when) => {
367 if Instant::now() >= when {
368 return None;
369 }
370 }
371 }
372
373 // Prepare for blocking.
374 let res = Context::with(|cx| {
375 let mut sel = Selected::Waiting;
376 let mut registered_count = 0;
377
378 // Begin watching all operations.
379 for (handle, _, _) in handles.iter_mut() {
380 registered_count += 1;
381 let oper = Operation::hook::<&dyn SelectHandle>(handle);
382
383 // If registration returns `false`, that means the operation has just become ready.
384 if handle.watch(oper, cx) {
385 sel = match cx.try_select(Selected::Operation(oper)) {
386 Ok(()) => Selected::Operation(oper),
387 Err(s) => s,
388 };
389 break;
390 }
391
392 // If another thread has already chosen one of the operations, stop registration.
393 sel = cx.selected();
394 if sel != Selected::Waiting {
395 break;
396 }
397 }
398
399 if sel == Selected::Waiting {
400 // Check with each operation for how long we're allowed to block, and compute the
401 // earliest deadline.
402 let mut deadline: Option<Instant> = match timeout {
403 Timeout::Now => unreachable!(),
404 Timeout::Never => None,
405 Timeout::At(when) => Some(when),
406 };
407 for &(handle, _, _) in handles.iter() {
408 if let Some(x) = handle.deadline() {
409 deadline = deadline.map(|y| x.min(y)).or(Some(x));
410 }
411 }
412
413 // Block the current thread.
414 sel = cx.wait_until(deadline);
415 }
416
417 // Unwatch all operations.
418 for (handle, _, _) in handles.iter_mut().take(registered_count) {
419 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
420 }
421
422 match sel {
423 Selected::Waiting => unreachable!(),
424 Selected::Aborted => {}
425 Selected::Disconnected => {}
426 Selected::Operation(_) => {
427 for (handle, i, _) in handles.iter_mut() {
428 let oper = Operation::hook::<&dyn SelectHandle>(handle);
429 if sel == Selected::Operation(oper) {
430 return Some(*i);
431 }
432 }
433 }
434 }
435
436 None
437 });
438
439 // Return if an operation became ready.
440 if res.is_some() {
441 return res;
442 }
443 }
444 }
445
446 /// Attempts to select one of the operations without blocking.
447 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
448 #[inline]
try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> Result<SelectedOperation<'a>, TrySelectError>449 pub fn try_select<'a>(
450 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
451 ) -> Result<SelectedOperation<'a>, TrySelectError> {
452 match run_select(handles, Timeout::Now) {
453 None => Err(TrySelectError),
454 Some((token, index, ptr)) => Ok(SelectedOperation {
455 token,
456 index,
457 ptr,
458 _marker: PhantomData,
459 }),
460 }
461 }
462
463 /// Blocks until one of the operations becomes ready and selects it.
464 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
465 #[inline]
select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], ) -> SelectedOperation<'a>466 pub fn select<'a>(
467 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
468 ) -> SelectedOperation<'a> {
469 if handles.is_empty() {
470 panic!("no operations have been added to `Select`");
471 }
472
473 let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
474 SelectedOperation {
475 token,
476 index,
477 ptr,
478 _marker: PhantomData,
479 }
480 }
481
482 /// Blocks for a limited time until one of the operations becomes ready and selects it.
483 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
484 #[inline]
select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>485 pub fn select_timeout<'a>(
486 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
487 timeout: Duration,
488 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
489 select_deadline(handles, Instant::now() + timeout)
490 }
491
492 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
493 #[inline]
select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>494 pub(crate) fn select_deadline<'a>(
495 handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
496 deadline: Instant,
497 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
498 match run_select(handles, Timeout::At(deadline)) {
499 None => Err(SelectTimeoutError),
500 Some((token, index, ptr)) => Ok(SelectedOperation {
501 token,
502 index,
503 ptr,
504 _marker: PhantomData,
505 }),
506 }
507 }
508
509 /// Selects from a set of channel operations.
510 ///
511 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
512 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
513 /// among them is selected.
514 ///
515 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
516 /// when it will simply return an error because the channel is disconnected.
517 ///
518 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
519 /// dynamically created list of channel operations.
520 ///
521 /// Once a list of operations has been built with `Select`, there are two different ways of
522 /// proceeding:
523 ///
524 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
525 /// the returned selected operation has already begun and **must** be completed. If we don't
526 /// complete it, a panic will occur.
527 ///
528 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
529 /// successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
530 /// possible for another thread to make the operation not ready just before we try executing it,
531 /// so it's wise to use a retry loop. However, note that these methods might return with success
532 /// spuriously, so it's a good idea to always double check if the operation is really ready.
533 ///
534 /// # Examples
535 ///
536 /// Use [`select`] to receive a message from a list of receivers:
537 ///
538 /// ```
539 /// use crossbeam_channel::{Receiver, RecvError, Select};
540 ///
541 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
542 /// // Build a list of operations.
543 /// let mut sel = Select::new();
544 /// for r in rs {
545 /// sel.recv(r);
546 /// }
547 ///
548 /// // Complete the selected operation.
549 /// let oper = sel.select();
550 /// let index = oper.index();
551 /// oper.recv(&rs[index])
552 /// }
553 /// ```
554 ///
555 /// Use [`ready`] to receive a message from a list of receivers:
556 ///
557 /// ```
558 /// use crossbeam_channel::{Receiver, RecvError, Select};
559 ///
560 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
561 /// // Build a list of operations.
562 /// let mut sel = Select::new();
563 /// for r in rs {
564 /// sel.recv(r);
565 /// }
566 ///
567 /// loop {
568 /// // Wait until a receive operation becomes ready and try executing it.
569 /// let index = sel.ready();
570 /// let res = rs[index].try_recv();
571 ///
572 /// // If the operation turns out not to be ready, retry.
573 /// if let Err(e) = res {
574 /// if e.is_empty() {
575 /// continue;
576 /// }
577 /// }
578 ///
579 /// // Success!
580 /// return res.map_err(|_| RecvError);
581 /// }
582 /// }
583 /// ```
584 ///
585 /// [`try_select`]: Select::try_select
586 /// [`select`]: Select::select
587 /// [`select_timeout`]: Select::select_timeout
588 /// [`try_ready`]: Select::try_ready
589 /// [`ready`]: Select::ready
590 /// [`ready_timeout`]: Select::ready_timeout
591 pub struct Select<'a> {
592 /// A list of senders and receivers participating in selection.
593 handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
594
595 /// The next index to assign to an operation.
596 next_index: usize,
597 }
598
599 unsafe impl Send for Select<'_> {}
600 unsafe impl Sync for Select<'_> {}
601
602 impl<'a> Select<'a> {
603 /// Creates an empty list of channel operations for selection.
604 ///
605 /// # Examples
606 ///
607 /// ```
608 /// use crossbeam_channel::Select;
609 ///
610 /// let mut sel = Select::new();
611 ///
612 /// // The list of operations is empty, which means no operation can be selected.
613 /// assert!(sel.try_select().is_err());
614 /// ```
new() -> Select<'a>615 pub fn new() -> Select<'a> {
616 Select {
617 handles: Vec::with_capacity(4),
618 next_index: 0,
619 }
620 }
621
622 /// Adds a send operation.
623 ///
624 /// Returns the index of the added operation.
625 ///
626 /// # Examples
627 ///
628 /// ```
629 /// use crossbeam_channel::{unbounded, Select};
630 ///
631 /// let (s, r) = unbounded::<i32>();
632 ///
633 /// let mut sel = Select::new();
634 /// let index = sel.send(&s);
635 /// ```
send<T>(&mut self, s: &'a Sender<T>) -> usize636 pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
637 let i = self.next_index;
638 let ptr = s as *const Sender<_> as *const u8;
639 self.handles.push((s, i, ptr));
640 self.next_index += 1;
641 i
642 }
643
644 /// Adds a receive operation.
645 ///
646 /// Returns the index of the added operation.
647 ///
648 /// # Examples
649 ///
650 /// ```
651 /// use crossbeam_channel::{unbounded, Select};
652 ///
653 /// let (s, r) = unbounded::<i32>();
654 ///
655 /// let mut sel = Select::new();
656 /// let index = sel.recv(&r);
657 /// ```
recv<T>(&mut self, r: &'a Receiver<T>) -> usize658 pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
659 let i = self.next_index;
660 let ptr = r as *const Receiver<_> as *const u8;
661 self.handles.push((r, i, ptr));
662 self.next_index += 1;
663 i
664 }
665
666 /// Removes a previously added operation.
667 ///
668 /// This is useful when an operation is selected because the channel got disconnected and we
669 /// want to try again to select a different operation instead.
670 ///
671 /// If new operations are added after removing some, the indices of removed operations will not
672 /// be reused.
673 ///
674 /// # Panics
675 ///
676 /// An attempt to remove a non-existing or already removed operation will panic.
677 ///
678 /// # Examples
679 ///
680 /// ```
681 /// use crossbeam_channel::{unbounded, Select};
682 ///
683 /// let (s1, r1) = unbounded::<i32>();
684 /// let (_, r2) = unbounded::<i32>();
685 ///
686 /// let mut sel = Select::new();
687 /// let oper1 = sel.recv(&r1);
688 /// let oper2 = sel.recv(&r2);
689 ///
690 /// // Both operations are initially ready, so a random one will be executed.
691 /// let oper = sel.select();
692 /// assert_eq!(oper.index(), oper2);
693 /// assert!(oper.recv(&r2).is_err());
694 /// sel.remove(oper2);
695 ///
696 /// s1.send(10).unwrap();
697 ///
698 /// let oper = sel.select();
699 /// assert_eq!(oper.index(), oper1);
700 /// assert_eq!(oper.recv(&r1), Ok(10));
701 /// ```
remove(&mut self, index: usize)702 pub fn remove(&mut self, index: usize) {
703 assert!(
704 index < self.next_index,
705 "index out of bounds; {} >= {}",
706 index,
707 self.next_index,
708 );
709
710 let i = self
711 .handles
712 .iter()
713 .enumerate()
714 .find(|(_, (_, i, _))| *i == index)
715 .expect("no operation with this index")
716 .0;
717
718 self.handles.swap_remove(i);
719 }
720
721 /// Attempts to select one of the operations without blocking.
722 ///
723 /// If an operation is ready, it is selected and returned. If multiple operations are ready at
724 /// the same time, a random one among them is selected. If none of the operations are ready, an
725 /// error is returned.
726 ///
727 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
728 /// even when it will simply return an error because the channel is disconnected.
729 ///
730 /// The selected operation must be completed with [`SelectedOperation::send`]
731 /// or [`SelectedOperation::recv`].
732 ///
733 /// # Examples
734 ///
735 /// ```
736 /// use crossbeam_channel::{unbounded, Select};
737 ///
738 /// let (s1, r1) = unbounded();
739 /// let (s2, r2) = unbounded();
740 ///
741 /// s1.send(10).unwrap();
742 /// s2.send(20).unwrap();
743 ///
744 /// let mut sel = Select::new();
745 /// let oper1 = sel.recv(&r1);
746 /// let oper2 = sel.recv(&r2);
747 ///
748 /// // Both operations are initially ready, so a random one will be executed.
749 /// let oper = sel.try_select();
750 /// match oper {
751 /// Err(_) => panic!("both operations should be ready"),
752 /// Ok(oper) => match oper.index() {
753 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
754 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
755 /// _ => unreachable!(),
756 /// }
757 /// }
758 /// ```
try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError>759 pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
760 try_select(&mut self.handles)
761 }
762
763 /// Blocks until one of the operations becomes ready and selects it.
764 ///
765 /// Once an operation becomes ready, it is selected and returned. If multiple operations are
766 /// ready at the same time, a random one among them is selected.
767 ///
768 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
769 /// even when it will simply return an error because the channel is disconnected.
770 ///
771 /// The selected operation must be completed with [`SelectedOperation::send`]
772 /// or [`SelectedOperation::recv`].
773 ///
774 /// # Panics
775 ///
776 /// Panics if no operations have been added to `Select`.
777 ///
778 /// # Examples
779 ///
780 /// ```
781 /// use std::thread;
782 /// use std::time::Duration;
783 /// use crossbeam_channel::{unbounded, Select};
784 ///
785 /// let (s1, r1) = unbounded();
786 /// let (s2, r2) = unbounded();
787 ///
788 /// thread::spawn(move || {
789 /// thread::sleep(Duration::from_secs(1));
790 /// s1.send(10).unwrap();
791 /// });
792 /// thread::spawn(move || s2.send(20).unwrap());
793 ///
794 /// let mut sel = Select::new();
795 /// let oper1 = sel.recv(&r1);
796 /// let oper2 = sel.recv(&r2);
797 ///
798 /// // The second operation will be selected because it becomes ready first.
799 /// let oper = sel.select();
800 /// match oper.index() {
801 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
802 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
803 /// _ => unreachable!(),
804 /// }
805 /// ```
select(&mut self) -> SelectedOperation<'a>806 pub fn select(&mut self) -> SelectedOperation<'a> {
807 select(&mut self.handles)
808 }
809
810 /// Blocks for a limited time until one of the operations becomes ready and selects it.
811 ///
812 /// If an operation becomes ready, it is selected and returned. If multiple operations are
813 /// ready at the same time, a random one among them is selected. If none of the operations
814 /// become ready for the specified duration, an error is returned.
815 ///
816 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
817 /// even when it will simply return an error because the channel is disconnected.
818 ///
819 /// The selected operation must be completed with [`SelectedOperation::send`]
820 /// or [`SelectedOperation::recv`].
821 ///
822 /// # Examples
823 ///
824 /// ```
825 /// use std::thread;
826 /// use std::time::Duration;
827 /// use crossbeam_channel::{unbounded, Select};
828 ///
829 /// let (s1, r1) = unbounded();
830 /// let (s2, r2) = unbounded();
831 ///
832 /// thread::spawn(move || {
833 /// thread::sleep(Duration::from_secs(1));
834 /// s1.send(10).unwrap();
835 /// });
836 /// thread::spawn(move || s2.send(20).unwrap());
837 ///
838 /// let mut sel = Select::new();
839 /// let oper1 = sel.recv(&r1);
840 /// let oper2 = sel.recv(&r2);
841 ///
842 /// // The second operation will be selected because it becomes ready first.
843 /// let oper = sel.select_timeout(Duration::from_millis(500));
844 /// match oper {
845 /// Err(_) => panic!("should not have timed out"),
846 /// Ok(oper) => match oper.index() {
847 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
848 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
849 /// _ => unreachable!(),
850 /// }
851 /// }
852 /// ```
select_timeout( &mut self, timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>853 pub fn select_timeout(
854 &mut self,
855 timeout: Duration,
856 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
857 select_timeout(&mut self.handles, timeout)
858 }
859
860 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
861 ///
862 /// If an operation becomes ready, it is selected and returned. If multiple operations are
863 /// ready at the same time, a random one among them is selected. If none of the operations
864 /// become ready before the given deadline, an error is returned.
865 ///
866 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
867 /// even when it will simply return an error because the channel is disconnected.
868 ///
869 /// The selected operation must be completed with [`SelectedOperation::send`]
870 /// or [`SelectedOperation::recv`].
871 ///
872 /// # Examples
873 ///
874 /// ```
875 /// use std::thread;
876 /// use std::time::{Instant, Duration};
877 /// use crossbeam_channel::{unbounded, Select};
878 ///
879 /// let (s1, r1) = unbounded();
880 /// let (s2, r2) = unbounded();
881 ///
882 /// thread::spawn(move || {
883 /// thread::sleep(Duration::from_secs(1));
884 /// s1.send(10).unwrap();
885 /// });
886 /// thread::spawn(move || s2.send(20).unwrap());
887 ///
888 /// let mut sel = Select::new();
889 /// let oper1 = sel.recv(&r1);
890 /// let oper2 = sel.recv(&r2);
891 ///
892 /// let deadline = Instant::now() + Duration::from_millis(500);
893 ///
894 /// // The second operation will be selected because it becomes ready first.
895 /// let oper = sel.select_deadline(deadline);
896 /// match oper {
897 /// Err(_) => panic!("should not have timed out"),
898 /// Ok(oper) => match oper.index() {
899 /// i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
900 /// i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
901 /// _ => unreachable!(),
902 /// }
903 /// }
904 /// ```
select_deadline( &mut self, deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>905 pub fn select_deadline(
906 &mut self,
907 deadline: Instant,
908 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
909 select_deadline(&mut self.handles, deadline)
910 }
911
912 /// Attempts to find a ready operation without blocking.
913 ///
914 /// If an operation is ready, its index is returned. If multiple operations are ready at the
915 /// same time, a random one among them is chosen. If none of the operations are ready, an error
916 /// is returned.
917 ///
918 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
919 /// even when it will simply return an error because the channel is disconnected.
920 ///
921 /// Note that this method might return with success spuriously, so it's a good idea to always
922 /// double check if the operation is really ready.
923 ///
924 /// # Examples
925 ///
926 /// ```
927 /// use crossbeam_channel::{unbounded, Select};
928 ///
929 /// let (s1, r1) = unbounded();
930 /// let (s2, r2) = unbounded();
931 ///
932 /// s1.send(10).unwrap();
933 /// s2.send(20).unwrap();
934 ///
935 /// let mut sel = Select::new();
936 /// let oper1 = sel.recv(&r1);
937 /// let oper2 = sel.recv(&r2);
938 ///
939 /// // Both operations are initially ready, so a random one will be chosen.
940 /// match sel.try_ready() {
941 /// Err(_) => panic!("both operations should be ready"),
942 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
943 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
944 /// Ok(_) => unreachable!(),
945 /// }
946 /// ```
try_ready(&mut self) -> Result<usize, TryReadyError>947 pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
948 match run_ready(&mut self.handles, Timeout::Now) {
949 None => Err(TryReadyError),
950 Some(index) => Ok(index),
951 }
952 }
953
954 /// Blocks until one of the operations becomes ready.
955 ///
956 /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
957 /// the same time, a random one among them is chosen.
958 ///
959 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
960 /// even when it will simply return an error because the channel is disconnected.
961 ///
962 /// Note that this method might return with success spuriously, so it's a good idea to always
963 /// double check if the operation is really ready.
964 ///
965 /// # Panics
966 ///
967 /// Panics if no operations have been added to `Select`.
968 ///
969 /// # Examples
970 ///
971 /// ```
972 /// use std::thread;
973 /// use std::time::Duration;
974 /// use crossbeam_channel::{unbounded, Select};
975 ///
976 /// let (s1, r1) = unbounded();
977 /// let (s2, r2) = unbounded();
978 ///
979 /// thread::spawn(move || {
980 /// thread::sleep(Duration::from_secs(1));
981 /// s1.send(10).unwrap();
982 /// });
983 /// thread::spawn(move || s2.send(20).unwrap());
984 ///
985 /// let mut sel = Select::new();
986 /// let oper1 = sel.recv(&r1);
987 /// let oper2 = sel.recv(&r2);
988 ///
989 /// // The second operation will be selected because it becomes ready first.
990 /// match sel.ready() {
991 /// i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
992 /// i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
993 /// _ => unreachable!(),
994 /// }
995 /// ```
ready(&mut self) -> usize996 pub fn ready(&mut self) -> usize {
997 if self.handles.is_empty() {
998 panic!("no operations have been added to `Select`");
999 }
1000
1001 run_ready(&mut self.handles, Timeout::Never).unwrap()
1002 }
1003
1004 /// Blocks for a limited time until one of the operations becomes ready.
1005 ///
1006 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1007 /// the same time, a random one among them is chosen. If none of the operations become ready
1008 /// for the specified duration, an error is returned.
1009 ///
1010 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1011 /// even when it will simply return an error because the channel is disconnected.
1012 ///
1013 /// Note that this method might return with success spuriously, so it's a good idea to double
1014 /// check if the operation is really ready.
1015 ///
1016 /// # Examples
1017 ///
1018 /// ```
1019 /// use std::thread;
1020 /// use std::time::Duration;
1021 /// use crossbeam_channel::{unbounded, Select};
1022 ///
1023 /// let (s1, r1) = unbounded();
1024 /// let (s2, r2) = unbounded();
1025 ///
1026 /// thread::spawn(move || {
1027 /// thread::sleep(Duration::from_secs(1));
1028 /// s1.send(10).unwrap();
1029 /// });
1030 /// thread::spawn(move || s2.send(20).unwrap());
1031 ///
1032 /// let mut sel = Select::new();
1033 /// let oper1 = sel.recv(&r1);
1034 /// let oper2 = sel.recv(&r2);
1035 ///
1036 /// // The second operation will be selected because it becomes ready first.
1037 /// match sel.ready_timeout(Duration::from_millis(500)) {
1038 /// Err(_) => panic!("should not have timed out"),
1039 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1040 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1041 /// Ok(_) => unreachable!(),
1042 /// }
1043 /// ```
ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError>1044 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1045 self.ready_deadline(Instant::now() + timeout)
1046 }
1047
1048 /// Blocks until a given deadline, or until one of the operations becomes ready.
1049 ///
1050 /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1051 /// the same time, a random one among them is chosen. If none of the operations become ready
1052 /// before the deadline, an error is returned.
1053 ///
1054 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1055 /// even when it will simply return an error because the channel is disconnected.
1056 ///
1057 /// Note that this method might return with success spuriously, so it's a good idea to double
1058 /// check if the operation is really ready.
1059 ///
1060 /// # Examples
1061 ///
1062 /// ```
1063 /// use std::thread;
1064 /// use std::time::{Duration, Instant};
1065 /// use crossbeam_channel::{unbounded, Select};
1066 ///
1067 /// let deadline = Instant::now() + Duration::from_millis(500);
1068 ///
1069 /// let (s1, r1) = unbounded();
1070 /// let (s2, r2) = unbounded();
1071 ///
1072 /// thread::spawn(move || {
1073 /// thread::sleep(Duration::from_secs(1));
1074 /// s1.send(10).unwrap();
1075 /// });
1076 /// thread::spawn(move || s2.send(20).unwrap());
1077 ///
1078 /// let mut sel = Select::new();
1079 /// let oper1 = sel.recv(&r1);
1080 /// let oper2 = sel.recv(&r2);
1081 ///
1082 /// // The second operation will be selected because it becomes ready first.
1083 /// match sel.ready_deadline(deadline) {
1084 /// Err(_) => panic!("should not have timed out"),
1085 /// Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1086 /// Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1087 /// Ok(_) => unreachable!(),
1088 /// }
1089 /// ```
ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError>1090 pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1091 match run_ready(&mut self.handles, Timeout::At(deadline)) {
1092 None => Err(ReadyTimeoutError),
1093 Some(index) => Ok(index),
1094 }
1095 }
1096 }
1097
1098 impl<'a> Clone for Select<'a> {
clone(&self) -> Select<'a>1099 fn clone(&self) -> Select<'a> {
1100 Select {
1101 handles: self.handles.clone(),
1102 next_index: self.next_index,
1103 }
1104 }
1105 }
1106
1107 impl<'a> Default for Select<'a> {
default() -> Select<'a>1108 fn default() -> Select<'a> {
1109 Select::new()
1110 }
1111 }
1112
1113 impl fmt::Debug for Select<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1115 f.pad("Select { .. }")
1116 }
1117 }
1118
1119 /// A selected operation that needs to be completed.
1120 ///
1121 /// To complete the operation, call [`send`] or [`recv`].
1122 ///
1123 /// # Panics
1124 ///
1125 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1126 /// `SelectedOperation` is dropped without completion, a panic occurs.
1127 ///
1128 /// [`send`]: SelectedOperation::send
1129 /// [`recv`]: SelectedOperation::recv
1130 #[must_use]
1131 pub struct SelectedOperation<'a> {
1132 /// Token needed to complete the operation.
1133 token: Token,
1134
1135 /// The index of the selected operation.
1136 index: usize,
1137
1138 /// The address of the selected `Sender` or `Receiver`.
1139 ptr: *const u8,
1140
1141 /// Indicates that `Sender`s and `Receiver`s are borrowed.
1142 _marker: PhantomData<&'a ()>,
1143 }
1144
1145 impl SelectedOperation<'_> {
1146 /// Returns the index of the selected operation.
1147 ///
1148 /// # Examples
1149 ///
1150 /// ```
1151 /// use crossbeam_channel::{bounded, Select};
1152 ///
1153 /// let (s1, r1) = bounded::<()>(0);
1154 /// let (s2, r2) = bounded::<()>(0);
1155 /// let (s3, r3) = bounded::<()>(1);
1156 ///
1157 /// let mut sel = Select::new();
1158 /// let oper1 = sel.send(&s1);
1159 /// let oper2 = sel.recv(&r2);
1160 /// let oper3 = sel.send(&s3);
1161 ///
1162 /// // Only the last operation is ready.
1163 /// let oper = sel.select();
1164 /// assert_eq!(oper.index(), 2);
1165 /// assert_eq!(oper.index(), oper3);
1166 ///
1167 /// // Complete the operation.
1168 /// oper.send(&s3, ()).unwrap();
1169 /// ```
index(&self) -> usize1170 pub fn index(&self) -> usize {
1171 self.index
1172 }
1173
1174 /// Completes the send operation.
1175 ///
1176 /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1177 /// when the operation was added.
1178 ///
1179 /// # Panics
1180 ///
1181 /// Panics if an incorrect [`Sender`] reference is passed.
1182 ///
1183 /// # Examples
1184 ///
1185 /// ```
1186 /// use crossbeam_channel::{bounded, Select, SendError};
1187 ///
1188 /// let (s, r) = bounded::<i32>(0);
1189 /// drop(r);
1190 ///
1191 /// let mut sel = Select::new();
1192 /// let oper1 = sel.send(&s);
1193 ///
1194 /// let oper = sel.select();
1195 /// assert_eq!(oper.index(), oper1);
1196 /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1197 /// ```
send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>>1198 pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1199 assert!(
1200 s as *const Sender<T> as *const u8 == self.ptr,
1201 "passed a sender that wasn't selected",
1202 );
1203 let res = unsafe { channel::write(s, &mut self.token, msg) };
1204 mem::forget(self);
1205 res.map_err(SendError)
1206 }
1207
1208 /// Completes the receive operation.
1209 ///
1210 /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1211 /// when the operation was added.
1212 ///
1213 /// # Panics
1214 ///
1215 /// Panics if an incorrect [`Receiver`] reference is passed.
1216 ///
1217 /// # Examples
1218 ///
1219 /// ```
1220 /// use crossbeam_channel::{bounded, Select, RecvError};
1221 ///
1222 /// let (s, r) = bounded::<i32>(0);
1223 /// drop(s);
1224 ///
1225 /// let mut sel = Select::new();
1226 /// let oper1 = sel.recv(&r);
1227 ///
1228 /// let oper = sel.select();
1229 /// assert_eq!(oper.index(), oper1);
1230 /// assert_eq!(oper.recv(&r), Err(RecvError));
1231 /// ```
recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError>1232 pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1233 assert!(
1234 r as *const Receiver<T> as *const u8 == self.ptr,
1235 "passed a receiver that wasn't selected",
1236 );
1237 let res = unsafe { channel::read(r, &mut self.token) };
1238 mem::forget(self);
1239 res.map_err(|_| RecvError)
1240 }
1241 }
1242
1243 impl fmt::Debug for SelectedOperation<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1244 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1245 f.pad("SelectedOperation { .. }")
1246 }
1247 }
1248
1249 impl Drop for SelectedOperation<'_> {
drop(&mut self)1250 fn drop(&mut self) {
1251 panic!("dropped `SelectedOperation` without completing the operation");
1252 }
1253 }
1254