1 //! Unbounded channel implemented as a linked list. 2 3 use std::cell::UnsafeCell; 4 use std::marker::PhantomData; 5 use std::mem::MaybeUninit; 6 use std::ptr; 7 use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; 8 use std::time::Instant; 9 10 use crossbeam_utils::{Backoff, CachePadded}; 11 12 use crate::context::Context; 13 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; 14 use crate::select::{Operation, SelectHandle, Selected, Token}; 15 use crate::waker::SyncWaker; 16 17 // TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the 18 // following changes by @kleimkuhler: 19 // 20 // 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100 21 // 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101 22 23 // Bits indicating the state of a slot: 24 // * If a message has been written into the slot, `WRITE` is set. 25 // * If a message has been read from the slot, `READ` is set. 26 // * If the block is being destroyed, `DESTROY` is set. 27 const WRITE: usize = 1; 28 const READ: usize = 2; 29 const DESTROY: usize = 4; 30 31 // Each block covers one "lap" of indices. 32 const LAP: usize = 32; 33 // The maximum number of messages a block can hold. 34 const BLOCK_CAP: usize = LAP - 1; 35 // How many lower bits are reserved for metadata. 36 const SHIFT: usize = 1; 37 // Has two different purposes: 38 // * If set in head, indicates that the block is not the last one. 39 // * If set in tail, indicates that the channel is disconnected. 40 const MARK_BIT: usize = 1; 41 42 /// A slot in a block. 43 struct Slot<T> { 44 /// The message. 45 msg: UnsafeCell<MaybeUninit<T>>, 46 47 /// The state of the slot. 48 state: AtomicUsize, 49 } 50 51 impl<T> Slot<T> { 52 const UNINIT: Self = Self { 53 msg: UnsafeCell::new(MaybeUninit::uninit()), 54 state: AtomicUsize::new(0), 55 }; 56 57 /// Waits until a message is written into the slot. wait_write(&self)58 fn wait_write(&self) { 59 let backoff = Backoff::new(); 60 while self.state.load(Ordering::Acquire) & WRITE == 0 { 61 backoff.snooze(); 62 } 63 } 64 } 65 66 /// A block in a linked list. 67 /// 68 /// Each block in the list can hold up to `BLOCK_CAP` messages. 69 struct Block<T> { 70 /// The next block in the linked list. 71 next: AtomicPtr<Block<T>>, 72 73 /// Slots for messages. 74 slots: [Slot<T>; BLOCK_CAP], 75 } 76 77 impl<T> Block<T> { 78 /// Creates an empty block. new() -> Block<T>79 fn new() -> Block<T> { 80 Self { 81 next: AtomicPtr::new(ptr::null_mut()), 82 slots: [Slot::UNINIT; BLOCK_CAP], 83 } 84 } 85 86 /// Waits until the next pointer is set. wait_next(&self) -> *mut Block<T>87 fn wait_next(&self) -> *mut Block<T> { 88 let backoff = Backoff::new(); 89 loop { 90 let next = self.next.load(Ordering::Acquire); 91 if !next.is_null() { 92 return next; 93 } 94 backoff.snooze(); 95 } 96 } 97 98 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. destroy(this: *mut Block<T>, start: usize)99 unsafe fn destroy(this: *mut Block<T>, start: usize) { 100 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has 101 // begun destruction of the block. 102 for i in start..BLOCK_CAP - 1 { 103 let slot = (*this).slots.get_unchecked(i); 104 105 // Mark the `DESTROY` bit if a thread is still using the slot. 106 if slot.state.load(Ordering::Acquire) & READ == 0 107 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 108 { 109 // If a thread is still using the slot, it will continue destruction of the block. 110 return; 111 } 112 } 113 114 // No thread is using the block, now it is safe to destroy it. 115 drop(Box::from_raw(this)); 116 } 117 } 118 119 /// A position in a channel. 120 #[derive(Debug)] 121 struct Position<T> { 122 /// The index in the channel. 123 index: AtomicUsize, 124 125 /// The block in the linked list. 126 block: AtomicPtr<Block<T>>, 127 } 128 129 /// The token type for the list flavor. 130 #[derive(Debug)] 131 pub(crate) struct ListToken { 132 /// The block of slots. 133 block: *const u8, 134 135 /// The offset into the block. 136 offset: usize, 137 } 138 139 impl Default for ListToken { 140 #[inline] default() -> Self141 fn default() -> Self { 142 ListToken { 143 block: ptr::null(), 144 offset: 0, 145 } 146 } 147 } 148 149 /// Unbounded channel implemented as a linked list. 150 /// 151 /// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are 152 /// represented as numbers of type `usize` and wrap on overflow. 153 /// 154 /// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and 155 /// improve cache efficiency. 156 pub(crate) struct Channel<T> { 157 /// The head of the channel. 158 head: CachePadded<Position<T>>, 159 160 /// The tail of the channel. 161 tail: CachePadded<Position<T>>, 162 163 /// Receivers waiting while the channel is empty and not disconnected. 164 receivers: SyncWaker, 165 166 /// Indicates that dropping a `Channel<T>` may drop messages of type `T`. 167 _marker: PhantomData<T>, 168 } 169 170 impl<T> Channel<T> { 171 /// Creates a new unbounded channel. new() -> Self172 pub(crate) fn new() -> Self { 173 Channel { 174 head: CachePadded::new(Position { 175 block: AtomicPtr::new(ptr::null_mut()), 176 index: AtomicUsize::new(0), 177 }), 178 tail: CachePadded::new(Position { 179 block: AtomicPtr::new(ptr::null_mut()), 180 index: AtomicUsize::new(0), 181 }), 182 receivers: SyncWaker::new(), 183 _marker: PhantomData, 184 } 185 } 186 187 /// Returns a receiver handle to the channel. receiver(&self) -> Receiver<'_, T>188 pub(crate) fn receiver(&self) -> Receiver<'_, T> { 189 Receiver(self) 190 } 191 192 /// Returns a sender handle to the channel. sender(&self) -> Sender<'_, T>193 pub(crate) fn sender(&self) -> Sender<'_, T> { 194 Sender(self) 195 } 196 197 /// Attempts to reserve a slot for sending a message. start_send(&self, token: &mut Token) -> bool198 fn start_send(&self, token: &mut Token) -> bool { 199 let backoff = Backoff::new(); 200 let mut tail = self.tail.index.load(Ordering::Acquire); 201 let mut block = self.tail.block.load(Ordering::Acquire); 202 let mut next_block = None; 203 204 loop { 205 // Check if the channel is disconnected. 206 if tail & MARK_BIT != 0 { 207 token.list.block = ptr::null(); 208 return true; 209 } 210 211 // Calculate the offset of the index into the block. 212 let offset = (tail >> SHIFT) % LAP; 213 214 // If we reached the end of the block, wait until the next one is installed. 215 if offset == BLOCK_CAP { 216 backoff.snooze(); 217 tail = self.tail.index.load(Ordering::Acquire); 218 block = self.tail.block.load(Ordering::Acquire); 219 continue; 220 } 221 222 // If we're going to have to install the next block, allocate it in advance in order to 223 // make the wait for other threads as short as possible. 224 if offset + 1 == BLOCK_CAP && next_block.is_none() { 225 next_block = Some(Box::new(Block::<T>::new())); 226 } 227 228 // If this is the first message to be sent into the channel, we need to allocate the 229 // first block and install it. 230 if block.is_null() { 231 let new = Box::into_raw(Box::new(Block::<T>::new())); 232 233 if self 234 .tail 235 .block 236 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) 237 .is_ok() 238 { 239 self.head.block.store(new, Ordering::Release); 240 block = new; 241 } else { 242 next_block = unsafe { Some(Box::from_raw(new)) }; 243 tail = self.tail.index.load(Ordering::Acquire); 244 block = self.tail.block.load(Ordering::Acquire); 245 continue; 246 } 247 } 248 249 let new_tail = tail + (1 << SHIFT); 250 251 // Try advancing the tail forward. 252 match self.tail.index.compare_exchange_weak( 253 tail, 254 new_tail, 255 Ordering::SeqCst, 256 Ordering::Acquire, 257 ) { 258 Ok(_) => unsafe { 259 // If we've reached the end of the block, install the next one. 260 if offset + 1 == BLOCK_CAP { 261 let next_block = Box::into_raw(next_block.unwrap()); 262 self.tail.block.store(next_block, Ordering::Release); 263 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release); 264 (*block).next.store(next_block, Ordering::Release); 265 } 266 267 token.list.block = block as *const u8; 268 token.list.offset = offset; 269 return true; 270 }, 271 Err(t) => { 272 tail = t; 273 block = self.tail.block.load(Ordering::Acquire); 274 backoff.spin(); 275 } 276 } 277 } 278 } 279 280 /// Writes a message into the channel. write(&self, token: &mut Token, msg: T) -> Result<(), T>281 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { 282 // If there is no slot, the channel is disconnected. 283 if token.list.block.is_null() { 284 return Err(msg); 285 } 286 287 // Write the message into the slot. 288 let block = token.list.block.cast::<Block<T>>(); 289 let offset = token.list.offset; 290 let slot = (*block).slots.get_unchecked(offset); 291 slot.msg.get().write(MaybeUninit::new(msg)); 292 slot.state.fetch_or(WRITE, Ordering::Release); 293 294 // Wake a sleeping receiver. 295 self.receivers.notify(); 296 Ok(()) 297 } 298 299 /// Attempts to reserve a slot for receiving a message. start_recv(&self, token: &mut Token) -> bool300 fn start_recv(&self, token: &mut Token) -> bool { 301 let backoff = Backoff::new(); 302 let mut head = self.head.index.load(Ordering::Acquire); 303 let mut block = self.head.block.load(Ordering::Acquire); 304 305 loop { 306 // Calculate the offset of the index into the block. 307 let offset = (head >> SHIFT) % LAP; 308 309 // If we reached the end of the block, wait until the next one is installed. 310 if offset == BLOCK_CAP { 311 backoff.snooze(); 312 head = self.head.index.load(Ordering::Acquire); 313 block = self.head.block.load(Ordering::Acquire); 314 continue; 315 } 316 317 let mut new_head = head + (1 << SHIFT); 318 319 if new_head & MARK_BIT == 0 { 320 atomic::fence(Ordering::SeqCst); 321 let tail = self.tail.index.load(Ordering::Relaxed); 322 323 // If the tail equals the head, that means the channel is empty. 324 if head >> SHIFT == tail >> SHIFT { 325 // If the channel is disconnected... 326 if tail & MARK_BIT != 0 { 327 // ...then receive an error. 328 token.list.block = ptr::null(); 329 return true; 330 } else { 331 // Otherwise, the receive operation is not ready. 332 return false; 333 } 334 } 335 336 // If head and tail are not in the same block, set `MARK_BIT` in head. 337 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { 338 new_head |= MARK_BIT; 339 } 340 } 341 342 // The block can be null here only if the first message is being sent into the channel. 343 // In that case, just wait until it gets initialized. 344 if block.is_null() { 345 backoff.snooze(); 346 head = self.head.index.load(Ordering::Acquire); 347 block = self.head.block.load(Ordering::Acquire); 348 continue; 349 } 350 351 // Try moving the head index forward. 352 match self.head.index.compare_exchange_weak( 353 head, 354 new_head, 355 Ordering::SeqCst, 356 Ordering::Acquire, 357 ) { 358 Ok(_) => unsafe { 359 // If we've reached the end of the block, move to the next one. 360 if offset + 1 == BLOCK_CAP { 361 let next = (*block).wait_next(); 362 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); 363 if !(*next).next.load(Ordering::Relaxed).is_null() { 364 next_index |= MARK_BIT; 365 } 366 367 self.head.block.store(next, Ordering::Release); 368 self.head.index.store(next_index, Ordering::Release); 369 } 370 371 token.list.block = block as *const u8; 372 token.list.offset = offset; 373 return true; 374 }, 375 Err(h) => { 376 head = h; 377 block = self.head.block.load(Ordering::Acquire); 378 backoff.spin(); 379 } 380 } 381 } 382 } 383 384 /// Reads a message from the channel. read(&self, token: &mut Token) -> Result<T, ()>385 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> { 386 if token.list.block.is_null() { 387 // The channel is disconnected. 388 return Err(()); 389 } 390 391 // Read the message. 392 let block = token.list.block as *mut Block<T>; 393 let offset = token.list.offset; 394 let slot = (*block).slots.get_unchecked(offset); 395 slot.wait_write(); 396 let msg = slot.msg.get().read().assume_init(); 397 398 // Destroy the block if we've reached the end, or if another thread wanted to destroy but 399 // couldn't because we were busy reading from the slot. 400 if offset + 1 == BLOCK_CAP { 401 Block::destroy(block, 0); 402 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { 403 Block::destroy(block, offset + 1); 404 } 405 406 Ok(msg) 407 } 408 409 /// Attempts to send a message into the channel. try_send(&self, msg: T) -> Result<(), TrySendError<T>>410 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { 411 self.send(msg, None).map_err(|err| match err { 412 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), 413 SendTimeoutError::Timeout(_) => unreachable!(), 414 }) 415 } 416 417 /// Sends a message into the channel. send( &self, msg: T, _deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>418 pub(crate) fn send( 419 &self, 420 msg: T, 421 _deadline: Option<Instant>, 422 ) -> Result<(), SendTimeoutError<T>> { 423 let token = &mut Token::default(); 424 assert!(self.start_send(token)); 425 unsafe { 426 self.write(token, msg) 427 .map_err(SendTimeoutError::Disconnected) 428 } 429 } 430 431 /// Attempts to receive a message without blocking. try_recv(&self) -> Result<T, TryRecvError>432 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> { 433 let token = &mut Token::default(); 434 435 if self.start_recv(token) { 436 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } 437 } else { 438 Err(TryRecvError::Empty) 439 } 440 } 441 442 /// Receives a message from the channel. recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>443 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> { 444 let token = &mut Token::default(); 445 loop { 446 // Try receiving a message several times. 447 let backoff = Backoff::new(); 448 loop { 449 if self.start_recv(token) { 450 unsafe { 451 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); 452 } 453 } 454 455 if backoff.is_completed() { 456 break; 457 } else { 458 backoff.snooze(); 459 } 460 } 461 462 if let Some(d) = deadline { 463 if Instant::now() >= d { 464 return Err(RecvTimeoutError::Timeout); 465 } 466 } 467 468 // Prepare for blocking until a sender wakes us up. 469 Context::with(|cx| { 470 let oper = Operation::hook(token); 471 self.receivers.register(oper, cx); 472 473 // Has the channel become ready just now? 474 if !self.is_empty() || self.is_disconnected() { 475 let _ = cx.try_select(Selected::Aborted); 476 } 477 478 // Block the current thread. 479 let sel = cx.wait_until(deadline); 480 481 match sel { 482 Selected::Waiting => unreachable!(), 483 Selected::Aborted | Selected::Disconnected => { 484 self.receivers.unregister(oper).unwrap(); 485 // If the channel was disconnected, we still have to check for remaining 486 // messages. 487 } 488 Selected::Operation(_) => {} 489 } 490 }); 491 } 492 } 493 494 /// Returns the current number of messages inside the channel. len(&self) -> usize495 pub(crate) fn len(&self) -> usize { 496 loop { 497 // Load the tail index, then load the head index. 498 let mut tail = self.tail.index.load(Ordering::SeqCst); 499 let mut head = self.head.index.load(Ordering::SeqCst); 500 501 // If the tail index didn't change, we've got consistent indices to work with. 502 if self.tail.index.load(Ordering::SeqCst) == tail { 503 // Erase the lower bits. 504 tail &= !((1 << SHIFT) - 1); 505 head &= !((1 << SHIFT) - 1); 506 507 // Fix up indices if they fall onto block ends. 508 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { 509 tail = tail.wrapping_add(1 << SHIFT); 510 } 511 if (head >> SHIFT) & (LAP - 1) == LAP - 1 { 512 head = head.wrapping_add(1 << SHIFT); 513 } 514 515 // Rotate indices so that head falls into the first block. 516 let lap = (head >> SHIFT) / LAP; 517 tail = tail.wrapping_sub((lap * LAP) << SHIFT); 518 head = head.wrapping_sub((lap * LAP) << SHIFT); 519 520 // Remove the lower bits. 521 tail >>= SHIFT; 522 head >>= SHIFT; 523 524 // Return the difference minus the number of blocks between tail and head. 525 return tail - head - tail / LAP; 526 } 527 } 528 } 529 530 /// Returns the capacity of the channel. capacity(&self) -> Option<usize>531 pub(crate) fn capacity(&self) -> Option<usize> { 532 None 533 } 534 535 /// Disconnects senders and wakes up all blocked receivers. 536 /// 537 /// Returns `true` if this call disconnected the channel. disconnect_senders(&self) -> bool538 pub(crate) fn disconnect_senders(&self) -> bool { 539 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); 540 541 if tail & MARK_BIT == 0 { 542 self.receivers.disconnect(); 543 true 544 } else { 545 false 546 } 547 } 548 549 /// Disconnects receivers. 550 /// 551 /// Returns `true` if this call disconnected the channel. disconnect_receivers(&self) -> bool552 pub(crate) fn disconnect_receivers(&self) -> bool { 553 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); 554 555 if tail & MARK_BIT == 0 { 556 // If receivers are dropped first, discard all messages to free 557 // memory eagerly. 558 self.discard_all_messages(); 559 true 560 } else { 561 false 562 } 563 } 564 565 /// Discards all messages. 566 /// 567 /// This method should only be called when all receivers are dropped. discard_all_messages(&self)568 fn discard_all_messages(&self) { 569 let backoff = Backoff::new(); 570 let mut tail = self.tail.index.load(Ordering::Acquire); 571 loop { 572 let offset = (tail >> SHIFT) % LAP; 573 if offset != BLOCK_CAP { 574 break; 575 } 576 577 // New updates to tail will be rejected by MARK_BIT and aborted unless it's 578 // at boundary. We need to wait for the updates take affect otherwise there 579 // can be memory leaks. 580 backoff.snooze(); 581 tail = self.tail.index.load(Ordering::Acquire); 582 } 583 584 let mut head = self.head.index.load(Ordering::Acquire); 585 let mut block = self.head.block.load(Ordering::Acquire); 586 587 // If we're going to be dropping messages we need to synchronize with initialization 588 if head >> SHIFT != tail >> SHIFT { 589 // The block can be null here only if a sender is in the process of initializing the 590 // channel while another sender managed to send a message by inserting it into the 591 // semi-initialized channel and advanced the tail. 592 // In that case, just wait until it gets initialized. 593 while block.is_null() { 594 backoff.snooze(); 595 block = self.head.block.load(Ordering::Acquire); 596 } 597 } 598 unsafe { 599 // Drop all messages between head and tail and deallocate the heap-allocated blocks. 600 while head >> SHIFT != tail >> SHIFT { 601 let offset = (head >> SHIFT) % LAP; 602 603 if offset < BLOCK_CAP { 604 // Drop the message in the slot. 605 let slot = (*block).slots.get_unchecked(offset); 606 slot.wait_write(); 607 (*slot.msg.get()).assume_init_drop(); 608 } else { 609 (*block).wait_next(); 610 // Deallocate the block and move to the next one. 611 let next = (*block).next.load(Ordering::Acquire); 612 drop(Box::from_raw(block)); 613 block = next; 614 } 615 616 head = head.wrapping_add(1 << SHIFT); 617 } 618 619 // Deallocate the last remaining block. 620 if !block.is_null() { 621 drop(Box::from_raw(block)); 622 } 623 } 624 head &= !MARK_BIT; 625 self.head.block.store(ptr::null_mut(), Ordering::Release); 626 self.head.index.store(head, Ordering::Release); 627 } 628 629 /// Returns `true` if the channel is disconnected. is_disconnected(&self) -> bool630 pub(crate) fn is_disconnected(&self) -> bool { 631 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 632 } 633 634 /// Returns `true` if the channel is empty. is_empty(&self) -> bool635 pub(crate) fn is_empty(&self) -> bool { 636 let head = self.head.index.load(Ordering::SeqCst); 637 let tail = self.tail.index.load(Ordering::SeqCst); 638 head >> SHIFT == tail >> SHIFT 639 } 640 641 /// Returns `true` if the channel is full. is_full(&self) -> bool642 pub(crate) fn is_full(&self) -> bool { 643 false 644 } 645 } 646 647 impl<T> Drop for Channel<T> { drop(&mut self)648 fn drop(&mut self) { 649 let mut head = *self.head.index.get_mut(); 650 let mut tail = *self.tail.index.get_mut(); 651 let mut block = *self.head.block.get_mut(); 652 653 // Erase the lower bits. 654 head &= !((1 << SHIFT) - 1); 655 tail &= !((1 << SHIFT) - 1); 656 657 unsafe { 658 // Drop all messages between head and tail and deallocate the heap-allocated blocks. 659 while head != tail { 660 let offset = (head >> SHIFT) % LAP; 661 662 if offset < BLOCK_CAP { 663 // Drop the message in the slot. 664 let slot = (*block).slots.get_unchecked(offset); 665 (*slot.msg.get()).assume_init_drop(); 666 } else { 667 // Deallocate the block and move to the next one. 668 let next = *(*block).next.get_mut(); 669 drop(Box::from_raw(block)); 670 block = next; 671 } 672 673 head = head.wrapping_add(1 << SHIFT); 674 } 675 676 // Deallocate the last remaining block. 677 if !block.is_null() { 678 drop(Box::from_raw(block)); 679 } 680 } 681 } 682 } 683 684 /// Receiver handle to a channel. 685 pub(crate) struct Receiver<'a, T>(&'a Channel<T>); 686 687 /// Sender handle to a channel. 688 pub(crate) struct Sender<'a, T>(&'a Channel<T>); 689 690 impl<T> SelectHandle for Receiver<'_, T> { try_select(&self, token: &mut Token) -> bool691 fn try_select(&self, token: &mut Token) -> bool { 692 self.0.start_recv(token) 693 } 694 deadline(&self) -> Option<Instant>695 fn deadline(&self) -> Option<Instant> { 696 None 697 } 698 register(&self, oper: Operation, cx: &Context) -> bool699 fn register(&self, oper: Operation, cx: &Context) -> bool { 700 self.0.receivers.register(oper, cx); 701 self.is_ready() 702 } 703 unregister(&self, oper: Operation)704 fn unregister(&self, oper: Operation) { 705 self.0.receivers.unregister(oper); 706 } 707 accept(&self, token: &mut Token, _cx: &Context) -> bool708 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 709 self.try_select(token) 710 } 711 is_ready(&self) -> bool712 fn is_ready(&self) -> bool { 713 !self.0.is_empty() || self.0.is_disconnected() 714 } 715 watch(&self, oper: Operation, cx: &Context) -> bool716 fn watch(&self, oper: Operation, cx: &Context) -> bool { 717 self.0.receivers.watch(oper, cx); 718 self.is_ready() 719 } 720 unwatch(&self, oper: Operation)721 fn unwatch(&self, oper: Operation) { 722 self.0.receivers.unwatch(oper); 723 } 724 } 725 726 impl<T> SelectHandle for Sender<'_, T> { try_select(&self, token: &mut Token) -> bool727 fn try_select(&self, token: &mut Token) -> bool { 728 self.0.start_send(token) 729 } 730 deadline(&self) -> Option<Instant>731 fn deadline(&self) -> Option<Instant> { 732 None 733 } 734 register(&self, _oper: Operation, _cx: &Context) -> bool735 fn register(&self, _oper: Operation, _cx: &Context) -> bool { 736 self.is_ready() 737 } 738 unregister(&self, _oper: Operation)739 fn unregister(&self, _oper: Operation) {} 740 accept(&self, token: &mut Token, _cx: &Context) -> bool741 fn accept(&self, token: &mut Token, _cx: &Context) -> bool { 742 self.try_select(token) 743 } 744 is_ready(&self) -> bool745 fn is_ready(&self) -> bool { 746 true 747 } 748 watch(&self, _oper: Operation, _cx: &Context) -> bool749 fn watch(&self, _oper: Operation, _cx: &Context) -> bool { 750 self.is_ready() 751 } 752 unwatch(&self, _oper: Operation)753 fn unwatch(&self, _oper: Operation) {} 754 } 755