• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Meta Platforms, Inc. and affiliates.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  */
9 #include "platform.h"   /* Large Files support, SET_BINARY_MODE */
10 #include "Pzstd.h"
11 #include "SkippableFrame.h"
12 #include "utils/FileSystem.h"
13 #include "utils/Portability.h"
14 #include "utils/Range.h"
15 #include "utils/ScopeGuard.h"
16 #include "utils/ThreadPool.h"
17 #include "utils/WorkQueue.h"
18 
19 #include <algorithm>
20 #include <chrono>
21 #include <cinttypes>
22 #include <cstddef>
23 #include <cstdio>
24 #include <memory>
25 #include <string>
26 
27 
28 namespace pzstd {
29 
30 namespace {
31 #ifdef _WIN32
32 const std::string nullOutput = "nul";
33 #else
34 const std::string nullOutput = "/dev/null";
35 #endif
36 }
37 
38 using std::size_t;
39 
fileSizeOrZero(const std::string & file)40 static std::uintmax_t fileSizeOrZero(const std::string &file) {
41   if (file == "-") {
42     return 0;
43   }
44   std::error_code ec;
45   auto size = file_size(file, ec);
46   if (ec) {
47     size = 0;
48   }
49   return size;
50 }
51 
handleOneInput(const Options & options,const std::string & inputFile,FILE * inputFd,const std::string & outputFile,FILE * outputFd,SharedState & state)52 static std::uint64_t handleOneInput(const Options &options,
53                              const std::string &inputFile,
54                              FILE* inputFd,
55                              const std::string &outputFile,
56                              FILE* outputFd,
57                              SharedState& state) {
58   auto inputSize = fileSizeOrZero(inputFile);
59   // WorkQueue outlives ThreadPool so in the case of error we are certain
60   // we don't accidentally try to call push() on it after it is destroyed
61   WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
62   std::uint64_t bytesRead;
63   std::uint64_t bytesWritten;
64   {
65     // Initialize the (de)compression thread pool with numThreads
66     ThreadPool executor(options.numThreads);
67     // Run the reader thread on an extra thread
68     ThreadPool readExecutor(1);
69     if (!options.decompress) {
70       // Add a job that reads the input and starts all the compression jobs
71       readExecutor.add(
72           [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
73             bytesRead = asyncCompressChunks(
74                 state,
75                 outs,
76                 executor,
77                 inputFd,
78                 inputSize,
79                 options.numThreads,
80                 options.determineParameters());
81           });
82       // Start writing
83       bytesWritten = writeFile(state, outs, outputFd, options.decompress);
84     } else {
85       // Add a job that reads the input and starts all the decompression jobs
86       readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
87         bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
88       });
89       // Start writing
90       bytesWritten = writeFile(state, outs, outputFd, options.decompress);
91     }
92   }
93   if (!state.errorHolder.hasError()) {
94     std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
95     std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
96     if (!options.decompress) {
97       double ratio = static_cast<double>(bytesWritten) /
98                      static_cast<double>(bytesRead + !bytesRead);
99       state.log(kLogInfo, "%-20s :%6.2f%%   (%6" PRIu64 " => %6" PRIu64
100                    " bytes, %s)\n",
101                    inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
102                    outputFileName.c_str());
103     } else {
104       state.log(kLogInfo, "%-20s: %" PRIu64 " bytes \n",
105                    inputFileName.c_str(),bytesWritten);
106     }
107   }
108   return bytesWritten;
109 }
110 
openInputFile(const std::string & inputFile,ErrorHolder & errorHolder)111 static FILE *openInputFile(const std::string &inputFile,
112                            ErrorHolder &errorHolder) {
113   if (inputFile == "-") {
114     SET_BINARY_MODE(stdin);
115     return stdin;
116   }
117   // Check if input file is a directory
118   {
119     std::error_code ec;
120     if (is_directory(inputFile, ec)) {
121       errorHolder.setError("Output file is a directory -- ignored");
122       return nullptr;
123     }
124   }
125   auto inputFd = std::fopen(inputFile.c_str(), "rb");
126   if (!errorHolder.check(inputFd != nullptr, "Failed to open input file")) {
127     return nullptr;
128   }
129   return inputFd;
130 }
131 
openOutputFile(const Options & options,const std::string & outputFile,SharedState & state)132 static FILE *openOutputFile(const Options &options,
133                             const std::string &outputFile,
134                             SharedState& state) {
135   if (outputFile == "-") {
136     SET_BINARY_MODE(stdout);
137     return stdout;
138   }
139   // Check if the output file exists and then open it
140   if (!options.overwrite && outputFile != nullOutput) {
141     auto outputFd = std::fopen(outputFile.c_str(), "rb");
142     if (outputFd != nullptr) {
143       std::fclose(outputFd);
144       if (!state.log.logsAt(kLogInfo)) {
145         state.errorHolder.setError("Output file exists");
146         return nullptr;
147       }
148       state.log(
149           kLogInfo,
150           "pzstd: %s already exists; do you wish to overwrite (y/n) ? ",
151           outputFile.c_str());
152       int c = getchar();
153       if (c != 'y' && c != 'Y') {
154         state.errorHolder.setError("Not overwritten");
155         return nullptr;
156       }
157     }
158   }
159   auto outputFd = std::fopen(outputFile.c_str(), "wb");
160   if (!state.errorHolder.check(
161           outputFd != nullptr, "Failed to open output file")) {
162     return nullptr;
163   }
164   return outputFd;
165 }
166 
pzstdMain(const Options & options)167 int pzstdMain(const Options &options) {
168   int returnCode = 0;
169   SharedState state(options);
170   for (const auto& input : options.inputFiles) {
171     // Setup the shared state
172     auto printErrorGuard = makeScopeGuard([&] {
173       if (state.errorHolder.hasError()) {
174         returnCode = 1;
175         state.log(kLogError, "pzstd: %s: %s.\n", input.c_str(),
176                   state.errorHolder.getError().c_str());
177       }
178     });
179     // Open the input file
180     auto inputFd = openInputFile(input, state.errorHolder);
181     if (inputFd == nullptr) {
182       continue;
183     }
184     auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
185     // Open the output file
186     auto outputFile = options.getOutputFile(input);
187     if (!state.errorHolder.check(outputFile != "",
188                            "Input file does not have extension .zst")) {
189       continue;
190     }
191     auto outputFd = openOutputFile(options, outputFile, state);
192     if (outputFd == nullptr) {
193       continue;
194     }
195     auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
196     // (de)compress the file
197     handleOneInput(options, input, inputFd, outputFile, outputFd, state);
198     if (state.errorHolder.hasError()) {
199       continue;
200     }
201     // Delete the input file if necessary
202     if (!options.keepSource) {
203       // Be sure that we are done and have written everything before we delete
204       if (!state.errorHolder.check(std::fclose(inputFd) == 0,
205                              "Failed to close input file")) {
206         continue;
207       }
208       closeInputGuard.dismiss();
209       if (!state.errorHolder.check(std::fclose(outputFd) == 0,
210                              "Failed to close output file")) {
211         continue;
212       }
213       closeOutputGuard.dismiss();
214       if (std::remove(input.c_str()) != 0) {
215         state.errorHolder.setError("Failed to remove input file");
216         continue;
217       }
218     }
219   }
220   // Returns 1 if any of the files failed to (de)compress.
221   return returnCode;
222 }
223 
224 /// Construct a `ZSTD_inBuffer` that points to the data in `buffer`.
makeZstdInBuffer(const Buffer & buffer)225 static ZSTD_inBuffer makeZstdInBuffer(const Buffer& buffer) {
226   return ZSTD_inBuffer{buffer.data(), buffer.size(), 0};
227 }
228 
229 /**
230  * Advance `buffer` and `inBuffer` by the amount of data read, as indicated by
231  * `inBuffer.pos`.
232  */
advance(Buffer & buffer,ZSTD_inBuffer & inBuffer)233 void advance(Buffer& buffer, ZSTD_inBuffer& inBuffer) {
234   auto pos = inBuffer.pos;
235   inBuffer.src = static_cast<const unsigned char*>(inBuffer.src) + pos;
236   inBuffer.size -= pos;
237   inBuffer.pos = 0;
238   return buffer.advance(pos);
239 }
240 
241 /// Construct a `ZSTD_outBuffer` that points to the data in `buffer`.
makeZstdOutBuffer(Buffer & buffer)242 static ZSTD_outBuffer makeZstdOutBuffer(Buffer& buffer) {
243   return ZSTD_outBuffer{buffer.data(), buffer.size(), 0};
244 }
245 
246 /**
247  * Split `buffer` and advance `outBuffer` by the amount of data written, as
248  * indicated by `outBuffer.pos`.
249  */
split(Buffer & buffer,ZSTD_outBuffer & outBuffer)250 Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
251   auto pos = outBuffer.pos;
252   outBuffer.dst = static_cast<unsigned char*>(outBuffer.dst) + pos;
253   outBuffer.size -= pos;
254   outBuffer.pos = 0;
255   return buffer.splitAt(pos);
256 }
257 
258 /**
259  * Stream chunks of input from `in`, compress it, and stream it out to `out`.
260  *
261  * @param state        The shared state
262  * @param in           Queue that we `pop()` input buffers from
263  * @param out          Queue that we `push()` compressed output buffers to
264  * @param maxInputSize An upper bound on the size of the input
265  */
compress(SharedState & state,std::shared_ptr<BufferWorkQueue> in,std::shared_ptr<BufferWorkQueue> out,size_t maxInputSize)266 static void compress(
267     SharedState& state,
268     std::shared_ptr<BufferWorkQueue> in,
269     std::shared_ptr<BufferWorkQueue> out,
270     size_t maxInputSize) {
271   auto& errorHolder = state.errorHolder;
272   auto guard = makeScopeGuard([&] {
273     in->finish();
274     out->finish();
275   });
276   // Initialize the CCtx
277   auto ctx = state.cStreamPool->get();
278   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
279     return;
280   }
281   {
282     auto err = ZSTD_CCtx_reset(ctx.get(), ZSTD_reset_session_only);
283     if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
284       return;
285     }
286   }
287 
288   // Allocate space for the result
289   auto outBuffer = Buffer(ZSTD_compressBound(maxInputSize));
290   auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
291   {
292     Buffer inBuffer;
293     // Read a buffer in from the input queue
294     while (in->pop(inBuffer) && !errorHolder.hasError()) {
295       auto zstdInBuffer = makeZstdInBuffer(inBuffer);
296       // Compress the whole buffer and send it to the output queue
297       while (!inBuffer.empty() && !errorHolder.hasError()) {
298         if (!errorHolder.check(
299                 !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
300           return;
301         }
302         // Compress
303         auto err =
304             ZSTD_compressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
305         if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
306           return;
307         }
308         // Split the compressed data off outBuffer and pass to the output queue
309         out->push(split(outBuffer, zstdOutBuffer));
310         // Forget about the data we already compressed
311         advance(inBuffer, zstdInBuffer);
312       }
313     }
314   }
315   // Write the epilog
316   size_t bytesLeft;
317   do {
318     if (!errorHolder.check(
319             !outBuffer.empty(), "ZSTD_compressBound() was too small")) {
320       return;
321     }
322     bytesLeft = ZSTD_endStream(ctx.get(), &zstdOutBuffer);
323     if (!errorHolder.check(
324             !ZSTD_isError(bytesLeft), ZSTD_getErrorName(bytesLeft))) {
325       return;
326     }
327     out->push(split(outBuffer, zstdOutBuffer));
328   } while (bytesLeft != 0 && !errorHolder.hasError());
329 }
330 
331 /**
332  * Calculates how large each independently compressed frame should be.
333  *
334  * @param size       The size of the source if known, 0 otherwise
335  * @param numThreads The number of threads available to run compression jobs on
336  * @param params     The zstd parameters to be used for compression
337  */
calculateStep(std::uintmax_t size,size_t numThreads,const ZSTD_parameters & params)338 static size_t calculateStep(
339     std::uintmax_t size,
340     size_t numThreads,
341     const ZSTD_parameters &params) {
342   (void)size;
343   (void)numThreads;
344   // Not validated to work correctly for window logs > 23.
345   // It will definitely fail if windowLog + 2 is >= 4GB because
346   // the skippable frame can only store sizes up to 4GB.
347   assert(params.cParams.windowLog <= 23);
348   return size_t{1} << (params.cParams.windowLog + 2);
349 }
350 
351 namespace {
352 enum class FileStatus { Continue, Done, Error };
353 /// Determines the status of the file descriptor `fd`.
fileStatus(FILE * fd)354 FileStatus fileStatus(FILE* fd) {
355   if (std::feof(fd)) {
356     return FileStatus::Done;
357   } else if (std::ferror(fd)) {
358     return FileStatus::Error;
359   }
360   return FileStatus::Continue;
361 }
362 } // anonymous namespace
363 
364 /**
365  * Reads `size` data in chunks of `chunkSize` and puts it into `queue`.
366  * Will read less if an error or EOF occurs.
367  * Returns the status of the file after all of the reads have occurred.
368  */
369 static FileStatus
readData(BufferWorkQueue & queue,size_t chunkSize,size_t size,FILE * fd,std::uint64_t * totalBytesRead)370 readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
371          std::uint64_t *totalBytesRead) {
372   Buffer buffer(size);
373   while (!buffer.empty()) {
374     auto bytesRead =
375         std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
376     *totalBytesRead += bytesRead;
377     queue.push(buffer.splitAt(bytesRead));
378     auto status = fileStatus(fd);
379     if (status != FileStatus::Continue) {
380       return status;
381     }
382   }
383   return FileStatus::Continue;
384 }
385 
asyncCompressChunks(SharedState & state,WorkQueue<std::shared_ptr<BufferWorkQueue>> & chunks,ThreadPool & executor,FILE * fd,std::uintmax_t size,size_t numThreads,ZSTD_parameters params)386 std::uint64_t asyncCompressChunks(
387     SharedState& state,
388     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
389     ThreadPool& executor,
390     FILE* fd,
391     std::uintmax_t size,
392     size_t numThreads,
393     ZSTD_parameters params) {
394   auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
395   std::uint64_t bytesRead = 0;
396 
397   // Break the input up into chunks of size `step` and compress each chunk
398   // independently.
399   size_t step = calculateStep(size, numThreads, params);
400   state.log(kLogDebug, "Chosen frame size: %zu\n", step);
401   auto status = FileStatus::Continue;
402   while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
403     // Make a new input queue that we will put the chunk's input data into.
404     auto in = std::make_shared<BufferWorkQueue>();
405     auto inGuard = makeScopeGuard([&] { in->finish(); });
406     // Make a new output queue that compress will put the compressed data into.
407     auto out = std::make_shared<BufferWorkQueue>();
408     // Start compression in the thread pool
409     executor.add([&state, in, out, step] {
410       return compress(
411           state, std::move(in), std::move(out), step);
412     });
413     // Pass the output queue to the writer thread.
414     chunks.push(std::move(out));
415     state.log(kLogVerbose, "%s\n", "Starting a new frame");
416     // Fill the input queue for the compression job we just started
417     status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
418   }
419   state.errorHolder.check(status != FileStatus::Error, "Error reading input");
420   return bytesRead;
421 }
422 
423 /**
424  * Decompress a frame, whose data is streamed into `in`, and stream the output
425  * to `out`.
426  *
427  * @param state        The shared state
428  * @param in           Queue that we `pop()` input buffers from. It contains
429  *                      exactly one compressed frame.
430  * @param out          Queue that we `push()` decompressed output buffers to
431  */
decompress(SharedState & state,std::shared_ptr<BufferWorkQueue> in,std::shared_ptr<BufferWorkQueue> out)432 static void decompress(
433     SharedState& state,
434     std::shared_ptr<BufferWorkQueue> in,
435     std::shared_ptr<BufferWorkQueue> out) {
436   auto& errorHolder = state.errorHolder;
437   auto guard = makeScopeGuard([&] {
438     in->finish();
439     out->finish();
440   });
441   // Initialize the DCtx
442   auto ctx = state.dStreamPool->get();
443   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
444     return;
445   }
446   {
447     auto err = ZSTD_DCtx_reset(ctx.get(), ZSTD_reset_session_only);
448     if (!errorHolder.check(!ZSTD_isError(err), ZSTD_getErrorName(err))) {
449       return;
450     }
451   }
452 
453   const size_t outSize = ZSTD_DStreamOutSize();
454   Buffer inBuffer;
455   size_t returnCode = 0;
456   // Read a buffer in from the input queue
457   while (in->pop(inBuffer) && !errorHolder.hasError()) {
458     auto zstdInBuffer = makeZstdInBuffer(inBuffer);
459     // Decompress the whole buffer and send it to the output queue
460     while (!inBuffer.empty() && !errorHolder.hasError()) {
461       // Allocate a buffer with at least outSize bytes.
462       Buffer outBuffer(outSize);
463       auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
464       // Decompress
465       returnCode =
466           ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
467       if (!errorHolder.check(
468               !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
469         return;
470       }
471       // Pass the buffer with the decompressed data to the output queue
472       out->push(split(outBuffer, zstdOutBuffer));
473       // Advance past the input we already read
474       advance(inBuffer, zstdInBuffer);
475       if (returnCode == 0) {
476         // The frame is over, prepare to (maybe) start a new frame
477         ZSTD_initDStream(ctx.get());
478       }
479     }
480   }
481   if (!errorHolder.check(returnCode <= 1, "Incomplete block")) {
482     return;
483   }
484   // We've given ZSTD_decompressStream all of our data, but there may still
485   // be data to read.
486   while (returnCode == 1) {
487     // Allocate a buffer with at least outSize bytes.
488     Buffer outBuffer(outSize);
489     auto zstdOutBuffer = makeZstdOutBuffer(outBuffer);
490     // Pass in no input.
491     ZSTD_inBuffer zstdInBuffer{nullptr, 0, 0};
492     // Decompress
493     returnCode =
494         ZSTD_decompressStream(ctx.get(), &zstdOutBuffer, &zstdInBuffer);
495     if (!errorHolder.check(
496             !ZSTD_isError(returnCode), ZSTD_getErrorName(returnCode))) {
497       return;
498     }
499     // Pass the buffer with the decompressed data to the output queue
500     out->push(split(outBuffer, zstdOutBuffer));
501   }
502 }
503 
asyncDecompressFrames(SharedState & state,WorkQueue<std::shared_ptr<BufferWorkQueue>> & frames,ThreadPool & executor,FILE * fd)504 std::uint64_t asyncDecompressFrames(
505     SharedState& state,
506     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
507     ThreadPool& executor,
508     FILE* fd) {
509   auto framesGuard = makeScopeGuard([&] { frames.finish(); });
510   std::uint64_t totalBytesRead = 0;
511 
512   // Split the source up into its component frames.
513   // If we find our recognized skippable frame we know the next frames size
514   // which means that we can decompress each standard frame in independently.
515   // Otherwise, we will decompress using only one decompression task.
516   const size_t chunkSize = ZSTD_DStreamInSize();
517   auto status = FileStatus::Continue;
518   while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
519     // Make a new input queue that we will put the frames's bytes into.
520     auto in = std::make_shared<BufferWorkQueue>();
521     auto inGuard = makeScopeGuard([&] { in->finish(); });
522     // Make a output queue that decompress will put the decompressed data into
523     auto out = std::make_shared<BufferWorkQueue>();
524 
525     size_t frameSize;
526     {
527       // Calculate the size of the next frame.
528       // frameSize is 0 if the frame info can't be decoded.
529       Buffer buffer(SkippableFrame::kSize);
530       auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
531       totalBytesRead += bytesRead;
532       status = fileStatus(fd);
533       if (bytesRead == 0 && status != FileStatus::Continue) {
534         break;
535       }
536       buffer.subtract(buffer.size() - bytesRead);
537       frameSize = SkippableFrame::tryRead(buffer.range());
538       in->push(std::move(buffer));
539     }
540     if (frameSize == 0) {
541       // We hit a non SkippableFrame, so this will be the last job.
542       // Make sure that we don't use too much memory
543       in->setMaxSize(64);
544       out->setMaxSize(64);
545     }
546     // Start decompression in the thread pool
547     executor.add([&state, in, out] {
548       return decompress(state, std::move(in), std::move(out));
549     });
550     // Pass the output queue to the writer thread
551     frames.push(std::move(out));
552     if (frameSize == 0) {
553       // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
554       // Pass the rest of the source to this decompression task
555       state.log(kLogVerbose, "%s\n",
556           "Input not in pzstd format, falling back to serial decompression");
557       while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
558         status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
559       }
560       break;
561     }
562     state.log(kLogVerbose, "Decompressing a frame of size %zu", frameSize);
563     // Fill the input queue for the decompression job we just started
564     status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
565   }
566   state.errorHolder.check(status != FileStatus::Error, "Error reading input");
567   return totalBytesRead;
568 }
569 
570 /// Write `data` to `fd`, returns true iff success.
writeData(ByteRange data,FILE * fd)571 static bool writeData(ByteRange data, FILE* fd) {
572   while (!data.empty()) {
573     data.advance(std::fwrite(data.begin(), 1, data.size(), fd));
574     if (std::ferror(fd)) {
575       return false;
576     }
577   }
578   return true;
579 }
580 
writeFile(SharedState & state,WorkQueue<std::shared_ptr<BufferWorkQueue>> & outs,FILE * outputFd,bool decompress)581 std::uint64_t writeFile(
582     SharedState& state,
583     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
584     FILE* outputFd,
585     bool decompress) {
586   auto& errorHolder = state.errorHolder;
587   auto outsFinishGuard = makeScopeGuard([&outs] { outs.finish(); });
588   auto lineClearGuard = makeScopeGuard([&state] {
589     state.log.clear(kLogInfo);
590   });
591   std::uint64_t bytesWritten = 0;
592   std::shared_ptr<BufferWorkQueue> out;
593   // Grab the output queue for each decompression job (in order).
594   while (outs.pop(out)) {
595     auto outFinishGuard = makeScopeGuard([&out] { out->finish(); });
596     if (errorHolder.hasError()) {
597       continue;
598     }
599     if (!decompress) {
600       // If we are compressing and want to write skippable frames we can't
601       // start writing before compression is done because we need to know the
602       // compressed size.
603       // Wait for the compressed size to be available and write skippable frame
604       assert(uint64_t(out->size()) < uint64_t(1) << 32);
605       SkippableFrame frame(uint32_t(out->size()));
606       if (!writeData(frame.data(), outputFd)) {
607         errorHolder.setError("Failed to write output");
608         return bytesWritten;
609       }
610       bytesWritten += frame.kSize;
611     }
612     // For each chunk of the frame: Pop it from the queue and write it
613     Buffer buffer;
614     while (out->pop(buffer) && !errorHolder.hasError()) {
615       if (!writeData(buffer.range(), outputFd)) {
616         errorHolder.setError("Failed to write output");
617         return bytesWritten;
618       }
619       bytesWritten += buffer.size();
620       state.log.update(kLogInfo, "Written: %u MB   ",
621                 static_cast<std::uint32_t>(bytesWritten >> 20));
622     }
623   }
624   return bytesWritten;
625 }
626 }
627