1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbServiceProxy"
16 #include "rdb_service_proxy.h"
17
18 #include <thread>
19 #include "itypes_util.h"
20 #include "logger.h"
21 #include "result_set_proxy.h"
22 #include "sqlite_utils.h"
23
24 namespace OHOS::DistributedRdb {
25 using namespace OHOS::Rdb;
26 using SqliteUtils = OHOS::NativeRdb::SqliteUtils;
27 using RdbServiceCode = OHOS::DistributedRdb::RelationalStore::RdbServiceInterfaceCode;
28 constexpr int32_t MAX_RETRY = 100;
29 #define IPC_SEND(code, reply, ...) \
30 ({ \
31 int32_t __status = RDB_OK; \
32 do { \
33 MessageParcel request; \
34 if (!request.WriteInterfaceToken(GetDescriptor())) { \
35 __status = RDB_ERROR; \
36 break; \
37 } \
38 if (!ITypesUtil::Marshal(request, ##__VA_ARGS__)) { \
39 __status = RDB_ERROR; \
40 break; \
41 } \
42 MessageOption option; \
43 auto result = remote_->SendRequest((code), request, reply, option); \
44 if (result != 0) { \
45 __status = RDB_ERROR; \
46 break; \
47 } \
48 \
49 ITypesUtil::Unmarshal(reply, __status); \
50 } while (0); \
51 __status; \
52 })
53
RdbServiceProxy(const sptr<IRemoteObject> & object)54 RdbServiceProxy::RdbServiceProxy(const sptr<IRemoteObject> &object)
55 : IRemoteProxy<IRdbService>(object)
56 {
57 remote_ = Remote();
58 }
59
ObtainDistributedTableName(const RdbSyncerParam & param,const std::string & device,const std::string & table)60 std::string RdbServiceProxy::ObtainDistributedTableName(
61 const RdbSyncerParam ¶m, const std::string &device, const std::string &table)
62 {
63 MessageParcel reply;
64 int32_t status =
65 IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_OBTAIN_TABLE), reply, param, device, table);
66 std::string distributedTableName;
67 if (status != RDB_OK || !ITypesUtil::Unmarshal(reply, distributedTableName)) {
68 LOG_ERROR("status:%{public}d, device:%{public}s, table:%{public}s", status,
69 SqliteUtils::Anonymous(device).c_str(), SqliteUtils::Anonymous(table).c_str());
70 }
71 return distributedTableName;
72 }
73
InitNotifier(const RdbSyncerParam & param)74 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam ¶m)
75 {
76 notifier_ = new (std::nothrow) RdbNotifierStub(
77 [this] (uint32_t seqNum, Details &&result) {
78 OnSyncComplete(seqNum, std::move(result));
79 },
80 [this] (std::string storeName, Details &&result) {
81 OnSyncComplete(storeName, std::move(result));
82 },
83 [this](const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) {
84 OnDataChange(origin, primaries, std::move(changeInfo));
85 });
86 if (notifier_ == nullptr) {
87 LOG_ERROR("create notifier failed.");
88 return RDB_ERROR;
89 }
90
91 if (InitNotifier(param, notifier_->AsObject()) != RDB_OK) {
92 notifier_ = nullptr;
93 LOG_ERROR("init notifier error.");
94 return RDB_ERROR;
95 }
96
97 return RDB_OK;
98 }
99
InitNotifier(const RdbSyncerParam & param,sptr<IRemoteObject> notifier)100 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam ¶m, sptr<IRemoteObject> notifier)
101 {
102 MessageParcel reply;
103 int32_t status =
104 IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_INIT_NOTIFIER), reply, param, notifier);
105 if (status != RDB_OK) {
106 LOG_ERROR("status:%{public}d, bundleName:%{public}s", status, param.bundleName_.c_str());
107 }
108 return status;
109 }
110
GetSeqNum()111 uint32_t RdbServiceProxy::GetSeqNum()
112 {
113 uint32_t value = ++seqNum_;
114 if (value == 0) {
115 value = ++seqNum_;
116 }
117 return value;
118 }
119
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)120 std::pair<int32_t, Details> RdbServiceProxy::DoSync(
121 const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates)
122 {
123 std::pair<int32_t, Details> result{ RDB_ERROR, {} };
124 MessageParcel reply;
125 auto &[status, details] = result;
126 status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SYNC), reply, param, option, predicates);
127 if (status != RDB_OK) {
128 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
129 SqliteUtils::Anonymous(param.storeName_).c_str());
130 return result;
131 }
132
133 if (!ITypesUtil::Unmarshal(reply, details)) {
134 LOG_ERROR("read result failed.");
135 status = RDB_ERROR;
136 return result;
137 }
138 return result;
139 }
140
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)141 int32_t RdbServiceProxy::DoSync(
142 const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &async)
143 {
144 auto [status, details] = DoSync(param, option, predicates);
145 if (status != RDB_OK) {
146 LOG_INFO("failed.");
147 return RDB_ERROR;
148 }
149 LOG_INFO("success.");
150 if (async != nullptr) {
151 async(std::move(details));
152 }
153 return RDB_OK;
154 }
155
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)156 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates)
157 {
158 MessageParcel reply;
159 int32_t status =
160 IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ASYNC), reply, param, option, predicates);
161 if (status != RDB_OK) {
162 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, seqNum:%{public}u", status,
163 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), option.seqNum);
164 }
165 return status;
166 }
167
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & callback)168 int32_t RdbServiceProxy::DoAsync(
169 const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &callback)
170 {
171 Option asyncOption = option;
172 if (callback != nullptr) {
173 asyncOption.seqNum = GetSeqNum();
174 if (!syncCallbacks_.Insert(asyncOption.seqNum, callback)) {
175 LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, insert callback failed", param.bundleName_.c_str(),
176 SqliteUtils::Anonymous(param.storeName_).c_str());
177 return RDB_ERROR;
178 }
179 }
180 LOG_INFO("bundleName:%{public}s, storeName:%{public}s, num=%{public}u, start DoAsync", param.bundleName_.c_str(),
181 SqliteUtils::Anonymous(param.storeName_).c_str(), asyncOption.seqNum);
182 if (DoAsync(param, asyncOption, predicates) != RDB_OK) {
183 syncCallbacks_.Erase(asyncOption.seqNum);
184 return RDB_ERROR;
185 }
186 return RDB_OK;
187 }
188
SetDistributedTables(const RdbSyncerParam & param,const std::vector<std::string> & tables,const std::vector<Reference> & references,bool isRebuild,int32_t type)189 int32_t RdbServiceProxy::SetDistributedTables(const RdbSyncerParam ¶m, const std::vector<std::string> &tables,
190 const std::vector<Reference> &references, bool isRebuild, int32_t type)
191 {
192 MessageParcel reply;
193 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_DIST_TABLE), reply, param,
194 tables, references, type, isRebuild);
195 if (status != RDB_OK) {
196 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, type:%{public}d", status,
197 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), type);
198 }
199 return status;
200 }
201
Sync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)202 int32_t RdbServiceProxy::Sync(
203 const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &async)
204 {
205 if (option.isAsync) {
206 return DoAsync(param, option, predicates, async);
207 }
208 return DoSync(param, option, predicates, async);
209 }
210
RemoveSuffix(const std::string & name)211 std::string RdbServiceProxy::RemoveSuffix(const std::string &name)
212 {
213 std::string suffix(".db");
214 auto pos = name.rfind(suffix);
215 if (pos == std::string::npos || pos < name.length() - suffix.length()) {
216 return name;
217 }
218 return { name, 0, pos };
219 }
220
Subscribe(const RdbSyncerParam & param,const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)221 int32_t RdbServiceProxy::Subscribe(
222 const RdbSyncerParam ¶m, const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
223 {
224 if (observer == nullptr) {
225 return RDB_ERROR;
226 }
227 if (option.mode < SubscribeMode::REMOTE || option.mode >= SUBSCRIBE_MODE_MAX) {
228 LOG_ERROR("subscribe mode invalid.");
229 return RDB_ERROR;
230 }
231 if (DoSubscribe(param, option) != RDB_OK) {
232 return RDB_ERROR;
233 }
234 auto name = RemoveSuffix(param.storeName_);
235 observers_.Compute(name, [observer, ¶m, &option](const auto &key, std::list<ObserverParam> &value) {
236 for (const auto &element : value) {
237 if (element.observer.lock() == observer) {
238 LOG_ERROR("duplicate observer, storeName:%{public}s", SqliteUtils::Anonymous(key).c_str());
239 return true;
240 }
241 }
242 value.push_back({ observer, param.bundleName_, option });
243 return true;
244 });
245 return RDB_OK;
246 }
247
DoSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)248 int32_t RdbServiceProxy::DoSubscribe(const RdbSyncerParam ¶m, const SubscribeOption &option)
249 {
250 MessageParcel reply;
251 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SUBSCRIBE), reply, param, option);
252 if (status != RDB_OK) {
253 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
254 SqliteUtils::Anonymous(param.storeName_).c_str());
255 }
256 return status;
257 }
258
UnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)259 int32_t RdbServiceProxy::UnSubscribe(
260 const RdbSyncerParam ¶m, const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
261 {
262 if (observer == nullptr) {
263 LOG_ERROR("observer is null.");
264 return RDB_ERROR;
265 }
266 if (DoUnSubscribe(param, option) != RDB_OK) {
267 return RDB_ERROR;
268 }
269 auto name = RemoveSuffix(param.storeName_);
270 observers_.ComputeIfPresent(name, [observer](const auto &key, std::list<ObserverParam> &value) {
271 LOG_INFO("before remove size=%{public}d", static_cast<int>(value.size()));
272 value.remove_if([observer](const ObserverParam ¶m) { return param.observer.lock() == observer; });
273 LOG_INFO("after remove size=%{public}d", static_cast<int>(value.size()));
274 return !(value.empty());
275 });
276 return RDB_OK;
277 }
278
DoUnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)279 int32_t RdbServiceProxy::DoUnSubscribe(const RdbSyncerParam ¶m, const SubscribeOption &option)
280 {
281 MessageParcel reply;
282 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNSUBSCRIBE), reply, param, option);
283 if (status != RDB_OK) {
284 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
285 SqliteUtils::Anonymous(param.storeName_).c_str());
286 }
287 return status;
288 }
289
RemoteQuery(const RdbSyncerParam & param,const std::string & device,const std::string & sql,const std::vector<std::string> & selectionArgs)290 std::pair<int32_t, std::shared_ptr<RdbServiceProxy::ResultSet>> RdbServiceProxy::RemoteQuery(
291 const RdbSyncerParam ¶m, const std::string &device, const std::string &sql,
292 const std::vector<std::string> &selectionArgs)
293 {
294 MessageParcel reply;
295 int32_t status = IPC_SEND(
296 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REMOTE_QUERY), reply, param, device, sql, selectionArgs);
297 if (status != RDB_OK) {
298 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, device:%{public}.6s", status,
299 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(),
300 SqliteUtils::Anonymous(device).c_str());
301 return { status, nullptr };
302 }
303
304 sptr<IRemoteObject> remote = reply.ReadRemoteObject();
305 if (remote == nullptr) {
306 LOG_ERROR("read remote object is null.");
307 return { RDB_ERROR, nullptr };
308 }
309 sptr<NativeRdb::ResultSetProxy> instance = new(std::nothrow) NativeRdb::ResultSetProxy(remote);
310 if (instance == nullptr) {
311 LOG_ERROR("instance object is null.bundleName:%{public}s, storeName:%{public}s, device:%{public}.6s",
312 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(),
313 SqliteUtils::Anonymous(device).c_str());
314 return { RDB_ERROR, nullptr };
315 }
316 return { RDB_OK, std::shared_ptr<ResultSet>(instance.GetRefPtr(), [holder = instance](const auto *) {}) };
317 }
318
ExportObservers()319 RdbServiceProxy::Observers RdbServiceProxy::ExportObservers()
320 {
321 return observers_;
322 }
323
ExportSyncObservers()324 RdbServiceProxy::SyncObservers RdbServiceProxy::ExportSyncObservers()
325 {
326 return syncObservers_;
327 }
328
ImportObservers(Observers & observers)329 void RdbServiceProxy::ImportObservers(Observers &observers)
330 {
331 observers.ForEach([this](const std::string &key, const std::list<ObserverParam> &value) {
332 RdbSyncerParam syncerParam;
333 for (const auto ¶m : value) {
334 syncerParam.bundleName_ = param.bundleName;
335 syncerParam.storeName_ = key;
336 Subscribe(syncerParam, param.subscribeOption, param.observer.lock());
337 }
338 return false;
339 });
340 }
341
ImportSyncObservers(SyncObservers & syncObservers)342 void RdbServiceProxy::ImportSyncObservers(SyncObservers &syncObservers)
343 {
344 syncObservers.ForEach([this](const std::string &key, const std::list<SyncObserverParam> &value) {
345 RdbSyncerParam syncerParam;
346 for (const auto ¶m : value) {
347 syncerParam.bundleName_ = param.bundleName;
348 syncerParam.storeName_ = key;
349 RegisterAutoSyncCallback(syncerParam, param.syncObserver);
350 }
351 return false;
352 });
353 }
354
BeforeOpen(RdbSyncerParam & param)355 int32_t RdbServiceProxy::BeforeOpen(RdbSyncerParam ¶m)
356 {
357 MessageParcel reply;
358 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_BEFORE_OPEN), reply, param);
359 if (status != RDB_OK) {
360 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
361 SqliteUtils::Anonymous(param.storeName_).c_str());
362 return status;
363 }
364 if (!ITypesUtil::Unmarshal(reply, param)) {
365 LOG_ERROR("read result failed.");
366 status = RDB_ERROR;
367 }
368 return status;
369 }
370
AfterOpen(const RdbSyncerParam & param)371 int32_t RdbServiceProxy::AfterOpen(const RdbSyncerParam ¶m)
372 {
373 MessageParcel reply;
374 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_AFTER_OPEN), reply, param);
375 if (status != RDB_OK) {
376 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
377 SqliteUtils::Anonymous(param.storeName_).c_str());
378 }
379 return status;
380 }
381
ReportStatistic(const RdbSyncerParam & param,const RdbStatEvent & statEvent)382 int32_t RdbServiceProxy::ReportStatistic(const RdbSyncerParam ¶m, const RdbStatEvent &statEvent)
383 {
384 MessageParcel reply;
385 int32_t status =
386 IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REPORT_STAT), reply, param, statEvent);
387 if (status != RDB_OK) {
388 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
389 SqliteUtils::Anonymous(param.storeName_).c_str());
390 }
391 return status;
392 }
393
Delete(const RdbSyncerParam & param)394 int32_t RdbServiceProxy::Delete(const RdbSyncerParam ¶m)
395 {
396 MessageParcel reply;
397 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DELETE), reply, param);
398 if (status != RDB_OK) {
399 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
400 SqliteUtils::Anonymous(param.storeName_).c_str());
401 }
402 return status;
403 }
404
QuerySharingResource(const RdbSyncerParam & param,const PredicatesMemo & predicates,const std::vector<std::string> & columns)405 std::pair<int32_t, std::shared_ptr<RdbServiceProxy::ResultSet>> RdbServiceProxy::QuerySharingResource(
406 const RdbSyncerParam ¶m, const PredicatesMemo &predicates, const std::vector<std::string> &columns)
407 {
408 MessageParcel reply;
409 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_QUERY_SHARING_RESOURCE), reply,
410 param, predicates, columns);
411 sptr<IRemoteObject> remote;
412 bool success = ITypesUtil::Unmarshal(reply, remote);
413 if (status != RDB_OK || !success || remote == nullptr) {
414 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, success:%{public}d, remote is "
415 "%{public}s nullptr",
416 status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), success,
417 remote != nullptr ? "not" : "");
418 return { RDB_ERROR, {} };
419 }
420 sptr<NativeRdb::ResultSetProxy> instance = new(std::nothrow) NativeRdb::ResultSetProxy(remote);
421 if (instance == nullptr) {
422 LOG_ERROR("instance object is null.bundleName:%{public}s, storeName:%{public}s",
423 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
424 return { RDB_ERROR, nullptr };
425 }
426 return { RDB_OK, std::shared_ptr<ResultSet>(instance.GetRefPtr(), [instance](const auto *) {}) };
427 }
428
RegisterAutoSyncCallback(const RdbSyncerParam & param,std::shared_ptr<DetailProgressObserver> observer)429 int32_t RdbServiceProxy::RegisterAutoSyncCallback(
430 const RdbSyncerParam ¶m, std::shared_ptr<DetailProgressObserver> observer)
431 {
432 if (observer == nullptr || param.storeName_.empty()) {
433 LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, syncObserver is nullptr", param.bundleName_.c_str(),
434 SqliteUtils::Anonymous(param.storeName_).c_str());
435 return RDB_ERROR;
436 }
437 int32_t status = RDB_OK;
438 auto name = RemoveSuffix(param.storeName_);
439 syncObservers_.Compute(name, [this, ¶m, &status, observer](const auto &store, auto &observers) {
440 for (const auto &element : observers) {
441 if (element.syncObserver.get() == observer.get()) {
442 LOG_ERROR("duplicate observer, storeName:%{public}s", SqliteUtils::Anonymous(store).c_str());
443 return true;
444 }
445 }
446 status = DoRegister(param);
447 if (status == RDB_OK) {
448 observers.push_back({ observer, param.bundleName_ });
449 }
450 return !observers.empty();
451 });
452 return status;
453 }
454
DoRegister(const RdbSyncerParam & param)455 int32_t RdbServiceProxy::DoRegister(const RdbSyncerParam ¶m)
456 {
457 MessageParcel reply;
458 int32_t status = IPC_SEND(
459 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REGISTER_AUTOSYNC_PROGRESS_OBSERVER), reply, param);
460 if (status != RDB_OK) {
461 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
462 SqliteUtils::Anonymous(param.storeName_).c_str());
463 }
464 return status;
465 }
466
UnregisterAutoSyncCallback(const RdbSyncerParam & param,std::shared_ptr<DetailProgressObserver> observer)467 int32_t RdbServiceProxy::UnregisterAutoSyncCallback(
468 const RdbSyncerParam ¶m, std::shared_ptr<DetailProgressObserver> observer)
469 {
470 if (observer == nullptr || param.storeName_.empty()) {
471 LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, syncObserver is nullptr", param.bundleName_.c_str(),
472 SqliteUtils::Anonymous(param.storeName_).c_str());
473 return RDB_ERROR;
474 }
475 int32_t status = RDB_OK;
476 auto name = RemoveSuffix(param.storeName_);
477 syncObservers_.ComputeIfPresent(name, [this, ¶m, &status, observer](const auto &storeName, auto &observers) {
478 for (auto it = observers.begin(); it != observers.end();) {
479 if (it->syncObserver.get() != observer.get()) {
480 ++it;
481 continue;
482 }
483 status = DoUnRegister(param);
484 if (status == RDB_OK) {
485 it = observers.erase(it);
486 }
487 }
488 return !observers.empty();
489 });
490 return status;
491 }
492
DoUnRegister(const RdbSyncerParam & param)493 int32_t RdbServiceProxy::DoUnRegister(const RdbSyncerParam ¶m)
494 {
495 MessageParcel reply;
496 int32_t status = IPC_SEND(
497 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNREGISTER_AUTOSYNC_PROGRESS_OBSERVER), reply, param);
498 if (status != RDB_OK) {
499 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
500 SqliteUtils::Anonymous(param.storeName_).c_str());
501 }
502 return status;
503 }
504
OnDataChange(const Origin & origin,const RdbServiceProxy::PrimaryFields & primaries,RdbServiceProxy::ChangeInfo && changeInfo)505 void RdbServiceProxy::OnDataChange(
506 const Origin &origin, const RdbServiceProxy::PrimaryFields &primaries, RdbServiceProxy::ChangeInfo &&changeInfo)
507 {
508 LOG_DEBUG("store:%{public}s data change from :%{public}s, dataType:%{public}d, origin:%{public}d.",
509 SqliteUtils::Anonymous(origin.store).c_str(),
510 origin.id.empty() ? "empty" : SqliteUtils::Anonymous(*origin.id.begin()).c_str(), origin.dataType,
511 origin.origin);
512 auto name = RdbServiceProxy::RemoveSuffix(origin.store);
513 observers_.ComputeIfPresent(name, [&origin, &primaries, info = std::move(changeInfo)](
514 const auto &key, const std::list<ObserverParam> &value) mutable {
515 auto size = value.size();
516 for (const auto ¶ms : value) {
517 auto obs = params.observer.lock();
518 size--;
519 if (obs != nullptr) {
520 obs->OnChange(origin, primaries, size > 0 ? ChangeInfo(info) : std::move(info));
521 }
522 }
523 return !value.empty();
524 });
525 }
526
OnRemoteDeadSyncComplete()527 void RdbServiceProxy::OnRemoteDeadSyncComplete()
528 {
529 std::map<std::string, ProgressDetail> result;
530 result.insert(std::pair<std::string, ProgressDetail>("", {Progress::SYNC_FINISH, ProgressCode::UNKNOWN_ERROR}));
531 auto callbacks = std::move(syncCallbacks_);
532 callbacks.ForEach([&result](const auto &key, const AsyncDetail &callback) {
533 LOG_INFO("remote dead, compensate progress notification");
534 std::map<std::string, ProgressDetail> copyResult = result;
535 if (callback != nullptr) {
536 callback(std::move(result));
537 }
538 return false;
539 });
540 }
541
OnSyncComplete(uint32_t seqNum,Details && result)542 void RdbServiceProxy::OnSyncComplete(uint32_t seqNum, Details &&result)
543 {
544 syncCallbacks_.ComputeIfPresent(seqNum, [&result](const auto &key, const AsyncDetail &callback) {
545 auto finished = result.empty() || (result.begin()->second.progress == SYNC_FINISH);
546 LOG_DEBUG("Sync complete, seqNum%{public}d, result size:%{public}zu", key, result.size());
547 if (callback != nullptr) {
548 callback(std::move(result));
549 }
550 return !finished;
551 });
552 }
553
OnSyncComplete(const std::string & storeName,Details && result)554 void RdbServiceProxy::OnSyncComplete(const std::string &storeName, Details &&result)
555 {
556 syncObservers_.ComputeIfPresent(storeName, [&result](const auto &key, const auto &observers) {
557 LOG_DEBUG("Sync complete, storeName%{public}s, result size:%{public}zu", SqliteUtils::Anonymous(key).c_str(),
558 result.size());
559 for (const auto &it : observers) {
560 if (it.syncObserver != nullptr) {
561 it.syncObserver->ProgressNotification(result);
562 }
563 }
564 return true;
565 });
566 }
567
SetSearchable(const RdbSyncerParam & param,bool isSearchable)568 int32_t RdbServiceProxy::SetSearchable(const RdbSyncerParam ¶m, bool isSearchable)
569 {
570 MessageParcel reply;
571 int32_t status =
572 IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_SEARCHABLE), reply, param, isSearchable);
573 if (status != RDB_OK) {
574 LOG_ERROR("RdbServiceProxy SetSearchable fail, status:%{public}d, "
575 "bundleName:%{public}s, storeName:%{public}s",
576 status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
577 }
578 return status;
579 }
580
NotifyDataChange(const RdbSyncerParam & param,const RdbChangedData & rdbChangedData,const RdbNotifyConfig & rdbNotifyConfig)581 int32_t RdbServiceProxy::NotifyDataChange(
582 const RdbSyncerParam ¶m, const RdbChangedData &rdbChangedData, const RdbNotifyConfig &rdbNotifyConfig)
583 {
584 MessageParcel reply;
585 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_NOTIFY_DATA_CHANGE), reply, param,
586 rdbChangedData, rdbNotifyConfig);
587 if (status != RDB_OK) {
588 LOG_ERROR("RdbServiceProxy NotifyDataChange fail, status:%{public}d, "
589 "bundleName:%{public}s, storeName:%{public}s",
590 status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
591 }
592 return status;
593 }
594
Disable(const RdbSyncerParam & param)595 int32_t RdbServiceProxy::Disable(const RdbSyncerParam ¶m)
596 {
597 MessageParcel reply;
598 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DISABLE), reply, param);
599 if (status != RDB_OK) {
600 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
601 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
602 }
603 return status;
604 }
605
Enable(const RdbSyncerParam & param)606 int32_t RdbServiceProxy::Enable(const RdbSyncerParam ¶m)
607 {
608 MessageParcel reply;
609 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ENABLE), reply, param);
610 if (status != RDB_OK) {
611 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
612 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
613 }
614 return status;
615 }
616
GetPassword(const RdbSyncerParam & param,std::vector<std::vector<uint8_t>> & key)617 int32_t RdbServiceProxy::GetPassword(const RdbSyncerParam ¶m, std::vector<std::vector<uint8_t>> &key)
618 {
619 MessageParcel reply;
620 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_PASSWORD), reply, param);
621 if (status != RDB_OK) {
622 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
623 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
624 return status;
625 }
626 if (!ITypesUtil::Unmarshal(reply, key)) {
627 LOG_ERROR("unmarshal key failed.");
628 status = RDB_ERROR;
629 }
630 return status;
631 }
632
LockCloudContainer(const RdbSyncerParam & param)633 std::pair<int32_t, uint32_t> RdbServiceProxy::LockCloudContainer(const RdbSyncerParam ¶m)
634 {
635 MessageParcel reply;
636 uint32_t expiredTime = 0;
637 int32_t status = IPC_SEND(
638 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_LOCK_CLOUD_CONTAINER), reply, param, expiredTime);
639 if (status != RDB_OK) {
640 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
641 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
642 return { status, expiredTime };
643 }
644 if (!ITypesUtil::Unmarshal(reply, expiredTime)) {
645 LOG_ERROR("Unmarshal failed");
646 status = RDB_ERROR;
647 }
648 return { status, expiredTime };
649 }
650
UnlockCloudContainer(const RdbSyncerParam & param)651 int32_t RdbServiceProxy::UnlockCloudContainer(const RdbSyncerParam ¶m)
652 {
653 MessageParcel reply;
654 int32_t status =
655 IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNLOCK_CLOUD_CONTAINER), reply, param);
656 if (status != RDB_OK) {
657 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
658 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
659 }
660 return status;
661 }
662
GetDebugInfo(const RdbSyncerParam & param,std::map<std::string,RdbDebugInfo> & debugInfo)663 int32_t RdbServiceProxy::GetDebugInfo(const RdbSyncerParam ¶m, std::map<std::string, RdbDebugInfo> &debugInfo)
664 {
665 MessageParcel reply;
666 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_DEBUG_INFO), reply, param);
667 if (status != RDB_OK) {
668 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
669 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
670 }
671 if (!ITypesUtil::Unmarshal(reply, debugInfo)) {
672 LOG_ERROR("Unmarshal failed");
673 status = RDB_ERROR;
674 }
675 return status;
676 }
677
GetDfxInfo(const RdbSyncerParam & param,DistributedRdb::RdbDfxInfo & dfxInfo)678 int32_t RdbServiceProxy::GetDfxInfo(const RdbSyncerParam ¶m, DistributedRdb::RdbDfxInfo &dfxInfo)
679 {
680 MessageParcel reply;
681 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_DFX_INFO), reply, param);
682 if (status != RDB_OK) {
683 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
684 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
685 }
686 if (!ITypesUtil::Unmarshal(reply, dfxInfo)) {
687 LOG_ERROR("Unmarshal failed, bundleName:%{public}s, storeName:%{public}s",
688 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
689 status = RDB_ERROR;
690 }
691 return status;
692 }
693
VerifyPromiseInfo(const RdbSyncerParam & param)694 int32_t RdbServiceProxy::VerifyPromiseInfo(const RdbSyncerParam ¶m)
695 {
696 MessageParcel reply;
697 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_VERIFY_PROMISE_INFO), reply, param);
698 if (status != RDB_OK) {
699 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
700 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
701 }
702 return status;
703 }
704
~RdbServiceProxy()705 RdbServiceProxy::~RdbServiceProxy()
706 {
707 int32_t retry = 0;
708 while (notifier_ != nullptr && notifier_->GetSptrRefCount() > 1 && retry < MAX_RETRY) {
709 retry++;
710 std::this_thread::sleep_for(std::chrono::milliseconds(1));
711 }
712 if (notifier_ != nullptr && notifier_->GetSptrRefCount() > 1) {
713 LOG_WARN("notifier_ has other in use:%{public}d!", notifier_->GetSptrRefCount());
714 }
715 }
716 } // namespace OHOS::DistributedRdb