1 //! Reference counter for channels.
2
3 use std::isize;
4 use std::ops;
5 use std::process;
6 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7
8 /// Reference counter internals.
9 struct Counter<C> {
10 /// The number of senders associated with the channel.
11 senders: AtomicUsize,
12
13 /// The number of receivers associated with the channel.
14 receivers: AtomicUsize,
15
16 /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
17 destroy: AtomicBool,
18
19 /// The internal channel.
20 chan: C,
21 }
22
23 /// Wraps a channel into the reference counter.
new<C>(chan: C) -> (Sender<C>, Receiver<C>)24 pub fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
25 let counter = Box::into_raw(Box::new(Counter {
26 senders: AtomicUsize::new(1),
27 receivers: AtomicUsize::new(1),
28 destroy: AtomicBool::new(false),
29 chan,
30 }));
31 let s = Sender { counter };
32 let r = Receiver { counter };
33 (s, r)
34 }
35
36 /// The sending side.
37 pub struct Sender<C> {
38 counter: *mut Counter<C>,
39 }
40
41 impl<C> Sender<C> {
42 /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>43 fn counter(&self) -> &Counter<C> {
44 unsafe { &*self.counter }
45 }
46
47 /// Acquires another sender reference.
acquire(&self) -> Sender<C>48 pub fn acquire(&self) -> Sender<C> {
49 let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
50
51 // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
52 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
53 // just abort when the count becomes very large.
54 if count > isize::MAX as usize {
55 process::abort();
56 }
57
58 Sender {
59 counter: self.counter,
60 }
61 }
62
63 /// Releases the sender reference.
64 ///
65 /// Function `disconnect` will be called if this is the last sender reference.
release<F: FnOnce(&C) -> bool>(&self, disconnect: F)66 pub unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
67 if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
68 disconnect(&self.counter().chan);
69
70 if self.counter().destroy.swap(true, Ordering::AcqRel) {
71 drop(Box::from_raw(self.counter));
72 }
73 }
74 }
75 }
76
77 impl<C> ops::Deref for Sender<C> {
78 type Target = C;
79
deref(&self) -> &C80 fn deref(&self) -> &C {
81 &self.counter().chan
82 }
83 }
84
85 impl<C> PartialEq for Sender<C> {
eq(&self, other: &Sender<C>) -> bool86 fn eq(&self, other: &Sender<C>) -> bool {
87 self.counter == other.counter
88 }
89 }
90
91 /// The receiving side.
92 pub struct Receiver<C> {
93 counter: *mut Counter<C>,
94 }
95
96 impl<C> Receiver<C> {
97 /// Returns the internal `Counter`.
counter(&self) -> &Counter<C>98 fn counter(&self) -> &Counter<C> {
99 unsafe { &*self.counter }
100 }
101
102 /// Acquires another receiver reference.
acquire(&self) -> Receiver<C>103 pub fn acquire(&self) -> Receiver<C> {
104 let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
105
106 // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
107 // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
108 // just abort when the count becomes very large.
109 if count > isize::MAX as usize {
110 process::abort();
111 }
112
113 Receiver {
114 counter: self.counter,
115 }
116 }
117
118 /// Releases the receiver reference.
119 ///
120 /// Function `disconnect` will be called if this is the last receiver reference.
release<F: FnOnce(&C) -> bool>(&self, disconnect: F)121 pub unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
122 if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
123 disconnect(&self.counter().chan);
124
125 if self.counter().destroy.swap(true, Ordering::AcqRel) {
126 drop(Box::from_raw(self.counter));
127 }
128 }
129 }
130 }
131
132 impl<C> ops::Deref for Receiver<C> {
133 type Target = C;
134
deref(&self) -> &C135 fn deref(&self) -> &C {
136 &self.counter().chan
137 }
138 }
139
140 impl<C> PartialEq for Receiver<C> {
eq(&self, other: &Receiver<C>) -> bool141 fn eq(&self, other: &Receiver<C>) -> bool {
142 self.counter == other.counter
143 }
144 }
145