• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <unordered_set>
17 
18 #include "hitrace.h"
19 #include "distributed_object_impl.h"
20 #include "distributed_objectstore_impl.h"
21 #include "objectstore_errors.h"
22 #include "string_utils.h"
23 #include "asset_change_timer.h"
24 #include "object_radar_reporter.h"
25 
26 namespace OHOS::ObjectStore {
DistributedObjectStoreImpl(FlatObjectStore * flatObjectStore)27 DistributedObjectStoreImpl::DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore)
28     : flatObjectStore_(flatObjectStore)
29 {
30 }
31 
~DistributedObjectStoreImpl()32 DistributedObjectStoreImpl::~DistributedObjectStoreImpl()
33 {
34     delete flatObjectStore_;
35 }
36 
CacheObject(const std::string & sessionId,FlatObjectStore * flatObjectStore)37 DistributedObject *DistributedObjectStoreImpl::CacheObject(
38     const std::string &sessionId, FlatObjectStore *flatObjectStore)
39 {
40     DistributedObjectImpl *object = new (std::nothrow) DistributedObjectImpl(sessionId, flatObjectStore);
41     if (object == nullptr) {
42         return nullptr;
43     }
44     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
45     objects_.push_back(object);
46     return object;
47 }
48 
RemoveCacheObject(const std::string & sessionId)49 void DistributedObjectStoreImpl::RemoveCacheObject(const std::string &sessionId)
50 {
51     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
52     auto iter = objects_.begin();
53     while (iter != objects_.end()) {
54         if ((*iter)->GetSessionId() == sessionId) {
55             delete *iter;
56             iter = objects_.erase(iter);
57         } else {
58             iter++;
59         }
60     }
61     return;
62 }
63 
CreateObject(const std::string & sessionId)64 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId)
65 {
66     DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
67     if (flatObjectStore_ == nullptr) {
68         LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
69         return nullptr;
70     }
71 
72     if (sessionId.empty()) {
73         LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
74         return nullptr;
75     }
76 
77     uint32_t status = flatObjectStore_->CreateObject(sessionId);
78     if (status != SUCCESS) {
79         LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
80         return nullptr;
81     }
82     return CacheObject(sessionId, flatObjectStore_);
83 }
84 
CreateObject(const std::string & sessionId,uint32_t & status)85 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId, uint32_t &status)
86 {
87     DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
88     if (flatObjectStore_ == nullptr) {
89         LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
90         status = ERR_NULL_OBJECTSTORE;
91         return nullptr;
92     }
93 
94     if (sessionId.empty()) {
95         LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
96         status = ERR_INVALID_ARGS;
97         return nullptr;
98     }
99 
100     status = flatObjectStore_->CreateObject(sessionId);
101     if (status != SUCCESS) {
102         LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
103         return nullptr;
104     }
105     return CacheObject(sessionId, flatObjectStore_);
106 }
107 
DeleteObject(const std::string & sessionId)108 uint32_t DistributedObjectStoreImpl::DeleteObject(const std::string &sessionId)
109 {
110     DataObjectHiTrace trace("DistributedObjectStoreImpl::DeleteObject");
111     if (flatObjectStore_ == nullptr) {
112         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
113         return ERR_NULL_OBJECTSTORE;
114     }
115     uint32_t status = flatObjectStore_->Delete(sessionId);
116     if (status != SUCCESS) {
117         LOG_ERROR("DistributedObjectStoreImpl::DeleteObject store delete err %{public}d", status);
118         return status;
119     }
120     RemoveCacheObject(sessionId);
121     return SUCCESS;
122 }
123 
Get(const std::string & sessionId,DistributedObject ** object)124 uint32_t DistributedObjectStoreImpl::Get(const std::string &sessionId, DistributedObject **object)
125 {
126     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
127     auto iter = objects_.begin();
128     while (iter != objects_.end()) {
129         if ((*iter)->GetSessionId() == sessionId) {
130             *object = *iter;
131             return SUCCESS;
132         }
133         iter++;
134     }
135     LOG_ERROR("DistributedObjectStoreImpl::Get object err, no object");
136     return ERR_GET_OBJECT;
137 }
138 
Watch(DistributedObject * object,std::shared_ptr<ObjectWatcher> watcher)139 uint32_t DistributedObjectStoreImpl::Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> watcher)
140 {
141     if (object == nullptr) {
142         LOG_ERROR("Watch sync object err ");
143         return ERR_NULL_OBJECT;
144     }
145     if (flatObjectStore_ == nullptr) {
146         LOG_ERROR("Watch sync flatObjectStore err ");
147         return ERR_NULL_OBJECTSTORE;
148     }
149     std::lock_guard<std::mutex> lock(watchersLock_);
150     if (watchers_.count(object) != 0) {
151         LOG_ERROR("DistributedObjectStoreImpl::Watch already gets object");
152         return ERR_EXIST;
153     }
154     std::shared_ptr<WatcherProxy> watcherProxy = std::make_shared<WatcherProxy>(watcher, object->GetSessionId());
155     watcherProxy->SetAssetChangeCallBack(
156         [=](const std::string &sessionId, const std::string &assetKey, std::shared_ptr<ObjectWatcher> objectWatcher) {
157             AssetChangeTimer *assetChangeTimer = AssetChangeTimer::GetInstance(flatObjectStore_);
158             assetChangeTimer->OnAssetChanged(sessionId, assetKey, objectWatcher);
159         });
160     uint32_t status = flatObjectStore_->Watch(object->GetSessionId(), watcherProxy);
161     if (status != SUCCESS) {
162         LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
163         return status;
164     }
165     watchers_.insert_or_assign(object, watcherProxy);
166     LOG_INFO("DistributedObjectStoreImpl:Watch object success.");
167     return SUCCESS;
168 }
169 
UnWatch(DistributedObject * object)170 uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
171 {
172     if (object == nullptr) {
173         LOG_ERROR("UnWatch sync object err ");
174         return ERR_NULL_OBJECT;
175     }
176     if (flatObjectStore_ == nullptr) {
177         LOG_ERROR("UnWatch sync flatObjectStore err ");
178         return ERR_NULL_OBJECTSTORE;
179     }
180     uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
181     if (status != SUCCESS) {
182         LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
183         return status;
184     }
185     std::lock_guard<std::mutex> lock(watchersLock_);
186     watchers_.erase(object);
187     LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
188     return SUCCESS;
189 }
190 
SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)191 uint32_t DistributedObjectStoreImpl::SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)
192 {
193     if (flatObjectStore_ == nullptr) {
194         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
195         return ERR_NULL_OBJECTSTORE;
196     }
197     std::shared_ptr<StatusNotifierProxy> watcherProxy = std::make_shared<StatusNotifierProxy>(notifier);
198     return flatObjectStore_->SetStatusNotifier(watcherProxy);
199 }
200 
NotifyCachedStatus(const std::string & sessionId)201 void DistributedObjectStoreImpl::NotifyCachedStatus(const std::string &sessionId)
202 {
203     flatObjectStore_->CheckRetrieveCache(sessionId);
204 }
205 
WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher,const std::string & sessionId)206 WatcherProxy::WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId)
207     : FlatObjectWatcher(sessionId), objectWatcher_(objectWatcher)
208 {
209 }
210 
OnChanged(const std::string & sessionId,const std::vector<std::string> & changedData,bool enableTransfer)211 void WatcherProxy::OnChanged(
212     const std::string &sessionId, const std::vector<std::string> &changedData, bool enableTransfer)
213 {
214     std::unordered_set<std::string> transferKeys;
215     std::vector<std::string> otherKeys;
216     for (const auto &str : changedData) {
217         if (str.find(ASSET_DOT) == std::string::npos) {
218             if (str != DEVICEID_KEY) {
219                 otherKeys.push_back(str);
220             }
221         } else {
222             std::string assetKey;
223             if (FindChangedAssetKey(str, assetKey)) {
224                 transferKeys.insert(assetKey);
225             }
226         }
227     }
228     if (!enableTransfer) {
229         otherKeys.insert(otherKeys.end(), transferKeys.begin(), transferKeys.end());
230     } else if (assetChangeCallback_ != nullptr && !transferKeys.empty()) {
231         for (auto &assetKey : transferKeys) {
232             assetChangeCallback_(sessionId, assetKey, objectWatcher_);
233         }
234     }
235     if (!otherKeys.empty()) {
236         objectWatcher_->OnChanged(sessionId, otherKeys);
237     }
238 }
239 
FindChangedAssetKey(const std::string & changedKey,std::string & assetKey)240 bool WatcherProxy::FindChangedAssetKey(const std::string &changedKey, std::string &assetKey)
241 {
242     std::size_t dotPos = changedKey.find(ASSET_DOT);
243     if ((changedKey.size() > strlen(MODIFY_TIME_SUFFIX) && changedKey.substr(dotPos) == MODIFY_TIME_SUFFIX) ||
244             (changedKey.size() > strlen(SIZE_SUFFIX) && changedKey.substr(dotPos) == SIZE_SUFFIX)) {
245         assetKey = changedKey.substr(0, dotPos);
246         return true;
247     }
248     return false;
249 }
250 
SetAssetChangeCallBack(const AssetChangeCallback & assetChangeCallback)251 void WatcherProxy::SetAssetChangeCallBack(const AssetChangeCallback &assetChangeCallback)
252 {
253     assetChangeCallback_ = assetChangeCallback;
254 }
255 
GetInstance(const std::string & bundleName)256 DistributedObjectStore *DistributedObjectStore::GetInstance(const std::string &bundleName)
257 {
258     static std::mutex instLock_;
259     static DistributedObjectStore *instPtr = nullptr;
260     if (instPtr == nullptr) {
261         std::lock_guard<std::mutex> lock(instLock_);
262         if (instPtr == nullptr && !bundleName.empty()) {
263             RadarReporter::ReportStateStart(std::string(__FUNCTION__), CREATE, INIT_STORE, IDLE, START, bundleName);
264             LOG_INFO("new objectstore %{public}s", bundleName.c_str());
265             FlatObjectStore *flatObjectStore = new (std::nothrow) FlatObjectStore(bundleName);
266             if (flatObjectStore == nullptr) {
267                 LOG_ERROR("no memory for FlatObjectStore malloc!");
268                 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
269                     RADAR_FAILED, NO_MEMORY, FINISHED);
270                 return nullptr;
271             }
272             // Use instMemory to make sure this singleton not free before other object.
273             // This operation needn't to malloc memory, we needn't to check nullptr.
274             instPtr = new (std::nothrow) DistributedObjectStoreImpl(flatObjectStore);
275             if (instPtr == nullptr) {
276                 delete flatObjectStore;
277                 LOG_ERROR("no memory for DistributedObjectStoreImpl malloc!");
278                 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
279                     RADAR_FAILED, NO_MEMORY, FINISHED);
280                 return nullptr;
281             }
282             RadarReporter::ReportStage(std::string(__FUNCTION__), CREATE, INIT_STORE, RADAR_SUCCESS);
283         }
284     }
285     return instPtr;
286 }
287 
OnChanged(const std::string & sessionId,const std::string & networkId,const std::string & onlineStatus)288 void StatusNotifierProxy::OnChanged(
289     const std::string &sessionId, const std::string &networkId, const std::string &onlineStatus)
290 {
291     if (notifier != nullptr) {
292         notifier->OnChanged(sessionId, networkId, onlineStatus);
293     }
294 }
295 
StatusNotifierProxy(const std::shared_ptr<StatusNotifier> & notifier)296 StatusNotifierProxy::StatusNotifierProxy(const std::shared_ptr<StatusNotifier> &notifier) : notifier(notifier)
297 {
298 }
299 
~StatusNotifierProxy()300 StatusNotifierProxy::~StatusNotifierProxy()
301 {
302     LOG_ERROR("destroy");
303     notifier = nullptr;
304 }
305 } // namespace OHOS::ObjectStore
306