• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "stream_update.h"
17 
18 #include "log/log.h"
19 #include "package/package.h"
20 #include "package/pkg_manager.h"
21 #include "scope_guard.h"
22 #include "securec.h"
23 #include "updater/updater_const.h"
24 #include "utils.h"
25 #include <thread>
26 #include "updater/updater.h"
27 #include "slot_info/slot_info.h"
28 
29 namespace OHOS {
30 namespace SysInstaller {
31 using namespace Updater;
32 
33 constexpr uint32_t BUFFER_SIZE = 50 * 1024;
34 constexpr uint16_t MAX_RING_BUFFER_NUM = 2;
35 constexpr uint32_t MAX_UPDATER_BUFFER_SIZE = 2 * BUFFER_SIZE;
36 
GetInstance()37 StreamInstallProcesser &StreamInstallProcesser::GetInstance()
38 {
39     static StreamInstallProcesser instance;
40     return instance;
41 }
42 
Start()43 int32_t StreamInstallProcesser::Start()
44 {
45     LOG(INFO) << "StreamInstallProcesser Start";
46 
47     // 处理头部数据的时候设置参数,用于ab流式升级
48     if (SetUpdateSuffixParam() != UPDATE_SUCCESS) {
49         LOG(ERROR) << "SetUpdateSuffixParam failed";
50         return -1;
51     }
52 
53     if (!ringBuffer_.Init(BUFFER_SIZE, MAX_RING_BUFFER_NUM)) {
54         LOG(ERROR) << "StreamInstallProcesser Start Init ringBuffer_ failed";
55         return -1;
56     }
57 
58     binChunkUpdate_ = std::make_unique<Updater::BinChunkUpdate>(MAX_UPDATER_BUFFER_SIZE);
59     isExitThread_ = false;
60     isRunning_ = true;
61     pComsumeThread_ = new (std::nothrow) std::thread([this] { this->ThreadExecuteFunc(); });
62     if (pComsumeThread_ == nullptr) {
63         LOG(ERROR) << "StreamInstallProcesser Start new pComsumeThread_ failed";
64         return -1;
65     }
66     UpdateResult(UpdateStatus::UPDATE_STATE_INIT, 0, "");
67     return 0;
68 }
69 
Stop()70 void StreamInstallProcesser::Stop()
71 {
72     LOG(INFO) << "StreamInstallProcesser Stop enter";
73     if (!isRunning_) {
74         LOG(WARNING) << "Action not running";
75         return;
76     }
77 
78     isRunning_ = false;
79     isExitThread_ = true;
80     ringBuffer_.Stop();
81     if (pComsumeThread_ != nullptr) {
82         pComsumeThread_->join();
83         delete pComsumeThread_;
84         pComsumeThread_ = nullptr;
85     }
86     ThreadExitProc();
87     LOG(INFO) << "StreamInstallProcesser Stop leave";
88     return;
89 }
90 
IsRunning()91 bool StreamInstallProcesser::IsRunning()
92 {
93     return isRunning_;
94 }
95 
ThreadExecuteFunc()96 void StreamInstallProcesser::ThreadExecuteFunc()
97 {
98     LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc enter";
99     while (!isExitThread_) {
100         uint8_t buffer[BUFFER_SIZE]{0};
101         uint32_t len = 0;
102         uint32_t dealLen = 0;
103         if (!ringBuffer_.Pop(buffer, sizeof(buffer), len)) break;
104         LOG(INFO) << "start bun chunk update ,len = " << len;
105         UpdateResultCode ret = binChunkUpdate_->StartBinChunkUpdate(buffer, len, dealLen);
106         if (STREAM_UPDATE_SUCCESS == ret) {
107             LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_SUCCESS";
108             UpdateResult(UpdateStatus::UPDATE_STATE_ONGOING, dealLen, "");
109         } else if (STREAM_UPDATE_FAILURE == ret) {
110             LOG(ERROR) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_FAILURE";
111             UpdateResult(UpdateStatus::UPDATE_STATE_FAILED, dealLen, "");
112             break;
113         } else if (STREAM_UPDATE_COMPLETE == ret) {
114             LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_COMPLETE";
115             UpdateResult(UpdateStatus::UPDATE_STATE_SUCCESSFUL, dealLen, "");
116             // 升级完成,切换分区
117             SetActiveSlot();
118             break;
119         }
120     }
121 }
122 
ThreadExitProc()123 void StreamInstallProcesser::ThreadExitProc()
124 {
125     LOG(INFO) << "StreamInstallProcesser ThreadExitProc enter";
126     isRunning_ = false;
127     isExitThread_ = true;
128     ringBuffer_.Stop();
129     ringBuffer_.Reset();
130 }
131 
ProcessStreamData(const uint8_t * buffer,uint32_t size)132 int32_t StreamInstallProcesser::ProcessStreamData(const uint8_t *buffer, uint32_t size)
133 {
134     uint8_t tmpBuff[BUFFER_SIZE]{0};
135     errno_t ret = memcpy_s(tmpBuff, BUFFER_SIZE, buffer, size);
136     if (ret != 0) {
137         LOG(ERROR) << "ProcessStreamData memcpy_s failed: " << ret;
138         return -1;
139     }
140     return ringBuffer_.Push(tmpBuff, size) ? 0 : -1;
141 }
142 
UpdateResult(UpdateStatus updateStatus,int dealLen,const std::string & resultMsg)143 void StreamInstallProcesser::UpdateResult(UpdateStatus updateStatus, int dealLen, const std::string &resultMsg)
144 {
145     if (statusManager_ == nullptr) {
146         LOG(ERROR) << "statusManager_ nullptr";
147         return;
148     }
149     statusManager_->UpdateCallback(updateStatus, dealLen, resultMsg);
150 }
151 } // namespace SysInstaller
152 } // namespace OHOS
153