• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Bounded channel based on a preallocated array.
2 //!
3 //! This flavor has a fixed, positive capacity.
4 //!
5 //! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
6 //!
7 //! Source:
8 //!   - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
9 //!   - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
10 
11 use std::cell::UnsafeCell;
12 use std::marker::PhantomData;
13 use std::mem::MaybeUninit;
14 use std::ptr;
15 use std::sync::atomic::{self, AtomicUsize, Ordering};
16 use std::time::Instant;
17 
18 use crossbeam_utils::{Backoff, CachePadded};
19 
20 use crate::context::Context;
21 use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
22 use crate::select::{Operation, SelectHandle, Selected, Token};
23 use crate::waker::SyncWaker;
24 
25 /// A slot in a channel.
26 struct Slot<T> {
27     /// The current stamp.
28     stamp: AtomicUsize,
29 
30     /// The message in this slot.
31     msg: UnsafeCell<MaybeUninit<T>>,
32 }
33 
34 /// The token type for the array flavor.
35 #[derive(Debug)]
36 pub struct ArrayToken {
37     /// Slot to read from or write to.
38     slot: *const u8,
39 
40     /// Stamp to store into the slot after reading or writing.
41     stamp: usize,
42 }
43 
44 impl Default for ArrayToken {
45     #[inline]
default() -> Self46     fn default() -> Self {
47         ArrayToken {
48             slot: ptr::null(),
49             stamp: 0,
50         }
51     }
52 }
53 
54 /// Bounded channel based on a preallocated array.
55 pub(crate) struct Channel<T> {
56     /// The head of the channel.
57     ///
58     /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
59     /// packed into a single `usize`. The lower bits represent the index, while the upper bits
60     /// represent the lap. The mark bit in the head is always zero.
61     ///
62     /// Messages are popped from the head of the channel.
63     head: CachePadded<AtomicUsize>,
64 
65     /// The tail of the channel.
66     ///
67     /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
68     /// packed into a single `usize`. The lower bits represent the index, while the upper bits
69     /// represent the lap. The mark bit indicates that the channel is disconnected.
70     ///
71     /// Messages are pushed into the tail of the channel.
72     tail: CachePadded<AtomicUsize>,
73 
74     /// The buffer holding slots.
75     buffer: *mut Slot<T>,
76 
77     /// The channel capacity.
78     cap: usize,
79 
80     /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
81     one_lap: usize,
82 
83     /// If this bit is set in the tail, that means the channel is disconnected.
84     mark_bit: usize,
85 
86     /// Senders waiting while the channel is full.
87     senders: SyncWaker,
88 
89     /// Receivers waiting while the channel is empty and not disconnected.
90     receivers: SyncWaker,
91 
92     /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
93     _marker: PhantomData<T>,
94 }
95 
96 impl<T> Channel<T> {
97     /// Creates a bounded channel of capacity `cap`.
with_capacity(cap: usize) -> Self98     pub(crate) fn with_capacity(cap: usize) -> Self {
99         assert!(cap > 0, "capacity must be positive");
100 
101         // Compute constants `mark_bit` and `one_lap`.
102         let mark_bit = (cap + 1).next_power_of_two();
103         let one_lap = mark_bit * 2;
104 
105         // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
106         let head = 0;
107         // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
108         let tail = 0;
109 
110         // Allocate a buffer of `cap` slots initialized
111         // with stamps.
112         let buffer = {
113             let boxed: Box<[Slot<T>]> = (0..cap)
114                 .map(|i| {
115                     // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
116                     Slot {
117                         stamp: AtomicUsize::new(i),
118                         msg: UnsafeCell::new(MaybeUninit::uninit()),
119                     }
120                 })
121                 .collect();
122             Box::into_raw(boxed) as *mut Slot<T>
123         };
124 
125         Channel {
126             buffer,
127             cap,
128             one_lap,
129             mark_bit,
130             head: CachePadded::new(AtomicUsize::new(head)),
131             tail: CachePadded::new(AtomicUsize::new(tail)),
132             senders: SyncWaker::new(),
133             receivers: SyncWaker::new(),
134             _marker: PhantomData,
135         }
136     }
137 
138     /// Returns a receiver handle to the channel.
receiver(&self) -> Receiver<'_, T>139     pub(crate) fn receiver(&self) -> Receiver<'_, T> {
140         Receiver(self)
141     }
142 
143     /// Returns a sender handle to the channel.
sender(&self) -> Sender<'_, T>144     pub(crate) fn sender(&self) -> Sender<'_, T> {
145         Sender(self)
146     }
147 
148     /// Attempts to reserve a slot for sending a message.
start_send(&self, token: &mut Token) -> bool149     fn start_send(&self, token: &mut Token) -> bool {
150         let backoff = Backoff::new();
151         let mut tail = self.tail.load(Ordering::Relaxed);
152 
153         loop {
154             // Check if the channel is disconnected.
155             if tail & self.mark_bit != 0 {
156                 token.array.slot = ptr::null();
157                 token.array.stamp = 0;
158                 return true;
159             }
160 
161             // Deconstruct the tail.
162             let index = tail & (self.mark_bit - 1);
163             let lap = tail & !(self.one_lap - 1);
164 
165             // Inspect the corresponding slot.
166             let slot = unsafe { &*self.buffer.add(index) };
167             let stamp = slot.stamp.load(Ordering::Acquire);
168 
169             // If the tail and the stamp match, we may attempt to push.
170             if tail == stamp {
171                 let new_tail = if index + 1 < self.cap {
172                     // Same lap, incremented index.
173                     // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
174                     tail + 1
175                 } else {
176                     // One lap forward, index wraps around to zero.
177                     // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
178                     lap.wrapping_add(self.one_lap)
179                 };
180 
181                 // Try moving the tail.
182                 match self.tail.compare_exchange_weak(
183                     tail,
184                     new_tail,
185                     Ordering::SeqCst,
186                     Ordering::Relaxed,
187                 ) {
188                     Ok(_) => {
189                         // Prepare the token for the follow-up call to `write`.
190                         token.array.slot = slot as *const Slot<T> as *const u8;
191                         token.array.stamp = tail + 1;
192                         return true;
193                     }
194                     Err(t) => {
195                         tail = t;
196                         backoff.spin();
197                     }
198                 }
199             } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
200                 atomic::fence(Ordering::SeqCst);
201                 let head = self.head.load(Ordering::Relaxed);
202 
203                 // If the head lags one lap behind the tail as well...
204                 if head.wrapping_add(self.one_lap) == tail {
205                     // ...then the channel is full.
206                     return false;
207                 }
208 
209                 backoff.spin();
210                 tail = self.tail.load(Ordering::Relaxed);
211             } else {
212                 // Snooze because we need to wait for the stamp to get updated.
213                 backoff.snooze();
214                 tail = self.tail.load(Ordering::Relaxed);
215             }
216         }
217     }
218 
219     /// Writes a message into the channel.
write(&self, token: &mut Token, msg: T) -> Result<(), T>220     pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
221         // If there is no slot, the channel is disconnected.
222         if token.array.slot.is_null() {
223             return Err(msg);
224         }
225 
226         let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
227 
228         // Write the message into the slot and update the stamp.
229         slot.msg.get().write(MaybeUninit::new(msg));
230         slot.stamp.store(token.array.stamp, Ordering::Release);
231 
232         // Wake a sleeping receiver.
233         self.receivers.notify();
234         Ok(())
235     }
236 
237     /// Attempts to reserve a slot for receiving a message.
start_recv(&self, token: &mut Token) -> bool238     fn start_recv(&self, token: &mut Token) -> bool {
239         let backoff = Backoff::new();
240         let mut head = self.head.load(Ordering::Relaxed);
241 
242         loop {
243             // Deconstruct the head.
244             let index = head & (self.mark_bit - 1);
245             let lap = head & !(self.one_lap - 1);
246 
247             // Inspect the corresponding slot.
248             let slot = unsafe { &*self.buffer.add(index) };
249             let stamp = slot.stamp.load(Ordering::Acquire);
250 
251             // If the the stamp is ahead of the head by 1, we may attempt to pop.
252             if head + 1 == stamp {
253                 let new = if index + 1 < self.cap {
254                     // Same lap, incremented index.
255                     // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
256                     head + 1
257                 } else {
258                     // One lap forward, index wraps around to zero.
259                     // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
260                     lap.wrapping_add(self.one_lap)
261                 };
262 
263                 // Try moving the head.
264                 match self.head.compare_exchange_weak(
265                     head,
266                     new,
267                     Ordering::SeqCst,
268                     Ordering::Relaxed,
269                 ) {
270                     Ok(_) => {
271                         // Prepare the token for the follow-up call to `read`.
272                         token.array.slot = slot as *const Slot<T> as *const u8;
273                         token.array.stamp = head.wrapping_add(self.one_lap);
274                         return true;
275                     }
276                     Err(h) => {
277                         head = h;
278                         backoff.spin();
279                     }
280                 }
281             } else if stamp == head {
282                 atomic::fence(Ordering::SeqCst);
283                 let tail = self.tail.load(Ordering::Relaxed);
284 
285                 // If the tail equals the head, that means the channel is empty.
286                 if (tail & !self.mark_bit) == head {
287                     // If the channel is disconnected...
288                     if tail & self.mark_bit != 0 {
289                         // ...then receive an error.
290                         token.array.slot = ptr::null();
291                         token.array.stamp = 0;
292                         return true;
293                     } else {
294                         // Otherwise, the receive operation is not ready.
295                         return false;
296                     }
297                 }
298 
299                 backoff.spin();
300                 head = self.head.load(Ordering::Relaxed);
301             } else {
302                 // Snooze because we need to wait for the stamp to get updated.
303                 backoff.snooze();
304                 head = self.head.load(Ordering::Relaxed);
305             }
306         }
307     }
308 
309     /// Reads a message from the channel.
read(&self, token: &mut Token) -> Result<T, ()>310     pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
311         if token.array.slot.is_null() {
312             // The channel is disconnected.
313             return Err(());
314         }
315 
316         let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
317 
318         // Read the message from the slot and update the stamp.
319         let msg = slot.msg.get().read().assume_init();
320         slot.stamp.store(token.array.stamp, Ordering::Release);
321 
322         // Wake a sleeping sender.
323         self.senders.notify();
324         Ok(msg)
325     }
326 
327     /// Attempts to send a message into the channel.
try_send(&self, msg: T) -> Result<(), TrySendError<T>>328     pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
329         let token = &mut Token::default();
330         if self.start_send(token) {
331             unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
332         } else {
333             Err(TrySendError::Full(msg))
334         }
335     }
336 
337     /// Sends a message into the channel.
send( &self, msg: T, deadline: Option<Instant>, ) -> Result<(), SendTimeoutError<T>>338     pub(crate) fn send(
339         &self,
340         msg: T,
341         deadline: Option<Instant>,
342     ) -> Result<(), SendTimeoutError<T>> {
343         let token = &mut Token::default();
344         loop {
345             // Try sending a message several times.
346             let backoff = Backoff::new();
347             loop {
348                 if self.start_send(token) {
349                     let res = unsafe { self.write(token, msg) };
350                     return res.map_err(SendTimeoutError::Disconnected);
351                 }
352 
353                 if backoff.is_completed() {
354                     break;
355                 } else {
356                     backoff.snooze();
357                 }
358             }
359 
360             if let Some(d) = deadline {
361                 if Instant::now() >= d {
362                     return Err(SendTimeoutError::Timeout(msg));
363                 }
364             }
365 
366             Context::with(|cx| {
367                 // Prepare for blocking until a receiver wakes us up.
368                 let oper = Operation::hook(token);
369                 self.senders.register(oper, cx);
370 
371                 // Has the channel become ready just now?
372                 if !self.is_full() || self.is_disconnected() {
373                     let _ = cx.try_select(Selected::Aborted);
374                 }
375 
376                 // Block the current thread.
377                 let sel = cx.wait_until(deadline);
378 
379                 match sel {
380                     Selected::Waiting => unreachable!(),
381                     Selected::Aborted | Selected::Disconnected => {
382                         self.senders.unregister(oper).unwrap();
383                     }
384                     Selected::Operation(_) => {}
385                 }
386             });
387         }
388     }
389 
390     /// Attempts to receive a message without blocking.
try_recv(&self) -> Result<T, TryRecvError>391     pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
392         let token = &mut Token::default();
393 
394         if self.start_recv(token) {
395             unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
396         } else {
397             Err(TryRecvError::Empty)
398         }
399     }
400 
401     /// Receives a message from the channel.
recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError>402     pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
403         let token = &mut Token::default();
404         loop {
405             // Try receiving a message several times.
406             let backoff = Backoff::new();
407             loop {
408                 if self.start_recv(token) {
409                     let res = unsafe { self.read(token) };
410                     return res.map_err(|_| RecvTimeoutError::Disconnected);
411                 }
412 
413                 if backoff.is_completed() {
414                     break;
415                 } else {
416                     backoff.snooze();
417                 }
418             }
419 
420             if let Some(d) = deadline {
421                 if Instant::now() >= d {
422                     return Err(RecvTimeoutError::Timeout);
423                 }
424             }
425 
426             Context::with(|cx| {
427                 // Prepare for blocking until a sender wakes us up.
428                 let oper = Operation::hook(token);
429                 self.receivers.register(oper, cx);
430 
431                 // Has the channel become ready just now?
432                 if !self.is_empty() || self.is_disconnected() {
433                     let _ = cx.try_select(Selected::Aborted);
434                 }
435 
436                 // Block the current thread.
437                 let sel = cx.wait_until(deadline);
438 
439                 match sel {
440                     Selected::Waiting => unreachable!(),
441                     Selected::Aborted | Selected::Disconnected => {
442                         self.receivers.unregister(oper).unwrap();
443                         // If the channel was disconnected, we still have to check for remaining
444                         // messages.
445                     }
446                     Selected::Operation(_) => {}
447                 }
448             });
449         }
450     }
451 
452     /// Returns the current number of messages inside the channel.
len(&self) -> usize453     pub(crate) fn len(&self) -> usize {
454         loop {
455             // Load the tail, then load the head.
456             let tail = self.tail.load(Ordering::SeqCst);
457             let head = self.head.load(Ordering::SeqCst);
458 
459             // If the tail didn't change, we've got consistent values to work with.
460             if self.tail.load(Ordering::SeqCst) == tail {
461                 let hix = head & (self.mark_bit - 1);
462                 let tix = tail & (self.mark_bit - 1);
463 
464                 return if hix < tix {
465                     tix - hix
466                 } else if hix > tix {
467                     self.cap - hix + tix
468                 } else if (tail & !self.mark_bit) == head {
469                     0
470                 } else {
471                     self.cap
472                 };
473             }
474         }
475     }
476 
477     /// Returns the capacity of the channel.
478     #[allow(clippy::unnecessary_wraps)] // This is intentional.
capacity(&self) -> Option<usize>479     pub(crate) fn capacity(&self) -> Option<usize> {
480         Some(self.cap)
481     }
482 
483     /// Disconnects the channel and wakes up all blocked senders and receivers.
484     ///
485     /// Returns `true` if this call disconnected the channel.
disconnect(&self) -> bool486     pub(crate) fn disconnect(&self) -> bool {
487         let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
488 
489         if tail & self.mark_bit == 0 {
490             self.senders.disconnect();
491             self.receivers.disconnect();
492             true
493         } else {
494             false
495         }
496     }
497 
498     /// Returns `true` if the channel is disconnected.
is_disconnected(&self) -> bool499     pub(crate) fn is_disconnected(&self) -> bool {
500         self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
501     }
502 
503     /// Returns `true` if the channel is empty.
is_empty(&self) -> bool504     pub(crate) fn is_empty(&self) -> bool {
505         let head = self.head.load(Ordering::SeqCst);
506         let tail = self.tail.load(Ordering::SeqCst);
507 
508         // Is the tail equal to the head?
509         //
510         // Note: If the head changes just before we load the tail, that means there was a moment
511         // when the channel was not empty, so it is safe to just return `false`.
512         (tail & !self.mark_bit) == head
513     }
514 
515     /// Returns `true` if the channel is full.
is_full(&self) -> bool516     pub(crate) fn is_full(&self) -> bool {
517         let tail = self.tail.load(Ordering::SeqCst);
518         let head = self.head.load(Ordering::SeqCst);
519 
520         // Is the head lagging one lap behind tail?
521         //
522         // Note: If the tail changes just before we load the head, that means there was a moment
523         // when the channel was not full, so it is safe to just return `false`.
524         head.wrapping_add(self.one_lap) == tail & !self.mark_bit
525     }
526 }
527 
528 impl<T> Drop for Channel<T> {
drop(&mut self)529     fn drop(&mut self) {
530         // Get the index of the head.
531         let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
532 
533         // Loop over all slots that hold a message and drop them.
534         for i in 0..self.len() {
535             // Compute the index of the next slot holding a message.
536             let index = if hix + i < self.cap {
537                 hix + i
538             } else {
539                 hix + i - self.cap
540             };
541 
542             unsafe {
543                 let p = {
544                     let slot = &mut *self.buffer.add(index);
545                     let msg = &mut *slot.msg.get();
546                     msg.as_mut_ptr()
547                 };
548                 p.drop_in_place();
549             }
550         }
551 
552         // Finally, deallocate the buffer, but don't run any destructors.
553         unsafe {
554             // Create a slice from the buffer to make
555             // a fat pointer. Then, use Box::from_raw
556             // to deallocate it.
557             let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>];
558             Box::from_raw(ptr);
559         }
560     }
561 }
562 
563 /// Receiver handle to a channel.
564 pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
565 
566 /// Sender handle to a channel.
567 pub(crate) struct Sender<'a, T>(&'a Channel<T>);
568 
569 impl<T> SelectHandle for Receiver<'_, T> {
try_select(&self, token: &mut Token) -> bool570     fn try_select(&self, token: &mut Token) -> bool {
571         self.0.start_recv(token)
572     }
573 
deadline(&self) -> Option<Instant>574     fn deadline(&self) -> Option<Instant> {
575         None
576     }
577 
register(&self, oper: Operation, cx: &Context) -> bool578     fn register(&self, oper: Operation, cx: &Context) -> bool {
579         self.0.receivers.register(oper, cx);
580         self.is_ready()
581     }
582 
unregister(&self, oper: Operation)583     fn unregister(&self, oper: Operation) {
584         self.0.receivers.unregister(oper);
585     }
586 
accept(&self, token: &mut Token, _cx: &Context) -> bool587     fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
588         self.try_select(token)
589     }
590 
is_ready(&self) -> bool591     fn is_ready(&self) -> bool {
592         !self.0.is_empty() || self.0.is_disconnected()
593     }
594 
watch(&self, oper: Operation, cx: &Context) -> bool595     fn watch(&self, oper: Operation, cx: &Context) -> bool {
596         self.0.receivers.watch(oper, cx);
597         self.is_ready()
598     }
599 
unwatch(&self, oper: Operation)600     fn unwatch(&self, oper: Operation) {
601         self.0.receivers.unwatch(oper);
602     }
603 }
604 
605 impl<T> SelectHandle for Sender<'_, T> {
try_select(&self, token: &mut Token) -> bool606     fn try_select(&self, token: &mut Token) -> bool {
607         self.0.start_send(token)
608     }
609 
deadline(&self) -> Option<Instant>610     fn deadline(&self) -> Option<Instant> {
611         None
612     }
613 
register(&self, oper: Operation, cx: &Context) -> bool614     fn register(&self, oper: Operation, cx: &Context) -> bool {
615         self.0.senders.register(oper, cx);
616         self.is_ready()
617     }
618 
unregister(&self, oper: Operation)619     fn unregister(&self, oper: Operation) {
620         self.0.senders.unregister(oper);
621     }
622 
accept(&self, token: &mut Token, _cx: &Context) -> bool623     fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
624         self.try_select(token)
625     }
626 
is_ready(&self) -> bool627     fn is_ready(&self) -> bool {
628         !self.0.is_full() || self.0.is_disconnected()
629     }
630 
watch(&self, oper: Operation, cx: &Context) -> bool631     fn watch(&self, oper: Operation, cx: &Context) -> bool {
632         self.0.senders.watch(oper, cx);
633         self.is_ready()
634     }
635 
unwatch(&self, oper: Operation)636     fn unwatch(&self, oper: Operation) {
637         self.0.senders.unwatch(oper);
638     }
639 }
640