1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <thread>
17 #include <unordered_set>
18
19 #include "hitrace.h"
20 #include "distributed_object_impl.h"
21 #include "distributed_objectstore_impl.h"
22 #include "objectstore_errors.h"
23 #include "softbus_adapter.h"
24 #include "string_utils.h"
25 #include "asset_change_timer.h"
26
27 namespace OHOS::ObjectStore {
DistributedObjectStoreImpl(FlatObjectStore * flatObjectStore)28 DistributedObjectStoreImpl::DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore)
29 : flatObjectStore_(flatObjectStore)
30 {
31 }
32
~DistributedObjectStoreImpl()33 DistributedObjectStoreImpl::~DistributedObjectStoreImpl()
34 {
35 delete flatObjectStore_;
36 }
37
CacheObject(const std::string & sessionId,FlatObjectStore * flatObjectStore)38 DistributedObject *DistributedObjectStoreImpl::CacheObject(
39 const std::string &sessionId, FlatObjectStore *flatObjectStore)
40 {
41 DistributedObjectImpl *object = new (std::nothrow) DistributedObjectImpl(sessionId, flatObjectStore);
42 if (object == nullptr) {
43 return nullptr;
44 }
45 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
46 objects_.push_back(object);
47 return object;
48 }
49
RemoveCacheObject(const std::string & sessionId)50 void DistributedObjectStoreImpl::RemoveCacheObject(const std::string &sessionId)
51 {
52 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
53 auto iter = objects_.begin();
54 while (iter != objects_.end()) {
55 if ((*iter)->GetSessionId() == sessionId) {
56 delete *iter;
57 iter = objects_.erase(iter);
58 } else {
59 iter++;
60 }
61 }
62 return;
63 }
64
CreateObject(const std::string & sessionId)65 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId)
66 {
67 DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
68 if (flatObjectStore_ == nullptr) {
69 LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
70 return nullptr;
71 }
72
73 if (sessionId.empty()) {
74 LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
75 return nullptr;
76 }
77
78 uint32_t status = flatObjectStore_->CreateObject(sessionId);
79 if (status != SUCCESS) {
80 LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
81 return nullptr;
82 }
83 return CacheObject(sessionId, flatObjectStore_);
84 }
85
CreateObject(const std::string & sessionId,uint32_t & status)86 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId, uint32_t &status)
87 {
88 DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
89 if (flatObjectStore_ == nullptr) {
90 LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
91 status = ERR_NULL_OBJECTSTORE;
92 return nullptr;
93 }
94
95 if (sessionId.empty()) {
96 LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
97 status = ERR_INVALID_ARGS;
98 return nullptr;
99 }
100
101 status = flatObjectStore_->CreateObject(sessionId);
102 if (status != SUCCESS) {
103 LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
104 return nullptr;
105 }
106 return CacheObject(sessionId, flatObjectStore_);
107 }
108
DeleteObject(const std::string & sessionId)109 uint32_t DistributedObjectStoreImpl::DeleteObject(const std::string &sessionId)
110 {
111 DataObjectHiTrace trace("DistributedObjectStoreImpl::DeleteObject");
112 if (flatObjectStore_ == nullptr) {
113 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
114 return ERR_NULL_OBJECTSTORE;
115 }
116 uint32_t status = flatObjectStore_->Delete(sessionId);
117 if (status != SUCCESS) {
118 LOG_ERROR("DistributedObjectStoreImpl::DeleteObject store delete err %{public}d", status);
119 return status;
120 }
121 RemoveCacheObject(sessionId);
122 return SUCCESS;
123 }
124
Get(const std::string & sessionId,DistributedObject ** object)125 uint32_t DistributedObjectStoreImpl::Get(const std::string &sessionId, DistributedObject **object)
126 {
127 auto iter = objects_.begin();
128 while (iter != objects_.end()) {
129 if ((*iter)->GetSessionId() == sessionId) {
130 *object = *iter;
131 return SUCCESS;
132 }
133 iter++;
134 }
135 LOG_ERROR("DistributedObjectStoreImpl::Get object err, no object");
136 return ERR_GET_OBJECT;
137 }
138
Watch(DistributedObject * object,std::shared_ptr<ObjectWatcher> watcher)139 uint32_t DistributedObjectStoreImpl::Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> watcher)
140 {
141 if (object == nullptr) {
142 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
143 return ERR_NULL_OBJECT;
144 }
145 if (flatObjectStore_ == nullptr) {
146 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
147 return ERR_NULL_OBJECTSTORE;
148 }
149 std::lock_guard<std::mutex> lock(watchersLock_);
150 if (watchers_.count(object) != 0) {
151 LOG_ERROR("DistributedObjectStoreImpl::Watch already gets object");
152 return ERR_EXIST;
153 }
154 std::shared_ptr<WatcherProxy> watcherProxy = std::make_shared<WatcherProxy>(watcher, object->GetSessionId());
155 watcherProxy->SetAssetChangeCallBack(
156 [=](const std::string &sessionId, const std::string &assetKey, std::shared_ptr<ObjectWatcher> objectWatcher) {
157 AssetChangeTimer *assetChangeTimer = AssetChangeTimer::GetInstance(flatObjectStore_);
158 assetChangeTimer->OnAssetChanged(sessionId, assetKey, objectWatcher);
159 });
160 uint32_t status = flatObjectStore_->Watch(object->GetSessionId(), watcherProxy);
161 if (status != SUCCESS) {
162 LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
163 return status;
164 }
165 watchers_.insert_or_assign(object, watcherProxy);
166 LOG_INFO("DistributedObjectStoreImpl:Watch object success.");
167 return SUCCESS;
168 }
169
UnWatch(DistributedObject * object)170 uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
171 {
172 if (object == nullptr) {
173 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
174 return ERR_NULL_OBJECT;
175 }
176 if (flatObjectStore_ == nullptr) {
177 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
178 return ERR_NULL_OBJECTSTORE;
179 }
180 uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
181 if (status != SUCCESS) {
182 LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
183 return status;
184 }
185 std::lock_guard<std::mutex> lock(watchersLock_);
186 watchers_.erase(object);
187 LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
188 return SUCCESS;
189 }
190
SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)191 uint32_t DistributedObjectStoreImpl::SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)
192 {
193 if (flatObjectStore_ == nullptr) {
194 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
195 return ERR_NULL_OBJECTSTORE;
196 }
197 std::shared_ptr<StatusNotifierProxy> watcherProxy = std::make_shared<StatusNotifierProxy>(notifier);
198 return flatObjectStore_->SetStatusNotifier(watcherProxy);
199 }
200
NotifyCachedStatus(const std::string & sessionId)201 void DistributedObjectStoreImpl::NotifyCachedStatus(const std::string &sessionId)
202 {
203 flatObjectStore_->CheckRetrieveCache(sessionId);
204 }
205
WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher,const std::string & sessionId)206 WatcherProxy::WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId)
207 : FlatObjectWatcher(sessionId), objectWatcher_(objectWatcher)
208 {
209 }
210
OnChanged(const std::string & sessionId,const std::vector<std::string> & changedData,bool enableTransfer)211 void WatcherProxy::OnChanged(
212 const std::string &sessionId, const std::vector<std::string> &changedData, bool enableTransfer)
213 {
214 std::unordered_set<std::string> transferKeys;
215 std::vector<std::string> otherKeys;
216 for (const auto &str : changedData) {
217 if (!enableTransfer || str.find(ASSET_DOT) == std::string::npos) {
218 if (str != DEVICEID_KEY) {
219 otherKeys.push_back(str);
220 }
221 } else {
222 std::string assetKey;
223 if (FindChangedAssetKey(str, assetKey)) {
224 transferKeys.insert(assetKey);
225 }
226 }
227 }
228 if (!otherKeys.empty()) {
229 objectWatcher_->OnChanged(sessionId, otherKeys);
230 }
231 if (assetChangeCallback_ != nullptr && !transferKeys.empty()) {
232 for (auto &assetKey : transferKeys) {
233 assetChangeCallback_(sessionId, assetKey, objectWatcher_);
234 }
235 }
236 }
237
FindChangedAssetKey(const std::string & changedKey,std::string & assetKey)238 bool WatcherProxy::FindChangedAssetKey(const std::string &changedKey, std::string &assetKey)
239 {
240 std::size_t dotPos = changedKey.find(ASSET_DOT);
241 if ((changedKey.size() > MODIFY_TIME_SUFFIX.length() && changedKey.substr(dotPos) == MODIFY_TIME_SUFFIX) ||
242 (changedKey.size() > SIZE_SUFFIX.length() && changedKey.substr(dotPos) == SIZE_SUFFIX)) {
243 assetKey = changedKey.substr(0, dotPos);
244 return true;
245 }
246 return false;
247 }
248
SetAssetChangeCallBack(const AssetChangeCallback & assetChangeCallback)249 void WatcherProxy::SetAssetChangeCallBack(const AssetChangeCallback &assetChangeCallback)
250 {
251 assetChangeCallback_ = assetChangeCallback;
252 }
253
GetInstance(const std::string & bundleName)254 DistributedObjectStore *DistributedObjectStore::GetInstance(const std::string &bundleName)
255 {
256 static std::mutex instLock_;
257 static DistributedObjectStore *instPtr = nullptr;
258 if (instPtr == nullptr) {
259 std::lock_guard<std::mutex> lock(instLock_);
260 if (instPtr == nullptr && !bundleName.empty()) {
261 LOG_INFO("new objectstore %{public}s", bundleName.c_str());
262 FlatObjectStore *flatObjectStore = new (std::nothrow) FlatObjectStore(bundleName);
263 if (flatObjectStore == nullptr) {
264 LOG_ERROR("no memory for FlatObjectStore malloc!");
265 return nullptr;
266 }
267 // Use instMemory to make sure this singleton not free before other object.
268 // This operation needn't to malloc memory, we needn't to check nullptr.
269 instPtr = new (std::nothrow) DistributedObjectStoreImpl(flatObjectStore);
270 if (instPtr == nullptr) {
271 delete flatObjectStore;
272 LOG_ERROR("no memory for DistributedObjectStoreImpl malloc!");
273 return nullptr;
274 }
275 }
276 }
277 return instPtr;
278 }
279
OnChanged(const std::string & sessionId,const std::string & networkId,const std::string & onlineStatus)280 void StatusNotifierProxy::OnChanged(
281 const std::string &sessionId, const std::string &networkId, const std::string &onlineStatus)
282 {
283 if (notifier != nullptr) {
284 notifier->OnChanged(sessionId, networkId, onlineStatus);
285 }
286 }
287
StatusNotifierProxy(const std::shared_ptr<StatusNotifier> & notifier)288 StatusNotifierProxy::StatusNotifierProxy(const std::shared_ptr<StatusNotifier> ¬ifier) : notifier(notifier)
289 {
290 }
291
~StatusNotifierProxy()292 StatusNotifierProxy::~StatusNotifierProxy()
293 {
294 LOG_ERROR("destroy");
295 notifier = nullptr;
296 }
297 } // namespace OHOS::ObjectStore
298