1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "stream_update.h"
17
18 #include "log/log.h"
19 #include "package/package.h"
20 #include "package/pkg_manager.h"
21 #include "scope_guard.h"
22 #include "securec.h"
23 #include "updater/updater_const.h"
24 #include "utils.h"
25 #include <thread>
26
27 namespace OHOS {
28 namespace SysInstaller {
29 using namespace Updater;
30
31 constexpr uint32_t BUFFER_SIZE = 50 * 1024;
32 constexpr uint16_t MAX_RING_BUFFER_NUM = 2;
33 constexpr uint32_t MAX_UPDATER_BUFFER_SIZE = 2 * BUFFER_SIZE;
34
GetInstance()35 StreamInstallProcesser &StreamInstallProcesser::GetInstance()
36 {
37 static StreamInstallProcesser instance;
38 return instance;
39 }
40
Start()41 int32_t StreamInstallProcesser::Start()
42 {
43 LOG(INFO) << "StreamInstallProcesser Start";
44
45 if (!ringBuffer_.Init(BUFFER_SIZE, MAX_RING_BUFFER_NUM)) {
46 LOG(ERROR) << "StreamInstallProcesser Start Init ringBuffer_ failed";
47 return -1;
48 }
49
50 binChunkUpdate_ = std::make_unique<Updater::BinChunkUpdate>(MAX_UPDATER_BUFFER_SIZE);
51 isExitThread_ = false;
52 isRunning_ = true;
53 pComsumeThread_ = new (std::nothrow) std::thread([this] { this->ThreadExecuteFunc(); });
54 if (pComsumeThread_ == nullptr) {
55 LOG(ERROR) << "StreamInstallProcesser Start new pComsumeThread_ failed";
56 return -1;
57 }
58 UpdateResult(UPDATE_STATE_INIT, 0, "");
59 return 0;
60 }
61
Stop()62 void StreamInstallProcesser::Stop()
63 {
64 LOG(INFO) << "StreamInstallProcesser Stop enter";
65 if (!isRunning_) {
66 LOG(WARNING) << "Action not running";
67 return;
68 }
69
70 isRunning_ = false;
71 isExitThread_ = true;
72 ringBuffer_.Stop();
73 if (pComsumeThread_ != nullptr) {
74 pComsumeThread_->join();
75 delete pComsumeThread_;
76 pComsumeThread_ = nullptr;
77 }
78 ThreadExitProc();
79 LOG(INFO) << "StreamInstallProcesser Stop leave";
80 return;
81 }
82
IsRunning()83 bool StreamInstallProcesser::IsRunning()
84 {
85 return isRunning_;
86 }
87
ThreadExecuteFunc()88 void StreamInstallProcesser::ThreadExecuteFunc()
89 {
90 LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc enter";
91 while (!isExitThread_) {
92 uint8_t buffer[BUFFER_SIZE]{0};
93 uint32_t len = 0;
94 uint32_t dealLen = 0;
95 if (!ringBuffer_.Pop(buffer, sizeof(buffer), len)) break;
96 UpdateResultCode ret = binChunkUpdate_->StartBinChunkUpdate(buffer, len, dealLen);
97 if (STREAM_UPDATE_SUCCESS == ret) {
98 LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_SUCCESS";
99 UpdateResult(UPDATE_STATE_ONGOING, dealLen, "");
100 } else if (STREAM_UPDATE_FAILURE == ret) {
101 LOG(ERROR) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_FAILURE";
102 UpdateResult(UPDATE_STATE_FAILED, dealLen, "");
103 break;
104 } else if (STREAM_UPDATE_COMPLETE == ret) {
105 LOG(INFO) << "StreamInstallProcesser ThreadExecuteFunc STREM_UPDATE_COMPLETE";
106 UpdateResult(UPDATE_STATE_SUCCESSFUL, dealLen, "");
107 break;
108 }
109 }
110 }
111
ThreadExitProc()112 void StreamInstallProcesser::ThreadExitProc()
113 {
114 LOG(INFO) << "StreamInstallProcesser ThreadExitProc enter";
115 isRunning_ = false;
116 isExitThread_ = true;
117 ringBuffer_.Stop();
118 ringBuffer_.Reset();
119 }
120
ProcessStreamData(const uint8_t * buffer,size_t size)121 int32_t StreamInstallProcesser::ProcessStreamData(const uint8_t *buffer, size_t size)
122 {
123 uint8_t tmpBuff[BUFFER_SIZE]{0};
124 errno_t ret = memcpy_s(tmpBuff, BUFFER_SIZE, buffer, size);
125 if (ret != 0) {
126 LOG(ERROR) << "ProcessStreamData memcpy_s failed: " << ret;
127 return -1;
128 }
129 return ringBuffer_.Push(tmpBuff, size) ? 0 : -1;
130 }
131
UpdateResult(UpdateStatus updateStatus,int dealLen,const std::string & resultMsg)132 void StreamInstallProcesser::UpdateResult(UpdateStatus updateStatus, int dealLen, const std::string &resultMsg)
133 {
134 if (statusManager_ == nullptr) {
135 LOG(ERROR) << "statusManager_ nullptr";
136 return;
137 }
138 statusManager_->UpdateCallback(updateStatus, dealLen, resultMsg);
139 }
140 } // namespace SysInstaller
141 } // namespace OHOS
142