1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <unordered_set>
17
18 #include "hitrace.h"
19 #include "distributed_object_impl.h"
20 #include "distributed_objectstore_impl.h"
21 #include "objectstore_errors.h"
22 #include "string_utils.h"
23 #include "asset_change_timer.h"
24 #include "object_radar_reporter.h"
25
26 namespace OHOS::ObjectStore {
DistributedObjectStoreImpl(FlatObjectStore * flatObjectStore)27 DistributedObjectStoreImpl::DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore)
28 : flatObjectStore_(flatObjectStore)
29 {
30 }
31
~DistributedObjectStoreImpl()32 DistributedObjectStoreImpl::~DistributedObjectStoreImpl()
33 {
34 delete flatObjectStore_;
35 }
36
CacheObject(const std::string & sessionId,FlatObjectStore * flatObjectStore)37 DistributedObject *DistributedObjectStoreImpl::CacheObject(
38 const std::string &sessionId, FlatObjectStore *flatObjectStore)
39 {
40 DistributedObjectImpl *object = new (std::nothrow) DistributedObjectImpl(sessionId, flatObjectStore);
41 if (object == nullptr) {
42 return nullptr;
43 }
44 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
45 objects_.push_back(object);
46 return object;
47 }
48
RemoveCacheObject(const std::string & sessionId)49 void DistributedObjectStoreImpl::RemoveCacheObject(const std::string &sessionId)
50 {
51 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
52 auto iter = objects_.begin();
53 while (iter != objects_.end()) {
54 if ((*iter)->GetSessionId() == sessionId) {
55 delete *iter;
56 iter = objects_.erase(iter);
57 } else {
58 iter++;
59 }
60 }
61 return;
62 }
63
CreateObject(const std::string & sessionId)64 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId)
65 {
66 DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
67 if (flatObjectStore_ == nullptr) {
68 LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
69 return nullptr;
70 }
71
72 if (sessionId.empty()) {
73 LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
74 return nullptr;
75 }
76
77 uint32_t status = flatObjectStore_->CreateObject(sessionId);
78 if (status != SUCCESS) {
79 LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
80 return nullptr;
81 }
82 return CacheObject(sessionId, flatObjectStore_);
83 }
84
CreateObject(const std::string & sessionId,uint32_t & status)85 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId, uint32_t &status)
86 {
87 DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
88 if (flatObjectStore_ == nullptr) {
89 LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
90 status = ERR_NULL_OBJECTSTORE;
91 return nullptr;
92 }
93
94 if (sessionId.empty()) {
95 LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
96 status = ERR_INVALID_ARGS;
97 return nullptr;
98 }
99
100 status = flatObjectStore_->CreateObject(sessionId);
101 if (status != SUCCESS) {
102 LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
103 return nullptr;
104 }
105 return CacheObject(sessionId, flatObjectStore_);
106 }
107
DeleteObject(const std::string & sessionId)108 uint32_t DistributedObjectStoreImpl::DeleteObject(const std::string &sessionId)
109 {
110 DataObjectHiTrace trace("DistributedObjectStoreImpl::DeleteObject");
111 if (flatObjectStore_ == nullptr) {
112 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
113 return ERR_NULL_OBJECTSTORE;
114 }
115 uint32_t status = flatObjectStore_->Delete(sessionId);
116 if (status != SUCCESS) {
117 LOG_ERROR("DistributedObjectStoreImpl::DeleteObject store delete err %{public}d", status);
118 return status;
119 }
120 RemoveCacheObject(sessionId);
121 return SUCCESS;
122 }
123
Get(const std::string & sessionId,DistributedObject ** object)124 uint32_t DistributedObjectStoreImpl::Get(const std::string &sessionId, DistributedObject **object)
125 {
126 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
127 auto iter = objects_.begin();
128 while (iter != objects_.end()) {
129 if ((*iter)->GetSessionId() == sessionId) {
130 *object = *iter;
131 return SUCCESS;
132 }
133 iter++;
134 }
135 LOG_ERROR("DistributedObjectStoreImpl::Get object err, no object");
136 return ERR_GET_OBJECT;
137 }
138
Watch(DistributedObject * object,std::shared_ptr<ObjectWatcher> watcher)139 uint32_t DistributedObjectStoreImpl::Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> watcher)
140 {
141 if (object == nullptr) {
142 LOG_ERROR("Watch sync object err ");
143 return ERR_NULL_OBJECT;
144 }
145 if (flatObjectStore_ == nullptr) {
146 LOG_ERROR("Watch sync flatObjectStore err ");
147 return ERR_NULL_OBJECTSTORE;
148 }
149 std::lock_guard<std::mutex> lock(watchersLock_);
150 if (watchers_.count(object) != 0) {
151 LOG_ERROR("DistributedObjectStoreImpl::Watch already gets object");
152 return ERR_EXIST;
153 }
154 std::shared_ptr<WatcherProxy> watcherProxy = std::make_shared<WatcherProxy>(watcher, object->GetSessionId());
155 watcherProxy->SetAssetChangeCallBack(
156 [=](const std::string &sessionId, const std::string &assetKey, std::shared_ptr<ObjectWatcher> objectWatcher) {
157 AssetChangeTimer *assetChangeTimer = AssetChangeTimer::GetInstance(flatObjectStore_);
158 assetChangeTimer->OnAssetChanged(sessionId, assetKey, objectWatcher);
159 });
160 uint32_t status = flatObjectStore_->Watch(object->GetSessionId(), watcherProxy);
161 if (status != SUCCESS) {
162 LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
163 return status;
164 }
165 watchers_.insert_or_assign(object, watcherProxy);
166 LOG_INFO("DistributedObjectStoreImpl:Watch object success.");
167 return SUCCESS;
168 }
169
UnWatch(DistributedObject * object)170 uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
171 {
172 if (object == nullptr) {
173 LOG_ERROR("UnWatch sync object err ");
174 return ERR_NULL_OBJECT;
175 }
176 if (flatObjectStore_ == nullptr) {
177 LOG_ERROR("UnWatch sync flatObjectStore err ");
178 return ERR_NULL_OBJECTSTORE;
179 }
180 uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
181 if (status != SUCCESS) {
182 LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
183 return status;
184 }
185 std::lock_guard<std::mutex> lock(watchersLock_);
186 watchers_.erase(object);
187 LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
188 return SUCCESS;
189 }
190
SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)191 uint32_t DistributedObjectStoreImpl::SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)
192 {
193 if (flatObjectStore_ == nullptr) {
194 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
195 return ERR_NULL_OBJECTSTORE;
196 }
197 std::shared_ptr<StatusNotifierProxy> watcherProxy = std::make_shared<StatusNotifierProxy>(notifier);
198 return flatObjectStore_->SetStatusNotifier(watcherProxy);
199 }
200
SetProgressNotifier(std::shared_ptr<ProgressNotifier> notifier)201 uint32_t DistributedObjectStoreImpl::SetProgressNotifier(std::shared_ptr<ProgressNotifier> notifier)
202 {
203 if (flatObjectStore_ == nullptr) {
204 LOG_ERROR("flatObjectStore_ is nullptr");
205 return ERR_NULL_OBJECTSTORE;
206 }
207 std::shared_ptr<ProgressNotifierProxy> watcherProxy = std::make_shared<ProgressNotifierProxy>(notifier);
208 return flatObjectStore_->SetProgressNotifier(watcherProxy);
209 }
210
NotifyCachedStatus(const std::string & sessionId)211 void DistributedObjectStoreImpl::NotifyCachedStatus(const std::string &sessionId)
212 {
213 flatObjectStore_->CheckRetrieveCache(sessionId);
214 }
215
NotifyProgressStatus(const std::string & sessionId)216 void DistributedObjectStoreImpl::NotifyProgressStatus(const std::string &sessionId)
217 {
218 flatObjectStore_->CheckProgressCache(sessionId);
219 }
220
WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher,const std::string & sessionId)221 WatcherProxy::WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId)
222 : FlatObjectWatcher(sessionId), objectWatcher_(objectWatcher)
223 {
224 }
225
OnChanged(const std::string & sessionId,const std::vector<std::string> & changedData,bool enableTransfer)226 void WatcherProxy::OnChanged(
227 const std::string &sessionId, const std::vector<std::string> &changedData, bool enableTransfer)
228 {
229 std::unordered_set<std::string> transferKeys;
230 std::vector<std::string> otherKeys;
231 for (const auto &str : changedData) {
232 if (str.find(ASSET_DOT) == std::string::npos) {
233 if (str != DEVICEID_KEY) {
234 otherKeys.push_back(str);
235 }
236 } else {
237 std::string assetKey;
238 if (FindChangedAssetKey(str, assetKey)) {
239 transferKeys.insert(assetKey);
240 }
241 }
242 }
243 if (!enableTransfer) {
244 otherKeys.insert(otherKeys.end(), transferKeys.begin(), transferKeys.end());
245 } else if (assetChangeCallback_ != nullptr && !transferKeys.empty()) {
246 for (auto &assetKey : transferKeys) {
247 assetChangeCallback_(sessionId, assetKey, objectWatcher_);
248 }
249 }
250 if (!otherKeys.empty()) {
251 objectWatcher_->OnChanged(sessionId, otherKeys);
252 }
253 }
254
FindChangedAssetKey(const std::string & changedKey,std::string & assetKey)255 bool WatcherProxy::FindChangedAssetKey(const std::string &changedKey, std::string &assetKey)
256 {
257 std::size_t dotPos = changedKey.find(ASSET_DOT);
258 if ((changedKey.size() > strlen(MODIFY_TIME_SUFFIX) && changedKey.substr(dotPos) == MODIFY_TIME_SUFFIX) ||
259 (changedKey.size() > strlen(SIZE_SUFFIX) && changedKey.substr(dotPos) == SIZE_SUFFIX)) {
260 assetKey = changedKey.substr(0, dotPos);
261 return true;
262 }
263 return false;
264 }
265
SetAssetChangeCallBack(const AssetChangeCallback & assetChangeCallback)266 void WatcherProxy::SetAssetChangeCallBack(const AssetChangeCallback &assetChangeCallback)
267 {
268 assetChangeCallback_ = assetChangeCallback;
269 }
270
GetInstance(const std::string & bundleName)271 DistributedObjectStore *DistributedObjectStore::GetInstance(const std::string &bundleName)
272 {
273 static std::mutex instLock_;
274 static DistributedObjectStore *instPtr = nullptr;
275 if (instPtr == nullptr) {
276 std::lock_guard<std::mutex> lock(instLock_);
277 if (instPtr == nullptr && !bundleName.empty()) {
278 RadarReporter::ReportStateStart(std::string(__FUNCTION__), CREATE, INIT_STORE, IDLE, START, bundleName);
279 LOG_INFO("new objectstore %{public}s", bundleName.c_str());
280 FlatObjectStore *flatObjectStore = new (std::nothrow) FlatObjectStore(bundleName);
281 if (flatObjectStore == nullptr) {
282 LOG_ERROR("no memory for FlatObjectStore malloc!");
283 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
284 RADAR_FAILED, NO_MEMORY, FINISHED);
285 return nullptr;
286 }
287 // Use instMemory to make sure this singleton not free before other object.
288 // This operation needn't to malloc memory, we needn't to check nullptr.
289 instPtr = new (std::nothrow) DistributedObjectStoreImpl(flatObjectStore);
290 if (instPtr == nullptr) {
291 delete flatObjectStore;
292 LOG_ERROR("no memory for DistributedObjectStoreImpl malloc!");
293 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
294 RADAR_FAILED, NO_MEMORY, FINISHED);
295 return nullptr;
296 }
297 RadarReporter::ReportStage(std::string(__FUNCTION__), CREATE, INIT_STORE, RADAR_SUCCESS);
298 }
299 }
300 return instPtr;
301 }
302
OnChanged(const std::string & sessionId,const std::string & networkId,const std::string & onlineStatus)303 void StatusNotifierProxy::OnChanged(
304 const std::string &sessionId, const std::string &networkId, const std::string &onlineStatus)
305 {
306 if (notifier != nullptr) {
307 notifier->OnChanged(sessionId, networkId, onlineStatus);
308 }
309 }
310
StatusNotifierProxy(const std::shared_ptr<StatusNotifier> & notifier)311 StatusNotifierProxy::StatusNotifierProxy(const std::shared_ptr<StatusNotifier> ¬ifier) : notifier(notifier)
312 {
313 }
314
~StatusNotifierProxy()315 StatusNotifierProxy::~StatusNotifierProxy()
316 {
317 LOG_ERROR("destroy");
318 notifier = nullptr;
319 }
320
OnChanged(const std::string & sessionId,int32_t progress)321 void ProgressNotifierProxy::OnChanged(const std::string &sessionId, int32_t progress)
322 {
323 if (notifier != nullptr) {
324 notifier->OnChanged(sessionId, progress);
325 }
326 }
327
ProgressNotifierProxy(const std::shared_ptr<ProgressNotifier> & notifier)328 ProgressNotifierProxy::ProgressNotifierProxy(const std::shared_ptr<ProgressNotifier> ¬ifier) : notifier(notifier)
329 {
330 }
331
~ProgressNotifierProxy()332 ProgressNotifierProxy::~ProgressNotifierProxy()
333 {
334 notifier = nullptr;
335 }
336 } // namespace OHOS::ObjectStore
337