• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <thread>
17 #include <unordered_set>
18 
19 #include "hitrace.h"
20 #include "distributed_object_impl.h"
21 #include "distributed_objectstore_impl.h"
22 #include "objectstore_errors.h"
23 #include "softbus_adapter.h"
24 #include "string_utils.h"
25 #include "asset_change_timer.h"
26 
27 namespace OHOS::ObjectStore {
DistributedObjectStoreImpl(FlatObjectStore * flatObjectStore)28 DistributedObjectStoreImpl::DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore)
29     : flatObjectStore_(flatObjectStore)
30 {
31 }
32 
~DistributedObjectStoreImpl()33 DistributedObjectStoreImpl::~DistributedObjectStoreImpl()
34 {
35     delete flatObjectStore_;
36 }
37 
CacheObject(const std::string & sessionId,FlatObjectStore * flatObjectStore)38 DistributedObject *DistributedObjectStoreImpl::CacheObject(
39     const std::string &sessionId, FlatObjectStore *flatObjectStore)
40 {
41     DistributedObjectImpl *object = new (std::nothrow) DistributedObjectImpl(sessionId, flatObjectStore);
42     if (object == nullptr) {
43         return nullptr;
44     }
45     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
46     objects_.push_back(object);
47     return object;
48 }
49 
RemoveCacheObject(const std::string & sessionId)50 void DistributedObjectStoreImpl::RemoveCacheObject(const std::string &sessionId)
51 {
52     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
53     auto iter = objects_.begin();
54     while (iter != objects_.end()) {
55         if ((*iter)->GetSessionId() == sessionId) {
56             delete *iter;
57             iter = objects_.erase(iter);
58         } else {
59             iter++;
60         }
61     }
62     return;
63 }
64 
CreateObject(const std::string & sessionId)65 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId)
66 {
67     DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
68     if (flatObjectStore_ == nullptr) {
69         LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
70         return nullptr;
71     }
72 
73     if (sessionId.empty()) {
74         LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
75         return nullptr;
76     }
77 
78     uint32_t status = flatObjectStore_->CreateObject(sessionId);
79     if (status != SUCCESS) {
80         LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
81         return nullptr;
82     }
83     return CacheObject(sessionId, flatObjectStore_);
84 }
85 
CreateObject(const std::string & sessionId,uint32_t & status)86 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId, uint32_t &status)
87 {
88     DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
89     if (flatObjectStore_ == nullptr) {
90         LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
91         status = ERR_NULL_OBJECTSTORE;
92         return nullptr;
93     }
94 
95     if (sessionId.empty()) {
96         LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
97         status = ERR_INVALID_ARGS;
98         return nullptr;
99     }
100 
101     status = flatObjectStore_->CreateObject(sessionId);
102     if (status != SUCCESS) {
103         LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
104         return nullptr;
105     }
106     return CacheObject(sessionId, flatObjectStore_);
107 }
108 
DeleteObject(const std::string & sessionId)109 uint32_t DistributedObjectStoreImpl::DeleteObject(const std::string &sessionId)
110 {
111     DataObjectHiTrace trace("DistributedObjectStoreImpl::DeleteObject");
112     if (flatObjectStore_ == nullptr) {
113         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
114         return ERR_NULL_OBJECTSTORE;
115     }
116     uint32_t status = flatObjectStore_->Delete(sessionId);
117     if (status != SUCCESS) {
118         LOG_ERROR("DistributedObjectStoreImpl::DeleteObject store delete err %{public}d", status);
119         return status;
120     }
121     RemoveCacheObject(sessionId);
122     return SUCCESS;
123 }
124 
Get(const std::string & sessionId,DistributedObject ** object)125 uint32_t DistributedObjectStoreImpl::Get(const std::string &sessionId, DistributedObject **object)
126 {
127     auto iter = objects_.begin();
128     while (iter != objects_.end()) {
129         if ((*iter)->GetSessionId() == sessionId) {
130             *object = *iter;
131             return SUCCESS;
132         }
133         iter++;
134     }
135     LOG_ERROR("DistributedObjectStoreImpl::Get object err, no object");
136     return ERR_GET_OBJECT;
137 }
138 
Watch(DistributedObject * object,std::shared_ptr<ObjectWatcher> watcher)139 uint32_t DistributedObjectStoreImpl::Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> watcher)
140 {
141     if (object == nullptr) {
142         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
143         return ERR_NULL_OBJECT;
144     }
145     if (flatObjectStore_ == nullptr) {
146         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
147         return ERR_NULL_OBJECTSTORE;
148     }
149     std::lock_guard<std::mutex> lock(watchersLock_);
150     if (watchers_.count(object) != 0) {
151         LOG_ERROR("DistributedObjectStoreImpl::Watch already gets object");
152         return ERR_EXIST;
153     }
154     std::shared_ptr<WatcherProxy> watcherProxy = std::make_shared<WatcherProxy>(watcher, object->GetSessionId());
155     watcherProxy->SetAssetChangeCallBack(
156         [=](const std::string &sessionId, const std::string &assetKey, std::shared_ptr<ObjectWatcher> objectWatcher) {
157             AssetChangeTimer *assetChangeTimer = AssetChangeTimer::GetInstance(flatObjectStore_);
158             assetChangeTimer->OnAssetChanged(sessionId, assetKey, objectWatcher);
159         });
160     uint32_t status = flatObjectStore_->Watch(object->GetSessionId(), watcherProxy);
161     if (status != SUCCESS) {
162         LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
163         return status;
164     }
165     watchers_.insert_or_assign(object, watcherProxy);
166     LOG_INFO("DistributedObjectStoreImpl:Watch object success.");
167     return SUCCESS;
168 }
169 
UnWatch(DistributedObject * object)170 uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
171 {
172     if (object == nullptr) {
173         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
174         return ERR_NULL_OBJECT;
175     }
176     if (flatObjectStore_ == nullptr) {
177         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
178         return ERR_NULL_OBJECTSTORE;
179     }
180     uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
181     if (status != SUCCESS) {
182         LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
183         return status;
184     }
185     std::lock_guard<std::mutex> lock(watchersLock_);
186     watchers_.erase(object);
187     LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
188     return SUCCESS;
189 }
190 
SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)191 uint32_t DistributedObjectStoreImpl::SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)
192 {
193     if (flatObjectStore_ == nullptr) {
194         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
195         return ERR_NULL_OBJECTSTORE;
196     }
197     std::shared_ptr<StatusNotifierProxy> watcherProxy = std::make_shared<StatusNotifierProxy>(notifier);
198     return flatObjectStore_->SetStatusNotifier(watcherProxy);
199 }
200 
NotifyCachedStatus(const std::string & sessionId)201 void DistributedObjectStoreImpl::NotifyCachedStatus(const std::string &sessionId)
202 {
203     flatObjectStore_->CheckRetrieveCache(sessionId);
204 }
205 
WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher,const std::string & sessionId)206 WatcherProxy::WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId)
207     : FlatObjectWatcher(sessionId), objectWatcher_(objectWatcher)
208 {
209 }
210 
OnChanged(const std::string & sessionId,const std::vector<std::string> & changedData,bool enableTransfer)211 void WatcherProxy::OnChanged(
212     const std::string &sessionId, const std::vector<std::string> &changedData, bool enableTransfer)
213 {
214     std::unordered_set<std::string> transferKeys;
215     std::vector<std::string> otherKeys;
216     for (const auto &str : changedData) {
217         if (!enableTransfer || str.find(ASSET_DOT) == std::string::npos) {
218             if (str != DEVICEID_KEY) {
219                 otherKeys.push_back(str);
220             }
221         } else {
222             std::string assetKey;
223             if (FindChangedAssetKey(str, assetKey)) {
224                 transferKeys.insert(assetKey);
225             }
226         }
227     }
228     if (!otherKeys.empty()) {
229         objectWatcher_->OnChanged(sessionId, otherKeys);
230     }
231     if (assetChangeCallback_ != nullptr && !transferKeys.empty()) {
232         for (auto &assetKey : transferKeys) {
233             assetChangeCallback_(sessionId, assetKey, objectWatcher_);
234         }
235     }
236 }
237 
FindChangedAssetKey(const std::string & changedKey,std::string & assetKey)238 bool WatcherProxy::FindChangedAssetKey(const std::string &changedKey, std::string &assetKey)
239 {
240     std::size_t dotPos = changedKey.find(ASSET_DOT);
241     if ((changedKey.size() > MODIFY_TIME_SUFFIX.length() && changedKey.substr(dotPos) == MODIFY_TIME_SUFFIX) ||
242             (changedKey.size() > SIZE_SUFFIX.length() && changedKey.substr(dotPos) == SIZE_SUFFIX)) {
243         assetKey = changedKey.substr(0, dotPos);
244         return true;
245     }
246     return false;
247 }
248 
SetAssetChangeCallBack(const AssetChangeCallback & assetChangeCallback)249 void WatcherProxy::SetAssetChangeCallBack(const AssetChangeCallback &assetChangeCallback)
250 {
251     assetChangeCallback_ = assetChangeCallback;
252 }
253 
GetInstance(const std::string & bundleName)254 DistributedObjectStore *DistributedObjectStore::GetInstance(const std::string &bundleName)
255 {
256     static std::mutex instLock_;
257     static DistributedObjectStore *instPtr = nullptr;
258     if (instPtr == nullptr) {
259         std::lock_guard<std::mutex> lock(instLock_);
260         if (instPtr == nullptr && !bundleName.empty()) {
261             LOG_INFO("new objectstore %{public}s", bundleName.c_str());
262             FlatObjectStore *flatObjectStore = new (std::nothrow) FlatObjectStore(bundleName);
263             if (flatObjectStore == nullptr) {
264                 LOG_ERROR("no memory for FlatObjectStore malloc!");
265                 return nullptr;
266             }
267             // Use instMemory to make sure this singleton not free before other object.
268             // This operation needn't to malloc memory, we needn't to check nullptr.
269             instPtr = new (std::nothrow) DistributedObjectStoreImpl(flatObjectStore);
270             if (instPtr == nullptr) {
271                 delete flatObjectStore;
272                 LOG_ERROR("no memory for DistributedObjectStoreImpl malloc!");
273                 return nullptr;
274             }
275         }
276     }
277     return instPtr;
278 }
279 
OnChanged(const std::string & sessionId,const std::string & networkId,const std::string & onlineStatus)280 void StatusNotifierProxy::OnChanged(
281     const std::string &sessionId, const std::string &networkId, const std::string &onlineStatus)
282 {
283     if (notifier != nullptr) {
284         notifier->OnChanged(sessionId, networkId, onlineStatus);
285     }
286 }
287 
StatusNotifierProxy(const std::shared_ptr<StatusNotifier> & notifier)288 StatusNotifierProxy::StatusNotifierProxy(const std::shared_ptr<StatusNotifier> &notifier) : notifier(notifier)
289 {
290 }
291 
~StatusNotifierProxy()292 StatusNotifierProxy::~StatusNotifierProxy()
293 {
294     LOG_ERROR("destroy");
295     notifier = nullptr;
296 }
297 } // namespace OHOS::ObjectStore
298