• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::loom::cell::UnsafeCell;
2 use crate::loom::future::AtomicWaker;
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Arc;
5 use crate::park::thread::CachedParkThread;
6 use crate::park::Park;
7 use crate::sync::mpsc::error::TryRecvError;
8 use crate::sync::mpsc::list;
9 use crate::sync::notify::Notify;
10 
11 use std::fmt;
12 use std::process;
13 use std::sync::atomic::Ordering::{AcqRel, Relaxed};
14 use std::task::Poll::{Pending, Ready};
15 use std::task::{Context, Poll};
16 
17 /// Channel sender.
18 pub(crate) struct Tx<T, S> {
19     inner: Arc<Chan<T, S>>,
20 }
21 
22 impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result23     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
24         fmt.debug_struct("Tx").field("inner", &self.inner).finish()
25     }
26 }
27 
28 /// Channel receiver.
29 pub(crate) struct Rx<T, S: Semaphore> {
30     inner: Arc<Chan<T, S>>,
31 }
32 
33 impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result34     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
35         fmt.debug_struct("Rx").field("inner", &self.inner).finish()
36     }
37 }
38 
39 pub(crate) trait Semaphore {
is_idle(&self) -> bool40     fn is_idle(&self) -> bool;
41 
add_permit(&self)42     fn add_permit(&self);
43 
close(&self)44     fn close(&self);
45 
is_closed(&self) -> bool46     fn is_closed(&self) -> bool;
47 }
48 
49 struct Chan<T, S> {
50     /// Notifies all tasks listening for the receiver being dropped.
51     notify_rx_closed: Notify,
52 
53     /// Handle to the push half of the lock-free list.
54     tx: list::Tx<T>,
55 
56     /// Coordinates access to channel's capacity.
57     semaphore: S,
58 
59     /// Receiver waker. Notified when a value is pushed into the channel.
60     rx_waker: AtomicWaker,
61 
62     /// Tracks the number of outstanding sender handles.
63     ///
64     /// When this drops to zero, the send half of the channel is closed.
65     tx_count: AtomicUsize,
66 
67     /// Only accessed by `Rx` handle.
68     rx_fields: UnsafeCell<RxFields<T>>,
69 }
70 
71 impl<T, S> fmt::Debug for Chan<T, S>
72 where
73     S: fmt::Debug,
74 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result75     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
76         fmt.debug_struct("Chan")
77             .field("tx", &self.tx)
78             .field("semaphore", &self.semaphore)
79             .field("rx_waker", &self.rx_waker)
80             .field("tx_count", &self.tx_count)
81             .field("rx_fields", &"...")
82             .finish()
83     }
84 }
85 
86 /// Fields only accessed by `Rx` handle.
87 struct RxFields<T> {
88     /// Channel receiver. This field is only accessed by the `Receiver` type.
89     list: list::Rx<T>,
90 
91     /// `true` if `Rx::close` is called.
92     rx_closed: bool,
93 }
94 
95 impl<T> fmt::Debug for RxFields<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result96     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
97         fmt.debug_struct("RxFields")
98             .field("list", &self.list)
99             .field("rx_closed", &self.rx_closed)
100             .finish()
101     }
102 }
103 
104 unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
105 unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
106 
channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)107 pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
108     let (tx, rx) = list::channel();
109 
110     let chan = Arc::new(Chan {
111         notify_rx_closed: Notify::new(),
112         tx,
113         semaphore,
114         rx_waker: AtomicWaker::new(),
115         tx_count: AtomicUsize::new(1),
116         rx_fields: UnsafeCell::new(RxFields {
117             list: rx,
118             rx_closed: false,
119         }),
120     });
121 
122     (Tx::new(chan.clone()), Rx::new(chan))
123 }
124 
125 // ===== impl Tx =====
126 
127 impl<T, S> Tx<T, S> {
new(chan: Arc<Chan<T, S>>) -> Tx<T, S>128     fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
129         Tx { inner: chan }
130     }
131 
semaphore(&self) -> &S132     pub(super) fn semaphore(&self) -> &S {
133         &self.inner.semaphore
134     }
135 
136     /// Send a message and notify the receiver.
send(&self, value: T)137     pub(crate) fn send(&self, value: T) {
138         self.inner.send(value);
139     }
140 
141     /// Wake the receive half
wake_rx(&self)142     pub(crate) fn wake_rx(&self) {
143         self.inner.rx_waker.wake();
144     }
145 
146     /// Returns `true` if senders belong to the same channel.
same_channel(&self, other: &Self) -> bool147     pub(crate) fn same_channel(&self, other: &Self) -> bool {
148         Arc::ptr_eq(&self.inner, &other.inner)
149     }
150 }
151 
152 impl<T, S: Semaphore> Tx<T, S> {
is_closed(&self) -> bool153     pub(crate) fn is_closed(&self) -> bool {
154         self.inner.semaphore.is_closed()
155     }
156 
closed(&self)157     pub(crate) async fn closed(&self) {
158         // In order to avoid a race condition, we first request a notification,
159         // **then** check whether the semaphore is closed. If the semaphore is
160         // closed the notification request is dropped.
161         let notified = self.inner.notify_rx_closed.notified();
162 
163         if self.inner.semaphore.is_closed() {
164             return;
165         }
166         notified.await;
167     }
168 }
169 
170 impl<T, S> Clone for Tx<T, S> {
clone(&self) -> Tx<T, S>171     fn clone(&self) -> Tx<T, S> {
172         // Using a Relaxed ordering here is sufficient as the caller holds a
173         // strong ref to `self`, preventing a concurrent decrement to zero.
174         self.inner.tx_count.fetch_add(1, Relaxed);
175 
176         Tx {
177             inner: self.inner.clone(),
178         }
179     }
180 }
181 
182 impl<T, S> Drop for Tx<T, S> {
drop(&mut self)183     fn drop(&mut self) {
184         if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
185             return;
186         }
187 
188         // Close the list, which sends a `Close` message
189         self.inner.tx.close();
190 
191         // Notify the receiver
192         self.wake_rx();
193     }
194 }
195 
196 // ===== impl Rx =====
197 
198 impl<T, S: Semaphore> Rx<T, S> {
new(chan: Arc<Chan<T, S>>) -> Rx<T, S>199     fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
200         Rx { inner: chan }
201     }
202 
close(&mut self)203     pub(crate) fn close(&mut self) {
204         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
205             let rx_fields = unsafe { &mut *rx_fields_ptr };
206 
207             if rx_fields.rx_closed {
208                 return;
209             }
210 
211             rx_fields.rx_closed = true;
212         });
213 
214         self.inner.semaphore.close();
215         self.inner.notify_rx_closed.notify_waiters();
216     }
217 
218     /// Receive the next value
recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>219     pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
220         use super::block::Read::*;
221 
222         // Keep track of task budget
223         let coop = ready!(crate::coop::poll_proceed(cx));
224 
225         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
226             let rx_fields = unsafe { &mut *rx_fields_ptr };
227 
228             macro_rules! try_recv {
229                 () => {
230                     match rx_fields.list.pop(&self.inner.tx) {
231                         Some(Value(value)) => {
232                             self.inner.semaphore.add_permit();
233                             coop.made_progress();
234                             return Ready(Some(value));
235                         }
236                         Some(Closed) => {
237                             // TODO: This check may not be required as it most
238                             // likely can only return `true` at this point. A
239                             // channel is closed when all tx handles are
240                             // dropped. Dropping a tx handle releases memory,
241                             // which ensures that if dropping the tx handle is
242                             // visible, then all messages sent are also visible.
243                             assert!(self.inner.semaphore.is_idle());
244                             coop.made_progress();
245                             return Ready(None);
246                         }
247                         None => {} // fall through
248                     }
249                 };
250             }
251 
252             try_recv!();
253 
254             self.inner.rx_waker.register_by_ref(cx.waker());
255 
256             // It is possible that a value was pushed between attempting to read
257             // and registering the task, so we have to check the channel a
258             // second time here.
259             try_recv!();
260 
261             if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
262                 coop.made_progress();
263                 Ready(None)
264             } else {
265                 Pending
266             }
267         })
268     }
269 
270     /// Try to receive the next value.
try_recv(&mut self) -> Result<T, TryRecvError>271     pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
272         use super::list::TryPopResult;
273 
274         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
275             let rx_fields = unsafe { &mut *rx_fields_ptr };
276 
277             macro_rules! try_recv {
278                 () => {
279                     match rx_fields.list.try_pop(&self.inner.tx) {
280                         TryPopResult::Ok(value) => {
281                             self.inner.semaphore.add_permit();
282                             return Ok(value);
283                         }
284                         TryPopResult::Closed => return Err(TryRecvError::Disconnected),
285                         TryPopResult::Empty => return Err(TryRecvError::Empty),
286                         TryPopResult::Busy => {} // fall through
287                     }
288                 };
289             }
290 
291             try_recv!();
292 
293             // If a previous `poll_recv` call has set a waker, we wake it here.
294             // This allows us to put our own CachedParkThread waker in the
295             // AtomicWaker slot instead.
296             //
297             // This is not a spurious wakeup to `poll_recv` since we just got a
298             // Busy from `try_pop`, which only happens if there are messages in
299             // the queue.
300             self.inner.rx_waker.wake();
301 
302             // Park the thread until the problematic send has completed.
303             let mut park = CachedParkThread::new();
304             let waker = park.unpark().into_waker();
305             loop {
306                 self.inner.rx_waker.register_by_ref(&waker);
307                 // It is possible that the problematic send has now completed,
308                 // so we have to check for messages again.
309                 try_recv!();
310                 park.park().expect("park failed");
311             }
312         })
313     }
314 }
315 
316 impl<T, S: Semaphore> Drop for Rx<T, S> {
drop(&mut self)317     fn drop(&mut self) {
318         use super::block::Read::Value;
319 
320         self.close();
321 
322         self.inner.rx_fields.with_mut(|rx_fields_ptr| {
323             let rx_fields = unsafe { &mut *rx_fields_ptr };
324 
325             while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
326                 self.inner.semaphore.add_permit();
327             }
328         })
329     }
330 }
331 
332 // ===== impl Chan =====
333 
334 impl<T, S> Chan<T, S> {
send(&self, value: T)335     fn send(&self, value: T) {
336         // Push the value
337         self.tx.push(value);
338 
339         // Notify the rx task
340         self.rx_waker.wake();
341     }
342 }
343 
344 impl<T, S> Drop for Chan<T, S> {
drop(&mut self)345     fn drop(&mut self) {
346         use super::block::Read::Value;
347 
348         // Safety: the only owner of the rx fields is Chan, and eing
349         // inside its own Drop means we're the last ones to touch it.
350         self.rx_fields.with_mut(|rx_fields_ptr| {
351             let rx_fields = unsafe { &mut *rx_fields_ptr };
352 
353             while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
354             unsafe { rx_fields.list.free_blocks() };
355         });
356     }
357 }
358 
359 // ===== impl Semaphore for (::Semaphore, capacity) =====
360 
361 impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {
add_permit(&self)362     fn add_permit(&self) {
363         self.0.release(1)
364     }
365 
is_idle(&self) -> bool366     fn is_idle(&self) -> bool {
367         self.0.available_permits() == self.1
368     }
369 
close(&self)370     fn close(&self) {
371         self.0.close();
372     }
373 
is_closed(&self) -> bool374     fn is_closed(&self) -> bool {
375         self.0.is_closed()
376     }
377 }
378 
379 // ===== impl Semaphore for AtomicUsize =====
380 
381 use std::sync::atomic::Ordering::{Acquire, Release};
382 use std::usize;
383 
384 impl Semaphore for AtomicUsize {
add_permit(&self)385     fn add_permit(&self) {
386         let prev = self.fetch_sub(2, Release);
387 
388         if prev >> 1 == 0 {
389             // Something went wrong
390             process::abort();
391         }
392     }
393 
is_idle(&self) -> bool394     fn is_idle(&self) -> bool {
395         self.load(Acquire) >> 1 == 0
396     }
397 
close(&self)398     fn close(&self) {
399         self.fetch_or(1, Release);
400     }
401 
is_closed(&self) -> bool402     fn is_closed(&self) -> bool {
403         self.load(Acquire) & 1 == 1
404     }
405 }
406