• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::loom::cell::UnsafeCell;
2 use crate::loom::future::AtomicWaker;
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Arc;
5 use crate::runtime::park::CachedParkThread;
6 use crate::sync::mpsc::error::TryRecvError;
7 use crate::sync::mpsc::{bounded, list, unbounded};
8 use crate::sync::notify::Notify;
9 use crate::util::cacheline::CachePadded;
10 
11 use std::fmt;
12 use std::process;
13 use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
14 use std::task::Poll::{Pending, Ready};
15 use std::task::{Context, Poll};
16 
17 /// Channel sender.
18 pub(crate) struct Tx<T, S> {
19     inner: Arc<Chan<T, S>>,
20 }
21 
22 impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result23     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
24         fmt.debug_struct("Tx").field("inner", &self.inner).finish()
25     }
26 }
27 
28 /// Channel receiver.
29 pub(crate) struct Rx<T, S: Semaphore> {
30     inner: Arc<Chan<T, S>>,
31 }
32 
33 impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result34     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
35         fmt.debug_struct("Rx").field("inner", &self.inner).finish()
36     }
37 }
38 
39 pub(crate) trait Semaphore {
is_idle(&self) -> bool40     fn is_idle(&self) -> bool;
41 
add_permit(&self)42     fn add_permit(&self);
43 
close(&self)44     fn close(&self);
45 
is_closed(&self) -> bool46     fn is_closed(&self) -> bool;
47 }
48 
49 pub(super) struct Chan<T, S> {
50     /// Handle to the push half of the lock-free list.
51     tx: CachePadded<list::Tx<T>>,
52 
53     /// Receiver waker. Notified when a value is pushed into the channel.
54     rx_waker: CachePadded<AtomicWaker>,
55 
56     /// Notifies all tasks listening for the receiver being dropped.
57     notify_rx_closed: Notify,
58 
59     /// Coordinates access to channel's capacity.
60     semaphore: S,
61 
62     /// Tracks the number of outstanding sender handles.
63     ///
64     /// When this drops to zero, the send half of the channel is closed.
65     tx_count: AtomicUsize,
66 
67     /// Only accessed by `Rx` handle.
68     rx_fields: UnsafeCell<RxFields<T>>,
69 }
70 
71 impl<T, S> fmt::Debug for Chan<T, S>
72 where
73     S: fmt::Debug,
74 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result75     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
76         fmt.debug_struct("Chan")
77             .field("tx", &*self.tx)
78             .field("semaphore", &self.semaphore)
79             .field("rx_waker", &*self.rx_waker)
80             .field("tx_count", &self.tx_count)
81             .field("rx_fields", &"...")
82             .finish()
83     }
84 }
85 
86 /// Fields only accessed by `Rx` handle.
87 struct RxFields<T> {
88     /// Channel receiver. This field is only accessed by the `Receiver` type.
89     list: list::Rx<T>,
90 
91     /// `true` if `Rx::close` is called.
92     rx_closed: bool,
93 }
94 
95 impl<T> fmt::Debug for RxFields<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result96     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
97         fmt.debug_struct("RxFields")
98             .field("list", &self.list)
99             .field("rx_closed", &self.rx_closed)
100             .finish()
101     }
102 }
103 
104 unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
105 unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
106 
channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)107 pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
108     let (tx, rx) = list::channel();
109 
110     let chan = Arc::new(Chan {
111         notify_rx_closed: Notify::new(),
112         tx: CachePadded::new(tx),
113         semaphore,
114         rx_waker: CachePadded::new(AtomicWaker::new()),
115         tx_count: AtomicUsize::new(1),
116         rx_fields: UnsafeCell::new(RxFields {
117             list: rx,
118             rx_closed: false,
119         }),
120     });
121 
122     (Tx::new(chan.clone()), Rx::new(chan))
123 }
124 
125 // ===== impl Tx =====
126 
127 impl<T, S> Tx<T, S> {
new(chan: Arc<Chan<T, S>>) -> Tx<T, S>128     fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
129         Tx { inner: chan }
130     }
131 
downgrade(&self) -> Arc<Chan<T, S>>132     pub(super) fn downgrade(&self) -> Arc<Chan<T, S>> {
133         self.inner.clone()
134     }
135 
136     // Returns the upgraded channel or None if the upgrade failed.
upgrade(chan: Arc<Chan<T, S>>) -> Option<Self>137     pub(super) fn upgrade(chan: Arc<Chan<T, S>>) -> Option<Self> {
138         let mut tx_count = chan.tx_count.load(Acquire);
139 
140         loop {
141             if tx_count == 0 {
142                 // channel is closed
143                 return None;
144             }
145 
146             match chan
147                 .tx_count
148                 .compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire)
149             {
150                 Ok(_) => return Some(Tx { inner: chan }),
151                 Err(prev_count) => tx_count = prev_count,
152             }
153         }
154     }
155 
semaphore(&self) -> &S156     pub(super) fn semaphore(&self) -> &S {
157         &self.inner.semaphore
158     }
159 
160     /// Send a message and notify the receiver.
send(&self, value: T)161     pub(crate) fn send(&self, value: T) {
162         self.inner.send(value);
163     }
164 
165     /// Wake the receive half
wake_rx(&self)166     pub(crate) fn wake_rx(&self) {
167         self.inner.rx_waker.wake();
168     }
169 
170     /// Returns `true` if senders belong to the same channel.
same_channel(&self, other: &Self) -> bool171     pub(crate) fn same_channel(&self, other: &Self) -> bool {
172         Arc::ptr_eq(&self.inner, &other.inner)
173     }
174 }
175 
176 impl<T, S: Semaphore> Tx<T, S> {
is_closed(&self) -> bool177     pub(crate) fn is_closed(&self) -> bool {
178         self.inner.semaphore.is_closed()
179     }
180 
closed(&self)181     pub(crate) async fn closed(&self) {
182         // In order to avoid a race condition, we first request a notification,
183         // **then** check whether the semaphore is closed. If the semaphore is
184         // closed the notification request is dropped.
185         let notified = self.inner.notify_rx_closed.notified();
186 
187         if self.inner.semaphore.is_closed() {
188             return;
189         }
190         notified.await;
191     }
192 }
193 
194 impl<T, S> Clone for Tx<T, S> {
clone(&self) -> Tx<T, S>195     fn clone(&self) -> Tx<T, S> {
196         // Using a Relaxed ordering here is sufficient as the caller holds a
197         // strong ref to `self`, preventing a concurrent decrement to zero.
198         self.inner.tx_count.fetch_add(1, Relaxed);
199 
200         Tx {
201             inner: self.inner.clone(),
202         }
203     }
204 }
205 
206 impl<T, S> Drop for Tx<T, S> {
drop(&mut self)207     fn drop(&mut self) {
208         if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
209             return;
210         }
211 
212         // Close the list, which sends a `Close` message
213         self.inner.tx.close();
214 
215         // Notify the receiver
216         self.wake_rx();
217     }
218 }
219 
220 // ===== impl Rx =====
221 
222 impl<T, S: Semaphore> Rx<T, S> {
new(chan: Arc<Chan<T, S>>) -> Rx<T, S>223     fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
224         Rx { inner: chan }
225     }
226 
close(&mut self)227     pub(crate) fn close(&mut self) {
228         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
229             let rx_fields = unsafe { &mut *rx_fields_ptr };
230 
231             if rx_fields.rx_closed {
232                 return;
233             }
234 
235             rx_fields.rx_closed = true;
236         });
237 
238         self.inner.semaphore.close();
239         self.inner.notify_rx_closed.notify_waiters();
240     }
241 
242     /// Receive the next value
recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>243     pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
244         use super::block::Read::*;
245 
246         ready!(crate::trace::trace_leaf(cx));
247 
248         // Keep track of task budget
249         let coop = ready!(crate::runtime::coop::poll_proceed(cx));
250 
251         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
252             let rx_fields = unsafe { &mut *rx_fields_ptr };
253 
254             macro_rules! try_recv {
255                 () => {
256                     match rx_fields.list.pop(&self.inner.tx) {
257                         Some(Value(value)) => {
258                             self.inner.semaphore.add_permit();
259                             coop.made_progress();
260                             return Ready(Some(value));
261                         }
262                         Some(Closed) => {
263                             // TODO: This check may not be required as it most
264                             // likely can only return `true` at this point. A
265                             // channel is closed when all tx handles are
266                             // dropped. Dropping a tx handle releases memory,
267                             // which ensures that if dropping the tx handle is
268                             // visible, then all messages sent are also visible.
269                             assert!(self.inner.semaphore.is_idle());
270                             coop.made_progress();
271                             return Ready(None);
272                         }
273                         None => {} // fall through
274                     }
275                 };
276             }
277 
278             try_recv!();
279 
280             self.inner.rx_waker.register_by_ref(cx.waker());
281 
282             // It is possible that a value was pushed between attempting to read
283             // and registering the task, so we have to check the channel a
284             // second time here.
285             try_recv!();
286 
287             if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
288                 coop.made_progress();
289                 Ready(None)
290             } else {
291                 Pending
292             }
293         })
294     }
295 
296     /// Try to receive the next value.
try_recv(&mut self) -> Result<T, TryRecvError>297     pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
298         use super::list::TryPopResult;
299 
300         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
301             let rx_fields = unsafe { &mut *rx_fields_ptr };
302 
303             macro_rules! try_recv {
304                 () => {
305                     match rx_fields.list.try_pop(&self.inner.tx) {
306                         TryPopResult::Ok(value) => {
307                             self.inner.semaphore.add_permit();
308                             return Ok(value);
309                         }
310                         TryPopResult::Closed => return Err(TryRecvError::Disconnected),
311                         TryPopResult::Empty => return Err(TryRecvError::Empty),
312                         TryPopResult::Busy => {} // fall through
313                     }
314                 };
315             }
316 
317             try_recv!();
318 
319             // If a previous `poll_recv` call has set a waker, we wake it here.
320             // This allows us to put our own CachedParkThread waker in the
321             // AtomicWaker slot instead.
322             //
323             // This is not a spurious wakeup to `poll_recv` since we just got a
324             // Busy from `try_pop`, which only happens if there are messages in
325             // the queue.
326             self.inner.rx_waker.wake();
327 
328             // Park the thread until the problematic send has completed.
329             let mut park = CachedParkThread::new();
330             let waker = park.waker().unwrap();
331             loop {
332                 self.inner.rx_waker.register_by_ref(&waker);
333                 // It is possible that the problematic send has now completed,
334                 // so we have to check for messages again.
335                 try_recv!();
336                 park.park();
337             }
338         })
339     }
340 }
341 
342 impl<T, S: Semaphore> Drop for Rx<T, S> {
drop(&mut self)343     fn drop(&mut self) {
344         use super::block::Read::Value;
345 
346         self.close();
347 
348         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
349             let rx_fields = unsafe { &mut *rx_fields_ptr };
350 
351             while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
352                 self.inner.semaphore.add_permit();
353             }
354         })
355     }
356 }
357 
358 // ===== impl Chan =====
359 
360 impl<T, S> Chan<T, S> {
send(&self, value: T)361     fn send(&self, value: T) {
362         // Push the value
363         self.tx.push(value);
364 
365         // Notify the rx task
366         self.rx_waker.wake();
367     }
368 }
369 
370 impl<T, S> Drop for Chan<T, S> {
drop(&mut self)371     fn drop(&mut self) {
372         use super::block::Read::Value;
373 
374         // Safety: the only owner of the rx fields is Chan, and being
375         // inside its own Drop means we're the last ones to touch it.
376         self.rx_fields.with_mut(|rx_fields_ptr| {
377             let rx_fields = unsafe { &mut *rx_fields_ptr };
378 
379             while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
380             unsafe { rx_fields.list.free_blocks() };
381         });
382     }
383 }
384 
385 // ===== impl Semaphore for (::Semaphore, capacity) =====
386 
387 impl Semaphore for bounded::Semaphore {
add_permit(&self)388     fn add_permit(&self) {
389         self.semaphore.release(1)
390     }
391 
is_idle(&self) -> bool392     fn is_idle(&self) -> bool {
393         self.semaphore.available_permits() == self.bound
394     }
395 
close(&self)396     fn close(&self) {
397         self.semaphore.close();
398     }
399 
is_closed(&self) -> bool400     fn is_closed(&self) -> bool {
401         self.semaphore.is_closed()
402     }
403 }
404 
405 // ===== impl Semaphore for AtomicUsize =====
406 
407 impl Semaphore for unbounded::Semaphore {
add_permit(&self)408     fn add_permit(&self) {
409         let prev = self.0.fetch_sub(2, Release);
410 
411         if prev >> 1 == 0 {
412             // Something went wrong
413             process::abort();
414         }
415     }
416 
is_idle(&self) -> bool417     fn is_idle(&self) -> bool {
418         self.0.load(Acquire) >> 1 == 0
419     }
420 
close(&self)421     fn close(&self) {
422         self.0.fetch_or(1, Release);
423     }
424 
is_closed(&self) -> bool425     fn is_closed(&self) -> bool {
426         self.0.load(Acquire) & 1 == 1
427     }
428 }
429