1 use std::io;
2 use std::io::prelude::*;
3 use std::mem;
4
5 use crate::{Compress, Decompress, DecompressError, FlushCompress, FlushDecompress, Status};
6
7 #[derive(Debug)]
8 pub struct Writer<W: Write, D: Ops> {
9 obj: Option<W>,
10 pub data: D,
11 buf: Vec<u8>,
12 }
13
14 pub trait Ops {
15 type Flush: Flush;
total_in(&self) -> u6416 fn total_in(&self) -> u64;
total_out(&self) -> u6417 fn total_out(&self) -> u64;
run( &mut self, input: &[u8], output: &mut [u8], flush: Self::Flush, ) -> Result<Status, DecompressError>18 fn run(
19 &mut self,
20 input: &[u8],
21 output: &mut [u8],
22 flush: Self::Flush,
23 ) -> Result<Status, DecompressError>;
run_vec( &mut self, input: &[u8], output: &mut Vec<u8>, flush: Self::Flush, ) -> Result<Status, DecompressError>24 fn run_vec(
25 &mut self,
26 input: &[u8],
27 output: &mut Vec<u8>,
28 flush: Self::Flush,
29 ) -> Result<Status, DecompressError>;
30 }
31
32 impl Ops for Compress {
33 type Flush = FlushCompress;
total_in(&self) -> u6434 fn total_in(&self) -> u64 {
35 self.total_in()
36 }
total_out(&self) -> u6437 fn total_out(&self) -> u64 {
38 self.total_out()
39 }
run( &mut self, input: &[u8], output: &mut [u8], flush: FlushCompress, ) -> Result<Status, DecompressError>40 fn run(
41 &mut self,
42 input: &[u8],
43 output: &mut [u8],
44 flush: FlushCompress,
45 ) -> Result<Status, DecompressError> {
46 Ok(self.compress(input, output, flush).unwrap())
47 }
run_vec( &mut self, input: &[u8], output: &mut Vec<u8>, flush: FlushCompress, ) -> Result<Status, DecompressError>48 fn run_vec(
49 &mut self,
50 input: &[u8],
51 output: &mut Vec<u8>,
52 flush: FlushCompress,
53 ) -> Result<Status, DecompressError> {
54 Ok(self.compress_vec(input, output, flush).unwrap())
55 }
56 }
57
58 impl Ops for Decompress {
59 type Flush = FlushDecompress;
total_in(&self) -> u6460 fn total_in(&self) -> u64 {
61 self.total_in()
62 }
total_out(&self) -> u6463 fn total_out(&self) -> u64 {
64 self.total_out()
65 }
run( &mut self, input: &[u8], output: &mut [u8], flush: FlushDecompress, ) -> Result<Status, DecompressError>66 fn run(
67 &mut self,
68 input: &[u8],
69 output: &mut [u8],
70 flush: FlushDecompress,
71 ) -> Result<Status, DecompressError> {
72 self.decompress(input, output, flush)
73 }
run_vec( &mut self, input: &[u8], output: &mut Vec<u8>, flush: FlushDecompress, ) -> Result<Status, DecompressError>74 fn run_vec(
75 &mut self,
76 input: &[u8],
77 output: &mut Vec<u8>,
78 flush: FlushDecompress,
79 ) -> Result<Status, DecompressError> {
80 self.decompress_vec(input, output, flush)
81 }
82 }
83
84 pub trait Flush {
none() -> Self85 fn none() -> Self;
sync() -> Self86 fn sync() -> Self;
finish() -> Self87 fn finish() -> Self;
88 }
89
90 impl Flush for FlushCompress {
none() -> Self91 fn none() -> Self {
92 FlushCompress::None
93 }
94
sync() -> Self95 fn sync() -> Self {
96 FlushCompress::Sync
97 }
98
finish() -> Self99 fn finish() -> Self {
100 FlushCompress::Finish
101 }
102 }
103
104 impl Flush for FlushDecompress {
none() -> Self105 fn none() -> Self {
106 FlushDecompress::None
107 }
108
sync() -> Self109 fn sync() -> Self {
110 FlushDecompress::Sync
111 }
112
finish() -> Self113 fn finish() -> Self {
114 FlushDecompress::Finish
115 }
116 }
117
read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize> where R: BufRead, D: Ops,118 pub fn read<R, D>(obj: &mut R, data: &mut D, dst: &mut [u8]) -> io::Result<usize>
119 where
120 R: BufRead,
121 D: Ops,
122 {
123 loop {
124 let (read, consumed, ret, eof);
125 {
126 let input = obj.fill_buf()?;
127 eof = input.is_empty();
128 let before_out = data.total_out();
129 let before_in = data.total_in();
130 let flush = if eof {
131 D::Flush::finish()
132 } else {
133 D::Flush::none()
134 };
135 ret = data.run(input, dst, flush);
136 read = (data.total_out() - before_out) as usize;
137 consumed = (data.total_in() - before_in) as usize;
138 }
139 obj.consume(consumed);
140
141 match ret {
142 // If we haven't ready any data and we haven't hit EOF yet,
143 // then we need to keep asking for more data because if we
144 // return that 0 bytes of data have been read then it will
145 // be interpreted as EOF.
146 Ok(Status::Ok) | Ok(Status::BufError) if read == 0 && !eof && dst.len() > 0 => continue,
147 Ok(Status::Ok) | Ok(Status::BufError) | Ok(Status::StreamEnd) => return Ok(read),
148
149 Err(..) => {
150 return Err(io::Error::new(
151 io::ErrorKind::InvalidInput,
152 "corrupt deflate stream",
153 ))
154 }
155 }
156 }
157 }
158
159 impl<W: Write, D: Ops> Writer<W, D> {
new(w: W, d: D) -> Writer<W, D>160 pub fn new(w: W, d: D) -> Writer<W, D> {
161 Writer {
162 obj: Some(w),
163 data: d,
164 buf: Vec::with_capacity(32 * 1024),
165 }
166 }
167
finish(&mut self) -> io::Result<()>168 pub fn finish(&mut self) -> io::Result<()> {
169 loop {
170 self.dump()?;
171
172 let before = self.data.total_out();
173 self.data.run_vec(&[], &mut self.buf, D::Flush::finish())?;
174 if before == self.data.total_out() {
175 return Ok(());
176 }
177 }
178 }
179
replace(&mut self, w: W) -> W180 pub fn replace(&mut self, w: W) -> W {
181 self.buf.truncate(0);
182 mem::replace(self.get_mut(), w)
183 }
184
get_ref(&self) -> &W185 pub fn get_ref(&self) -> &W {
186 self.obj.as_ref().unwrap()
187 }
188
get_mut(&mut self) -> &mut W189 pub fn get_mut(&mut self) -> &mut W {
190 self.obj.as_mut().unwrap()
191 }
192
193 // Note that this should only be called if the outer object is just about
194 // to be consumed!
195 //
196 // (e.g. an implementation of `into_inner`)
take_inner(&mut self) -> W197 pub fn take_inner(&mut self) -> W {
198 self.obj.take().unwrap()
199 }
200
is_present(&self) -> bool201 pub fn is_present(&self) -> bool {
202 self.obj.is_some()
203 }
204
205 // Returns total written bytes and status of underlying codec
write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)>206 pub(crate) fn write_with_status(&mut self, buf: &[u8]) -> io::Result<(usize, Status)> {
207 // miniz isn't guaranteed to actually write any of the buffer provided,
208 // it may be in a flushing mode where it's just giving us data before
209 // we're actually giving it any data. We don't want to spuriously return
210 // `Ok(0)` when possible as it will cause calls to write_all() to fail.
211 // As a result we execute this in a loop to ensure that we try our
212 // darndest to write the data.
213 loop {
214 self.dump()?;
215
216 let before_in = self.data.total_in();
217 let ret = self.data.run_vec(buf, &mut self.buf, D::Flush::none());
218 let written = (self.data.total_in() - before_in) as usize;
219
220 let is_stream_end = match ret {
221 Ok(Status::StreamEnd) => true,
222 _ => false,
223 };
224
225 if buf.len() > 0 && written == 0 && ret.is_ok() && !is_stream_end {
226 continue;
227 }
228 return match ret {
229 Ok(st) => match st {
230 Status::Ok | Status::BufError | Status::StreamEnd => Ok((written, st)),
231 },
232 Err(..) => Err(io::Error::new(
233 io::ErrorKind::InvalidInput,
234 "corrupt deflate stream",
235 )),
236 };
237 }
238 }
239
dump(&mut self) -> io::Result<()>240 fn dump(&mut self) -> io::Result<()> {
241 // TODO: should manage this buffer not with `drain` but probably more of
242 // a deque-like strategy.
243 while self.buf.len() > 0 {
244 let n = self.obj.as_mut().unwrap().write(&self.buf)?;
245 if n == 0 {
246 return Err(io::ErrorKind::WriteZero.into());
247 }
248 self.buf.drain(..n);
249 }
250 Ok(())
251 }
252 }
253
254 impl<W: Write, D: Ops> Write for Writer<W, D> {
write(&mut self, buf: &[u8]) -> io::Result<usize>255 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
256 self.write_with_status(buf).map(|res| res.0)
257 }
258
flush(&mut self) -> io::Result<()>259 fn flush(&mut self) -> io::Result<()> {
260 self.data
261 .run_vec(&[], &mut self.buf, D::Flush::sync())
262 .unwrap();
263
264 // Unfortunately miniz doesn't actually tell us when we're done with
265 // pulling out all the data from the internal stream. To remedy this we
266 // have to continually ask the stream for more memory until it doesn't
267 // give us a chunk of memory the same size as our own internal buffer,
268 // at which point we assume it's reached the end.
269 loop {
270 self.dump()?;
271 let before = self.data.total_out();
272 self.data
273 .run_vec(&[], &mut self.buf, D::Flush::none())
274 .unwrap();
275 if before == self.data.total_out() {
276 break;
277 }
278 }
279
280 self.obj.as_mut().unwrap().flush()
281 }
282 }
283
284 impl<W: Write, D: Ops> Drop for Writer<W, D> {
drop(&mut self)285 fn drop(&mut self) {
286 if self.obj.is_some() {
287 let _ = self.finish();
288 }
289 }
290 }
291