• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::fs::File;
6 use std::io::{self, BufRead, BufReader, Cursor, Read, Write};
7 use std::mem::size_of;
8 use std::os::unix::fs::FileExt;
9 use std::os::unix::io::AsRawFd;
10 use std::sync::Arc;
11 
12 use crate::filesystem::{FileSystem, ZeroCopyReader, ZeroCopyWriter};
13 use crate::server::{Mapper, Reader, Server, Writer};
14 use crate::sys;
15 use crate::{Error, Result};
16 
17 struct DevFuseReader {
18     // File representing /dev/fuse for reading, with sufficient buffer to accommodate a FUSE read
19     // transaction.
20     reader: BufReader<File>,
21 }
22 
23 impl DevFuseReader {
new(reader: BufReader<File>) -> Self24     pub fn new(reader: BufReader<File>) -> Self {
25         DevFuseReader { reader }
26     }
27 
drain(&mut self)28     fn drain(&mut self) {
29         self.reader.consume(self.reader.buffer().len());
30     }
31 }
32 
33 impl Read for DevFuseReader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>34     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
35         self.reader.read(buf)
36     }
37 }
38 
39 impl Reader for DevFuseReader {}
40 
41 impl ZeroCopyReader for DevFuseReader {
read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>42     fn read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
43         let buf = self.reader.fill_buf()?;
44         let end = std::cmp::min(count, buf.len());
45         let written = f.write_at(&buf[..end], off)?;
46         self.reader.consume(written);
47         Ok(written)
48     }
49 }
50 
51 struct DevFuseWriter {
52     // File representing /dev/fuse for writing.
53     dev_fuse: File,
54 
55     // An internal buffer to allow generating data and header out of order, such that they can be
56     // flushed at once. This is wrapped by a cursor for tracking the current written position.
57     write_buf: Cursor<Vec<u8>>,
58 }
59 
60 impl DevFuseWriter {
new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self61     pub fn new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self {
62         debug_assert_eq!(write_buf.position(), 0);
63 
64         DevFuseWriter {
65             dev_fuse,
66             write_buf,
67         }
68     }
69 }
70 
71 impl Write for DevFuseWriter {
write(&mut self, buf: &[u8]) -> io::Result<usize>72     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
73         self.write_buf.write(buf)
74     }
75 
flush(&mut self) -> io::Result<()>76     fn flush(&mut self) -> io::Result<()> {
77         self.dev_fuse.write_all(&self.write_buf.get_ref()[..])?;
78         self.write_buf.set_position(0);
79         self.write_buf.get_mut().clear();
80         Ok(())
81     }
82 }
83 
84 impl Writer for DevFuseWriter {
85     type ClosureWriter = Self;
86 
write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize> where F: Fn(&mut Self) -> io::Result<usize>,87     fn write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize>
88     where
89         F: Fn(&mut Self) -> io::Result<usize>,
90     {
91         // Restore the cursor for idempotent.
92         let original = self.write_buf.position();
93         self.write_buf.set_position(offset as u64);
94         let r = f(self);
95         self.write_buf.set_position(original);
96         r
97     }
98 
has_sufficient_buffer(&self, size: u32) -> bool99     fn has_sufficient_buffer(&self, size: u32) -> bool {
100         (self.write_buf.position() as usize + size as usize) < self.write_buf.get_ref().capacity()
101     }
102 }
103 
104 impl ZeroCopyWriter for DevFuseWriter {
write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>105     fn write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
106         let pos = self.write_buf.position() as usize;
107         let end = pos + count;
108         let buf = self.write_buf.get_mut();
109 
110         let old_end = buf.len();
111         buf.resize(end, 0);
112         let read = f.read_at(&mut buf[pos..end], off)?;
113 
114         let new_end = pos + read;
115         debug_assert!(new_end >= old_end);
116         buf.truncate(new_end);
117         self.write_buf.set_position(new_end as u64);
118         Ok(read)
119     }
120 }
121 
122 struct DevFuseMapper;
123 
124 impl DevFuseMapper {
new() -> Self125     fn new() -> Self {
126         Self {}
127     }
128 }
129 
130 impl Mapper for DevFuseMapper {
map( &self, _mem_offset: u64, _size: usize, _fd: &dyn AsRawFd, _file_offset: u64, _prot: u32, ) -> io::Result<()>131     fn map(
132         &self,
133         _mem_offset: u64,
134         _size: usize,
135         _fd: &dyn AsRawFd,
136         _file_offset: u64,
137         _prot: u32,
138     ) -> io::Result<()> {
139         Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
140     }
141 
unmap(&self, _offset: u64, _size: u64) -> io::Result<()>142     fn unmap(&self, _offset: u64, _size: u64) -> io::Result<()> {
143         Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
144     }
145 }
146 
147 /// Start the FUSE message handling loop. Returns when an error happens.
148 ///
149 /// # Arguments
150 ///
151 /// * `dev_fuse` - A `File` object of /dev/fuse
152 /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
153 /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse. Must be large
154 ///                          enough (usually equal) to `n` in `MountOption::MaxRead(n)`.
155 ///
156 /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, fs: F, ) -> Result<()>157 pub fn start_message_loop<F: FileSystem + Sync>(
158     dev_fuse: File,
159     input_buffer_size: u32,
160     output_buffer_size: u32,
161     fs: F,
162 ) -> Result<()> {
163     let server = Server::new(fs);
164     do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
165 }
166 
do_start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, server: &Server<F>, ) -> Result<()>167 fn do_start_message_loop<F: FileSystem + Sync>(
168     dev_fuse: File,
169     input_buffer_size: u32,
170     output_buffer_size: u32,
171     server: &Server<F>,
172 ) -> Result<()> {
173     let mut dev_fuse_reader = {
174         let rfile = dev_fuse.try_clone().map_err(Error::EndpointSetup)?;
175         let buf_reader = BufReader::with_capacity(
176             input_buffer_size as usize + size_of::<sys::InHeader>() + size_of::<sys::WriteIn>(),
177             rfile,
178         );
179         DevFuseReader::new(buf_reader)
180     };
181     let mut dev_fuse_writer = {
182         let wfile = dev_fuse;
183         let write_buf = Cursor::new(Vec::with_capacity(output_buffer_size as usize));
184         DevFuseWriter::new(wfile, write_buf)
185     };
186     let dev_fuse_mapper = DevFuseMapper::new();
187     loop {
188         server.handle_message(&mut dev_fuse_reader, &mut dev_fuse_writer, &dev_fuse_mapper)?;
189 
190         // Since we're reusing the buffer to avoid repeated allocation, drain the possible
191         // residual from the buffer.
192         dev_fuse_reader.drain();
193     }
194 }
195 
196 // TODO: Remove worker and this namespace from public
197 pub mod internal {
198     use super::*;
199     use crossbeam_utils::thread;
200 
201     /// Start the FUSE message handling loops in multiple threads. Returns when an error happens.
202     ///
203     /// # Arguments
204     ///
205     /// * `dev_fuse` - A `File` object of /dev/fuse
206     /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
207     /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse.
208     ///
209     /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop_mt<F: FileSystem + Sync + Send>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, thread_numbers: usize, fs: F, ) -> Result<()>210     pub fn start_message_loop_mt<F: FileSystem + Sync + Send>(
211         dev_fuse: File,
212         input_buffer_size: u32,
213         output_buffer_size: u32,
214         thread_numbers: usize,
215         fs: F,
216     ) -> Result<()> {
217         let result = thread::scope(|s| {
218             let server = Arc::new(Server::new(fs));
219             for _ in 0..thread_numbers {
220                 let dev_fuse = dev_fuse
221                     .try_clone()
222                     .map_err(Error::EndpointSetup)
223                     .expect("Failed to clone /dev/fuse FD");
224                 let server = server.clone();
225                 s.spawn(move |_| {
226                     do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
227                 });
228             }
229         });
230 
231         unreachable!("Threads exited or crashed unexpectedly: {:?}", result);
232     }
233 }
234