1 /* 2 * Copyright (c) 2016-present, Facebook, Inc. 3 * All rights reserved. 4 * 5 * This source code is licensed under both the BSD-style license (found in the 6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7 * in the COPYING file in the root directory of this source tree). 8 */ 9 #pragma once 10 11 #include "ErrorHolder.h" 12 #include "Logging.h" 13 #include "Options.h" 14 #include "utils/Buffer.h" 15 #include "utils/Range.h" 16 #include "utils/ResourcePool.h" 17 #include "utils/ThreadPool.h" 18 #include "utils/WorkQueue.h" 19 #define ZSTD_STATIC_LINKING_ONLY 20 #include "zstd.h" 21 #undef ZSTD_STATIC_LINKING_ONLY 22 23 #include <cstddef> 24 #include <cstdint> 25 #include <memory> 26 27 namespace pzstd { 28 /** 29 * Runs pzstd with `options` and returns the number of bytes written. 30 * An error occurred if `errorHandler.hasError()`. 31 * 32 * @param options The pzstd options to use for (de)compression 33 * @returns 0 upon success and non-zero on failure. 34 */ 35 int pzstdMain(const Options& options); 36 37 class SharedState { 38 public: SharedState(const Options & options)39 SharedState(const Options& options) : log(options.verbosity) { 40 if (!options.decompress) { 41 auto parameters = options.determineParameters(); 42 cStreamPool.reset(new ResourcePool<ZSTD_CStream>{ 43 [this, parameters]() -> ZSTD_CStream* { 44 this->log(kLogVerbose, "%s\n", "Creating new ZSTD_CStream"); 45 auto zcs = ZSTD_createCStream(); 46 if (zcs) { 47 auto err = ZSTD_initCStream_advanced( 48 zcs, nullptr, 0, parameters, 0); 49 if (ZSTD_isError(err)) { 50 ZSTD_freeCStream(zcs); 51 return nullptr; 52 } 53 } 54 return zcs; 55 }, 56 [](ZSTD_CStream *zcs) { 57 ZSTD_freeCStream(zcs); 58 }}); 59 } else { 60 dStreamPool.reset(new ResourcePool<ZSTD_DStream>{ 61 [this]() -> ZSTD_DStream* { 62 this->log(kLogVerbose, "%s\n", "Creating new ZSTD_DStream"); 63 auto zds = ZSTD_createDStream(); 64 if (zds) { 65 auto err = ZSTD_initDStream(zds); 66 if (ZSTD_isError(err)) { 67 ZSTD_freeDStream(zds); 68 return nullptr; 69 } 70 } 71 return zds; 72 }, 73 [](ZSTD_DStream *zds) { 74 ZSTD_freeDStream(zds); 75 }}); 76 } 77 } 78 ~SharedState()79 ~SharedState() { 80 // The resource pools have references to this, so destroy them first. 81 cStreamPool.reset(); 82 dStreamPool.reset(); 83 } 84 85 Logger log; 86 ErrorHolder errorHolder; 87 std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool; 88 std::unique_ptr<ResourcePool<ZSTD_DStream>> dStreamPool; 89 }; 90 91 /** 92 * Streams input from `fd`, breaks input up into chunks, and compresses each 93 * chunk independently. Output of each chunk gets streamed to a queue, and 94 * the output queues get put into `chunks` in order. 95 * 96 * @param state The shared state 97 * @param chunks Each compression jobs output queue gets `pushed()` here 98 * as soon as it is available 99 * @param executor The thread pool to run compression jobs in 100 * @param fd The input file descriptor 101 * @param size The size of the input file if known, 0 otherwise 102 * @param numThreads The number of threads in the thread pool 103 * @param parameters The zstd parameters to use for compression 104 * @returns The number of bytes read from the file 105 */ 106 std::uint64_t asyncCompressChunks( 107 SharedState& state, 108 WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks, 109 ThreadPool& executor, 110 FILE* fd, 111 std::uintmax_t size, 112 std::size_t numThreads, 113 ZSTD_parameters parameters); 114 115 /** 116 * Streams input from `fd`. If pzstd headers are available it breaks the input 117 * up into independent frames. It sends each frame to an independent 118 * decompression job. Output of each frame gets streamed to a queue, and 119 * the output queues get put into `frames` in order. 120 * 121 * @param state The shared state 122 * @param frames Each decompression jobs output queue gets `pushed()` here 123 * as soon as it is available 124 * @param executor The thread pool to run compression jobs in 125 * @param fd The input file descriptor 126 * @returns The number of bytes read from the file 127 */ 128 std::uint64_t asyncDecompressFrames( 129 SharedState& state, 130 WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames, 131 ThreadPool& executor, 132 FILE* fd); 133 134 /** 135 * Streams input in from each queue in `outs` in order, and writes the data to 136 * `outputFd`. 137 * 138 * @param state The shared state 139 * @param outs A queue of output queues, one for each 140 * (de)compression job. 141 * @param outputFd The file descriptor to write to 142 * @param decompress Are we decompressing? 143 * @returns The number of bytes written 144 */ 145 std::uint64_t writeFile( 146 SharedState& state, 147 WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs, 148 FILE* outputFd, 149 bool decompress); 150 } 151