• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::fs::File;
6 use std::io;
7 use std::io::BufRead;
8 use std::io::BufReader;
9 use std::io::Cursor;
10 use std::io::Read;
11 use std::io::Write;
12 use std::mem::size_of;
13 use std::os::unix::fs::FileExt;
14 use std::os::unix::io::AsRawFd;
15 use std::sync::Arc;
16 
17 use crate::filesystem::FileSystem;
18 use crate::filesystem::ZeroCopyReader;
19 use crate::filesystem::ZeroCopyWriter;
20 use crate::server::Mapper;
21 use crate::server::Reader;
22 use crate::server::Server;
23 use crate::server::Writer;
24 use crate::sys;
25 use crate::Error;
26 use crate::Result;
27 
28 struct DevFuseReader {
29     // File representing /dev/fuse for reading, with sufficient buffer to accommodate a FUSE read
30     // transaction.
31     reader: BufReader<File>,
32 }
33 
34 impl DevFuseReader {
new(reader: BufReader<File>) -> Self35     pub fn new(reader: BufReader<File>) -> Self {
36         DevFuseReader { reader }
37     }
38 
drain(&mut self)39     fn drain(&mut self) {
40         self.reader.consume(self.reader.buffer().len());
41     }
42 }
43 
44 impl Read for DevFuseReader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>45     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
46         self.reader.read(buf)
47     }
48 }
49 
50 impl Reader for DevFuseReader {}
51 
52 impl ZeroCopyReader for DevFuseReader {
read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>53     fn read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
54         let buf = self.reader.fill_buf()?;
55         let end = std::cmp::min(count, buf.len());
56         let written = f.write_at(&buf[..end], off)?;
57         self.reader.consume(written);
58         Ok(written)
59     }
60 }
61 
62 struct DevFuseWriter {
63     // File representing /dev/fuse for writing.
64     dev_fuse: File,
65 
66     // An internal buffer to allow generating data and header out of order, such that they can be
67     // flushed at once. This is wrapped by a cursor for tracking the current written position.
68     write_buf: Cursor<Vec<u8>>,
69 }
70 
71 impl DevFuseWriter {
new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self72     pub fn new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self {
73         debug_assert_eq!(write_buf.position(), 0);
74 
75         DevFuseWriter {
76             dev_fuse,
77             write_buf,
78         }
79     }
80 }
81 
82 impl Write for DevFuseWriter {
write(&mut self, buf: &[u8]) -> io::Result<usize>83     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
84         self.write_buf.write(buf)
85     }
86 
flush(&mut self) -> io::Result<()>87     fn flush(&mut self) -> io::Result<()> {
88         self.dev_fuse.write_all(&self.write_buf.get_ref()[..])?;
89         self.write_buf.set_position(0);
90         self.write_buf.get_mut().clear();
91         Ok(())
92     }
93 }
94 
95 impl Writer for DevFuseWriter {
96     type ClosureWriter = Self;
97 
write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize> where F: Fn(&mut Self) -> io::Result<usize>,98     fn write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize>
99     where
100         F: Fn(&mut Self) -> io::Result<usize>,
101     {
102         // Restore the cursor for idempotent.
103         let original = self.write_buf.position();
104         self.write_buf.set_position(offset as u64);
105         let r = f(self);
106         self.write_buf.set_position(original);
107         r
108     }
109 
has_sufficient_buffer(&self, size: u32) -> bool110     fn has_sufficient_buffer(&self, size: u32) -> bool {
111         (self.write_buf.position() as usize + size as usize) < self.write_buf.get_ref().capacity()
112     }
113 }
114 
115 impl ZeroCopyWriter for DevFuseWriter {
write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>116     fn write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
117         let pos = self.write_buf.position() as usize;
118         let end = pos + count;
119         let buf = self.write_buf.get_mut();
120 
121         let old_end = buf.len();
122         buf.resize(end, 0);
123         let read = f.read_at(&mut buf[pos..end], off)?;
124 
125         let new_end = pos + read;
126         debug_assert!(new_end >= old_end);
127         buf.truncate(new_end);
128         self.write_buf.set_position(new_end as u64);
129         Ok(read)
130     }
131 }
132 
133 struct DevFuseMapper;
134 
135 impl DevFuseMapper {
new() -> Self136     fn new() -> Self {
137         Self {}
138     }
139 }
140 
141 impl Mapper for DevFuseMapper {
map( &self, _mem_offset: u64, _size: usize, _fd: &dyn AsRawFd, _file_offset: u64, _prot: u32, ) -> io::Result<()>142     fn map(
143         &self,
144         _mem_offset: u64,
145         _size: usize,
146         _fd: &dyn AsRawFd,
147         _file_offset: u64,
148         _prot: u32,
149     ) -> io::Result<()> {
150         Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
151     }
152 
unmap(&self, _offset: u64, _size: u64) -> io::Result<()>153     fn unmap(&self, _offset: u64, _size: u64) -> io::Result<()> {
154         Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
155     }
156 }
157 
158 /// Start the FUSE message handling loop. Returns when an error happens.
159 ///
160 /// # Arguments
161 ///
162 /// * `dev_fuse` - A `File` object of /dev/fuse
163 /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
164 /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse. Must be large
165 ///                          enough (usually equal) to `n` in `MountOption::MaxRead(n)`.
166 ///
167 /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, fs: F, ) -> Result<()>168 pub fn start_message_loop<F: FileSystem + Sync>(
169     dev_fuse: File,
170     input_buffer_size: u32,
171     output_buffer_size: u32,
172     fs: F,
173 ) -> Result<()> {
174     let server = Server::new(fs);
175     do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
176 }
177 
do_start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, server: &Server<F>, ) -> Result<()>178 fn do_start_message_loop<F: FileSystem + Sync>(
179     dev_fuse: File,
180     input_buffer_size: u32,
181     output_buffer_size: u32,
182     server: &Server<F>,
183 ) -> Result<()> {
184     let mut dev_fuse_reader = {
185         let rfile = dev_fuse.try_clone().map_err(Error::EndpointSetup)?;
186         let buf_reader = BufReader::with_capacity(
187             input_buffer_size as usize + size_of::<sys::InHeader>() + size_of::<sys::WriteIn>(),
188             rfile,
189         );
190         DevFuseReader::new(buf_reader)
191     };
192     let mut dev_fuse_writer = {
193         let wfile = dev_fuse;
194         let write_buf = Cursor::new(Vec::with_capacity(output_buffer_size as usize));
195         DevFuseWriter::new(wfile, write_buf)
196     };
197     let dev_fuse_mapper = DevFuseMapper::new();
198     loop {
199         server.handle_message(&mut dev_fuse_reader, &mut dev_fuse_writer, &dev_fuse_mapper)?;
200 
201         // Since we're reusing the buffer to avoid repeated allocation, drain the possible
202         // residual from the buffer.
203         dev_fuse_reader.drain();
204     }
205 }
206 
207 // TODO: Remove worker and this namespace from public
208 pub mod internal {
209     use crossbeam_utils::thread;
210 
211     use super::*;
212 
213     /// Start the FUSE message handling loops in multiple threads. Returns when an error happens.
214     ///
215     /// # Arguments
216     ///
217     /// * `dev_fuse` - A `File` object of /dev/fuse
218     /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
219     /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse.
220     ///
221     /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop_mt<F: FileSystem + Sync + Send>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, thread_numbers: usize, fs: F, ) -> Result<()>222     pub fn start_message_loop_mt<F: FileSystem + Sync + Send>(
223         dev_fuse: File,
224         input_buffer_size: u32,
225         output_buffer_size: u32,
226         thread_numbers: usize,
227         fs: F,
228     ) -> Result<()> {
229         let result = thread::scope(|s| {
230             let server = Arc::new(Server::new(fs));
231             for _ in 0..thread_numbers {
232                 let dev_fuse = dev_fuse
233                     .try_clone()
234                     .map_err(Error::EndpointSetup)
235                     .expect("Failed to clone /dev/fuse FD");
236                 let server = server.clone();
237                 s.spawn(move |_| {
238                     do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
239                 });
240             }
241         });
242 
243         unreachable!("Threads exited or crashed unexpectedly: {:?}", result);
244     }
245 }
246