1 use crate::loom::cell::UnsafeCell;
2 use crate::loom::future::AtomicWaker;
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Arc;
5 use crate::runtime::park::CachedParkThread;
6 use crate::sync::mpsc::error::TryRecvError;
7 use crate::sync::mpsc::{bounded, list, unbounded};
8 use crate::sync::notify::Notify;
9 use crate::util::cacheline::CachePadded;
10
11 use std::fmt;
12 use std::process;
13 use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
14 use std::task::Poll::{Pending, Ready};
15 use std::task::{Context, Poll};
16
17 /// Channel sender.
18 pub(crate) struct Tx<T, S> {
19 inner: Arc<Chan<T, S>>,
20 }
21
22 impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result23 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
24 fmt.debug_struct("Tx").field("inner", &self.inner).finish()
25 }
26 }
27
28 /// Channel receiver.
29 pub(crate) struct Rx<T, S: Semaphore> {
30 inner: Arc<Chan<T, S>>,
31 }
32
33 impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result34 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
35 fmt.debug_struct("Rx").field("inner", &self.inner).finish()
36 }
37 }
38
39 pub(crate) trait Semaphore {
is_idle(&self) -> bool40 fn is_idle(&self) -> bool;
41
add_permit(&self)42 fn add_permit(&self);
43
close(&self)44 fn close(&self);
45
is_closed(&self) -> bool46 fn is_closed(&self) -> bool;
47 }
48
49 pub(super) struct Chan<T, S> {
50 /// Handle to the push half of the lock-free list.
51 tx: CachePadded<list::Tx<T>>,
52
53 /// Receiver waker. Notified when a value is pushed into the channel.
54 rx_waker: CachePadded<AtomicWaker>,
55
56 /// Notifies all tasks listening for the receiver being dropped.
57 notify_rx_closed: Notify,
58
59 /// Coordinates access to channel's capacity.
60 semaphore: S,
61
62 /// Tracks the number of outstanding sender handles.
63 ///
64 /// When this drops to zero, the send half of the channel is closed.
65 tx_count: AtomicUsize,
66
67 /// Only accessed by `Rx` handle.
68 rx_fields: UnsafeCell<RxFields<T>>,
69 }
70
71 impl<T, S> fmt::Debug for Chan<T, S>
72 where
73 S: fmt::Debug,
74 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result75 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
76 fmt.debug_struct("Chan")
77 .field("tx", &*self.tx)
78 .field("semaphore", &self.semaphore)
79 .field("rx_waker", &*self.rx_waker)
80 .field("tx_count", &self.tx_count)
81 .field("rx_fields", &"...")
82 .finish()
83 }
84 }
85
86 /// Fields only accessed by `Rx` handle.
87 struct RxFields<T> {
88 /// Channel receiver. This field is only accessed by the `Receiver` type.
89 list: list::Rx<T>,
90
91 /// `true` if `Rx::close` is called.
92 rx_closed: bool,
93 }
94
95 impl<T> fmt::Debug for RxFields<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result96 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
97 fmt.debug_struct("RxFields")
98 .field("list", &self.list)
99 .field("rx_closed", &self.rx_closed)
100 .finish()
101 }
102 }
103
104 unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
105 unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
106
channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)107 pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
108 let (tx, rx) = list::channel();
109
110 let chan = Arc::new(Chan {
111 notify_rx_closed: Notify::new(),
112 tx: CachePadded::new(tx),
113 semaphore,
114 rx_waker: CachePadded::new(AtomicWaker::new()),
115 tx_count: AtomicUsize::new(1),
116 rx_fields: UnsafeCell::new(RxFields {
117 list: rx,
118 rx_closed: false,
119 }),
120 });
121
122 (Tx::new(chan.clone()), Rx::new(chan))
123 }
124
125 // ===== impl Tx =====
126
127 impl<T, S> Tx<T, S> {
new(chan: Arc<Chan<T, S>>) -> Tx<T, S>128 fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
129 Tx { inner: chan }
130 }
131
downgrade(&self) -> Arc<Chan<T, S>>132 pub(super) fn downgrade(&self) -> Arc<Chan<T, S>> {
133 self.inner.clone()
134 }
135
136 // Returns the upgraded channel or None if the upgrade failed.
upgrade(chan: Arc<Chan<T, S>>) -> Option<Self>137 pub(super) fn upgrade(chan: Arc<Chan<T, S>>) -> Option<Self> {
138 let mut tx_count = chan.tx_count.load(Acquire);
139
140 loop {
141 if tx_count == 0 {
142 // channel is closed
143 return None;
144 }
145
146 match chan
147 .tx_count
148 .compare_exchange_weak(tx_count, tx_count + 1, AcqRel, Acquire)
149 {
150 Ok(_) => return Some(Tx { inner: chan }),
151 Err(prev_count) => tx_count = prev_count,
152 }
153 }
154 }
155
semaphore(&self) -> &S156 pub(super) fn semaphore(&self) -> &S {
157 &self.inner.semaphore
158 }
159
160 /// Send a message and notify the receiver.
send(&self, value: T)161 pub(crate) fn send(&self, value: T) {
162 self.inner.send(value);
163 }
164
165 /// Wake the receive half
wake_rx(&self)166 pub(crate) fn wake_rx(&self) {
167 self.inner.rx_waker.wake();
168 }
169
170 /// Returns `true` if senders belong to the same channel.
same_channel(&self, other: &Self) -> bool171 pub(crate) fn same_channel(&self, other: &Self) -> bool {
172 Arc::ptr_eq(&self.inner, &other.inner)
173 }
174 }
175
176 impl<T, S: Semaphore> Tx<T, S> {
is_closed(&self) -> bool177 pub(crate) fn is_closed(&self) -> bool {
178 self.inner.semaphore.is_closed()
179 }
180
closed(&self)181 pub(crate) async fn closed(&self) {
182 // In order to avoid a race condition, we first request a notification,
183 // **then** check whether the semaphore is closed. If the semaphore is
184 // closed the notification request is dropped.
185 let notified = self.inner.notify_rx_closed.notified();
186
187 if self.inner.semaphore.is_closed() {
188 return;
189 }
190 notified.await;
191 }
192 }
193
194 impl<T, S> Clone for Tx<T, S> {
clone(&self) -> Tx<T, S>195 fn clone(&self) -> Tx<T, S> {
196 // Using a Relaxed ordering here is sufficient as the caller holds a
197 // strong ref to `self`, preventing a concurrent decrement to zero.
198 self.inner.tx_count.fetch_add(1, Relaxed);
199
200 Tx {
201 inner: self.inner.clone(),
202 }
203 }
204 }
205
206 impl<T, S> Drop for Tx<T, S> {
drop(&mut self)207 fn drop(&mut self) {
208 if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
209 return;
210 }
211
212 // Close the list, which sends a `Close` message
213 self.inner.tx.close();
214
215 // Notify the receiver
216 self.wake_rx();
217 }
218 }
219
220 // ===== impl Rx =====
221
222 impl<T, S: Semaphore> Rx<T, S> {
new(chan: Arc<Chan<T, S>>) -> Rx<T, S>223 fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
224 Rx { inner: chan }
225 }
226
close(&mut self)227 pub(crate) fn close(&mut self) {
228 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
229 let rx_fields = unsafe { &mut *rx_fields_ptr };
230
231 if rx_fields.rx_closed {
232 return;
233 }
234
235 rx_fields.rx_closed = true;
236 });
237
238 self.inner.semaphore.close();
239 self.inner.notify_rx_closed.notify_waiters();
240 }
241
242 /// Receive the next value
recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>243 pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
244 use super::block::Read::*;
245
246 ready!(crate::trace::trace_leaf(cx));
247
248 // Keep track of task budget
249 let coop = ready!(crate::runtime::coop::poll_proceed(cx));
250
251 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
252 let rx_fields = unsafe { &mut *rx_fields_ptr };
253
254 macro_rules! try_recv {
255 () => {
256 match rx_fields.list.pop(&self.inner.tx) {
257 Some(Value(value)) => {
258 self.inner.semaphore.add_permit();
259 coop.made_progress();
260 return Ready(Some(value));
261 }
262 Some(Closed) => {
263 // TODO: This check may not be required as it most
264 // likely can only return `true` at this point. A
265 // channel is closed when all tx handles are
266 // dropped. Dropping a tx handle releases memory,
267 // which ensures that if dropping the tx handle is
268 // visible, then all messages sent are also visible.
269 assert!(self.inner.semaphore.is_idle());
270 coop.made_progress();
271 return Ready(None);
272 }
273 None => {} // fall through
274 }
275 };
276 }
277
278 try_recv!();
279
280 self.inner.rx_waker.register_by_ref(cx.waker());
281
282 // It is possible that a value was pushed between attempting to read
283 // and registering the task, so we have to check the channel a
284 // second time here.
285 try_recv!();
286
287 if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
288 coop.made_progress();
289 Ready(None)
290 } else {
291 Pending
292 }
293 })
294 }
295
296 /// Try to receive the next value.
try_recv(&mut self) -> Result<T, TryRecvError>297 pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
298 use super::list::TryPopResult;
299
300 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
301 let rx_fields = unsafe { &mut *rx_fields_ptr };
302
303 macro_rules! try_recv {
304 () => {
305 match rx_fields.list.try_pop(&self.inner.tx) {
306 TryPopResult::Ok(value) => {
307 self.inner.semaphore.add_permit();
308 return Ok(value);
309 }
310 TryPopResult::Closed => return Err(TryRecvError::Disconnected),
311 TryPopResult::Empty => return Err(TryRecvError::Empty),
312 TryPopResult::Busy => {} // fall through
313 }
314 };
315 }
316
317 try_recv!();
318
319 // If a previous `poll_recv` call has set a waker, we wake it here.
320 // This allows us to put our own CachedParkThread waker in the
321 // AtomicWaker slot instead.
322 //
323 // This is not a spurious wakeup to `poll_recv` since we just got a
324 // Busy from `try_pop`, which only happens if there are messages in
325 // the queue.
326 self.inner.rx_waker.wake();
327
328 // Park the thread until the problematic send has completed.
329 let mut park = CachedParkThread::new();
330 let waker = park.waker().unwrap();
331 loop {
332 self.inner.rx_waker.register_by_ref(&waker);
333 // It is possible that the problematic send has now completed,
334 // so we have to check for messages again.
335 try_recv!();
336 park.park();
337 }
338 })
339 }
340 }
341
342 impl<T, S: Semaphore> Drop for Rx<T, S> {
drop(&mut self)343 fn drop(&mut self) {
344 use super::block::Read::Value;
345
346 self.close();
347
348 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
349 let rx_fields = unsafe { &mut *rx_fields_ptr };
350
351 while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
352 self.inner.semaphore.add_permit();
353 }
354 })
355 }
356 }
357
358 // ===== impl Chan =====
359
360 impl<T, S> Chan<T, S> {
send(&self, value: T)361 fn send(&self, value: T) {
362 // Push the value
363 self.tx.push(value);
364
365 // Notify the rx task
366 self.rx_waker.wake();
367 }
368 }
369
370 impl<T, S> Drop for Chan<T, S> {
drop(&mut self)371 fn drop(&mut self) {
372 use super::block::Read::Value;
373
374 // Safety: the only owner of the rx fields is Chan, and being
375 // inside its own Drop means we're the last ones to touch it.
376 self.rx_fields.with_mut(|rx_fields_ptr| {
377 let rx_fields = unsafe { &mut *rx_fields_ptr };
378
379 while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
380 unsafe { rx_fields.list.free_blocks() };
381 });
382 }
383 }
384
385 // ===== impl Semaphore for (::Semaphore, capacity) =====
386
387 impl Semaphore for bounded::Semaphore {
add_permit(&self)388 fn add_permit(&self) {
389 self.semaphore.release(1)
390 }
391
is_idle(&self) -> bool392 fn is_idle(&self) -> bool {
393 self.semaphore.available_permits() == self.bound
394 }
395
close(&self)396 fn close(&self) {
397 self.semaphore.close();
398 }
399
is_closed(&self) -> bool400 fn is_closed(&self) -> bool {
401 self.semaphore.is_closed()
402 }
403 }
404
405 // ===== impl Semaphore for AtomicUsize =====
406
407 impl Semaphore for unbounded::Semaphore {
add_permit(&self)408 fn add_permit(&self) {
409 let prev = self.0.fetch_sub(2, Release);
410
411 if prev >> 1 == 0 {
412 // Something went wrong
413 process::abort();
414 }
415 }
416
is_idle(&self) -> bool417 fn is_idle(&self) -> bool {
418 self.0.load(Acquire) >> 1 == 0
419 }
420
close(&self)421 fn close(&self) {
422 self.0.fetch_or(1, Release);
423 }
424
is_closed(&self) -> bool425 fn is_closed(&self) -> bool {
426 self.0.load(Acquire) & 1 == 1
427 }
428 }
429