• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "stream_update.h"
17 
18 #include "log/log.h"
19 #include "package/package.h"
20 #include "package/pkg_manager.h"
21 #include "scope_guard.h"
22 #include "securec.h"
23 #include "updater/updater_const.h"
24 #include "utils.h"
25 #include <thread>
26 
27 namespace OHOS {
28 namespace SysInstaller {
29 using namespace Updater;
30 
31 constexpr uint32_t BUFFER_SIZE = 50 * 1024;
32 constexpr uint16_t MAX_RING_BUFFER_NUM = 2;
33 constexpr uint32_t MAX_UPDATER_BUFFER_SIZE = 2 * BUFFER_SIZE;
34 
GetInstance()35 StreamInstallProcesser &StreamInstallProcesser::GetInstance()
36 {
37     static StreamInstallProcesser instance;
38     return instance;
39 }
40 
Start()41 int32_t StreamInstallProcesser::Start()
42 {
43     LOG(INFO) << "StreamInstallProcesser Start";
44 
45     if (!ringBuffer_.Init(BUFFER_SIZE, MAX_RING_BUFFER_NUM)) {
46         LOG(ERROR) << "StreamInstallProcesser Start Init ringBuffer_ failed";
47         return -1;
48     }
49 
50     binChunkUpdate_ = std::make_unique<Updater::BinChunkUpdate>(MAX_UPDATER_BUFFER_SIZE);
51     isExitThread_ = false;
52     isRunning_ = true;
53     pComsumeThread_ = new (std::nothrow) std::thread([this] { this->ThreadExecuteFunc(); });
54     if (pComsumeThread_ == nullptr) {
55         LOG(ERROR) << "StreamInstallProcesser Start new pComsumeThread_ failed";
56         return -1;
57     }
58     UpdateResult(UPDATE_STATE_INIT, 0, "");
59     return 0;
60 }
61 
Stop()62 void StreamInstallProcesser::Stop()
63 {
64     LOG(INFO) << "StreamInstallProcesser Stop enter";
65     if (!isRunning_) {
66         LOG(WARNING) << "Action not running";
67         return;
68     }
69 
70     isRunning_ = false;
71     isExitThread_ = true;
72     ringBuffer_.Stop();
73     if (pComsumeThread_ != nullptr) {
74         pComsumeThread_->join();
75         delete pComsumeThread_;
76         pComsumeThread_ = nullptr;
77     }
78     ThreadExitProc();
79     LOG(INFO) << "StreamInstallProcesser Stop leave";
80     return;
81 }
82 
IsRunning()83 bool StreamInstallProcesser::IsRunning()
84 {
85     return isRunning_;
86 }
87 
ThreadExecuteFunc()88 void StreamInstallProcesser::ThreadExecuteFunc()
89 {
90     LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc enter";
91     while (!isExitThread_) {
92         uint8_t buffer[BUFFER_SIZE]{0};
93         uint32_t len = 0;
94         uint32_t dealLen = 0;
95         if (!ringBuffer_.Pop(buffer, sizeof(buffer), len)) break;
96         UpdateResultCode ret = binChunkUpdate_->StartBinChunkUpdate(buffer, len, dealLen);
97         if (STREAM_UPDATE_SUCCESS == ret) {
98             LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_SUCCESS";
99             UpdateResult(UPDATE_STATE_ONGOING, dealLen, "");
100         } else if (STREAM_UPDATE_FAILURE == ret) {
101             LOG(ERROR) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_FAILURE";
102             UpdateResult(UPDATE_STATE_FAILED, dealLen, "");
103             break;
104         } else if (STREAM_UPDATE_COMPLETE == ret) {
105             LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_COMPLETE";
106             UpdateResult(UPDATE_STATE_SUCCESSFUL, dealLen, "");
107             break;
108         }
109     }
110 }
111 
ThreadExitProc()112 void StreamInstallProcesser::ThreadExitProc()
113 {
114     LOG(INFO) << "StreamInstallProcesser ThreadExitProc enter";
115     isRunning_ = false;
116     isExitThread_ = true;
117     ringBuffer_.Stop();
118     ringBuffer_.Reset();
119 }
120 
ProcessStreamData(const uint8_t * buffer,size_t size)121 int32_t StreamInstallProcesser::ProcessStreamData(const uint8_t *buffer, size_t size)
122 {
123     uint8_t tmpBuff[BUFFER_SIZE]{0};
124     errno_t ret = memcpy_s(tmpBuff, BUFFER_SIZE, buffer, size);
125     if (ret != 0) {
126         LOG(ERROR) << "ProcessStreamData memcpy_s failed: " << ret;
127         return -1;
128     }
129     return ringBuffer_.Push(tmpBuff, size) ? 0 : -1;
130 }
131 
UpdateResult(UpdateStatus updateStatus,int dealLen,const std::string & resultMsg)132 void StreamInstallProcesser::UpdateResult(UpdateStatus updateStatus, int dealLen, const std::string &resultMsg)
133 {
134     if (statusManager_ == nullptr) {
135         LOG(ERROR) << "statusManager_ nullptr";
136         return;
137     }
138     statusManager_->UpdateCallback(updateStatus, dealLen, resultMsg);
139 }
140 } // namespace SysInstaller
141 } // namespace OHOS
142