• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <thread>
17 
18 #include "distributed_object_impl.h"
19 #include "distributed_objectstore_impl.h"
20 #include "objectstore_errors.h"
21 #include "softbus_adapter.h"
22 #include "string_utils.h"
23 
24 namespace OHOS::ObjectStore {
DistributedObjectStoreImpl(FlatObjectStore * flatObjectStore)25 DistributedObjectStoreImpl::DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore)
26     : flatObjectStore_(flatObjectStore)
27 {
28 }
29 
~DistributedObjectStoreImpl()30 DistributedObjectStoreImpl::~DistributedObjectStoreImpl()
31 {
32     delete flatObjectStore_;
33 }
34 
CacheObject(const std::string & sessionId,FlatObjectStore * flatObjectStore)35 DistributedObject *DistributedObjectStoreImpl::CacheObject(
36     const std::string &sessionId, FlatObjectStore *flatObjectStore)
37 {
38     DistributedObjectImpl *object = new (std::nothrow) DistributedObjectImpl(sessionId, flatObjectStore);
39     if (object == nullptr) {
40         return nullptr;
41     }
42     std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
43     objects_.push_back(object);
44     return object;
45 }
46 
CreateObject(const std::string & sessionId)47 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId)
48 {
49     if (flatObjectStore_ == nullptr) {
50         LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
51         return nullptr;
52     }
53     uint32_t status = flatObjectStore_->CreateObject(sessionId);
54     if (status != SUCCESS) {
55         LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
56         return nullptr;
57     }
58     return CacheObject(sessionId, flatObjectStore_);
59 }
60 
DeleteObject(const std::string & sessionId)61 uint32_t DistributedObjectStoreImpl::DeleteObject(const std::string &sessionId)
62 {
63     if (flatObjectStore_ == nullptr) {
64         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
65         return ERR_NULL_OBJECTSTORE;
66     }
67     uint32_t status = flatObjectStore_->Delete(sessionId);
68     if (status != SUCCESS) {
69         LOG_ERROR("DistributedObjectStoreImpl::DeleteObject store delete err %{public}d", status);
70         return status;
71     }
72     return SUCCESS;
73 }
74 
Get(const std::string & sessionId,DistributedObject * object)75 uint32_t DistributedObjectStoreImpl::Get(const std::string &sessionId, DistributedObject *object)
76 {
77     auto iter = objects_.begin();
78     while (iter != objects_.end()) {
79         if ((*iter)->GetSessionId() == sessionId) {
80             object = *iter;
81             return SUCCESS;
82         }
83         iter++;
84     }
85     LOG_ERROR("DistributedObjectStoreImpl::Get object err, no object");
86     return ERR_GET_OBJECT;
87 }
88 
Watch(DistributedObject * object,std::shared_ptr<ObjectWatcher> watcher)89 uint32_t DistributedObjectStoreImpl::Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> watcher)
90 {
91     if (object == nullptr) {
92         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
93         return ERR_NULL_OBJECT;
94     }
95     if (flatObjectStore_ == nullptr) {
96         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
97         return ERR_NULL_OBJECTSTORE;
98     }
99     if (watchers_.count(object) != 0) {
100         LOG_ERROR("DistributedObjectStoreImpl::Watch already gets object");
101         return ERR_EXIST;
102     }
103     std::shared_ptr<WatcherProxy> watcherProxy = std::make_shared<WatcherProxy>(watcher, object->GetSessionId());
104     uint32_t status = flatObjectStore_->Watch(object->GetSessionId(), watcherProxy);
105     if (status != SUCCESS) {
106         LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
107         return status;
108     }
109     watchers_.insert_or_assign(object, watcherProxy);
110     LOG_INFO("DistributedObjectStoreImpl:Watch object success.");
111     return SUCCESS;
112 }
113 
UnWatch(DistributedObject * object)114 uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
115 {
116     if (object == nullptr) {
117         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
118         return ERR_NULL_OBJECT;
119     }
120     if (flatObjectStore_ == nullptr) {
121         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
122         return ERR_NULL_OBJECTSTORE;
123     }
124     uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
125     if (status != SUCCESS) {
126         LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
127         return status;
128     }
129     watchers_.erase(object);
130     LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
131     return SUCCESS;
132 }
133 
TriggerSync()134 void DistributedObjectStoreImpl::TriggerSync()
135 {
136 }
137 
TriggerRestore(std::function<void ()> notifier)138 void DistributedObjectStoreImpl::TriggerRestore(std::function<void()> notifier)
139 {
140     std::thread th = std::thread([=]() {
141         bool isFinished;
142         int16_t i;
143         constexpr static int16_t MAX_RETRY_SIZE = 5000;
144         std::map<std::string, SyncStatus> syncStatus;
145         for (auto &item : objects_) {
146             syncStatus[item->GetSessionId()] = SYNC_START;
147         }
148         for (i = 0; i < MAX_RETRY_SIZE; i++) {
149             {
150                 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
151                 for (auto &item : objects_) {
152                     if (syncStatus[item->GetSessionId()] != SYNC_SUCCESS
153                         && syncStatus[item->GetSessionId()] != SYNCING) {
154                         auto onComplete = [this, item, &syncStatus](
155                                               const std::map<std::string, DistributedDB::DBStatus> &devices) {
156                             LOG_INFO("%{public}s pull data", item->GetSessionId().c_str());
157                             std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
158                             SyncStatus result = SYNC_SUCCESS;
159                             for (auto device : devices) {
160                                 if (device.second != DistributedDB::OK) {
161                                     result = SYNC_FAIL;
162                                     LOG_ERROR("%{public}s pull data fail %{public}d in device %{public}s",
163                                         item->GetSessionId().c_str(), device.second,
164                                         SoftBusAdapter::GetInstance()->ToNodeID(device.first).c_str());
165                                 }
166                             }
167                             LOG_INFO("%{public}s pull data success", item->GetSessionId().c_str());
168                             syncStatus[item->GetSessionId()] = result;
169                         };
170                         LOG_INFO("start sync %{public}s", item->GetSessionId().c_str());
171                         uint32_t result = flatObjectStore_->SyncAllData(item->GetSessionId(), onComplete);
172                         if (result == SUCCESS) {
173                             syncStatus[item->GetSessionId()] = SYNCING;
174                         } else if (result == ERR_SINGLE_DEVICE) {
175                             // single device, do not retry
176                             syncStatus[item->GetSessionId()] = SYNC_SUCCESS;
177                         }
178                     }
179                 }
180             }
181 
182             isFinished = true;
183             for (auto &item : syncStatus) {
184                 if (item.second != SYNC_SUCCESS) {
185                     LOG_INFO("%{public}s not ready", item.first.c_str());
186                     isFinished = false;
187                     break;
188                 }
189             }
190             if (!isFinished) {
191                 std::this_thread::sleep_for(std::chrono::milliseconds(100));
192             }
193         }
194         LOG_WARN("restore result");
195         notifier();
196         LOG_WARN("notify end");
197     });
198     th.detach();
199     return;
200 }
SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)201 uint32_t DistributedObjectStoreImpl::SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)
202 {
203     if (flatObjectStore_ == nullptr) {
204         LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
205         return ERR_NULL_OBJECTSTORE;
206     }
207     std::shared_ptr<StatusNotifierProxy> watcherProxy = std::make_shared<StatusNotifierProxy>(notifier);
208     uint32_t status = flatObjectStore_->SetStatusNotifier(watcherProxy);
209     if (status != SUCCESS) {
210         LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
211     }
212     return status;
213 }
214 
WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher,const std::string & sessionId)215 WatcherProxy::WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId)
216     : FlatObjectWatcher(sessionId), objectWatcher_(objectWatcher)
217 {
218 }
219 
OnChanged(const std::string & sessionid,const std::vector<std::string> & changedData)220 void WatcherProxy::OnChanged(const std::string &sessionid, const std::vector<std::string> &changedData)
221 {
222     objectWatcher_->OnChanged(sessionid, changedData);
223 }
224 
GetInstance(const std::string & bundleName)225 DistributedObjectStore *DistributedObjectStore::GetInstance(const std::string &bundleName)
226 {
227     static char instMemory[sizeof(DistributedObjectStoreImpl)];
228     static std::mutex instLock_;
229     static DistributedObjectStore *instPtr = nullptr;
230     if (instPtr == nullptr) {
231         std::lock_guard<std::mutex> lock(instLock_);
232         if (instPtr == nullptr && !bundleName.empty()) {
233             LOG_INFO("new objectstore %{public}s", bundleName.c_str());
234             FlatObjectStore *flatObjectStore = new (std::nothrow) FlatObjectStore(bundleName);
235             if (flatObjectStore == nullptr) {
236                 LOG_ERROR("no memory for FlatObjectStore malloc!");
237                 return nullptr;
238             }
239             // Use instMemory to make sure this singleton not free before other object.
240             // This operation needn't to malloc memory, we needn't to check nullptr.
241             instPtr = new (instMemory) DistributedObjectStoreImpl(flatObjectStore);
242         }
243     }
244     return instPtr;
245 }
246 
OnChanged(const std::string & sessionId,const std::string & networkId,const std::string & onlineStatus)247 void StatusNotifierProxy::OnChanged(
248     const std::string &sessionId, const std::string &networkId, const std::string &onlineStatus)
249 {
250     if (notifier != nullptr) {
251         notifier->OnChanged(sessionId, networkId, onlineStatus);
252     }
253 }
254 
StatusNotifierProxy(const std::shared_ptr<StatusNotifier> & notifier)255 StatusNotifierProxy::StatusNotifierProxy(const std::shared_ptr<StatusNotifier> &notifier) : notifier(notifier)
256 {
257 }
258 
~StatusNotifierProxy()259 StatusNotifierProxy::~StatusNotifierProxy()
260 {
261     LOG_ERROR("destroy");
262     notifier = nullptr;
263 }
264 } // namespace OHOS::ObjectStore
265