1 //! A restricted channel to pass data from signal handler.
2 //!
3 //! When trying to communicate data from signal handler to the outside world, one can use an atomic
4 //! variable (as it doesn't lock, so it can be made async-signal-safe). But this won't work for
5 //! larger data.
6 //!
7 //! This module provides a channel that can be used for that purpose. It is used by certain
8 //! [exfiltrators][crate::iterator::exfiltrator], but can be used as building block for custom
9 //! actions. In general, this is not a ready-made end-user API.
10 //!
11 //! # How does it work
12 //!
13 //! Each channel has a fixed number of slots and two queues (one for empty slots, one for full
14 //! slots). A signal handler takes a slot out of the empty one, fills it and passes it into the
15 //! full one. Outside of signal handler, it can take the value out of the full queue and return the
16 //! slot to the empty queue.
17 //!
18 //! The queues are implemented as bit-encoded indexes of the slots in the storage. The bits are
19 //! stored in an atomic variable.
20 //!
21 //! Note that the algorithm allows for a slot to be in neither queue (when it is being emptied or
22 //! filled).
23 //!
24 //! # Fallible allocation of a slot
25 //!
26 //! It is apparent that allocation of a new slot can fail (there's nothing in the empty slot). In
27 //! such case, there's no way to send the new value out of the handler (there's no way to safely
28 //! wait for a slot to appear, because the handler can be blocking the thread that is responsible
29 //! for emptying them). But that's considered acceptable ‒ even the kernel collates the same kinds
30 //! of signals together if they are not consumed by application fast enough and there are no free
31 //! slots exactly because some are being filled, emptied or are full ‒ in particular, the whole
32 //! system will yield a signal.
33 //!
34 //! This assumes that separate signals don't share the same buffer and that there's only one reader
35 //! (using multiple readers is still safe, but it is possible that all slots would be inside the
36 //! readers, but already empty, so the above argument would not hold).
37
38 // TODO: Other sizes? Does anyone need more than 5 slots?
39
40 use std::cell::UnsafeCell;
41 use std::sync::atomic::{AtomicU16, Ordering};
42
43 const SLOTS: usize = 5;
44 const BITS: u16 = 3;
45 const MASK: u16 = 0b111;
46
get(n: u16, idx: u16) -> u1647 fn get(n: u16, idx: u16) -> u16 {
48 (n >> (BITS * idx)) & MASK
49 }
50
set(n: u16, idx: u16, v: u16) -> u1651 fn set(n: u16, idx: u16, v: u16) -> u16 {
52 let v = v << (BITS * idx);
53 let mask = MASK << (BITS * idx);
54 (n & !mask) | v
55 }
56
enqueue(q: &AtomicU16, val: u16)57 fn enqueue(q: &AtomicU16, val: u16) {
58 let mut current = q.load(Ordering::Relaxed);
59 loop {
60 let empty = (0..SLOTS as u16)
61 .find(|i| get(current, *i) == 0)
62 .expect("No empty slot available");
63 let modified = set(current, empty, val);
64 match q.compare_exchange_weak(current, modified, Ordering::Release, Ordering::Relaxed) {
65 Ok(_) => break,
66 Err(changed) => current = changed, // And retry with the changed value
67 }
68 }
69 }
70
dequeue(q: &AtomicU16) -> Option<u16>71 fn dequeue(q: &AtomicU16) -> Option<u16> {
72 let mut current = q.load(Ordering::Relaxed);
73 loop {
74 let val = current & MASK;
75 // It's completely empty
76 if val == 0 {
77 break None;
78 }
79 let modified = current >> BITS;
80 match q.compare_exchange_weak(current, modified, Ordering::Acquire, Ordering::Relaxed) {
81 Ok(_) => break Some(val),
82 Err(changed) => current = changed,
83 }
84 }
85 }
86
87 /// A restricted async-signal-safe channel
88 ///
89 /// This is a bit like the usual channel used for inter-thread communication, but with several
90 /// restrictions:
91 ///
92 /// * There's a limited number of slots (currently 5).
93 /// * There's no way to wait for a place in it or for a value. If value is not available, `None` is
94 /// returned. If there's no space for a value, the value is silently dropped.
95 ///
96 /// In exchange for that, all the operations on that channel are async-signal-safe. That means it
97 /// is possible to use it to communicate between a signal handler and the rest of the world with it
98 /// (specifically, it's designed to send information from the handler to the rest of the
99 /// application). The throwing out of values when full is in line with collating of the same type
100 /// in kernel (you should not use the same channel for multiple different signals).
101 ///
102 /// Technically, this is a MPMC queue which preserves order, but it is expected to be used in MPSC
103 /// mode mostly (in theory, multiple threads can be executing a signal handler for the same signal
104 /// at the same time). The channel is not responsible for wakeups.
105 ///
106 /// While the channel is async-signal-safe, you still need to make sure *creating* of the values is
107 /// too (it should not contain anything that allocates, for example ‒ so no `String`s inside, etc).
108 ///
109 /// The code was *not* tuned for performance (signals are not expected to happen often).
110 pub struct Channel<T> {
111 storage: [UnsafeCell<Option<T>>; SLOTS],
112 empty: AtomicU16,
113 full: AtomicU16,
114 }
115
116 impl<T> Channel<T> {
117 /// Creates a new channel with nothing in it.
new() -> Self118 pub fn new() -> Self {
119 let storage = Default::default();
120 let me = Self {
121 storage,
122 empty: AtomicU16::new(0),
123 full: AtomicU16::new(0),
124 };
125
126 for i in 1..SLOTS + 1 {
127 enqueue(&me.empty, i as u16);
128 }
129
130 me
131 }
132
133 /// Inserts a value into the channel.
134 ///
135 /// If the value doesn't fit, it is silently dropped. Never blocks.
send(&self, val: T)136 pub fn send(&self, val: T) {
137 if let Some(empty_idx) = dequeue(&self.empty) {
138 unsafe { *self.storage[empty_idx as usize - 1].get() = Some(val) };
139 enqueue(&self.full, empty_idx);
140 }
141 }
142
143 /// Takes a value from the channel.
144 ///
145 /// Or returns `None` if the channel is empty. Never blocks.
recv(&self) -> Option<T>146 pub fn recv(&self) -> Option<T> {
147 dequeue(&self.full).map(|idx| {
148 let result = unsafe { &mut *self.storage[idx as usize - 1].get() }
149 .take()
150 .expect("Full slot with nothing in it");
151 enqueue(&self.empty, idx);
152 result
153 })
154 }
155 }
156
157 impl<T> Default for Channel<T> {
default() -> Self158 fn default() -> Self {
159 Self::new()
160 }
161 }
162
163 unsafe impl<T: Send> Send for Channel<T> {}
164
165 // Yes, really Send -> Sync. Having a reference to Channel allows Sending Ts, but not having refs
166 // on them.
167 unsafe impl<T: Send> Sync for Channel<T> {}
168
169 #[cfg(test)]
170 mod tests {
171 use std::sync::Arc;
172 use std::thread;
173
174 use super::*;
175
176 #[test]
new_empty()177 fn new_empty() {
178 let channel = Channel::<usize>::new();
179 assert!(channel.recv().is_none());
180 assert!(channel.recv().is_none());
181 }
182
183 #[test]
pass_value()184 fn pass_value() {
185 let channel = Channel::new();
186 channel.send(42);
187 assert_eq!(42, channel.recv().unwrap());
188 assert!(channel.recv().is_none());
189 }
190
191 #[test]
multiple()192 fn multiple() {
193 let channel = Channel::new();
194 for i in 0..1000 {
195 channel.send(i);
196 assert_eq!(i, channel.recv().unwrap());
197 assert!(channel.recv().is_none());
198 }
199 }
200
201 #[test]
overflow()202 fn overflow() {
203 let channel = Channel::new();
204 for i in 0..10 {
205 channel.send(i);
206 }
207 for i in 0..5 {
208 assert_eq!(i, channel.recv().unwrap());
209 }
210 assert!(channel.recv().is_none());
211 }
212
213 #[test]
multi_thread()214 fn multi_thread() {
215 let channel = Arc::new(Channel::<usize>::new());
216
217 let sender = thread::spawn({
218 let channel = Arc::clone(&channel);
219 move || {
220 for i in 0..4 {
221 channel.send(i);
222 }
223 }
224 });
225
226 let mut results = Vec::new();
227 while results.len() < 4 {
228 results.extend(channel.recv());
229 }
230
231 assert_eq!(vec![0, 1, 2, 3], results);
232
233 sender.join().unwrap();
234 }
235 }
236