1 use crate::loom::cell::UnsafeCell;
2 use crate::loom::future::AtomicWaker;
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Arc;
5 use crate::park::thread::CachedParkThread;
6 use crate::park::Park;
7 use crate::sync::mpsc::error::TryRecvError;
8 use crate::sync::mpsc::list;
9 use crate::sync::notify::Notify;
10
11 use std::fmt;
12 use std::process;
13 use std::sync::atomic::Ordering::{AcqRel, Relaxed};
14 use std::task::Poll::{Pending, Ready};
15 use std::task::{Context, Poll};
16
17 /// Channel sender.
18 pub(crate) struct Tx<T, S> {
19 inner: Arc<Chan<T, S>>,
20 }
21
22 impl<T, S: fmt::Debug> fmt::Debug for Tx<T, S> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result23 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
24 fmt.debug_struct("Tx").field("inner", &self.inner).finish()
25 }
26 }
27
28 /// Channel receiver.
29 pub(crate) struct Rx<T, S: Semaphore> {
30 inner: Arc<Chan<T, S>>,
31 }
32
33 impl<T, S: Semaphore + fmt::Debug> fmt::Debug for Rx<T, S> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result34 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
35 fmt.debug_struct("Rx").field("inner", &self.inner).finish()
36 }
37 }
38
39 pub(crate) trait Semaphore {
is_idle(&self) -> bool40 fn is_idle(&self) -> bool;
41
add_permit(&self)42 fn add_permit(&self);
43
close(&self)44 fn close(&self);
45
is_closed(&self) -> bool46 fn is_closed(&self) -> bool;
47 }
48
49 struct Chan<T, S> {
50 /// Notifies all tasks listening for the receiver being dropped.
51 notify_rx_closed: Notify,
52
53 /// Handle to the push half of the lock-free list.
54 tx: list::Tx<T>,
55
56 /// Coordinates access to channel's capacity.
57 semaphore: S,
58
59 /// Receiver waker. Notified when a value is pushed into the channel.
60 rx_waker: AtomicWaker,
61
62 /// Tracks the number of outstanding sender handles.
63 ///
64 /// When this drops to zero, the send half of the channel is closed.
65 tx_count: AtomicUsize,
66
67 /// Only accessed by `Rx` handle.
68 rx_fields: UnsafeCell<RxFields<T>>,
69 }
70
71 impl<T, S> fmt::Debug for Chan<T, S>
72 where
73 S: fmt::Debug,
74 {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result75 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
76 fmt.debug_struct("Chan")
77 .field("tx", &self.tx)
78 .field("semaphore", &self.semaphore)
79 .field("rx_waker", &self.rx_waker)
80 .field("tx_count", &self.tx_count)
81 .field("rx_fields", &"...")
82 .finish()
83 }
84 }
85
86 /// Fields only accessed by `Rx` handle.
87 struct RxFields<T> {
88 /// Channel receiver. This field is only accessed by the `Receiver` type.
89 list: list::Rx<T>,
90
91 /// `true` if `Rx::close` is called.
92 rx_closed: bool,
93 }
94
95 impl<T> fmt::Debug for RxFields<T> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result96 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
97 fmt.debug_struct("RxFields")
98 .field("list", &self.list)
99 .field("rx_closed", &self.rx_closed)
100 .finish()
101 }
102 }
103
104 unsafe impl<T: Send, S: Send> Send for Chan<T, S> {}
105 unsafe impl<T: Send, S: Sync> Sync for Chan<T, S> {}
106
channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>)107 pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {
108 let (tx, rx) = list::channel();
109
110 let chan = Arc::new(Chan {
111 notify_rx_closed: Notify::new(),
112 tx,
113 semaphore,
114 rx_waker: AtomicWaker::new(),
115 tx_count: AtomicUsize::new(1),
116 rx_fields: UnsafeCell::new(RxFields {
117 list: rx,
118 rx_closed: false,
119 }),
120 });
121
122 (Tx::new(chan.clone()), Rx::new(chan))
123 }
124
125 // ===== impl Tx =====
126
127 impl<T, S> Tx<T, S> {
new(chan: Arc<Chan<T, S>>) -> Tx<T, S>128 fn new(chan: Arc<Chan<T, S>>) -> Tx<T, S> {
129 Tx { inner: chan }
130 }
131
semaphore(&self) -> &S132 pub(super) fn semaphore(&self) -> &S {
133 &self.inner.semaphore
134 }
135
136 /// Send a message and notify the receiver.
send(&self, value: T)137 pub(crate) fn send(&self, value: T) {
138 self.inner.send(value);
139 }
140
141 /// Wake the receive half
wake_rx(&self)142 pub(crate) fn wake_rx(&self) {
143 self.inner.rx_waker.wake();
144 }
145
146 /// Returns `true` if senders belong to the same channel.
same_channel(&self, other: &Self) -> bool147 pub(crate) fn same_channel(&self, other: &Self) -> bool {
148 Arc::ptr_eq(&self.inner, &other.inner)
149 }
150 }
151
152 impl<T, S: Semaphore> Tx<T, S> {
is_closed(&self) -> bool153 pub(crate) fn is_closed(&self) -> bool {
154 self.inner.semaphore.is_closed()
155 }
156
closed(&self)157 pub(crate) async fn closed(&self) {
158 // In order to avoid a race condition, we first request a notification,
159 // **then** check whether the semaphore is closed. If the semaphore is
160 // closed the notification request is dropped.
161 let notified = self.inner.notify_rx_closed.notified();
162
163 if self.inner.semaphore.is_closed() {
164 return;
165 }
166 notified.await;
167 }
168 }
169
170 impl<T, S> Clone for Tx<T, S> {
clone(&self) -> Tx<T, S>171 fn clone(&self) -> Tx<T, S> {
172 // Using a Relaxed ordering here is sufficient as the caller holds a
173 // strong ref to `self`, preventing a concurrent decrement to zero.
174 self.inner.tx_count.fetch_add(1, Relaxed);
175
176 Tx {
177 inner: self.inner.clone(),
178 }
179 }
180 }
181
182 impl<T, S> Drop for Tx<T, S> {
drop(&mut self)183 fn drop(&mut self) {
184 if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
185 return;
186 }
187
188 // Close the list, which sends a `Close` message
189 self.inner.tx.close();
190
191 // Notify the receiver
192 self.wake_rx();
193 }
194 }
195
196 // ===== impl Rx =====
197
198 impl<T, S: Semaphore> Rx<T, S> {
new(chan: Arc<Chan<T, S>>) -> Rx<T, S>199 fn new(chan: Arc<Chan<T, S>>) -> Rx<T, S> {
200 Rx { inner: chan }
201 }
202
close(&mut self)203 pub(crate) fn close(&mut self) {
204 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
205 let rx_fields = unsafe { &mut *rx_fields_ptr };
206
207 if rx_fields.rx_closed {
208 return;
209 }
210
211 rx_fields.rx_closed = true;
212 });
213
214 self.inner.semaphore.close();
215 self.inner.notify_rx_closed.notify_waiters();
216 }
217
218 /// Receive the next value
recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>>219 pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
220 use super::block::Read::*;
221
222 // Keep track of task budget
223 let coop = ready!(crate::coop::poll_proceed(cx));
224
225 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
226 let rx_fields = unsafe { &mut *rx_fields_ptr };
227
228 macro_rules! try_recv {
229 () => {
230 match rx_fields.list.pop(&self.inner.tx) {
231 Some(Value(value)) => {
232 self.inner.semaphore.add_permit();
233 coop.made_progress();
234 return Ready(Some(value));
235 }
236 Some(Closed) => {
237 // TODO: This check may not be required as it most
238 // likely can only return `true` at this point. A
239 // channel is closed when all tx handles are
240 // dropped. Dropping a tx handle releases memory,
241 // which ensures that if dropping the tx handle is
242 // visible, then all messages sent are also visible.
243 assert!(self.inner.semaphore.is_idle());
244 coop.made_progress();
245 return Ready(None);
246 }
247 None => {} // fall through
248 }
249 };
250 }
251
252 try_recv!();
253
254 self.inner.rx_waker.register_by_ref(cx.waker());
255
256 // It is possible that a value was pushed between attempting to read
257 // and registering the task, so we have to check the channel a
258 // second time here.
259 try_recv!();
260
261 if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
262 coop.made_progress();
263 Ready(None)
264 } else {
265 Pending
266 }
267 })
268 }
269
270 /// Try to receive the next value.
try_recv(&mut self) -> Result<T, TryRecvError>271 pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
272 use super::list::TryPopResult;
273
274 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
275 let rx_fields = unsafe { &mut *rx_fields_ptr };
276
277 macro_rules! try_recv {
278 () => {
279 match rx_fields.list.try_pop(&self.inner.tx) {
280 TryPopResult::Ok(value) => {
281 self.inner.semaphore.add_permit();
282 return Ok(value);
283 }
284 TryPopResult::Closed => return Err(TryRecvError::Disconnected),
285 TryPopResult::Empty => return Err(TryRecvError::Empty),
286 TryPopResult::Busy => {} // fall through
287 }
288 };
289 }
290
291 try_recv!();
292
293 // If a previous `poll_recv` call has set a waker, we wake it here.
294 // This allows us to put our own CachedParkThread waker in the
295 // AtomicWaker slot instead.
296 //
297 // This is not a spurious wakeup to `poll_recv` since we just got a
298 // Busy from `try_pop`, which only happens if there are messages in
299 // the queue.
300 self.inner.rx_waker.wake();
301
302 // Park the thread until the problematic send has completed.
303 let mut park = CachedParkThread::new();
304 let waker = park.unpark().into_waker();
305 loop {
306 self.inner.rx_waker.register_by_ref(&waker);
307 // It is possible that the problematic send has now completed,
308 // so we have to check for messages again.
309 try_recv!();
310 park.park().expect("park failed");
311 }
312 })
313 }
314 }
315
316 impl<T, S: Semaphore> Drop for Rx<T, S> {
drop(&mut self)317 fn drop(&mut self) {
318 use super::block::Read::Value;
319
320 self.close();
321
322 self.inner.rx_fields.with_mut(|rx_fields_ptr| {
323 let rx_fields = unsafe { &mut *rx_fields_ptr };
324
325 while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
326 self.inner.semaphore.add_permit();
327 }
328 })
329 }
330 }
331
332 // ===== impl Chan =====
333
334 impl<T, S> Chan<T, S> {
send(&self, value: T)335 fn send(&self, value: T) {
336 // Push the value
337 self.tx.push(value);
338
339 // Notify the rx task
340 self.rx_waker.wake();
341 }
342 }
343
344 impl<T, S> Drop for Chan<T, S> {
drop(&mut self)345 fn drop(&mut self) {
346 use super::block::Read::Value;
347
348 // Safety: the only owner of the rx fields is Chan, and eing
349 // inside its own Drop means we're the last ones to touch it.
350 self.rx_fields.with_mut(|rx_fields_ptr| {
351 let rx_fields = unsafe { &mut *rx_fields_ptr };
352
353 while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
354 unsafe { rx_fields.list.free_blocks() };
355 });
356 }
357 }
358
359 // ===== impl Semaphore for (::Semaphore, capacity) =====
360
361 impl Semaphore for (crate::sync::batch_semaphore::Semaphore, usize) {
add_permit(&self)362 fn add_permit(&self) {
363 self.0.release(1)
364 }
365
is_idle(&self) -> bool366 fn is_idle(&self) -> bool {
367 self.0.available_permits() == self.1
368 }
369
close(&self)370 fn close(&self) {
371 self.0.close();
372 }
373
is_closed(&self) -> bool374 fn is_closed(&self) -> bool {
375 self.0.is_closed()
376 }
377 }
378
379 // ===== impl Semaphore for AtomicUsize =====
380
381 use std::sync::atomic::Ordering::{Acquire, Release};
382 use std::usize;
383
384 impl Semaphore for AtomicUsize {
add_permit(&self)385 fn add_permit(&self) {
386 let prev = self.fetch_sub(2, Release);
387
388 if prev >> 1 == 0 {
389 // Something went wrong
390 process::abort();
391 }
392 }
393
is_idle(&self) -> bool394 fn is_idle(&self) -> bool {
395 self.load(Acquire) >> 1 == 0
396 }
397
close(&self)398 fn close(&self) {
399 self.fetch_or(1, Release);
400 }
401
is_closed(&self) -> bool402 fn is_closed(&self) -> bool {
403 self.load(Acquire) & 1 == 1
404 }
405 }
406