• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <unordered_set>
17 
18 #include "hitrace.h"
19 #include "distributed_object_impl.h"
20 #include "distributed_objectstore_impl.h"
21 #include "objectstore_errors.h"
22 #include "string_utils.h"
23 #include "asset_change_timer.h"
24 #include "object_radar_reporter.h"
25 
26 namespace OHOS::ObjectStore {
DistributedObjectStoreImpl(FlatObjectStore * flatObjectStore)27 DistributedObjectStoreImpl::DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore)
28     : flatObjectStore_(flatObjectStore)
29 {
30 }
31 
~DistributedObjectStoreImpl()32 DistributedObjectStoreImpl::~DistributedObjectStoreImpl()
33 {
34     delete flatObjectStore_;
35 }
36 
CacheObject(const std::string & sessionId,FlatObjectStore * flatObjectStore)37 DistributedObject *DistributedObjectStoreImpl::CacheObject(
38     const std::string &sessionId, FlatObjectStore *flatObjectStore)
39 {
40     DistributedObjectImpl *object = new (std::nothrow) DistributedObjectImpl(sessionId, flatObjectStore);
41     if (object == nullptr) {
42         return nullptr;
43     }
44     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
45     objects_.push_back(object);
46     return object;
47 }
48 
RemoveCacheObject(const std::string & sessionId)49 void DistributedObjectStoreImpl::RemoveCacheObject(const std::string &sessionId)
50 {
51     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
52     auto iter = objects_.begin();
53     while (iter != objects_.end()) {
54         if ((*iter)->GetSessionId() == sessionId) {
55             delete *iter;
56             iter = objects_.erase(iter);
57         } else {
58             iter++;
59         }
60     }
61     return;
62 }
63 
CreateObject(const std::string & sessionId)64 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId)
65 {
66     DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
67     if (flatObjectStore_ == nullptr) {
68         LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
69         return nullptr;
70     }
71 
72     if (sessionId.empty()) {
73         LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
74         return nullptr;
75     }
76 
77     uint32_t status = flatObjectStore_->CreateObject(sessionId);
78     if (status != SUCCESS) {
79         LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
80         return nullptr;
81     }
82     return CacheObject(sessionId, flatObjectStore_);
83 }
84 
CreateObject(const std::string & sessionId,uint32_t & status)85 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId, uint32_t &status)
86 {
87     DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
88     if (flatObjectStore_ == nullptr) {
89         LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
90         status = ERR_NULL_OBJECTSTORE;
91         return nullptr;
92     }
93 
94     if (sessionId.empty()) {
95         LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
96         status = ERR_INVALID_ARGS;
97         return nullptr;
98     }
99 
100     status = flatObjectStore_->CreateObject(sessionId);
101     if (status != SUCCESS) {
102         LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
103         return nullptr;
104     }
105     return CacheObject(sessionId, flatObjectStore_);
106 }
107 
DeleteObject(const std::string & sessionId)108 uint32_t DistributedObjectStoreImpl::DeleteObject(const std::string &sessionId)
109 {
110     DataObjectHiTrace trace("DistributedObjectStoreImpl::DeleteObject");
111     if (flatObjectStore_ == nullptr) {
112         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
113         return ERR_NULL_OBJECTSTORE;
114     }
115     uint32_t status = flatObjectStore_->Delete(sessionId);
116     if (status != SUCCESS) {
117         LOG_ERROR("DistributedObjectStoreImpl::DeleteObject store delete err %{public}d", status);
118         return status;
119     }
120     RemoveCacheObject(sessionId);
121     return SUCCESS;
122 }
123 
Get(const std::string & sessionId,DistributedObject ** object)124 uint32_t DistributedObjectStoreImpl::Get(const std::string &sessionId, DistributedObject **object)
125 {
126     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
127     auto iter = objects_.begin();
128     while (iter != objects_.end()) {
129         if ((*iter)->GetSessionId() == sessionId) {
130             *object = *iter;
131             return SUCCESS;
132         }
133         iter++;
134     }
135     LOG_ERROR("DistributedObjectStoreImpl::Get object err, no object");
136     return ERR_GET_OBJECT;
137 }
138 
Watch(DistributedObject * object,std::shared_ptr<ObjectWatcher> watcher)139 uint32_t DistributedObjectStoreImpl::Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> watcher)
140 {
141     if (object == nullptr) {
142         LOG_ERROR("Watch sync object err ");
143         return ERR_NULL_OBJECT;
144     }
145     if (flatObjectStore_ == nullptr) {
146         LOG_ERROR("Watch sync flatObjectStore err ");
147         return ERR_NULL_OBJECTSTORE;
148     }
149     std::lock_guard<std::mutex> lock(watchersLock_);
150     if (watchers_.count(object) != 0) {
151         LOG_ERROR("DistributedObjectStoreImpl::Watch already gets object");
152         return ERR_EXIST;
153     }
154     std::shared_ptr<WatcherProxy> watcherProxy = std::make_shared<WatcherProxy>(watcher, object->GetSessionId());
155     watcherProxy->SetAssetChangeCallBack(
156         [=](const std::string &sessionId, const std::string &assetKey, std::shared_ptr<ObjectWatcher> objectWatcher) {
157             AssetChangeTimer *assetChangeTimer = AssetChangeTimer::GetInstance(flatObjectStore_);
158             assetChangeTimer->OnAssetChanged(sessionId, assetKey, objectWatcher);
159         });
160     uint32_t status = flatObjectStore_->Watch(object->GetSessionId(), watcherProxy);
161     if (status != SUCCESS) {
162         LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
163         return status;
164     }
165     watchers_.insert_or_assign(object, watcherProxy);
166     LOG_INFO("DistributedObjectStoreImpl:Watch object success.");
167     return SUCCESS;
168 }
169 
UnWatch(DistributedObject * object)170 uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
171 {
172     if (object == nullptr) {
173         LOG_ERROR("UnWatch sync object err ");
174         return ERR_NULL_OBJECT;
175     }
176     if (flatObjectStore_ == nullptr) {
177         LOG_ERROR("UnWatch sync flatObjectStore err ");
178         return ERR_NULL_OBJECTSTORE;
179     }
180     uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
181     if (status != SUCCESS) {
182         LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
183         return status;
184     }
185     std::lock_guard<std::mutex> lock(watchersLock_);
186     watchers_.erase(object);
187     LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
188     return SUCCESS;
189 }
190 
SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)191 uint32_t DistributedObjectStoreImpl::SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)
192 {
193     if (flatObjectStore_ == nullptr) {
194         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
195         return ERR_NULL_OBJECTSTORE;
196     }
197     std::shared_ptr<StatusNotifierProxy> watcherProxy = std::make_shared<StatusNotifierProxy>(notifier);
198     return flatObjectStore_->SetStatusNotifier(watcherProxy);
199 }
200 
SetProgressNotifier(std::shared_ptr<ProgressNotifier> notifier)201 uint32_t DistributedObjectStoreImpl::SetProgressNotifier(std::shared_ptr<ProgressNotifier> notifier)
202 {
203     if (flatObjectStore_ == nullptr) {
204         LOG_ERROR("flatObjectStore_ is nullptr");
205         return ERR_NULL_OBJECTSTORE;
206     }
207     std::shared_ptr<ProgressNotifierProxy> watcherProxy = std::make_shared<ProgressNotifierProxy>(notifier);
208     return flatObjectStore_->SetProgressNotifier(watcherProxy);
209 }
210 
NotifyCachedStatus(const std::string & sessionId)211 void DistributedObjectStoreImpl::NotifyCachedStatus(const std::string &sessionId)
212 {
213     flatObjectStore_->CheckRetrieveCache(sessionId);
214 }
215 
NotifyProgressStatus(const std::string & sessionId)216 void DistributedObjectStoreImpl::NotifyProgressStatus(const std::string &sessionId)
217 {
218     flatObjectStore_->CheckProgressCache(sessionId);
219 }
220 
WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher,const std::string & sessionId)221 WatcherProxy::WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId)
222     : FlatObjectWatcher(sessionId), objectWatcher_(objectWatcher)
223 {
224 }
225 
OnChanged(const std::string & sessionId,const std::vector<std::string> & changedData,bool enableTransfer)226 void WatcherProxy::OnChanged(
227     const std::string &sessionId, const std::vector<std::string> &changedData, bool enableTransfer)
228 {
229     std::unordered_set<std::string> transferKeys;
230     std::vector<std::string> otherKeys;
231     for (const auto &str : changedData) {
232         if (str.find(ASSET_DOT) == std::string::npos) {
233             if (str != DEVICEID_KEY) {
234                 otherKeys.push_back(str);
235             }
236         } else {
237             std::string assetKey;
238             if (FindChangedAssetKey(str, assetKey)) {
239                 transferKeys.insert(assetKey);
240             }
241         }
242     }
243     if (!enableTransfer) {
244         otherKeys.insert(otherKeys.end(), transferKeys.begin(), transferKeys.end());
245     } else if (assetChangeCallback_ != nullptr && !transferKeys.empty()) {
246         for (auto &assetKey : transferKeys) {
247             assetChangeCallback_(sessionId, assetKey, objectWatcher_);
248         }
249     }
250     if (!otherKeys.empty()) {
251         objectWatcher_->OnChanged(sessionId, otherKeys);
252     }
253 }
254 
FindChangedAssetKey(const std::string & changedKey,std::string & assetKey)255 bool WatcherProxy::FindChangedAssetKey(const std::string &changedKey, std::string &assetKey)
256 {
257     std::size_t dotPos = changedKey.find(ASSET_DOT);
258     if ((changedKey.size() > strlen(MODIFY_TIME_SUFFIX) && changedKey.substr(dotPos) == MODIFY_TIME_SUFFIX) ||
259             (changedKey.size() > strlen(SIZE_SUFFIX) && changedKey.substr(dotPos) == SIZE_SUFFIX)) {
260         assetKey = changedKey.substr(0, dotPos);
261         return true;
262     }
263     return false;
264 }
265 
SetAssetChangeCallBack(const AssetChangeCallback & assetChangeCallback)266 void WatcherProxy::SetAssetChangeCallBack(const AssetChangeCallback &assetChangeCallback)
267 {
268     assetChangeCallback_ = assetChangeCallback;
269 }
270 
GetInstance(const std::string & bundleName)271 DistributedObjectStore *DistributedObjectStore::GetInstance(const std::string &bundleName)
272 {
273     static std::mutex instLock_;
274     static DistributedObjectStore *instPtr = nullptr;
275     if (instPtr == nullptr) {
276         std::lock_guard<std::mutex> lock(instLock_);
277         if (instPtr == nullptr && !bundleName.empty()) {
278             RadarReporter::ReportStateStart(std::string(__FUNCTION__), CREATE, INIT_STORE, IDLE, START, bundleName);
279             LOG_INFO("new objectstore %{public}s", bundleName.c_str());
280             FlatObjectStore *flatObjectStore = new (std::nothrow) FlatObjectStore(bundleName);
281             if (flatObjectStore == nullptr) {
282                 LOG_ERROR("no memory for FlatObjectStore malloc!");
283                 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
284                     RADAR_FAILED, NO_MEMORY, FINISHED);
285                 return nullptr;
286             }
287             // Use instMemory to make sure this singleton not free before other object.
288             // This operation needn't to malloc memory, we needn't to check nullptr.
289             instPtr = new (std::nothrow) DistributedObjectStoreImpl(flatObjectStore);
290             if (instPtr == nullptr) {
291                 delete flatObjectStore;
292                 LOG_ERROR("no memory for DistributedObjectStoreImpl malloc!");
293                 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
294                     RADAR_FAILED, NO_MEMORY, FINISHED);
295                 return nullptr;
296             }
297             RadarReporter::ReportStage(std::string(__FUNCTION__), CREATE, INIT_STORE, RADAR_SUCCESS);
298         }
299     }
300     return instPtr;
301 }
302 
OnChanged(const std::string & sessionId,const std::string & networkId,const std::string & onlineStatus)303 void StatusNotifierProxy::OnChanged(
304     const std::string &sessionId, const std::string &networkId, const std::string &onlineStatus)
305 {
306     if (notifier != nullptr) {
307         notifier->OnChanged(sessionId, networkId, onlineStatus);
308     }
309 }
310 
StatusNotifierProxy(const std::shared_ptr<StatusNotifier> & notifier)311 StatusNotifierProxy::StatusNotifierProxy(const std::shared_ptr<StatusNotifier> &notifier) : notifier(notifier)
312 {
313 }
314 
~StatusNotifierProxy()315 StatusNotifierProxy::~StatusNotifierProxy()
316 {
317     LOG_ERROR("destroy");
318     notifier = nullptr;
319 }
320 
OnChanged(const std::string & sessionId,int32_t progress)321 void ProgressNotifierProxy::OnChanged(const std::string &sessionId, int32_t progress)
322 {
323     if (notifier != nullptr) {
324         notifier->OnChanged(sessionId, progress);
325     }
326 }
327 
ProgressNotifierProxy(const std::shared_ptr<ProgressNotifier> & notifier)328 ProgressNotifierProxy::ProgressNotifierProxy(const std::shared_ptr<ProgressNotifier> &notifier) : notifier(notifier)
329 {
330 }
331 
~ProgressNotifierProxy()332 ProgressNotifierProxy::~ProgressNotifierProxy()
333 {
334     notifier = nullptr;
335 }
336 } // namespace OHOS::ObjectStore
337