• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Unbounded channel implemented as a linked list.
2 
3 use std::cell::UnsafeCell;
4 use std::marker::PhantomData;
5 use std::mem::MaybeUninit;
6 use std::ptr;
7 use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8 use std::time::Instant;
9 
10 use crossbeam_utils::{Backoff, CachePadded};
11 
12 use crate::context::Context;
13 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
14 use crate::select::{Operation, SelectHandle, Selected, Token};
15 use crate::waker::SyncWaker;
16 
17 // TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
18 // following changes by @kleimkuhler:
19 //
20 // 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
21 // 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
22 
23 // Bits indicating the state of a slot:
24 // * If a message has been written into the slot, `WRITE` is set.
25 // * If a message has been read from the slot, `READ` is set.
26 // * If the block is being destroyed, `DESTROY` is set.
27 const WRITE: usize = 1;
28 const READ: usize = 2;
29 const DESTROY: usize = 4;
30 
31 // Each block covers one "lap" of indices.
32 const LAP: usize = 32;
33 // The maximum number of messages a block can hold.
34 const BLOCK_CAP: usize = LAP - 1;
35 // How many lower bits are reserved for metadata.
36 const SHIFT: usize = 1;
37 // Has two different purposes:
38 // * If set in head, indicates that the block is not the last one.
39 // * If set in tail, indicates that the channel is disconnected.
40 const MARK_BIT: usize = 1;
41 
42 /// A slot in a block.
43 struct Slot<T> {
44     /// The message.
45     msg: UnsafeCell<MaybeUninit<T>>,
46 
47     /// The state of the slot.
48     state: AtomicUsize,
49 }
50 
51 impl<T> Slot<T> {
52     const UNINIT: Self = Self {
53         msg: UnsafeCell::new(MaybeUninit::uninit()),
54         state: AtomicUsize::new(0),
55     };
56 
57     /// Waits until a message is written into the slot.
wait_write(&self)58     fn wait_write(&self) {
59         let backoff = Backoff::new();
60         while self.state.load(Ordering::Acquire) & WRITE == 0 {
61             backoff.snooze();
62         }
63     }
64 }
65 
66 /// A block in a linked list.
67 ///
68 /// Each block in the list can hold up to `BLOCK_CAP` messages.
69 struct Block<T> {
70     /// The next block in the linked list.
71     next: AtomicPtr<Block<T>>,
72 
73     /// Slots for messages.
74     slots: [Slot<T>; BLOCK_CAP],
75 }
76 
77 impl<T> Block<T> {
78     /// Creates an empty block.
new() -> Block<T>79     fn new() -> Block<T> {
80         Self {
81             next: AtomicPtr::new(ptr::null_mut()),
82             slots: [Slot::UNINIT; BLOCK_CAP],
83         }
84     }
85 
86     /// Waits until the next pointer is set.
wait_next(&self) -> *mut Block<T>87     fn wait_next(&self) -> *mut Block<T> {
88         let backoff = Backoff::new();
89         loop {
90             let next = self.next.load(Ordering::Acquire);
91             if !next.is_null() {
92                 return next;
93             }
94             backoff.snooze();
95         }
96     }
97 
98     /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
destroy(this: *mut Block<T>, start: usize)99     unsafe fn destroy(this: *mut Block<T>, start: usize) {
100         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
101         // begun destruction of the block.
102         for i in start..BLOCK_CAP - 1 {
103             let slot = (*this).slots.get_unchecked(i);
104 
105             // Mark the `DESTROY` bit if a thread is still using the slot.
106             if slot.state.load(Ordering::Acquire) & READ == 0
107                 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
108             {
109                 // If a thread is still using the slot, it will continue destruction of the block.
110                 return;
111             }
112         }
113 
114         // No thread is using the block, now it is safe to destroy it.
115         drop(Box::from_raw(this));
116     }
117 }
118 
119 /// A position in a channel.
120 #[derive(Debug)]
121 struct Position<T> {
122     /// The index in the channel.
123     index: AtomicUsize,
124 
125     /// The block in the linked list.
126     block: AtomicPtr<Block<T>>,
127 }
128 
129 /// The token type for the list flavor.
130 #[derive(Debug)]
131 pub(crate) struct ListToken {
132     /// The block of slots.
133     block: *const u8,
134 
135     /// The offset into the block.
136     offset: usize,
137 }
138 
139 impl Default for ListToken {
140     #[inline]
default() -> Self141     fn default() -> Self {
142         ListToken {
143             block: ptr::null(),
144             offset: 0,
145         }
146     }
147 }
148 
149 /// Unbounded channel implemented as a linked list.
150 ///
151 /// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
152 /// represented as numbers of type `usize` and wrap on overflow.
153 ///
154 /// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
155 /// improve cache efficiency.
156 pub(crate) struct Channel<T> {
157     /// The head of the channel.
158     head: CachePadded<Position<T>>,
159 
160     /// The tail of the channel.
161     tail: CachePadded<Position<T>>,
162 
163     /// Receivers waiting while the channel is empty and not disconnected.
164     receivers: SyncWaker,
165 
166     /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
167     _marker: PhantomData<T>,
168 }
169 
170 impl<T> Channel<T> {
171     /// Creates a new unbounded channel.
new() -> Self172     pub(crate) fn new() -> Self {
173         Channel {
174             head: CachePadded::new(Position {
175                 block: AtomicPtr::new(ptr::null_mut()),
176                 index: AtomicUsize::new(0),
177             }),
178             tail: CachePadded::new(Position {
179                 block: AtomicPtr::new(ptr::null_mut()),
180                 index: AtomicUsize::new(0),
181             }),
182             receivers: SyncWaker::new(),
183             _marker: PhantomData,
184         }
185     }
186 
187     /// Returns a receiver handle to the channel.
receiver(&self) -> Receiver<'_, T>188     pub(crate) fn receiver(&self) -> Receiver<'_, T> {
189         Receiver(self)
190     }
191 
192     /// Returns a sender handle to the channel.
sender(&self) -> Sender<'_, T>193     pub(crate) fn sender(&self) -> Sender<'_, T> {
194         Sender(self)
195     }
196 
197     /// Attempts to reserve a slot for sending a message.
start_send(&self, token: &mut Token) -> bool198     fn start_send(&self, token: &mut Token) -> bool {
199         let backoff = Backoff::new();
200         let mut tail = self.tail.index.load(Ordering::Acquire);
201         let mut block = self.tail.block.load(Ordering::Acquire);
202         let mut next_block = None;
203 
204         loop {
205             // Check if the channel is disconnected.
206             if tail & MARK_BIT != 0 {
207                 token.list.block = ptr::null();
208                 return true;
209             }
210 
211             // Calculate the offset of the index into the block.
212             let offset = (tail >> SHIFT) % LAP;
213 
214             // If we reached the end of the block, wait until the next one is installed.
215             if offset == BLOCK_CAP {
216                 backoff.snooze();
217                 tail = self.tail.index.load(Ordering::Acquire);
218                 block = self.tail.block.load(Ordering::Acquire);
219                 continue;
220             }
221 
222             // If we're going to have to install the next block, allocate it in advance in order to
223             // make the wait for other threads as short as possible.
224             if offset + 1 == BLOCK_CAP && next_block.is_none() {
225                 next_block = Some(Box::new(Block::<T>::new()));
226             }
227 
228             // If this is the first message to be sent into the channel, we need to allocate the
229             // first block and install it.
230             if block.is_null() {
231                 let new = Box::into_raw(Box::new(Block::<T>::new()));
232 
233                 if self
234                     .tail
235                     .block
236                     .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
237                     .is_ok()
238                 {
239                     self.head.block.store(new, Ordering::Release);
240                     block = new;
241                 } else {
242                     next_block = unsafe { Some(Box::from_raw(new)) };
243                     tail = self.tail.index.load(Ordering::Acquire);
244                     block = self.tail.block.load(Ordering::Acquire);
245                     continue;
246                 }
247             }
248 
249             let new_tail = tail + (1 << SHIFT);
250 
251             // Try advancing the tail forward.
252             match self.tail.index.compare_exchange_weak(
253                 tail,
254                 new_tail,
255                 Ordering::SeqCst,
256                 Ordering::Acquire,
257             ) {
258                 Ok(_) => unsafe {
259                     // If we've reached the end of the block, install the next one.
260                     if offset + 1 == BLOCK_CAP {
261                         let next_block = Box::into_raw(next_block.unwrap());
262                         self.tail.block.store(next_block, Ordering::Release);
263                         self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
264                         (*block).next.store(next_block, Ordering::Release);
265                     }
266 
267                     token.list.block = block as *const u8;
268                     token.list.offset = offset;
269                     return true;
270                 },
271                 Err(t) => {
272                     tail = t;
273                     block = self.tail.block.load(Ordering::Acquire);
274                     backoff.spin();
275                 }
276             }
277         }
278     }
279 
280     /// Writes a message into the channel.
write(&self, token: &mut Token, msg: T) -> Result<(), T>281     pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
282         // If there is no slot, the channel is disconnected.
283         if token.list.block.is_null() {
284             return Err(msg);
285         }
286 
287         // Write the message into the slot.
288         let block = token.list.block.cast::<Block<T>>();
289         let offset = token.list.offset;
290         let slot = (*block).slots.get_unchecked(offset);
291         slot.msg.get().write(MaybeUninit::new(msg));
292         slot.state.fetch_or(WRITE, Ordering::Release);
293 
294         // Wake a sleeping receiver.
295         self.receivers.notify();
296         Ok(())
297     }
298 
299     /// Attempts to reserve a slot for receiving a message.
start_recv(&self, token: &mut Token) -> bool300     fn start_recv(&self, token: &mut Token) -> bool {
301         let backoff = Backoff::new();
302         let mut head = self.head.index.load(Ordering::Acquire);
303         let mut block = self.head.block.load(Ordering::Acquire);
304 
305         loop {
306             // Calculate the offset of the index into the block.
307             let offset = (head >> SHIFT) % LAP;
308 
309             // If we reached the end of the block, wait until the next one is installed.
310             if offset == BLOCK_CAP {
311                 backoff.snooze();
312                 head = self.head.index.load(Ordering::Acquire);
313                 block = self.head.block.load(Ordering::Acquire);
314                 continue;
315             }
316 
317             let mut new_head = head + (1 << SHIFT);
318 
319             if new_head & MARK_BIT == 0 {
320                 atomic::fence(Ordering::SeqCst);
321                 let tail = self.tail.index.load(Ordering::Relaxed);
322 
323                 // If the tail equals the head, that means the channel is empty.
324                 if head >> SHIFT == tail >> SHIFT {
325                     // If the channel is disconnected...
326                     if tail & MARK_BIT != 0 {
327                         // ...then receive an error.
328                         token.list.block = ptr::null();
329                         return true;
330                     } else {
331                         // Otherwise, the receive operation is not ready.
332                         return false;
333                     }
334                 }
335 
336                 // If head and tail are not in the same block, set `MARK_BIT` in head.
337                 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
338                     new_head |= MARK_BIT;
339                 }
340             }
341 
342             // The block can be null here only if the first message is being sent into the channel.
343             // In that case, just wait until it gets initialized.
344             if block.is_null() {
345                 backoff.snooze();
346                 head = self.head.index.load(Ordering::Acquire);
347                 block = self.head.block.load(Ordering::Acquire);
348                 continue;
349             }
350 
351             // Try moving the head index forward.
352             match self.head.index.compare_exchange_weak(
353                 head,
354                 new_head,
355                 Ordering::SeqCst,
356                 Ordering::Acquire,
357             ) {
358                 Ok(_) => unsafe {
359                     // If we've reached the end of the block, move to the next one.
360                     if offset + 1 == BLOCK_CAP {
361                         let next = (*block).wait_next();
362                         let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
363                         if !(*next).next.load(Ordering::Relaxed).is_null() {
364                             next_index |= MARK_BIT;
365                         }
366 
367                         self.head.block.store(next, Ordering::Release);
368                         self.head.index.store(next_index, Ordering::Release);
369                     }
370 
371                     token.list.block = block as *const u8;
372                     token.list.offset = offset;
373                     return true;
374                 },
375                 Err(h) => {
376                     head = h;
377                     block = self.head.block.load(Ordering::Acquire);
378                     backoff.spin();
379                 }
380             }
381         }
382     }
383 
384     /// Reads a message from the channel.
read(&self, token: &mut Token) -> Result<T, ()>385     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
386         if token.list.block.is_null() {
387             // The channel is disconnected.
388             return Err(());
389         }
390 
391         // Read the message.
392         let block = token.list.block as *mut Block<T>;
393         let offset = token.list.offset;
394         let slot = (*block).slots.get_unchecked(offset);
395         slot.wait_write();
396         let msg = slot.msg.get().read().assume_init();
397 
398         // Destroy the block if we've reached the end, or if another thread wanted to destroy but
399         // couldn't because we were busy reading from the slot.
400         if offset + 1 == BLOCK_CAP {
401             Block::destroy(block, 0);
402         } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
403             Block::destroy(block, offset + 1);
404         }
405 
406         Ok(msg)
407     }
408 
409     /// Attempts to send a message into the channel.
try_send(&self, msg: T) -> Result<(), TrySendError<T>>410     pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
411         self.send(msg, None).map_err(|err| match err {
412             SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
413             SendTimeoutError::Timeout(_) => unreachable!(),
414         })
415     }
416 
417     /// Sends a message into the channel.
send( &self, msg: T, _deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>418     pub(crate) fn send(
419         &self,
420         msg: T,
421         _deadline: Option<Instant>,
422     ) -> Result<(), SendTimeoutError<T>> {
423         let token = &mut Token::default();
424         assert!(self.start_send(token));
425         unsafe {
426             self.write(token, msg)
427                 .map_err(SendTimeoutError::Disconnected)
428         }
429     }
430 
431     /// Attempts to receive a message without blocking.
try_recv(&self) -> Result<T, TryRecvError>432     pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
433         let token = &mut Token::default();
434 
435         if self.start_recv(token) {
436             unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
437         } else {
438             Err(TryRecvError::Empty)
439         }
440     }
441 
442     /// Receives a message from the channel.
recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>443     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
444         let token = &mut Token::default();
445         loop {
446             // Try receiving a message several times.
447             let backoff = Backoff::new();
448             loop {
449                 if self.start_recv(token) {
450                     unsafe {
451                         return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
452                     }
453                 }
454 
455                 if backoff.is_completed() {
456                     break;
457                 } else {
458                     backoff.snooze();
459                 }
460             }
461 
462             if let Some(d) = deadline {
463                 if Instant::now() >= d {
464                     return Err(RecvTimeoutError::Timeout);
465                 }
466             }
467 
468             // Prepare for blocking until a sender wakes us up.
469             Context::with(|cx| {
470                 let oper = Operation::hook(token);
471                 self.receivers.register(oper, cx);
472 
473                 // Has the channel become ready just now?
474                 if !self.is_empty() || self.is_disconnected() {
475                     let _ = cx.try_select(Selected::Aborted);
476                 }
477 
478                 // Block the current thread.
479                 let sel = cx.wait_until(deadline);
480 
481                 match sel {
482                     Selected::Waiting => unreachable!(),
483                     Selected::Aborted | Selected::Disconnected => {
484                         self.receivers.unregister(oper).unwrap();
485                         // If the channel was disconnected, we still have to check for remaining
486                         // messages.
487                     }
488                     Selected::Operation(_) => {}
489                 }
490             });
491         }
492     }
493 
494     /// Returns the current number of messages inside the channel.
len(&self) -> usize495     pub(crate) fn len(&self) -> usize {
496         loop {
497             // Load the tail index, then load the head index.
498             let mut tail = self.tail.index.load(Ordering::SeqCst);
499             let mut head = self.head.index.load(Ordering::SeqCst);
500 
501             // If the tail index didn't change, we've got consistent indices to work with.
502             if self.tail.index.load(Ordering::SeqCst) == tail {
503                 // Erase the lower bits.
504                 tail &= !((1 << SHIFT) - 1);
505                 head &= !((1 << SHIFT) - 1);
506 
507                 // Fix up indices if they fall onto block ends.
508                 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
509                     tail = tail.wrapping_add(1 << SHIFT);
510                 }
511                 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
512                     head = head.wrapping_add(1 << SHIFT);
513                 }
514 
515                 // Rotate indices so that head falls into the first block.
516                 let lap = (head >> SHIFT) / LAP;
517                 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
518                 head = head.wrapping_sub((lap * LAP) << SHIFT);
519 
520                 // Remove the lower bits.
521                 tail >>= SHIFT;
522                 head >>= SHIFT;
523 
524                 // Return the difference minus the number of blocks between tail and head.
525                 return tail - head - tail / LAP;
526             }
527         }
528     }
529 
530     /// Returns the capacity of the channel.
capacity(&self) -> Option<usize>531     pub(crate) fn capacity(&self) -> Option<usize> {
532         None
533     }
534 
535     /// Disconnects senders and wakes up all blocked receivers.
536     ///
537     /// Returns `true` if this call disconnected the channel.
disconnect_senders(&self) -> bool538     pub(crate) fn disconnect_senders(&self) -> bool {
539         let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
540 
541         if tail & MARK_BIT == 0 {
542             self.receivers.disconnect();
543             true
544         } else {
545             false
546         }
547     }
548 
549     /// Disconnects receivers.
550     ///
551     /// Returns `true` if this call disconnected the channel.
disconnect_receivers(&self) -> bool552     pub(crate) fn disconnect_receivers(&self) -> bool {
553         let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
554 
555         if tail & MARK_BIT == 0 {
556             // If receivers are dropped first, discard all messages to free
557             // memory eagerly.
558             self.discard_all_messages();
559             true
560         } else {
561             false
562         }
563     }
564 
565     /// Discards all messages.
566     ///
567     /// This method should only be called when all receivers are dropped.
discard_all_messages(&self)568     fn discard_all_messages(&self) {
569         let backoff = Backoff::new();
570         let mut tail = self.tail.index.load(Ordering::Acquire);
571         loop {
572             let offset = (tail >> SHIFT) % LAP;
573             if offset != BLOCK_CAP {
574                 break;
575             }
576 
577             // New updates to tail will be rejected by MARK_BIT and aborted unless it's
578             // at boundary. We need to wait for the updates take affect otherwise there
579             // can be memory leaks.
580             backoff.snooze();
581             tail = self.tail.index.load(Ordering::Acquire);
582         }
583 
584         let mut head = self.head.index.load(Ordering::Acquire);
585         let mut block = self.head.block.load(Ordering::Acquire);
586 
587         // If we're going to be dropping messages we need to synchronize with initialization
588         if head >> SHIFT != tail >> SHIFT {
589             // The block can be null here only if a sender is in the process of initializing the
590             // channel while another sender managed to send a message by inserting it into the
591             // semi-initialized channel and advanced the tail.
592             // In that case, just wait until it gets initialized.
593             while block.is_null() {
594                 backoff.snooze();
595                 block = self.head.block.load(Ordering::Acquire);
596             }
597         }
598         unsafe {
599             // Drop all messages between head and tail and deallocate the heap-allocated blocks.
600             while head >> SHIFT != tail >> SHIFT {
601                 let offset = (head >> SHIFT) % LAP;
602 
603                 if offset < BLOCK_CAP {
604                     // Drop the message in the slot.
605                     let slot = (*block).slots.get_unchecked(offset);
606                     slot.wait_write();
607                     (*slot.msg.get()).assume_init_drop();
608                 } else {
609                     (*block).wait_next();
610                     // Deallocate the block and move to the next one.
611                     let next = (*block).next.load(Ordering::Acquire);
612                     drop(Box::from_raw(block));
613                     block = next;
614                 }
615 
616                 head = head.wrapping_add(1 << SHIFT);
617             }
618 
619             // Deallocate the last remaining block.
620             if !block.is_null() {
621                 drop(Box::from_raw(block));
622             }
623         }
624         head &= !MARK_BIT;
625         self.head.block.store(ptr::null_mut(), Ordering::Release);
626         self.head.index.store(head, Ordering::Release);
627     }
628 
629     /// Returns `true` if the channel is disconnected.
is_disconnected(&self) -> bool630     pub(crate) fn is_disconnected(&self) -> bool {
631         self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
632     }
633 
634     /// Returns `true` if the channel is empty.
is_empty(&self) -> bool635     pub(crate) fn is_empty(&self) -> bool {
636         let head = self.head.index.load(Ordering::SeqCst);
637         let tail = self.tail.index.load(Ordering::SeqCst);
638         head >> SHIFT == tail >> SHIFT
639     }
640 
641     /// Returns `true` if the channel is full.
is_full(&self) -> bool642     pub(crate) fn is_full(&self) -> bool {
643         false
644     }
645 }
646 
647 impl<T> Drop for Channel<T> {
drop(&mut self)648     fn drop(&mut self) {
649         let mut head = *self.head.index.get_mut();
650         let mut tail = *self.tail.index.get_mut();
651         let mut block = *self.head.block.get_mut();
652 
653         // Erase the lower bits.
654         head &= !((1 << SHIFT) - 1);
655         tail &= !((1 << SHIFT) - 1);
656 
657         unsafe {
658             // Drop all messages between head and tail and deallocate the heap-allocated blocks.
659             while head != tail {
660                 let offset = (head >> SHIFT) % LAP;
661 
662                 if offset < BLOCK_CAP {
663                     // Drop the message in the slot.
664                     let slot = (*block).slots.get_unchecked(offset);
665                     (*slot.msg.get()).assume_init_drop();
666                 } else {
667                     // Deallocate the block and move to the next one.
668                     let next = *(*block).next.get_mut();
669                     drop(Box::from_raw(block));
670                     block = next;
671                 }
672 
673                 head = head.wrapping_add(1 << SHIFT);
674             }
675 
676             // Deallocate the last remaining block.
677             if !block.is_null() {
678                 drop(Box::from_raw(block));
679             }
680         }
681     }
682 }
683 
684 /// Receiver handle to a channel.
685 pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
686 
687 /// Sender handle to a channel.
688 pub(crate) struct Sender<'a, T>(&'a Channel<T>);
689 
690 impl<T> SelectHandle for Receiver<'_, T> {
try_select(&self, token: &mut Token) -> bool691     fn try_select(&self, token: &mut Token) -> bool {
692         self.0.start_recv(token)
693     }
694 
deadline(&self) -> Option<Instant>695     fn deadline(&self) -> Option<Instant> {
696         None
697     }
698 
register(&self, oper: Operation, cx: &Context) -> bool699     fn register(&self, oper: Operation, cx: &Context) -> bool {
700         self.0.receivers.register(oper, cx);
701         self.is_ready()
702     }
703 
unregister(&self, oper: Operation)704     fn unregister(&self, oper: Operation) {
705         self.0.receivers.unregister(oper);
706     }
707 
accept(&self, token: &mut Token, _cx: &Context) -> bool708     fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
709         self.try_select(token)
710     }
711 
is_ready(&self) -> bool712     fn is_ready(&self) -> bool {
713         !self.0.is_empty() || self.0.is_disconnected()
714     }
715 
watch(&self, oper: Operation, cx: &Context) -> bool716     fn watch(&self, oper: Operation, cx: &Context) -> bool {
717         self.0.receivers.watch(oper, cx);
718         self.is_ready()
719     }
720 
unwatch(&self, oper: Operation)721     fn unwatch(&self, oper: Operation) {
722         self.0.receivers.unwatch(oper);
723     }
724 }
725 
726 impl<T> SelectHandle for Sender<'_, T> {
try_select(&self, token: &mut Token) -> bool727     fn try_select(&self, token: &mut Token) -> bool {
728         self.0.start_send(token)
729     }
730 
deadline(&self) -> Option<Instant>731     fn deadline(&self) -> Option<Instant> {
732         None
733     }
734 
register(&self, _oper: Operation, _cx: &Context) -> bool735     fn register(&self, _oper: Operation, _cx: &Context) -> bool {
736         self.is_ready()
737     }
738 
unregister(&self, _oper: Operation)739     fn unregister(&self, _oper: Operation) {}
740 
accept(&self, token: &mut Token, _cx: &Context) -> bool741     fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
742         self.try_select(token)
743     }
744 
is_ready(&self) -> bool745     fn is_ready(&self) -> bool {
746         true
747     }
748 
watch(&self, _oper: Operation, _cx: &Context) -> bool749     fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
750         self.is_ready()
751     }
752 
unwatch(&self, _oper: Operation)753     fn unwatch(&self, _oper: Operation) {}
754 }
755