1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <thread>
17
18 #include "distributed_object_impl.h"
19 #include "distributed_objectstore_impl.h"
20 #include "objectstore_errors.h"
21 #include "softbus_adapter.h"
22 #include "string_utils.h"
23
24 namespace OHOS::ObjectStore {
DistributedObjectStoreImpl(FlatObjectStore * flatObjectStore)25 DistributedObjectStoreImpl::DistributedObjectStoreImpl(FlatObjectStore *flatObjectStore)
26 : flatObjectStore_(flatObjectStore)
27 {
28 }
29
~DistributedObjectStoreImpl()30 DistributedObjectStoreImpl::~DistributedObjectStoreImpl()
31 {
32 delete flatObjectStore_;
33 }
34
CacheObject(const std::string & sessionId,FlatObjectStore * flatObjectStore)35 DistributedObject *DistributedObjectStoreImpl::CacheObject(
36 const std::string &sessionId, FlatObjectStore *flatObjectStore)
37 {
38 DistributedObjectImpl *object = new (std::nothrow) DistributedObjectImpl(sessionId, flatObjectStore);
39 if (object == nullptr) {
40 return nullptr;
41 }
42 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
43 objects_.push_back(object);
44 return object;
45 }
46
CreateObject(const std::string & sessionId)47 DistributedObject *DistributedObjectStoreImpl::CreateObject(const std::string &sessionId)
48 {
49 if (flatObjectStore_ == nullptr) {
50 LOG_ERROR("DistributedObjectStoreImpl::CreateObject store not opened!");
51 return nullptr;
52 }
53 uint32_t status = flatObjectStore_->CreateObject(sessionId);
54 if (status != SUCCESS) {
55 LOG_ERROR("DistributedObjectStoreImpl::CreateObject CreateTable err %{public}d", status);
56 return nullptr;
57 }
58 return CacheObject(sessionId, flatObjectStore_);
59 }
60
DeleteObject(const std::string & sessionId)61 uint32_t DistributedObjectStoreImpl::DeleteObject(const std::string &sessionId)
62 {
63 if (flatObjectStore_ == nullptr) {
64 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
65 return ERR_NULL_OBJECTSTORE;
66 }
67 uint32_t status = flatObjectStore_->Delete(sessionId);
68 if (status != SUCCESS) {
69 LOG_ERROR("DistributedObjectStoreImpl::DeleteObject store delete err %{public}d", status);
70 return status;
71 }
72 return SUCCESS;
73 }
74
Get(const std::string & sessionId,DistributedObject * object)75 uint32_t DistributedObjectStoreImpl::Get(const std::string &sessionId, DistributedObject *object)
76 {
77 auto iter = objects_.begin();
78 while (iter != objects_.end()) {
79 if ((*iter)->GetSessionId() == sessionId) {
80 object = *iter;
81 return SUCCESS;
82 }
83 iter++;
84 }
85 LOG_ERROR("DistributedObjectStoreImpl::Get object err, no object");
86 return ERR_GET_OBJECT;
87 }
88
Watch(DistributedObject * object,std::shared_ptr<ObjectWatcher> watcher)89 uint32_t DistributedObjectStoreImpl::Watch(DistributedObject *object, std::shared_ptr<ObjectWatcher> watcher)
90 {
91 if (object == nullptr) {
92 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
93 return ERR_NULL_OBJECT;
94 }
95 if (flatObjectStore_ == nullptr) {
96 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
97 return ERR_NULL_OBJECTSTORE;
98 }
99 if (watchers_.count(object) != 0) {
100 LOG_ERROR("DistributedObjectStoreImpl::Watch already gets object");
101 return ERR_EXIST;
102 }
103 std::shared_ptr<WatcherProxy> watcherProxy = std::make_shared<WatcherProxy>(watcher, object->GetSessionId());
104 uint32_t status = flatObjectStore_->Watch(object->GetSessionId(), watcherProxy);
105 if (status != SUCCESS) {
106 LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
107 return status;
108 }
109 watchers_.insert_or_assign(object, watcherProxy);
110 LOG_INFO("DistributedObjectStoreImpl:Watch object success.");
111 return SUCCESS;
112 }
113
UnWatch(DistributedObject * object)114 uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
115 {
116 if (object == nullptr) {
117 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
118 return ERR_NULL_OBJECT;
119 }
120 if (flatObjectStore_ == nullptr) {
121 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
122 return ERR_NULL_OBJECTSTORE;
123 }
124 uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
125 if (status != SUCCESS) {
126 LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
127 return status;
128 }
129 watchers_.erase(object);
130 LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
131 return SUCCESS;
132 }
133
TriggerSync()134 void DistributedObjectStoreImpl::TriggerSync()
135 {
136 }
137
TriggerRestore(std::function<void ()> notifier)138 void DistributedObjectStoreImpl::TriggerRestore(std::function<void()> notifier)
139 {
140 std::thread th = std::thread([=]() {
141 bool isFinished;
142 int16_t i;
143 constexpr static int16_t MAX_RETRY_SIZE = 5000;
144 std::map<std::string, SyncStatus> syncStatus;
145 for (auto &item : objects_) {
146 syncStatus[item->GetSessionId()] = SYNC_START;
147 }
148 for (i = 0; i < MAX_RETRY_SIZE; i++) {
149 {
150 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
151 for (auto &item : objects_) {
152 if (syncStatus[item->GetSessionId()] != SYNC_SUCCESS
153 && syncStatus[item->GetSessionId()] != SYNCING) {
154 auto onComplete = [this, item, &syncStatus](
155 const std::map<std::string, DistributedDB::DBStatus> &devices) {
156 LOG_INFO("%{public}s pull data", item->GetSessionId().c_str());
157 std::unique_lock<std::shared_mutex> cacheLock(dataMutex_);
158 SyncStatus result = SYNC_SUCCESS;
159 for (auto device : devices) {
160 if (device.second != DistributedDB::OK) {
161 result = SYNC_FAIL;
162 LOG_ERROR("%{public}s pull data fail %{public}d in device %{public}s",
163 item->GetSessionId().c_str(), device.second,
164 SoftBusAdapter::GetInstance()->ToNodeID(device.first).c_str());
165 }
166 }
167 LOG_INFO("%{public}s pull data success", item->GetSessionId().c_str());
168 syncStatus[item->GetSessionId()] = result;
169 };
170 LOG_INFO("start sync %{public}s", item->GetSessionId().c_str());
171 uint32_t result = flatObjectStore_->SyncAllData(item->GetSessionId(), onComplete);
172 if (result == SUCCESS) {
173 syncStatus[item->GetSessionId()] = SYNCING;
174 } else if (result == ERR_SINGLE_DEVICE) {
175 // single device, do not retry
176 syncStatus[item->GetSessionId()] = SYNC_SUCCESS;
177 }
178 }
179 }
180 }
181
182 isFinished = true;
183 for (auto &item : syncStatus) {
184 if (item.second != SYNC_SUCCESS) {
185 LOG_INFO("%{public}s not ready", item.first.c_str());
186 isFinished = false;
187 break;
188 }
189 }
190 if (!isFinished) {
191 std::this_thread::sleep_for(std::chrono::milliseconds(100));
192 }
193 }
194 LOG_WARN("restore result");
195 notifier();
196 LOG_WARN("notify end");
197 });
198 th.detach();
199 return;
200 }
SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)201 uint32_t DistributedObjectStoreImpl::SetStatusNotifier(std::shared_ptr<StatusNotifier> notifier)
202 {
203 if (flatObjectStore_ == nullptr) {
204 LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
205 return ERR_NULL_OBJECTSTORE;
206 }
207 std::shared_ptr<StatusNotifierProxy> watcherProxy = std::make_shared<StatusNotifierProxy>(notifier);
208 uint32_t status = flatObjectStore_->SetStatusNotifier(watcherProxy);
209 if (status != SUCCESS) {
210 LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
211 }
212 return status;
213 }
214
WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher,const std::string & sessionId)215 WatcherProxy::WatcherProxy(const std::shared_ptr<ObjectWatcher> objectWatcher, const std::string &sessionId)
216 : FlatObjectWatcher(sessionId), objectWatcher_(objectWatcher)
217 {
218 }
219
OnChanged(const std::string & sessionid,const std::vector<std::string> & changedData)220 void WatcherProxy::OnChanged(const std::string &sessionid, const std::vector<std::string> &changedData)
221 {
222 objectWatcher_->OnChanged(sessionid, changedData);
223 }
224
GetInstance(const std::string & bundleName)225 DistributedObjectStore *DistributedObjectStore::GetInstance(const std::string &bundleName)
226 {
227 static char instMemory[sizeof(DistributedObjectStoreImpl)];
228 static std::mutex instLock_;
229 static DistributedObjectStore *instPtr = nullptr;
230 if (instPtr == nullptr) {
231 std::lock_guard<std::mutex> lock(instLock_);
232 if (instPtr == nullptr && !bundleName.empty()) {
233 LOG_INFO("new objectstore %{public}s", bundleName.c_str());
234 FlatObjectStore *flatObjectStore = new (std::nothrow) FlatObjectStore(bundleName);
235 if (flatObjectStore == nullptr) {
236 LOG_ERROR("no memory for FlatObjectStore malloc!");
237 return nullptr;
238 }
239 // Use instMemory to make sure this singleton not free before other object.
240 // This operation needn't to malloc memory, we needn't to check nullptr.
241 instPtr = new (instMemory) DistributedObjectStoreImpl(flatObjectStore);
242 }
243 }
244 return instPtr;
245 }
246
OnChanged(const std::string & sessionId,const std::string & networkId,const std::string & onlineStatus)247 void StatusNotifierProxy::OnChanged(
248 const std::string &sessionId, const std::string &networkId, const std::string &onlineStatus)
249 {
250 if (notifier != nullptr) {
251 notifier->OnChanged(sessionId, networkId, onlineStatus);
252 }
253 }
254
StatusNotifierProxy(const std::shared_ptr<StatusNotifier> & notifier)255 StatusNotifierProxy::StatusNotifierProxy(const std::shared_ptr<StatusNotifier> ¬ifier) : notifier(notifier)
256 {
257 }
258
~StatusNotifierProxy()259 StatusNotifierProxy::~StatusNotifierProxy()
260 {
261 LOG_ERROR("destroy");
262 notifier = nullptr;
263 }
264 } // namespace OHOS::ObjectStore
265