1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::fs::File;
6 use std::io;
7 use std::io::BufRead;
8 use std::io::BufReader;
9 use std::io::Cursor;
10 use std::io::Read;
11 use std::io::Write;
12 use std::mem::size_of;
13 use std::os::unix::fs::FileExt;
14 use std::os::unix::io::AsRawFd;
15 use std::sync::Arc;
16
17 use crate::filesystem::FileSystem;
18 use crate::filesystem::ZeroCopyReader;
19 use crate::filesystem::ZeroCopyWriter;
20 use crate::server::Mapper;
21 use crate::server::Reader;
22 use crate::server::Server;
23 use crate::server::Writer;
24 use crate::sys;
25 use crate::Error;
26 use crate::Result;
27
28 struct DevFuseReader {
29 // File representing /dev/fuse for reading, with sufficient buffer to accommodate a FUSE read
30 // transaction.
31 reader: BufReader<File>,
32 }
33
34 impl DevFuseReader {
new(reader: BufReader<File>) -> Self35 pub fn new(reader: BufReader<File>) -> Self {
36 DevFuseReader { reader }
37 }
38
drain(&mut self)39 fn drain(&mut self) {
40 self.reader.consume(self.reader.buffer().len());
41 }
42 }
43
44 impl Read for DevFuseReader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>45 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
46 self.reader.read(buf)
47 }
48 }
49
50 impl Reader for DevFuseReader {}
51
52 impl ZeroCopyReader for DevFuseReader {
read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>53 fn read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
54 let buf = self.reader.fill_buf()?;
55 let end = std::cmp::min(count, buf.len());
56 let written = f.write_at(&buf[..end], off)?;
57 self.reader.consume(written);
58 Ok(written)
59 }
60 }
61
62 struct DevFuseWriter {
63 // File representing /dev/fuse for writing.
64 dev_fuse: File,
65
66 // An internal buffer to allow generating data and header out of order, such that they can be
67 // flushed at once. This is wrapped by a cursor for tracking the current written position.
68 write_buf: Cursor<Vec<u8>>,
69 }
70
71 impl DevFuseWriter {
new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self72 pub fn new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self {
73 debug_assert_eq!(write_buf.position(), 0);
74
75 DevFuseWriter {
76 dev_fuse,
77 write_buf,
78 }
79 }
80 }
81
82 impl Write for DevFuseWriter {
write(&mut self, buf: &[u8]) -> io::Result<usize>83 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
84 self.write_buf.write(buf)
85 }
86
flush(&mut self) -> io::Result<()>87 fn flush(&mut self) -> io::Result<()> {
88 self.dev_fuse.write_all(&self.write_buf.get_ref()[..])?;
89 self.write_buf.set_position(0);
90 self.write_buf.get_mut().clear();
91 Ok(())
92 }
93 }
94
95 impl Writer for DevFuseWriter {
96 type ClosureWriter = Self;
97
write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize> where F: Fn(&mut Self) -> io::Result<usize>,98 fn write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize>
99 where
100 F: Fn(&mut Self) -> io::Result<usize>,
101 {
102 // Restore the cursor for idempotent.
103 let original = self.write_buf.position();
104 self.write_buf.set_position(offset as u64);
105 let r = f(self);
106 self.write_buf.set_position(original);
107 r
108 }
109
has_sufficient_buffer(&self, size: u32) -> bool110 fn has_sufficient_buffer(&self, size: u32) -> bool {
111 (self.write_buf.position() as usize + size as usize) < self.write_buf.get_ref().capacity()
112 }
113 }
114
115 impl ZeroCopyWriter for DevFuseWriter {
write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>116 fn write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
117 let pos = self.write_buf.position() as usize;
118 let end = pos + count;
119 let buf = self.write_buf.get_mut();
120
121 let old_end = buf.len();
122 buf.resize(end, 0);
123 let read = f.read_at(&mut buf[pos..end], off)?;
124
125 let new_end = pos + read;
126 debug_assert!(new_end >= old_end);
127 buf.truncate(new_end);
128 self.write_buf.set_position(new_end as u64);
129 Ok(read)
130 }
131 }
132
133 struct DevFuseMapper;
134
135 impl DevFuseMapper {
new() -> Self136 fn new() -> Self {
137 Self {}
138 }
139 }
140
141 impl Mapper for DevFuseMapper {
map( &self, _mem_offset: u64, _size: usize, _fd: &dyn AsRawFd, _file_offset: u64, _prot: u32, ) -> io::Result<()>142 fn map(
143 &self,
144 _mem_offset: u64,
145 _size: usize,
146 _fd: &dyn AsRawFd,
147 _file_offset: u64,
148 _prot: u32,
149 ) -> io::Result<()> {
150 Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
151 }
152
unmap(&self, _offset: u64, _size: u64) -> io::Result<()>153 fn unmap(&self, _offset: u64, _size: u64) -> io::Result<()> {
154 Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
155 }
156 }
157
158 /// Start the FUSE message handling loop. Returns when an error happens.
159 ///
160 /// # Arguments
161 ///
162 /// * `dev_fuse` - A `File` object of /dev/fuse
163 /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
164 /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse. Must be large
165 /// enough (usually equal) to `n` in `MountOption::MaxRead(n)`.
166 ///
167 /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, fs: F, ) -> Result<()>168 pub fn start_message_loop<F: FileSystem + Sync>(
169 dev_fuse: File,
170 input_buffer_size: u32,
171 output_buffer_size: u32,
172 fs: F,
173 ) -> Result<()> {
174 let server = Server::new(fs);
175 do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
176 }
177
do_start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, server: &Server<F>, ) -> Result<()>178 fn do_start_message_loop<F: FileSystem + Sync>(
179 dev_fuse: File,
180 input_buffer_size: u32,
181 output_buffer_size: u32,
182 server: &Server<F>,
183 ) -> Result<()> {
184 let mut dev_fuse_reader = {
185 let rfile = dev_fuse.try_clone().map_err(Error::EndpointSetup)?;
186 let buf_reader = BufReader::with_capacity(
187 input_buffer_size as usize + size_of::<sys::InHeader>() + size_of::<sys::WriteIn>(),
188 rfile,
189 );
190 DevFuseReader::new(buf_reader)
191 };
192 let mut dev_fuse_writer = {
193 let wfile = dev_fuse;
194 let write_buf = Cursor::new(Vec::with_capacity(output_buffer_size as usize));
195 DevFuseWriter::new(wfile, write_buf)
196 };
197 let dev_fuse_mapper = DevFuseMapper::new();
198 loop {
199 server.handle_message(&mut dev_fuse_reader, &mut dev_fuse_writer, &dev_fuse_mapper)?;
200
201 // Since we're reusing the buffer to avoid repeated allocation, drain the possible
202 // residual from the buffer.
203 dev_fuse_reader.drain();
204 }
205 }
206
207 // TODO: Remove worker and this namespace from public
208 pub mod internal {
209 use crossbeam_utils::thread;
210
211 use super::*;
212
213 /// Start the FUSE message handling loops in multiple threads. Returns when an error happens.
214 ///
215 /// # Arguments
216 ///
217 /// * `dev_fuse` - A `File` object of /dev/fuse
218 /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
219 /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse.
220 ///
221 /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop_mt<F: FileSystem + Sync + Send>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, thread_numbers: usize, fs: F, ) -> Result<()>222 pub fn start_message_loop_mt<F: FileSystem + Sync + Send>(
223 dev_fuse: File,
224 input_buffer_size: u32,
225 output_buffer_size: u32,
226 thread_numbers: usize,
227 fs: F,
228 ) -> Result<()> {
229 let result = thread::scope(|s| {
230 let server = Arc::new(Server::new(fs));
231 for _ in 0..thread_numbers {
232 let dev_fuse = dev_fuse
233 .try_clone()
234 .map_err(Error::EndpointSetup)
235 .expect("Failed to clone /dev/fuse FD");
236 let server = server.clone();
237 s.spawn(move |_| {
238 do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
239 });
240 }
241 });
242
243 unreachable!("Threads exited or crashed unexpectedly: {:?}", result);
244 }
245 }
246