• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::io;
2 use std::io::prelude::*;
3 use std::mem;
4 
5 use crate::{Compress, Decompress, DecompressError, FlushCompress, FlushDecompress, Status};
6 
7 #[derive(Debug)]
8 pub struct Writer<W: Write, D: Ops> {
9     obj: Option<W>,
10     pub data: D,
11     buf: Vec<u8>,
12 }
13 
14 pub trait Ops {
15     type Flush: Flush;
total_in(&self) -> u6416     fn total_in(&self) -> u64;
total_out(&self) -> u6417     fn total_out(&self) -> u64;
run( &mut self, input: &[u8], output: &mut [u8], flush: Self::Flush, ) -> Result<Status, DecompressError>18     fn run(
19         &mut self,
20         input: &[u8],
21         output: &mut [u8],
22         flush: Self::Flush,
23     ) -> Result<Status, DecompressError>;
run_vec( &mut self, input: &[u8], output: &mut Vec<u8>, flush: Self::Flush, ) -> Result<Status, DecompressError>24     fn run_vec(
25         &mut self,
26         input: &[u8],
27         output: &mut Vec<u8>,
28         flush: Self::Flush,
29     ) -> Result<Status, DecompressError>;
30 }
31 
32 impl Ops for Compress {
33     type Flush = FlushCompress;
total_in(&self) -> u6434     fn total_in(&self) -> u64 {
35         self.total_in()
36     }
total_out(&self) -> u6437     fn total_out(&self) -> u64 {
38         self.total_out()
39     }
run( &mut self, input: &[u8], output: &mut [u8], flush: FlushCompress, ) -> Result<Status, DecompressError>40     fn run(
41         &mut self,
42         input: &[u8],
43         output: &mut [u8],
44         flush: FlushCompress,
45     ) -> Result<Status, DecompressError> {
46         Ok(self.compress(input, output, flush).unwrap())
47     }
run_vec( &mut self, input: &[u8], output: &mut Vec<u8>, flush: FlushCompress, ) -> Result<Status, DecompressError>48     fn run_vec(
49         &mut self,
50         input: &[u8],
51         output: &mut Vec<u8>,
52         flush: FlushCompress,
53     ) -> Result<Status, DecompressError> {
54         Ok(self.compress_vec(input, output, flush).unwrap())
55     }
56 }
57 
58 impl Ops for Decompress {
59     type Flush = FlushDecompress;
total_in(&self) -> u6460     fn total_in(&self) -> u64 {
61         self.total_in()
62     }
total_out(&self) -> u6463     fn total_out(&self) -> u64 {
64         self.total_out()
65     }
run( &mut self, input: &[u8], output: &mut [u8], flush: FlushDecompress, ) -> Result<Status, DecompressError>66     fn run(
67         &mut self,
68         input: &[u8],
69         output: &mut [u8],
70         flush: FlushDecompress,
71     ) -> Result<Status, DecompressError> {
72         self.decompress(input, output, flush)
73     }
run_vec( &mut self, input: &[u8], output: &mut Vec<u8>, flush: FlushDecompress, ) -> Result<Status, DecompressError>74     fn run_vec(
75         &mut self,
76         input: &[u8],
77         output: &mut Vec<u8>,
78         flush: FlushDecompress,
79     ) -> Result<Status, DecompressError> {
80         self.decompress_vec(input, output, flush)
81     }
82 }
83 
84 pub trait Flush {
none() -> Self85     fn none() -> Self;
sync() -> Self86     fn sync() -> Self;
finish() -> Self87     fn finish() -> Self;
88 }
89 
90 impl Flush for FlushCompress {
none() -> Self91     fn none() -> Self {
92         FlushCompress::None
93     }
94 
sync() -> Self95     fn sync() -> Self {
96         FlushCompress::Sync
97     }
98 
finish() -> Self99     fn finish() -> Self {
100         FlushCompress::Finish
101     }
102 }
103 
104 impl Flush for FlushDecompress {
none() -> Self105     fn none() -> Self {
106         FlushDecompress::None
107     }
108 
sync() -> Self109     fn sync() -> Self {
110         FlushDecompress::Sync
111     }
112 
finish() -> Self113     fn finish() -> Self {
114         FlushDecompress::Finish
115     }
116 }
117 
read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize> where R: BufRead, D: Ops,118 pub fn read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize>
119 where
120     R: BufRead,
121     D: Ops,
122 {
123     loop {
124         let (read, consumed, ret, eof);
125         {
126             let input = obj.fill_buf()?;
127             eof = input.is_empty();
128             let before_out = data.total_out();
129             let before_in = data.total_in();
130             let flush = if eof {
131                 D::Flush::finish()
132             } else {
133                 D::Flush::none()
134             };
135             ret = data.run(input, dst, flush);
136             read = (data.total_out() - before_out) as usize;
137             consumed = (data.total_in() - before_in) as usize;
138         }
139         obj.consume(consumed);
140 
141         match ret {
142             // If we haven't ready any data and we haven't hit EOF yet,
143             // then we need to keep asking for more data because if we
144             // return that 0 bytes of data have been read then it will
145             // be interpreted as EOF.
146             Ok(Status::Ok) | Ok(Status::BufError) if read == 0 && !eof && dst.len() > 0 => continue,
147             Ok(Status::Ok) | Ok(Status::BufError) | Ok(Status::StreamEnd) => return Ok(read),
148 
149             Err(..) => {
150                 return Err(io::Error::new(
151                     io::ErrorKind::InvalidInput,
152                     "corrupt deflate stream",
153                 ))
154             }
155         }
156     }
157 }
158 
159 impl<W: Write, D: Ops> Writer<W, D> {
new(w: W, d: D) -> Writer<W, D>160     pub fn new(w: W, d: D) -> Writer<W, D> {
161         Writer {
162             obj: Some(w),
163             data: d,
164             buf: Vec::with_capacity(32 * 1024),
165         }
166     }
167 
finish(&mut self) -> io::Result<()>168     pub fn finish(&mut self) -> io::Result<()> {
169         loop {
170             self.dump()?;
171 
172             let before = self.data.total_out();
173             self.data.run_vec(&[], &mut self.buf, D::Flush::finish())?;
174             if before == self.data.total_out() {
175                 return Ok(());
176             }
177         }
178     }
179 
replace(&mut self, w: W) -> W180     pub fn replace(&mut self, w: W) -> W {
181         self.buf.truncate(0);
182         mem::replace(self.get_mut(), w)
183     }
184 
get_ref(&self) -> &W185     pub fn get_ref(&self) -> &W {
186         self.obj.as_ref().unwrap()
187     }
188 
get_mut(&mut self) -> &mut W189     pub fn get_mut(&mut self) -> &mut W {
190         self.obj.as_mut().unwrap()
191     }
192 
193     // Note that this should only be called if the outer object is just about
194     // to be consumed!
195     //
196     // (e.g. an implementation of `into_inner`)
take_inner(&mut self) -> W197     pub fn take_inner(&mut self) -> W {
198         self.obj.take().unwrap()
199     }
200 
is_present(&self) -> bool201     pub fn is_present(&self) -> bool {
202         self.obj.is_some()
203     }
204 
205     // Returns total written bytes and status of underlying codec
write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)>206     pub(crate) fn write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)> {
207         // miniz isn't guaranteed to actually write any of the buffer provided,
208         // it may be in a flushing mode where it's just giving us data before
209         // we're actually giving it any data. We don't want to spuriously return
210         // `Ok(0)` when possible as it will cause calls to write_all() to fail.
211         // As a result we execute this in a loop to ensure that we try our
212         // darndest to write the data.
213         loop {
214             self.dump()?;
215 
216             let before_in = self.data.total_in();
217             let ret = self.data.run_vec(buf, &mut self.buf, D::Flush::none());
218             let written = (self.data.total_in() - before_in) as usize;
219 
220             let is_stream_end = match ret {
221                 Ok(Status::StreamEnd) => true,
222                 _ => false,
223             };
224 
225             if buf.len() > 0 && written == 0 && ret.is_ok() && !is_stream_end {
226                 continue;
227             }
228             return match ret {
229                 Ok(st) => match st {
230                     Status::Ok | Status::BufError | Status::StreamEnd => Ok((written, st)),
231                 },
232                 Err(..) => Err(io::Error::new(
233                     io::ErrorKind::InvalidInput,
234                     "corrupt deflate stream",
235                 )),
236             };
237         }
238     }
239 
dump(&mut self) -> io::Result<()>240     fn dump(&mut self) -> io::Result<()> {
241         // TODO: should manage this buffer not with `drain` but probably more of
242         // a deque-like strategy.
243         while self.buf.len() > 0 {
244             let n = self.obj.as_mut().unwrap().write(&self.buf)?;
245             if n == 0 {
246                 return Err(io::ErrorKind::WriteZero.into());
247             }
248             self.buf.drain(..n);
249         }
250         Ok(())
251     }
252 }
253 
254 impl<W: Write, D: Ops> Write for Writer<W, D> {
write(&mut self, buf: &[u8]) -> io::Result<usize>255     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
256         self.write_with_status(buf).map(|res| res.0)
257     }
258 
flush(&mut self) -> io::Result<()>259     fn flush(&mut self) -> io::Result<()> {
260         self.data
261             .run_vec(&[], &mut self.buf, D::Flush::sync())
262             .unwrap();
263 
264         // Unfortunately miniz doesn't actually tell us when we're done with
265         // pulling out all the data from the internal stream. To remedy this we
266         // have to continually ask the stream for more memory until it doesn't
267         // give us a chunk of memory the same size as our own internal buffer,
268         // at which point we assume it's reached the end.
269         loop {
270             self.dump()?;
271             let before = self.data.total_out();
272             self.data
273                 .run_vec(&[], &mut self.buf, D::Flush::none())
274                 .unwrap();
275             if before == self.data.total_out() {
276                 break;
277             }
278         }
279 
280         self.obj.as_mut().unwrap().flush()
281     }
282 }
283 
284 impl<W: Write, D: Ops> Drop for Writer<W, D> {
drop(&mut self)285     fn drop(&mut self) {
286         if self.obj.is_some() {
287             let _ = self.finish();
288         }
289     }
290 }
291