• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! A restricted channel to pass data from signal handler.
2 //!
3 //! When trying to communicate data from signal handler to the outside world, one can use an atomic
4 //! variable (as it doesn't lock, so it can be made async-signal-safe). But this won't work for
5 //! larger data.
6 //!
7 //! This module provides a channel that can be used for that purpose. It is used by certain
8 //! [exfiltrators][crate::iterator::exfiltrator], but can be used as building block for custom
9 //! actions. In general, this is not a ready-made end-user API.
10 //!
11 //! # How does it work
12 //!
13 //! Each channel has a fixed number of slots and two queues (one for empty slots, one for full
14 //! slots). A signal handler takes a slot out of the empty one, fills it and passes it into the
15 //! full one. Outside of signal handler, it can take the value out of the full queue and return the
16 //! slot to the empty queue.
17 //!
18 //! The queues are implemented as bit-encoded indexes of the slots in the storage. The bits are
19 //! stored in an atomic variable.
20 //!
21 //! Note that the algorithm allows for a slot to be in neither queue (when it is being emptied or
22 //! filled).
23 //!
24 //! # Fallible allocation of a slot
25 //!
26 //! It is apparent that allocation of a new slot can fail (there's nothing in the empty slot). In
27 //! such case, there's no way to send the new value out of the handler (there's no way to safely
28 //! wait for a slot to appear, because the handler can be blocking the thread that is responsible
29 //! for emptying them). But that's considered acceptable ‒ even the kernel collates the same kinds
30 //! of signals together if they are not consumed by application fast enough and there are no free
31 //! slots exactly because some are being filled, emptied or are full ‒ in particular, the whole
32 //! system will yield a signal.
33 //!
34 //! This assumes that separate signals don't share the same buffer and that there's only one reader
35 //! (using multiple readers is still safe, but it is possible that all slots would be inside the
36 //! readers, but already empty, so the above argument would not hold).
37 
38 // TODO: Other sizes? Does anyone need more than 5 slots?
39 
40 use std::cell::UnsafeCell;
41 use std::sync::atomic::{AtomicU16, Ordering};
42 
43 const SLOTS: usize = 5;
44 const BITS: u16 = 3;
45 const MASK: u16 = 0b111;
46 
get(n: u16, idx: u16) -> u1647 fn get(n: u16, idx: u16) -> u16 {
48     (n >> (BITS * idx)) & MASK
49 }
50 
set(n: u16, idx: u16, v: u16) -> u1651 fn set(n: u16, idx: u16, v: u16) -> u16 {
52     let v = v << (BITS * idx);
53     let mask = MASK << (BITS * idx);
54     (n & !mask) | v
55 }
56 
enqueue(q: &AtomicU16, val: u16)57 fn enqueue(q: &AtomicU16, val: u16) {
58     let mut current = q.load(Ordering::Relaxed);
59     loop {
60         let empty = (0..SLOTS as u16)
61             .find(|i| get(current, *i) == 0)
62             .expect("No empty slot available");
63         let modified = set(current, empty, val);
64         match q.compare_exchange_weak(current, modified, Ordering::Release, Ordering::Relaxed) {
65             Ok(_) => break,
66             Err(changed) => current = changed, // And retry with the changed value
67         }
68     }
69 }
70 
dequeue(q: &AtomicU16) -> Option<u16>71 fn dequeue(q: &AtomicU16) -> Option<u16> {
72     let mut current = q.load(Ordering::Relaxed);
73     loop {
74         let val = current & MASK;
75         // It's completely empty
76         if val == 0 {
77             break None;
78         }
79         let modified = current >> BITS;
80         match q.compare_exchange_weak(current, modified, Ordering::Acquire, Ordering::Relaxed) {
81             Ok(_) => break Some(val),
82             Err(changed) => current = changed,
83         }
84     }
85 }
86 
87 /// A restricted async-signal-safe channel
88 ///
89 /// This is a bit like the usual channel used for inter-thread communication, but with several
90 /// restrictions:
91 ///
92 /// * There's a limited number of slots (currently 5).
93 /// * There's no way to wait for a place in it or for a value. If value is not available, `None` is
94 ///   returned. If there's no space for a value, the value is silently dropped.
95 ///
96 /// In exchange for that, all the operations on that channel are async-signal-safe. That means it
97 /// is possible to use it to communicate between a signal handler and the rest of the world with it
98 /// (specifically, it's designed to send information from the handler to the rest of the
99 /// application). The throwing out of values when full is in line with collating of the same type
100 /// in kernel (you should not use the same channel for multiple different signals).
101 ///
102 /// Technically, this is a MPMC queue which preserves order, but it is expected to be used in MPSC
103 /// mode mostly (in theory, multiple threads can be executing a signal handler for the same signal
104 /// at the same time). The channel is not responsible for wakeups.
105 ///
106 /// While the channel is async-signal-safe, you still need to make sure *creating* of the values is
107 /// too (it should not contain anything that allocates, for example ‒ so no `String`s inside, etc).
108 ///
109 /// The code was *not* tuned for performance (signals are not expected to happen often).
110 pub struct Channel<T> {
111     storage: [UnsafeCell<Option<T>>; SLOTS],
112     empty: AtomicU16,
113     full: AtomicU16,
114 }
115 
116 impl<T> Channel<T> {
117     /// Creates a new channel with nothing in it.
new() -> Self118     pub fn new() -> Self {
119         let storage = Default::default();
120         let me = Self {
121             storage,
122             empty: AtomicU16::new(0),
123             full: AtomicU16::new(0),
124         };
125 
126         for i in 1..SLOTS + 1 {
127             enqueue(&me.empty, i as u16);
128         }
129 
130         me
131     }
132 
133     /// Inserts a value into the channel.
134     ///
135     /// If the value doesn't fit, it is silently dropped. Never blocks.
send(&self, val: T)136     pub fn send(&self, val: T) {
137         if let Some(empty_idx) = dequeue(&self.empty) {
138             unsafe { *self.storage[empty_idx as usize - 1].get() = Some(val) };
139             enqueue(&self.full, empty_idx);
140         }
141     }
142 
143     /// Takes a value from the channel.
144     ///
145     /// Or returns `None` if the channel is empty. Never blocks.
recv(&self) -> Option<T>146     pub fn recv(&self) -> Option<T> {
147         dequeue(&self.full).map(|idx| {
148             let result = unsafe { &mut *self.storage[idx as usize - 1].get() }
149                 .take()
150                 .expect("Full slot with nothing in it");
151             enqueue(&self.empty, idx);
152             result
153         })
154     }
155 }
156 
157 impl<T> Default for Channel<T> {
default() -> Self158     fn default() -> Self {
159         Self::new()
160     }
161 }
162 
163 unsafe impl<T: Send> Send for Channel<T> {}
164 
165 // Yes, really Send -> Sync. Having a reference to Channel allows Sending Ts, but not having refs
166 // on them.
167 unsafe impl<T: Send> Sync for Channel<T> {}
168 
169 #[cfg(test)]
170 mod tests {
171     use std::sync::Arc;
172     use std::thread;
173 
174     use super::*;
175 
176     #[test]
new_empty()177     fn new_empty() {
178         let channel = Channel::<usize>::new();
179         assert!(channel.recv().is_none());
180         assert!(channel.recv().is_none());
181     }
182 
183     #[test]
pass_value()184     fn pass_value() {
185         let channel = Channel::new();
186         channel.send(42);
187         assert_eq!(42, channel.recv().unwrap());
188         assert!(channel.recv().is_none());
189     }
190 
191     #[test]
multiple()192     fn multiple() {
193         let channel = Channel::new();
194         for i in 0..1000 {
195             channel.send(i);
196             assert_eq!(i, channel.recv().unwrap());
197             assert!(channel.recv().is_none());
198         }
199     }
200 
201     #[test]
overflow()202     fn overflow() {
203         let channel = Channel::new();
204         for i in 0..10 {
205             channel.send(i);
206         }
207         for i in 0..5 {
208             assert_eq!(i, channel.recv().unwrap());
209         }
210         assert!(channel.recv().is_none());
211     }
212 
213     #[test]
multi_thread()214     fn multi_thread() {
215         let channel = Arc::new(Channel::<usize>::new());
216 
217         let sender = thread::spawn({
218             let channel = Arc::clone(&channel);
219             move || {
220                 for i in 0..4 {
221                     channel.send(i);
222                 }
223             }
224         });
225 
226         let mut results = Vec::new();
227         while results.len() < 4 {
228             results.extend(channel.recv());
229         }
230 
231         assert_eq!(vec![0, 1, 2, 3], results);
232 
233         sender.join().unwrap();
234     }
235 }
236