1 // Copyright 2022 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use super::super::{
6 named_pipes::{
7 PipeConnection, {self},
8 },
9 stream_channel::{BlockingMode, FramingMode},
10 CloseNotifier, Event, MultiProcessMutex, RawDescriptor, ReadNotifier, Result,
11 };
12 use crate::descriptor::AsRawDescriptor;
13 use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
14 use std::{cell::RefCell, io, sync::Arc};
15 use sync::Mutex;
16
17 impl From<FramingMode> for named_pipes::FramingMode {
from(framing_mode: FramingMode) -> Self18 fn from(framing_mode: FramingMode) -> Self {
19 match framing_mode {
20 FramingMode::Message => named_pipes::FramingMode::Message,
21 FramingMode::Byte => named_pipes::FramingMode::Byte,
22 }
23 }
24 }
25
26 impl From<BlockingMode> for named_pipes::BlockingMode {
from(blocking_mode: BlockingMode) -> Self27 fn from(blocking_mode: BlockingMode) -> Self {
28 match blocking_mode {
29 BlockingMode::Blocking => named_pipes::BlockingMode::Wait,
30 BlockingMode::Nonblocking => named_pipes::BlockingMode::NoWait,
31 }
32 }
33 }
34
35 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
36
37 /// An abstraction over named pipes and unix socketpairs.
38 ///
39 /// The ReadNotifier will return an event handle that is set when data is in the channel.
40 ///
41 /// In message mode, single writes larger than
42 /// `crate::platform::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
43 ///
44 /// # Notes for maintainers
45 /// 1. This struct contains extremely subtle thread safety considerations.
46 /// 2. Serialization is not derived! New fields need to be added manually.
47 #[derive(Deserialize, Debug)]
48 pub struct StreamChannel {
49 pipe_conn: named_pipes::PipeConnection,
50 write_notify: Event,
51 read_notify: Event,
52 pipe_closed: Event,
53
54 // Held when reading on this end, to prevent additional writes from corrupting notification
55 // state.
56 remote_write_lock: MultiProcessMutex,
57
58 // Held when a write is made on this end, so that if the remote end is reading, we wait to
59 // write to avoid corrupting notification state.
60 local_write_lock: MultiProcessMutex,
61
62 // Held for the entire duration of a read. This enables the StreamChannel to be sync,
63 // ensuring there is no chance of concurrent reads creating a bad state in StreamChannel.
64 //
65 // In practice, there is no use-case for multiple threads actually contending over
66 // reading from a single pipe through StreamChannel, so this is mostly to provide a
67 // compiler guarantee while passing the StreamChannel to/from background executors.
68 //
69 // Note that this mutex does not work across processes, so the same StreamChannel end should
70 // NOT be concurrently used across process boundaries. (Odds are if you want to do this, it's
71 // not what you want. Wanting this means you want two readers on the *same end* of the pipe,
72 // which is not well defined behavior.)
73 #[serde(skip)]
74 #[serde(default = "create_read_lock")]
75 read_lock: Arc<Mutex<()>>,
76
77 // Serde only has an immutable reference. Because of that, we have to cheat to signal when this
78 // channel end has been serialized. Once serialized, we know that the current end MUST NOT
79 // signal the channel has been closed when it was dropped, because a copy of it was sent to
80 // another process. It is the copy's responsibility to close the pipe.
81 #[serde(skip)]
82 #[serde(default = "create_true_cell")]
83 is_channel_closed_on_drop: RefCell<bool>,
84
85 // For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
86 // up to that size.
87 send_buffer_size: usize,
88 }
89
create_read_lock() -> Arc<Mutex<()>>90 fn create_read_lock() -> Arc<Mutex<()>> {
91 Arc::new(Mutex::new(()))
92 }
93
create_true_cell() -> RefCell<bool>94 fn create_true_cell() -> RefCell<bool> {
95 RefCell::new(true)
96 }
97
98 /// Serialize is manually implemented because we need to tell the local copy that a remote copy
99 /// exists, and to not send the close event. Our serialization is otherwise identical to what
100 /// derive would have generated.
101 impl Serialize for StreamChannel {
serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> where S: Serializer,102 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
103 where
104 S: Serializer,
105 {
106 let mut s = serializer.serialize_struct("StreamChannel", 7)?;
107 s.serialize_field("pipe_conn", &self.pipe_conn)?;
108 s.serialize_field("write_notify", &self.write_notify)?;
109 s.serialize_field("read_notify", &self.read_notify)?;
110 s.serialize_field("pipe_closed", &self.pipe_closed)?;
111 s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
112 s.serialize_field("local_write_lock", &self.local_write_lock)?;
113 s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
114 let ret = s.end();
115
116 // Because this end has been serialized, the serialized copy is now responsible for setting
117 // the close event.
118 if ret.is_ok() {
119 *self.is_channel_closed_on_drop.borrow_mut() = false;
120 }
121
122 ret
123 }
124 }
125
126 impl Drop for StreamChannel {
drop(&mut self)127 fn drop(&mut self) {
128 if *self.is_channel_closed_on_drop.borrow() {
129 if let Err(e) = self.pipe_closed.write(0) {
130 warn!("failed to notify on channel drop: {}", e);
131 }
132 }
133 }
134 }
135
136 impl StreamChannel {
set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()>137 pub fn set_nonblocking(&mut self, nonblocking: bool) -> io::Result<()> {
138 // Safe because the pipe is open.
139 if nonblocking {
140 self.pipe_conn
141 .set_blocking(&named_pipes::BlockingMode::NoWait)
142 } else {
143 self.pipe_conn
144 .set_blocking(&named_pipes::BlockingMode::Wait)
145 }
146 }
147
148 // WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
149 // > 1 reader per end is not defined.
try_clone(&self) -> io::Result<Self>150 pub fn try_clone(&self) -> io::Result<Self> {
151 Ok(StreamChannel {
152 pipe_conn: self.pipe_conn.try_clone()?,
153 write_notify: self.write_notify.try_clone()?,
154 read_notify: self.read_notify.try_clone()?,
155 pipe_closed: self.pipe_closed.try_clone()?,
156 remote_write_lock: self.remote_write_lock.try_clone()?,
157 local_write_lock: self.local_write_lock.try_clone()?,
158 read_lock: self.read_lock.clone(),
159 is_channel_closed_on_drop: create_true_cell(),
160 send_buffer_size: self.send_buffer_size,
161 })
162 }
163
get_readable_byte_count(&self) -> io::Result<u32>164 fn get_readable_byte_count(&self) -> io::Result<u32> {
165 self.pipe_conn.get_available_byte_count().map_err(|e| {
166 error!("StreamChannel failed to get readable byte count: {}", e);
167 e
168 })
169 }
170
inner_read(&self, buf: &mut [u8]) -> io::Result<usize>171 pub(super) fn inner_read(&self, buf: &mut [u8]) -> io::Result<usize> {
172 // We ensure concurrent read safety by holding a lock for the duration of the method.
173 // (If multiple concurrent readers were permitted, the pipe could be emptied after we decide
174 // that the notifier should be set, leading to an invalid notified/readable state which
175 // could stall readers.)
176 let _read_lock = self.read_lock.lock();
177
178 let res = unsafe {
179 // Safe because no partial reads are possible, and the underlying code bounds the
180 // read by buf's size.
181 self.pipe_conn.read(buf)
182 };
183
184 // The entire goal of this complex section is to avoid the need for shared memory between
185 // each channel end to synchronize the notification state. It is very subtle, modify with
186 // care.
187 loop {
188 // No other thread is reading, so we can find out, without the write lock, whether or
189 // not we need to clear the read notifier. If we don't, then we don't even have to try
190 // acquiring the write lock. This avoids deadlocks where the pipe is full and the write
191 // side blocks on a writing with the lock held. If it looks like we do need to clear
192 // the notifier though, then we have to be sure, so we'll proceed to the next section.
193 let byte_count = self.get_readable_byte_count()?;
194 if byte_count > 0 {
195 // It's always safe to set the read notifier here because we know there is data in the
196 // pipe, and no one else could read it out from under us.
197 self.read_notify.write(0).map_err(|e| {
198 io::Error::new(
199 io::ErrorKind::Other,
200 format!("failed to write to read notifier: {:?}", e),
201 )
202 })?;
203
204 // Notifier state has been safely synced.
205 return res;
206 }
207
208 // At this point, there *may* be no data in the pipe, meaning we may want to clear the
209 // notifier. Instead of just trying to acquire the lock outright which could deadlock
210 // with the writing side, we'll try with a timeout. If it fails, we know that the other
211 // side is in the middle of a write, so there either will be data in the pipe soon (a),
212 // or there won't be and we have to clear a spurious notification (b).
213 //
214 // For (a), we can safely return from the read without needing the lock, so we just come
215 // around in the loop to check again, and the loop will exit quickly.
216 //
217 // For (b) we'll return to this point and acquire the lock, as we're just waiting for
218 // the spurious notification to arrive so we can clear it (that code path is very fast),
219 // and the loop will exit.
220 //
221 // If we successfully acquire the lock though, then we can go ahead and clear the
222 // notifier if the pipe is indeed empty, because we are assured that no writes are
223 // happening (we hold the lock). Here, we wait up to 1ms to acquire the lock because
224 // that's a decent balance between avoiding an unnecessary iteration, and minimizing
225 // latency.
226 if let Some(_write_lock) = self.remote_write_lock.try_lock(/* timeout_ms= */ 1) {
227 let byte_count = self.get_readable_byte_count()?;
228 if byte_count > 0 {
229 // Safe because no one else can be reading from the pipe.
230 self.read_notify.write(0).map_err(|e| {
231 io::Error::new(
232 io::ErrorKind::Other,
233 format!("failed to write to read notifier: {:?}", e),
234 )
235 })?;
236 } else {
237 // Safe because no other writes can be happening (_lock is held).
238 self.read_notify.reset().map_err(|e| {
239 io::Error::new(
240 io::ErrorKind::Other,
241 format!("failed to reset read notifier: {:?}", e),
242 )
243 })?;
244 }
245
246 // Notifier state has been safely synced.
247 return res;
248 }
249 }
250 }
251
252 /// Exists as a workaround for Tube which does not expect its transport to be mutable,
253 /// even though io::Write requires it.
write_immutable(&self, buf: &[u8]) -> io::Result<usize>254 pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
255 if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
256 && buf.len() > self.send_buffer_size
257 {
258 return Err(io::Error::new(
259 io::ErrorKind::Other,
260 format!(
261 "StreamChannel forbids message mode writes larger than the \
262 default buffer size of {}.",
263 self.send_buffer_size,
264 ),
265 ));
266 }
267
268 let _lock = self.local_write_lock.lock();
269 let res = self.pipe_conn.write(buf);
270
271 // We can always set the write notifier because we know that the reader is in one of the
272 // following states:
273 // 1) a read is running, and it consumes these bytes, so the notification is
274 // unnecessary. That's fine, because the reader will resync the notifier state once
275 // it finishes reading.
276 // 2) a read has completed and is blocked on the lock. The notification state is
277 // already correct, and the read's resync won't change that.
278 if res.is_ok() {
279 self.write_notify.write(0).map_err(|e| {
280 io::Error::new(
281 io::ErrorKind::Other,
282 format!("failed to write to read notifier: {:?}", e),
283 )
284 })?;
285 }
286
287 res
288 }
289
290 /// This only works with empty pipes. U.B. will result if used in any other scenario.
from_pipes( pipe_a: PipeConnection, pipe_b: PipeConnection, send_buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>291 pub fn from_pipes(
292 pipe_a: PipeConnection,
293 pipe_b: PipeConnection,
294 send_buffer_size: usize,
295 ) -> Result<(StreamChannel, StreamChannel)> {
296 let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
297 let pipe_closed = Event::new()?;
298
299 let write_lock_a = MultiProcessMutex::new()?;
300 let write_lock_b = MultiProcessMutex::new()?;
301
302 let sock_a = StreamChannel {
303 pipe_conn: pipe_a,
304 write_notify: notify_a_write.try_clone()?,
305 read_notify: notify_b_write.try_clone()?,
306 read_lock: Arc::new(Mutex::new(())),
307 local_write_lock: write_lock_a.try_clone()?,
308 remote_write_lock: write_lock_b.try_clone()?,
309 pipe_closed: pipe_closed.try_clone()?,
310 is_channel_closed_on_drop: create_true_cell(),
311 send_buffer_size,
312 };
313 let sock_b = StreamChannel {
314 pipe_conn: pipe_b,
315 write_notify: notify_b_write,
316 read_notify: notify_a_write,
317 read_lock: Arc::new(Mutex::new(())),
318 local_write_lock: write_lock_b,
319 remote_write_lock: write_lock_a,
320 pipe_closed,
321 is_channel_closed_on_drop: create_true_cell(),
322 send_buffer_size,
323 };
324 Ok((sock_a, sock_b))
325 }
326
327 /// Create a pair with a specific buffer size. Note that this is the only way to send messages
328 /// larger than the default named pipe buffer size.
pair_with_buffer_size( blocking_mode: BlockingMode, framing_mode: FramingMode, buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)>329 pub fn pair_with_buffer_size(
330 blocking_mode: BlockingMode,
331 framing_mode: FramingMode,
332 buffer_size: usize,
333 ) -> Result<(StreamChannel, StreamChannel)> {
334 let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
335 &named_pipes::FramingMode::from(framing_mode),
336 &named_pipes::BlockingMode::from(blocking_mode),
337 0,
338 buffer_size,
339 false,
340 )?;
341 Self::from_pipes(pipe_a, pipe_b, buffer_size)
342 }
343 /// Creates a cross platform channel pair.
344 /// On Windows the result is in the form (server, client).
pair( blocking_mode: BlockingMode, framing_mode: FramingMode, ) -> Result<(StreamChannel, StreamChannel)>345 pub fn pair(
346 blocking_mode: BlockingMode,
347 framing_mode: FramingMode,
348 ) -> Result<(StreamChannel, StreamChannel)> {
349 let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
350 &named_pipes::FramingMode::from(framing_mode),
351 &named_pipes::BlockingMode::from(blocking_mode),
352 0,
353 DEFAULT_BUFFER_SIZE,
354 false,
355 )?;
356 Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
357 }
358
359 /// Blocks until the pipe buffer is empty.
360 /// NOTE: that this will only work for server pipes on Windows.
flush_blocking(&self) -> io::Result<()>361 pub fn flush_blocking(&self) -> io::Result<()> {
362 self.pipe_conn.flush_data_blocking().map_err(|e| e)
363 }
364 }
365
366 impl io::Write for StreamChannel {
write(&mut self, buf: &[u8]) -> io::Result<usize>367 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
368 self.write_immutable(buf)
369 }
flush(&mut self) -> io::Result<()>370 fn flush(&mut self) -> io::Result<()> {
371 // There is no userspace buffering inside crosvm to flush for named pipes. We write
372 // directly to the named pipe using WriteFile.
373 Ok(())
374 }
375 }
376
377 impl AsRawDescriptor for &StreamChannel {
as_raw_descriptor(&self) -> RawDescriptor378 fn as_raw_descriptor(&self) -> RawDescriptor {
379 self.pipe_conn.as_raw_descriptor()
380 }
381 }
382
383 impl ReadNotifier for StreamChannel {
384 /// Returns a RawDescriptor that can be polled for reads using PollContext.
get_read_notifier(&self) -> &dyn AsRawDescriptor385 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
386 &self.read_notify
387 }
388 }
389
390 impl CloseNotifier for StreamChannel {
get_close_notifier(&self) -> &dyn AsRawDescriptor391 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
392 &self.pipe_closed
393 }
394 }
395
396 #[cfg(test)]
397 mod test {
398 use super::{
399 super::super::{EventContext, EventTrigger, PollToken, ReadNotifier},
400 *,
401 };
402 use std::{
403 io::{Read, Write},
404 time::Duration,
405 };
406
407 #[derive(PollToken, Debug, Eq, PartialEq, Copy, Clone)]
408 enum Token {
409 ReceivedData,
410 }
411
412 const EVENT_WAIT_TIME: Duration = Duration::from_secs(10);
413
414 #[test]
test_read_notifies_multiple_writes()415 fn test_read_notifies_multiple_writes() {
416 let (mut sender, mut receiver) =
417 StreamChannel::pair(BlockingMode::Blocking, FramingMode::Byte).unwrap();
418 sender.write_all(&[1, 2]).unwrap();
419
420 // Wait for the write to arrive.
421 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
422 receiver.get_read_notifier(),
423 Token::ReceivedData,
424 )])
425 .unwrap();
426 assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
427
428 // Read just one byte. This leaves another byte in the pipe.
429 let mut recv_buffer = [0u8; 1];
430 let size = receiver.read(&mut recv_buffer).unwrap();
431 assert_eq!(size, 1);
432 assert_eq!(recv_buffer[0], 1);
433
434 // The notifier should still be set, because the pipe has data.
435 assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
436 let size = receiver.read(&mut recv_buffer).unwrap();
437 assert_eq!(size, 1);
438 assert_eq!(recv_buffer[0], 2);
439 }
440
441 #[test]
test_blocked_writer_wont_deadlock()442 fn test_blocked_writer_wont_deadlock() {
443 let (mut writer, mut reader) =
444 StreamChannel::pair_with_buffer_size(BlockingMode::Blocking, FramingMode::Byte, 100)
445 .unwrap();
446 const NUM_OPS: usize = 100;
447
448 // We set the buffer size to 100 bytes. It seems that we must exceed that buffer size by
449 // 100x before we run into a blocking write, so that's what we do here. This makes sense
450 // to a degree because the docs suggest that some automatic expansion of a pipe's buffers
451 // is supposed to be handled by the kernel.
452 let writer = std::thread::spawn(move || {
453 let buf = [0u8; 100];
454 for _ in 0..NUM_OPS {
455 assert_eq!(writer.write(&buf).unwrap(), buf.len());
456 }
457 writer
458 });
459
460 // The test passes if the reader can read (this used to deadlock).
461 let mut buf = [0u8; 100];
462 for _ in 0..NUM_OPS {
463 assert_eq!(reader.read(&mut buf).unwrap(), buf.len());
464 }
465
466 // Writer must exit cleanly.
467 writer.join().unwrap();
468 }
469 }
470