1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "stream_update.h"
17
18 #include "log/log.h"
19 #include "package/package.h"
20 #include "package/pkg_manager.h"
21 #include "scope_guard.h"
22 #include "securec.h"
23 #include "updater/updater_const.h"
24 #include "utils.h"
25 #include <thread>
26 #include "updater/updater.h"
27 #include "slot_info/slot_info.h"
28
29 namespace OHOS {
30 namespace SysInstaller {
31 using namespace Updater;
32
33 constexpr uint32_t BUFFER_SIZE = 50 * 1024;
34 constexpr uint16_t MAX_RING_BUFFER_NUM = 2;
35 constexpr uint32_t MAX_UPDATER_BUFFER_SIZE = 2 * BUFFER_SIZE;
36
GetInstance()37 StreamInstallProcesser &StreamInstallProcesser::GetInstance()
38 {
39 static StreamInstallProcesser instance;
40 return instance;
41 }
42
Start()43 int32_t StreamInstallProcesser::Start()
44 {
45 LOG(INFO) << "StreamInstallProcesser Start";
46
47 // 处理头部数据的时候设置参数,用于ab流式升级
48 if (SetUpdateSuffixParam() != UPDATE_SUCCESS) {
49 LOG(ERROR) << "SetUpdateSuffixParam failed";
50 return -1;
51 }
52
53 if (!ringBuffer_.Init(BUFFER_SIZE, MAX_RING_BUFFER_NUM)) {
54 LOG(ERROR) << "StreamInstallProcesser Start Init ringBuffer_ failed";
55 return -1;
56 }
57
58 binChunkUpdate_ = std::make_unique<Updater::BinChunkUpdate>(MAX_UPDATER_BUFFER_SIZE);
59 isExitThread_ = false;
60 isRunning_ = true;
61 pComsumeThread_ = new (std::nothrow) std::thread([this] { this->ThreadExecuteFunc(); });
62 if (pComsumeThread_ == nullptr) {
63 LOG(ERROR) << "StreamInstallProcesser Start new pComsumeThread_ failed";
64 return -1;
65 }
66 UpdateResult(UpdateStatus::UPDATE_STATE_INIT, 0, "");
67 return 0;
68 }
69
Stop()70 void StreamInstallProcesser::Stop()
71 {
72 LOG(INFO) << "StreamInstallProcesser Stop enter";
73 if (!isRunning_) {
74 LOG(WARNING) << "Action not running";
75 return;
76 }
77
78 isRunning_ = false;
79 isExitThread_ = true;
80 ringBuffer_.Stop();
81 if (pComsumeThread_ != nullptr) {
82 pComsumeThread_->join();
83 delete pComsumeThread_;
84 pComsumeThread_ = nullptr;
85 }
86 ThreadExitProc();
87 LOG(INFO) << "StreamInstallProcesser Stop leave";
88 return;
89 }
90
IsRunning()91 bool StreamInstallProcesser::IsRunning()
92 {
93 return isRunning_;
94 }
95
ThreadExecuteFunc()96 void StreamInstallProcesser::ThreadExecuteFunc()
97 {
98 LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc enter";
99 while (!isExitThread_) {
100 uint8_t buffer[BUFFER_SIZE]{0};
101 uint32_t len = 0;
102 uint32_t dealLen = 0;
103 if (!ringBuffer_.Pop(buffer, sizeof(buffer), len)) break;
104 LOG(INFO) << "start bun chunk update ,len = " << len;
105 UpdateResultCode ret = binChunkUpdate_->StartBinChunkUpdate(buffer, len, dealLen);
106 if (STREAM_UPDATE_SUCCESS == ret) {
107 LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_SUCCESS";
108 UpdateResult(UpdateStatus::UPDATE_STATE_ONGOING, dealLen, "");
109 } else if (STREAM_UPDATE_FAILURE == ret) {
110 LOG(ERROR) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_FAILURE";
111 UpdateResult(UpdateStatus::UPDATE_STATE_FAILED, dealLen, "");
112 break;
113 } else if (STREAM_UPDATE_COMPLETE == ret) {
114 LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_COMPLETE";
115 UpdateResult(UpdateStatus::UPDATE_STATE_SUCCESSFUL, dealLen, "");
116 // 升级完成,切换分区
117 SetActiveSlot();
118 break;
119 }
120 }
121 }
122
ThreadExitProc()123 void StreamInstallProcesser::ThreadExitProc()
124 {
125 LOG(INFO) << "StreamInstallProcesser ThreadExitProc enter";
126 isRunning_ = false;
127 isExitThread_ = true;
128 ringBuffer_.Stop();
129 ringBuffer_.Reset();
130 }
131
ProcessStreamData(const uint8_t * buffer,uint32_t size)132 int32_t StreamInstallProcesser::ProcessStreamData(const uint8_t *buffer, uint32_t size)
133 {
134 uint8_t tmpBuff[BUFFER_SIZE]{0};
135 errno_t ret = memcpy_s(tmpBuff, BUFFER_SIZE, buffer, size);
136 if (ret != 0) {
137 LOG(ERROR) << "ProcessStreamData memcpy_s failed: " << ret;
138 return -1;
139 }
140 return ringBuffer_.Push(tmpBuff, size) ? 0 : -1;
141 }
142
UpdateResult(UpdateStatus updateStatus,int dealLen,const std::string & resultMsg)143 void StreamInstallProcesser::UpdateResult(UpdateStatus updateStatus, int dealLen, const std::string &resultMsg)
144 {
145 if (statusManager_ == nullptr) {
146 LOG(ERROR) << "statusManager_ nullptr";
147 return;
148 }
149 statusManager_->UpdateCallback(updateStatus, dealLen, resultMsg);
150 }
151 } // namespace SysInstaller
152 } // namespace OHOS
153