1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef PKG_STREAM_H 16 #define PKG_STREAM_H 17 #ifndef __WIN32 18 #include <sys/mman.h> 19 #endif 20 #include <atomic> 21 #include "pkg_manager.h" 22 #include "pkg_utils.h" 23 #include "ring_buffer/ring_buffer.h" 24 25 namespace Hpackage { 26 class PkgStreamImpl : public PkgStream { 27 public: PkgStreamImpl(PkgManager::PkgManagerPtr pkgManager,const std::string fileName)28 explicit PkgStreamImpl(PkgManager::PkgManagerPtr pkgManager, const std::string fileName) 29 : fileName_(fileName), refCount_(0), pkgManager_(pkgManager) {} 30 ~PkgStreamImpl()31 virtual ~PkgStreamImpl() {} 32 Read(PkgBuffer & data,size_t start,size_t needRead,size_t & readLen)33 int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override 34 { 35 UNUSED(data); 36 UNUSED(start); 37 UNUSED(readLen); 38 UNUSED(needRead); 39 return PKG_SUCCESS; 40 } 41 GetBuffer(PkgBuffer & buffer)42 int32_t GetBuffer(PkgBuffer &buffer) const override 43 { 44 buffer.length = 0; 45 buffer.buffer = nullptr; 46 return PKG_SUCCESS; 47 } 48 Write(const PkgBuffer & data,size_t size,size_t start)49 int32_t Write(const PkgBuffer &data, size_t size, size_t start) override 50 { 51 UNUSED(data); 52 UNUSED(size); 53 UNUSED(start); 54 return PKG_SUCCESS; 55 } 56 57 virtual int32_t Seek(long int offset, int whence) = 0; 58 Flush(size_t size)59 int32_t Flush(size_t size) override 60 { 61 UNUSED(size); 62 return PKG_SUCCESS; 63 } 64 65 const std::string GetFileName() const override; 66 GetStreamType()67 int32_t GetStreamType() const override 68 { 69 return PkgStreamType_Read; 70 }; 71 72 void AddRef() override; 73 74 void DelRef() override; 75 76 bool IsRef() const override; 77 78 static PkgStreamPtr ConvertPkgStream(PkgManager::StreamPtr stream); 79 80 protected: 81 void PostDecodeProgress(int type, size_t writeDataLen, const void *context) const; 82 std::string fileName_; 83 84 private: 85 std::atomic_int refCount_; 86 PkgManager::PkgManagerPtr pkgManager_ = nullptr; 87 }; 88 89 class FileStream : public PkgStreamImpl { 90 public: FileStream(PkgManager::PkgManagerPtr pkgManager,const std::string fileName,FILE * stream,int32_t streamType)91 FileStream(PkgManager::PkgManagerPtr pkgManager, const std::string fileName, FILE *stream, int32_t streamType) 92 : PkgStreamImpl(pkgManager, fileName), stream_(stream), fileLength_(0), streamType_(streamType) {} 93 94 ~FileStream() override; 95 96 int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override; 97 98 int32_t Write(const PkgBuffer &data, size_t size, size_t start) override; 99 100 int32_t Seek(long int offset, int whence) override; 101 102 int32_t Flush(size_t size) override; 103 104 size_t GetFileLength() override; 105 GetStreamType()106 int32_t GetStreamType() const override 107 { 108 return streamType_; 109 } 110 111 private: 112 FILE *stream_; 113 size_t fileLength_; 114 int32_t streamType_; 115 std::recursive_mutex fileStreamLock_; 116 }; 117 118 class MemoryMapStream : public PkgStreamImpl { 119 public: 120 MemoryMapStream(PkgManager::PkgManagerPtr pkgManager, const std::string fileName, const PkgBuffer &buffer, PkgStreamImpl(pkgManager,fileName)121 int32_t streamType = PkgStreamType_MemoryMap) : PkgStreamImpl(pkgManager, fileName), memMap_(buffer.buffer), 122 memSize_(buffer.length), currOffset_(0), streamType_(streamType) {} 123 ~MemoryMapStream() override; 124 125 int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override; 126 127 int32_t Write(const PkgBuffer &data, size_t size, size_t start) override; 128 129 int32_t Seek(long int offset, int whence) override; 130 GetStreamType()131 int32_t GetStreamType() const override 132 { 133 return streamType_; 134 } 135 GetFileLength()136 size_t GetFileLength() override 137 { 138 return memSize_; 139 } 140 Flush(size_t size)141 int32_t Flush(size_t size) override 142 { 143 if (size != memSize_) { 144 PKG_LOGE("Flush size %zu local size:%zu", size, memSize_); 145 } 146 if (streamType_ == PkgStreamType_MemoryMap) { 147 msync(static_cast<void *>(memMap_), memSize_, MS_ASYNC); 148 } 149 currOffset_ = size; 150 return PKG_SUCCESS; 151 } 152 GetBuffer(PkgBuffer & buffer)153 int32_t GetBuffer(PkgBuffer &buffer) const override 154 { 155 buffer.buffer = memMap_; 156 buffer.length = memSize_; 157 return PKG_SUCCESS; 158 } 159 160 private: 161 uint8_t *memMap_; 162 size_t memSize_; 163 size_t currOffset_; 164 int32_t streamType_; 165 }; 166 167 class ProcessorStream : public PkgStreamImpl { 168 public: ProcessorStream(PkgManager::PkgManagerPtr pkgManager,const std::string fileName,ExtractFileProcessor processor,const void * context)169 ProcessorStream(PkgManager::PkgManagerPtr pkgManager, const std::string fileName, 170 ExtractFileProcessor processor, const void *context) 171 : PkgStreamImpl(pkgManager, fileName), processor_(processor), context_(context) {} 172 ~ProcessorStream()173 ~ProcessorStream() override {} 174 Read(PkgBuffer & data,size_t start,size_t needRead,size_t & readLen)175 int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override 176 { 177 UNUSED(data); 178 UNUSED(start); 179 UNUSED(readLen); 180 UNUSED(needRead); 181 return PKG_INVALID_STREAM; 182 } 183 Write(const PkgBuffer & data,size_t size,size_t start)184 int32_t Write(const PkgBuffer &data, size_t size, size_t start) override 185 { 186 if (processor_ == nullptr) { 187 PKG_LOGE("processor not exist"); 188 return PKG_INVALID_STREAM; 189 } 190 int ret = processor_(data, size, start, false, context_); 191 PostDecodeProgress(POST_TYPE_DECODE_PKG, size, nullptr); 192 return ret; 193 } 194 Seek(long int size,int whence)195 int32_t Seek(long int size, int whence) override 196 { 197 UNUSED(size); 198 UNUSED(whence); 199 return PKG_SUCCESS; 200 } 201 GetStreamType()202 int32_t GetStreamType() const override 203 { 204 return PkgStreamType_Process; 205 } 206 GetFileLength()207 size_t GetFileLength() override 208 { 209 return 0; 210 } 211 Flush(size_t size)212 int32_t Flush(size_t size) override 213 { 214 UNUSED(size); 215 if (processor_ == nullptr) { 216 PKG_LOGE("processor not exist"); 217 return PKG_INVALID_STREAM; 218 } 219 PkgBuffer data = {}; 220 return processor_(data, 0, 0, true, context_); 221 } 222 223 private: 224 ExtractFileProcessor processor_ = nullptr; 225 const void *context_; 226 }; 227 228 constexpr uint32_t MAX_FLOW_BUFFER_SIZE = 4 * 1024 * 1024; 229 230 class FlowDataStream : public Hpackage::PkgStreamImpl { 231 public: 232 FlowDataStream(Hpackage::PkgManager::PkgManagerPtr pkgManager, const std::string fileName, 233 const size_t fileSize, Updater::RingBuffer *buffer, int32_t streamType = PkgStreamImpl(pkgManager,fileName)234 PkgStreamType_FlowData) : PkgStreamImpl(pkgManager, fileName), fileLength_(fileSize), 235 ringBuf_(buffer), streamType_(streamType) {} ~FlowDataStream()236 ~FlowDataStream() override {} 237 238 int32_t Read(Hpackage::PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override; 239 240 int32_t Write(const Hpackage::PkgBuffer &data, size_t size, size_t start) override; 241 Seek(long int offset,int whence)242 int32_t Seek(long int offset, int whence) override 243 { 244 UNUSED(offset); 245 UNUSED(whence); 246 return Hpackage::PKG_INVALID_STREAM; 247 } 248 GetStreamType()249 int32_t GetStreamType() const override 250 { 251 return streamType_; 252 } 253 GetFileLength()254 size_t GetFileLength() override 255 { 256 return fileLength_; 257 } 258 Flush(size_t size)259 int32_t Flush(size_t size) override 260 { 261 UNUSED(size); 262 return Hpackage::PKG_INVALID_STREAM; 263 } 264 GetReadOffset()265 size_t GetReadOffset() const override 266 { 267 return readOffset_; 268 } 269 270 void Stop() override; 271 272 private: 273 int32_t ReadFromRingBuf(uint8_t *&buff, const uint32_t needLen, uint32_t &readLen); 274 275 size_t fileLength_ {}; 276 Updater::RingBuffer *ringBuf_ {}; 277 int32_t streamType_; 278 uint8_t buff_[MAX_FLOW_BUFFER_SIZE] = {0}; 279 uint32_t avail_ {}; 280 uint32_t bufOffset_ {}; 281 size_t readOffset_ {}; 282 size_t writeOffset_ {}; 283 }; 284 } // namespace Hpackage 285 #endif // PKG_STREAM_H 286