1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::fs::File;
6 use std::io::{self, BufRead, BufReader, Cursor, Read, Write};
7 use std::mem::size_of;
8 use std::os::unix::fs::FileExt;
9 use std::os::unix::io::AsRawFd;
10 use std::sync::Arc;
11
12 use crate::filesystem::{FileSystem, ZeroCopyReader, ZeroCopyWriter};
13 use crate::server::{Mapper, Reader, Server, Writer};
14 use crate::sys;
15 use crate::{Error, Result};
16
17 struct DevFuseReader {
18 // File representing /dev/fuse for reading, with sufficient buffer to accommodate a FUSE read
19 // transaction.
20 reader: BufReader<File>,
21 }
22
23 impl DevFuseReader {
new(reader: BufReader<File>) -> Self24 pub fn new(reader: BufReader<File>) -> Self {
25 DevFuseReader { reader }
26 }
27
drain(&mut self)28 fn drain(&mut self) {
29 self.reader.consume(self.reader.buffer().len());
30 }
31 }
32
33 impl Read for DevFuseReader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>34 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
35 self.reader.read(buf)
36 }
37 }
38
39 impl Reader for DevFuseReader {}
40
41 impl ZeroCopyReader for DevFuseReader {
read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>42 fn read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
43 let buf = self.reader.fill_buf()?;
44 let end = std::cmp::min(count, buf.len());
45 let written = f.write_at(&buf[..end], off)?;
46 self.reader.consume(written);
47 Ok(written)
48 }
49 }
50
51 struct DevFuseWriter {
52 // File representing /dev/fuse for writing.
53 dev_fuse: File,
54
55 // An internal buffer to allow generating data and header out of order, such that they can be
56 // flushed at once. This is wrapped by a cursor for tracking the current written position.
57 write_buf: Cursor<Vec<u8>>,
58 }
59
60 impl DevFuseWriter {
new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self61 pub fn new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self {
62 debug_assert_eq!(write_buf.position(), 0);
63
64 DevFuseWriter {
65 dev_fuse,
66 write_buf,
67 }
68 }
69 }
70
71 impl Write for DevFuseWriter {
write(&mut self, buf: &[u8]) -> io::Result<usize>72 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
73 self.write_buf.write(buf)
74 }
75
flush(&mut self) -> io::Result<()>76 fn flush(&mut self) -> io::Result<()> {
77 self.dev_fuse.write_all(&self.write_buf.get_ref()[..])?;
78 self.write_buf.set_position(0);
79 self.write_buf.get_mut().clear();
80 Ok(())
81 }
82 }
83
84 impl Writer for DevFuseWriter {
85 type ClosureWriter = Self;
86
write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize> where F: Fn(&mut Self) -> io::Result<usize>,87 fn write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize>
88 where
89 F: Fn(&mut Self) -> io::Result<usize>,
90 {
91 // Restore the cursor for idempotent.
92 let original = self.write_buf.position();
93 self.write_buf.set_position(offset as u64);
94 let r = f(self);
95 self.write_buf.set_position(original);
96 r
97 }
98
has_sufficient_buffer(&self, size: u32) -> bool99 fn has_sufficient_buffer(&self, size: u32) -> bool {
100 (self.write_buf.position() as usize + size as usize) < self.write_buf.get_ref().capacity()
101 }
102 }
103
104 impl ZeroCopyWriter for DevFuseWriter {
write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize>105 fn write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
106 let pos = self.write_buf.position() as usize;
107 let end = pos + count;
108 let buf = self.write_buf.get_mut();
109
110 let old_end = buf.len();
111 buf.resize(end, 0);
112 let read = f.read_at(&mut buf[pos..end], off)?;
113
114 let new_end = pos + read;
115 debug_assert!(new_end >= old_end);
116 buf.truncate(new_end);
117 self.write_buf.set_position(new_end as u64);
118 Ok(read)
119 }
120 }
121
122 struct DevFuseMapper;
123
124 impl DevFuseMapper {
new() -> Self125 fn new() -> Self {
126 Self {}
127 }
128 }
129
130 impl Mapper for DevFuseMapper {
map( &self, _mem_offset: u64, _size: usize, _fd: &dyn AsRawFd, _file_offset: u64, _prot: u32, ) -> io::Result<()>131 fn map(
132 &self,
133 _mem_offset: u64,
134 _size: usize,
135 _fd: &dyn AsRawFd,
136 _file_offset: u64,
137 _prot: u32,
138 ) -> io::Result<()> {
139 Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
140 }
141
unmap(&self, _offset: u64, _size: u64) -> io::Result<()>142 fn unmap(&self, _offset: u64, _size: u64) -> io::Result<()> {
143 Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
144 }
145 }
146
147 /// Start the FUSE message handling loop. Returns when an error happens.
148 ///
149 /// # Arguments
150 ///
151 /// * `dev_fuse` - A `File` object of /dev/fuse
152 /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
153 /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse. Must be large
154 /// enough (usually equal) to `n` in `MountOption::MaxRead(n)`.
155 ///
156 /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, fs: F, ) -> Result<()>157 pub fn start_message_loop<F: FileSystem + Sync>(
158 dev_fuse: File,
159 input_buffer_size: u32,
160 output_buffer_size: u32,
161 fs: F,
162 ) -> Result<()> {
163 let server = Server::new(fs);
164 do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
165 }
166
do_start_message_loop<F: FileSystem + Sync>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, server: &Server<F>, ) -> Result<()>167 fn do_start_message_loop<F: FileSystem + Sync>(
168 dev_fuse: File,
169 input_buffer_size: u32,
170 output_buffer_size: u32,
171 server: &Server<F>,
172 ) -> Result<()> {
173 let mut dev_fuse_reader = {
174 let rfile = dev_fuse.try_clone().map_err(Error::EndpointSetup)?;
175 let buf_reader = BufReader::with_capacity(
176 input_buffer_size as usize + size_of::<sys::InHeader>() + size_of::<sys::WriteIn>(),
177 rfile,
178 );
179 DevFuseReader::new(buf_reader)
180 };
181 let mut dev_fuse_writer = {
182 let wfile = dev_fuse;
183 let write_buf = Cursor::new(Vec::with_capacity(output_buffer_size as usize));
184 DevFuseWriter::new(wfile, write_buf)
185 };
186 let dev_fuse_mapper = DevFuseMapper::new();
187 loop {
188 server.handle_message(&mut dev_fuse_reader, &mut dev_fuse_writer, &dev_fuse_mapper)?;
189
190 // Since we're reusing the buffer to avoid repeated allocation, drain the possible
191 // residual from the buffer.
192 dev_fuse_reader.drain();
193 }
194 }
195
196 // TODO: Remove worker and this namespace from public
197 pub mod internal {
198 use super::*;
199 use crossbeam_utils::thread;
200
201 /// Start the FUSE message handling loops in multiple threads. Returns when an error happens.
202 ///
203 /// # Arguments
204 ///
205 /// * `dev_fuse` - A `File` object of /dev/fuse
206 /// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
207 /// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse.
208 ///
209 /// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
start_message_loop_mt<F: FileSystem + Sync + Send>( dev_fuse: File, input_buffer_size: u32, output_buffer_size: u32, thread_numbers: usize, fs: F, ) -> Result<()>210 pub fn start_message_loop_mt<F: FileSystem + Sync + Send>(
211 dev_fuse: File,
212 input_buffer_size: u32,
213 output_buffer_size: u32,
214 thread_numbers: usize,
215 fs: F,
216 ) -> Result<()> {
217 let result = thread::scope(|s| {
218 let server = Arc::new(Server::new(fs));
219 for _ in 0..thread_numbers {
220 let dev_fuse = dev_fuse
221 .try_clone()
222 .map_err(Error::EndpointSetup)
223 .expect("Failed to clone /dev/fuse FD");
224 let server = server.clone();
225 s.spawn(move |_| {
226 do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
227 });
228 }
229 });
230
231 unreachable!("Threads exited or crashed unexpectedly: {:?}", result);
232 }
233 }
234