1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <unordered_set>
17
18 #include "hitrace.h"
19 #include "distributed_object_impl.h"
20 #include "distributed_objectstore_impl.h"
21 #include "objectstore_errors.h"
22 #include "string_utils.h"
23 #include "asset_change_timer.h"
24 #include "object_radar_reporter.h"
25
26 namespace OHOS::ObjectStore {
DistributedObjectStoreImpl(FlatObjectStore * flatObjectStore)27 DistributedObjectStoreImpl::DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore)
28 : flatObjectStore_(flatObjectStore)
29 {
30 }
31
~DistributedObjectStoreImpl()32 DistributedObjectStoreImpl::~DistributedObjectStoreImpl()
33 {
34 delete flatObjectStore_;
35 }
36
CacheObject(const std::string & sessionId,FlatObjectStore * flatObjectStore)37 DistributedObject *DistributedObjectStoreImpl::CacheObject(
38 const std::string &sessionId, FlatObjectStore *flatObjectStore)
39 {
40 DistributedObjectImpl *object = new (std::nothrow) DistributedObjectImpl(sessionId, flatObjectStore);
41 if (object == nullptr) {
42 return nullptr;
43 }
44 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
45 objects_.push_back(object);
46 return object;
47 }
48
RemoveCacheObject(const std::string & sessionId)49 void DistributedObjectStoreImpl::RemoveCacheObject(const std::string &sessionId)
50 {
51 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
52 auto iter = objects_.begin();
53 while (iter != objects_.end()) {
54 if ((*iter)->GetSessionId() == sessionId) {
55 delete *iter;
56 iter = objects_.erase(iter);
57 } else {
58 iter++;
59 }
60 }
61 return;
62 }
63
CreateObject(const std::string & sessionId)64 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId)
65 {
66 DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
67 if (flatObjectStore_ == nullptr) {
68 LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
69 return nullptr;
70 }
71
72 if (sessionId.empty()) {
73 LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
74 return nullptr;
75 }
76
77 uint32_t status = flatObjectStore_->CreateObject(sessionId);
78 if (status != SUCCESS) {
79 LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
80 return nullptr;
81 }
82 return CacheObject(sessionId, flatObjectStore_);
83 }
84
CreateObject(const std::string & sessionId,uint32_t & status)85 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId, uint32_t &status)
86 {
87 DataObjectHiTrace trace("DistributedObjectStoreImpl::CreateObject");
88 if (flatObjectStore_ == nullptr) {
89 LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
90 status = ERR_NULL_OBJECTSTORE;
91 return nullptr;
92 }
93
94 if (sessionId.empty()) {
95 LOG_ERROR("DistributedObjectStoreImpl::CreateObject Invalid sessionId");
96 status = ERR_INVALID_ARGS;
97 return nullptr;
98 }
99
100 status = flatObjectStore_->CreateObject(sessionId);
101 if (status != SUCCESS) {
102 LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
103 return nullptr;
104 }
105 return CacheObject(sessionId, flatObjectStore_);
106 }
107
DeleteObject(const std::string & sessionId)108 uint32_t DistributedObjectStoreImpl::DeleteObject(const std::string &sessionId)
109 {
110 DataObjectHiTrace trace("DistributedObjectStoreImpl::DeleteObject");
111 if (flatObjectStore_ == nullptr) {
112 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
113 return ERR_NULL_OBJECTSTORE;
114 }
115 uint32_t status = flatObjectStore_->Delete(sessionId);
116 if (status != SUCCESS) {
117 LOG_ERROR("DistributedObjectStoreImpl::DeleteObject store delete err %{public}d", status);
118 return status;
119 }
120 RemoveCacheObject(sessionId);
121 return SUCCESS;
122 }
123
Get(const std::string & sessionId,DistributedObject ** object)124 uint32_t DistributedObjectStoreImpl::Get(const std::string &sessionId, DistributedObject **object)
125 {
126 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
127 auto iter = objects_.begin();
128 while (iter != objects_.end()) {
129 if ((*iter)->GetSessionId() == sessionId) {
130 *object = *iter;
131 return SUCCESS;
132 }
133 iter++;
134 }
135 LOG_ERROR("DistributedObjectStoreImpl::Get object err, no object");
136 return ERR_GET_OBJECT;
137 }
138
Watch(DistributedObject * object,std::shared_ptr<ObjectWatcher> watcher)139 uint32_t DistributedObjectStoreImpl::Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> watcher)
140 {
141 if (object == nullptr) {
142 LOG_ERROR("Watch sync object err ");
143 return ERR_NULL_OBJECT;
144 }
145 if (flatObjectStore_ == nullptr) {
146 LOG_ERROR("Watch sync flatObjectStore err ");
147 return ERR_NULL_OBJECTSTORE;
148 }
149 std::lock_guard<std::mutex> lock(watchersLock_);
150 if (watchers_.count(object) != 0) {
151 LOG_ERROR("DistributedObjectStoreImpl::Watch already gets object");
152 return ERR_EXIST;
153 }
154 std::shared_ptr<WatcherProxy> watcherProxy = std::make_shared<WatcherProxy>(watcher, object->GetSessionId());
155 watcherProxy->SetAssetChangeCallBack(
156 [=](const std::string &sessionId, const std::string &assetKey, std::shared_ptr<ObjectWatcher> objectWatcher) {
157 AssetChangeTimer *assetChangeTimer = AssetChangeTimer::GetInstance(flatObjectStore_);
158 assetChangeTimer->OnAssetChanged(sessionId, assetKey, objectWatcher);
159 });
160 uint32_t status = flatObjectStore_->Watch(object->GetSessionId(), watcherProxy);
161 if (status != SUCCESS) {
162 LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
163 return status;
164 }
165 watchers_.insert_or_assign(object, watcherProxy);
166 LOG_INFO("DistributedObjectStoreImpl:Watch object success.");
167 return SUCCESS;
168 }
169
UnWatch(DistributedObject * object)170 uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
171 {
172 if (object == nullptr) {
173 LOG_ERROR("UnWatch sync object err ");
174 return ERR_NULL_OBJECT;
175 }
176 if (flatObjectStore_ == nullptr) {
177 LOG_ERROR("UnWatch sync flatObjectStore err ");
178 return ERR_NULL_OBJECTSTORE;
179 }
180 uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
181 if (status != SUCCESS) {
182 LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
183 return status;
184 }
185 std::lock_guard<std::mutex> lock(watchersLock_);
186 watchers_.erase(object);
187 LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
188 return SUCCESS;
189 }
190
SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)191 uint32_t DistributedObjectStoreImpl::SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)
192 {
193 if (flatObjectStore_ == nullptr) {
194 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
195 return ERR_NULL_OBJECTSTORE;
196 }
197 std::shared_ptr<StatusNotifierProxy> watcherProxy = std::make_shared<StatusNotifierProxy>(notifier);
198 return flatObjectStore_->SetStatusNotifier(watcherProxy);
199 }
200
NotifyCachedStatus(const std::string & sessionId)201 void DistributedObjectStoreImpl::NotifyCachedStatus(const std::string &sessionId)
202 {
203 flatObjectStore_->CheckRetrieveCache(sessionId);
204 }
205
WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher,const std::string & sessionId)206 WatcherProxy::WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId)
207 : FlatObjectWatcher(sessionId), objectWatcher_(objectWatcher)
208 {
209 }
210
OnChanged(const std::string & sessionId,const std::vector<std::string> & changedData,bool enableTransfer)211 void WatcherProxy::OnChanged(
212 const std::string &sessionId, const std::vector<std::string> &changedData, bool enableTransfer)
213 {
214 std::unordered_set<std::string> transferKeys;
215 std::vector<std::string> otherKeys;
216 for (const auto &str : changedData) {
217 if (str.find(ASSET_DOT) == std::string::npos) {
218 if (str != DEVICEID_KEY) {
219 otherKeys.push_back(str);
220 }
221 } else {
222 std::string assetKey;
223 if (FindChangedAssetKey(str, assetKey)) {
224 transferKeys.insert(assetKey);
225 }
226 }
227 }
228 if (!enableTransfer) {
229 otherKeys.insert(otherKeys.end(), transferKeys.begin(), transferKeys.end());
230 } else if (assetChangeCallback_ != nullptr && !transferKeys.empty()) {
231 for (auto &assetKey : transferKeys) {
232 assetChangeCallback_(sessionId, assetKey, objectWatcher_);
233 }
234 }
235 if (!otherKeys.empty()) {
236 objectWatcher_->OnChanged(sessionId, otherKeys);
237 }
238 }
239
FindChangedAssetKey(const std::string & changedKey,std::string & assetKey)240 bool WatcherProxy::FindChangedAssetKey(const std::string &changedKey, std::string &assetKey)
241 {
242 std::size_t dotPos = changedKey.find(ASSET_DOT);
243 if ((changedKey.size() > strlen(MODIFY_TIME_SUFFIX) && changedKey.substr(dotPos) == MODIFY_TIME_SUFFIX) ||
244 (changedKey.size() > strlen(SIZE_SUFFIX) && changedKey.substr(dotPos) == SIZE_SUFFIX)) {
245 assetKey = changedKey.substr(0, dotPos);
246 return true;
247 }
248 return false;
249 }
250
SetAssetChangeCallBack(const AssetChangeCallback & assetChangeCallback)251 void WatcherProxy::SetAssetChangeCallBack(const AssetChangeCallback &assetChangeCallback)
252 {
253 assetChangeCallback_ = assetChangeCallback;
254 }
255
GetInstance(const std::string & bundleName)256 DistributedObjectStore *DistributedObjectStore::GetInstance(const std::string &bundleName)
257 {
258 static std::mutex instLock_;
259 static DistributedObjectStore *instPtr = nullptr;
260 if (instPtr == nullptr) {
261 std::lock_guard<std::mutex> lock(instLock_);
262 if (instPtr == nullptr && !bundleName.empty()) {
263 RadarReporter::ReportStateStart(std::string(__FUNCTION__), CREATE, INIT_STORE, IDLE, START, bundleName);
264 LOG_INFO("new objectstore %{public}s", bundleName.c_str());
265 FlatObjectStore *flatObjectStore = new (std::nothrow) FlatObjectStore(bundleName);
266 if (flatObjectStore == nullptr) {
267 LOG_ERROR("no memory for FlatObjectStore malloc!");
268 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
269 RADAR_FAILED, NO_MEMORY, FINISHED);
270 return nullptr;
271 }
272 // Use instMemory to make sure this singleton not free before other object.
273 // This operation needn't to malloc memory, we needn't to check nullptr.
274 instPtr = new (std::nothrow) DistributedObjectStoreImpl(flatObjectStore);
275 if (instPtr == nullptr) {
276 delete flatObjectStore;
277 LOG_ERROR("no memory for DistributedObjectStoreImpl malloc!");
278 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, INIT_STORE,
279 RADAR_FAILED, NO_MEMORY, FINISHED);
280 return nullptr;
281 }
282 RadarReporter::ReportStage(std::string(__FUNCTION__), CREATE, INIT_STORE, RADAR_SUCCESS);
283 }
284 }
285 return instPtr;
286 }
287
OnChanged(const std::string & sessionId,const std::string & networkId,const std::string & onlineStatus)288 void StatusNotifierProxy::OnChanged(
289 const std::string &sessionId, const std::string &networkId, const std::string &onlineStatus)
290 {
291 if (notifier != nullptr) {
292 notifier->OnChanged(sessionId, networkId, onlineStatus);
293 }
294 }
295
StatusNotifierProxy(const std::shared_ptr<StatusNotifier> & notifier)296 StatusNotifierProxy::StatusNotifierProxy(const std::shared_ptr<StatusNotifier> ¬ifier) : notifier(notifier)
297 {
298 }
299
~StatusNotifierProxy()300 StatusNotifierProxy::~StatusNotifierProxy()
301 {
302 LOG_ERROR("destroy");
303 notifier = nullptr;
304 }
305 } // namespace OHOS::ObjectStore
306