1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbServiceProxy"
16 #include "rdb_service_proxy.h"
17
18 #include "itypes_util.h"
19 #include "logger.h"
20 #include "result_set_proxy.h"
21 #include "sqlite_utils.h"
22
23 namespace OHOS::DistributedRdb {
24 using namespace OHOS::Rdb;
25 using SqliteUtils = OHOS::NativeRdb::SqliteUtils;
26 using RdbServiceCode = OHOS::DistributedRdb::RelationalStore::RdbServiceInterfaceCode;
27
28 #define IPC_SEND(code, reply, ...) \
29 ({ \
30 int32_t __status = RDB_OK; \
31 do { \
32 MessageParcel request; \
33 if (!request.WriteInterfaceToken(GetDescriptor())) { \
34 __status = RDB_ERROR; \
35 break; \
36 } \
37 if (!ITypesUtil::Marshal(request, ##__VA_ARGS__)) { \
38 __status = RDB_ERROR; \
39 break; \
40 } \
41 MessageOption option; \
42 auto result = remote_->SendRequest((code), request, reply, option); \
43 if (result != 0) { \
44 __status = RDB_ERROR; \
45 break; \
46 } \
47 \
48 ITypesUtil::Unmarshal(reply, __status); \
49 } while (0); \
50 __status; \
51 })
52
RdbServiceProxy(const sptr<IRemoteObject> & object)53 RdbServiceProxy::RdbServiceProxy(const sptr<IRemoteObject> &object)
54 : IRemoteProxy<IRdbService>(object)
55 {
56 remote_ = Remote();
57 }
58
ObtainDistributedTableName(const std::string & device,const std::string & table)59 std::string RdbServiceProxy::ObtainDistributedTableName(const std::string &device, const std::string &table)
60 {
61 return "";
62 }
63
InitNotifier(const RdbSyncerParam & param)64 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam ¶m)
65 {
66 notifier_ = new (std::nothrow) RdbNotifierStub(
67 [this] (uint32_t seqNum, Details &&result) {
68 OnSyncComplete(seqNum, std::move(result));
69 },
70 [this] (std::string storeName, Details &&result) {
71 OnSyncComplete(storeName, std::move(result));
72 },
73 [this](const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) {
74 OnDataChange(origin, primaries, std::move(changeInfo));
75 });
76 if (notifier_ == nullptr) {
77 LOG_ERROR("create notifier failed.");
78 return RDB_ERROR;
79 }
80
81 if (InitNotifier(param, notifier_->AsObject()) != RDB_OK) {
82 notifier_ = nullptr;
83 LOG_ERROR("init notifier error.");
84 return RDB_ERROR;
85 }
86
87 return RDB_OK;
88 }
89
InitNotifier(const RdbSyncerParam & param,sptr<IRemoteObject> notifier)90 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam ¶m, sptr<IRemoteObject> notifier)
91 {
92 MessageParcel reply;
93 int32_t status =
94 IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_INIT_NOTIFIER), reply, param, notifier);
95 if (status != RDB_OK) {
96 LOG_ERROR("status:%{public}d, bundleName:%{public}s", status, param.bundleName_.c_str());
97 }
98 return status;
99 }
100
GetSeqNum()101 uint32_t RdbServiceProxy::GetSeqNum()
102 {
103 uint32_t value = ++seqNum_;
104 if (value == 0) {
105 value = ++seqNum_;
106 }
107 return value;
108 }
109
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)110 std::pair<int32_t, Details> RdbServiceProxy::DoSync(
111 const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates)
112 {
113 std::pair<int32_t, Details> result{ RDB_ERROR, {} };
114 MessageParcel reply;
115 auto &[status, details] = result;
116 status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SYNC), reply, param, option, predicates);
117 if (status != RDB_OK) {
118 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
119 SqliteUtils::Anonymous(param.storeName_).c_str());
120 return result;
121 }
122
123 if (!ITypesUtil::Unmarshal(reply, details)) {
124 LOG_ERROR("read result failed.");
125 status = RDB_ERROR;
126 return result;
127 }
128 return result;
129 }
130
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)131 int32_t RdbServiceProxy::DoSync(
132 const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &async)
133 {
134 auto [status, details] = DoSync(param, option, predicates);
135 if (status != RDB_OK) {
136 LOG_INFO("failed.");
137 return RDB_ERROR;
138 }
139 LOG_INFO("success.");
140 if (async != nullptr) {
141 async(std::move(details));
142 }
143 return RDB_OK;
144 }
145
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)146 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates)
147 {
148 MessageParcel reply;
149 int32_t status =
150 IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ASYNC), reply, param, option, predicates);
151 if (status != RDB_OK) {
152 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, seqNum:%{public}u", status,
153 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), option.seqNum);
154 }
155 return status;
156 }
157
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & callback)158 int32_t RdbServiceProxy::DoAsync(
159 const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &callback)
160 {
161 Option asyncOption = option;
162 if (callback != nullptr) {
163 asyncOption.seqNum = GetSeqNum();
164 if (!syncCallbacks_.Insert(asyncOption.seqNum, callback)) {
165 LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, insert callback failed", param.bundleName_.c_str(),
166 SqliteUtils::Anonymous(param.storeName_).c_str());
167 return RDB_ERROR;
168 }
169 }
170 LOG_INFO("bundleName:%{public}s, storeName:%{public}s, num=%{public}u, start DoAsync", param.bundleName_.c_str(),
171 SqliteUtils::Anonymous(param.storeName_).c_str(), asyncOption.seqNum);
172 if (DoAsync(param, asyncOption, predicates) != RDB_OK) {
173 syncCallbacks_.Erase(asyncOption.seqNum);
174 return RDB_ERROR;
175 }
176 return RDB_OK;
177 }
178
SetDistributedTables(const RdbSyncerParam & param,const std::vector<std::string> & tables,const std::vector<Reference> & references,bool isRebuild,int32_t type)179 int32_t RdbServiceProxy::SetDistributedTables(const RdbSyncerParam ¶m, const std::vector<std::string> &tables,
180 const std::vector<Reference> &references, bool isRebuild, int32_t type)
181 {
182 MessageParcel reply;
183 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_DIST_TABLE), reply, param,
184 tables, references, type, isRebuild);
185 if (status != RDB_OK) {
186 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, type:%{public}d", status,
187 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), type);
188 }
189 return status;
190 }
191
Sync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)192 int32_t RdbServiceProxy::Sync(
193 const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &async)
194 {
195 if (option.isAsync) {
196 return DoAsync(param, option, predicates, async);
197 }
198 return DoSync(param, option, predicates, async);
199 }
200
RemoveSuffix(const std::string & name)201 std::string RdbServiceProxy::RemoveSuffix(const std::string &name)
202 {
203 std::string suffix(".db");
204 auto pos = name.rfind(suffix);
205 if (pos == std::string::npos || pos < name.length() - suffix.length()) {
206 return name;
207 }
208 return { name, 0, pos };
209 }
210
Subscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)211 int32_t RdbServiceProxy::Subscribe(
212 const RdbSyncerParam ¶m, const SubscribeOption &option, RdbStoreObserver *observer)
213 {
214 if (observer == nullptr) {
215 return RDB_ERROR;
216 }
217 if (option.mode < SubscribeMode::REMOTE || option.mode >= SUBSCRIBE_MODE_MAX) {
218 LOG_ERROR("subscribe mode invalid.");
219 return RDB_ERROR;
220 }
221 if (DoSubscribe(param, option) != RDB_OK) {
222 return RDB_ERROR;
223 }
224 auto name = RemoveSuffix(param.storeName_);
225 observers_.Compute(name, [observer, ¶m, &option](const auto &key, std::list<ObserverParam> &value) {
226 for (const auto &element : value) {
227 if (element.observer == observer) {
228 LOG_ERROR("duplicate observer, storeName:%{public}s", SqliteUtils::Anonymous(key).c_str());
229 return true;
230 }
231 }
232 value.push_back({ observer, param.bundleName_, option });
233 return true;
234 });
235 return RDB_OK;
236 }
237
DoSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)238 int32_t RdbServiceProxy::DoSubscribe(const RdbSyncerParam ¶m, const SubscribeOption &option)
239 {
240 MessageParcel reply;
241 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SUBSCRIBE), reply, param, option);
242 if (status != RDB_OK) {
243 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
244 SqliteUtils::Anonymous(param.storeName_).c_str());
245 }
246 return status;
247 }
248
UnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)249 int32_t RdbServiceProxy::UnSubscribe(
250 const RdbSyncerParam ¶m, const SubscribeOption &option, RdbStoreObserver *observer)
251 {
252 if (observer == nullptr) {
253 LOG_ERROR("observer is null.");
254 return RDB_ERROR;
255 }
256 if (DoUnSubscribe(param, option) != RDB_OK) {
257 return RDB_ERROR;
258 }
259 auto name = RemoveSuffix(param.storeName_);
260 observers_.ComputeIfPresent(name, [observer](const auto &key, std::list<ObserverParam> &value) {
261 LOG_INFO("before remove size=%{public}d", static_cast<int>(value.size()));
262 value.remove_if([observer](const ObserverParam ¶m) { return param.observer == observer; });
263 LOG_INFO("after remove size=%{public}d", static_cast<int>(value.size()));
264 return !(value.empty());
265 });
266 return RDB_OK;
267 }
268
DoUnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)269 int32_t RdbServiceProxy::DoUnSubscribe(const RdbSyncerParam ¶m, const SubscribeOption &option)
270 {
271 MessageParcel reply;
272 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNSUBSCRIBE), reply, param, option);
273 if (status != RDB_OK) {
274 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
275 SqliteUtils::Anonymous(param.storeName_).c_str());
276 }
277 return status;
278 }
279
RemoteQuery(const RdbSyncerParam & param,const std::string & device,const std::string & sql,const std::vector<std::string> & selectionArgs)280 std::pair<int32_t, std::shared_ptr<RdbServiceProxy::ResultSet>> RdbServiceProxy::RemoteQuery(
281 const RdbSyncerParam ¶m, const std::string &device, const std::string &sql,
282 const std::vector<std::string> &selectionArgs)
283 {
284 MessageParcel reply;
285 int32_t status = IPC_SEND(
286 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REMOTE_QUERY), reply, param, device, sql, selectionArgs);
287 if (status != RDB_OK) {
288 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, device:%{public}.6s", status,
289 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(),
290 SqliteUtils::Anonymous(device).c_str());
291 return { status, nullptr };
292 }
293
294 sptr<IRemoteObject> remote = reply.ReadRemoteObject();
295 if (remote == nullptr) {
296 LOG_ERROR("read remote object is null.");
297 return { RDB_ERROR, nullptr };
298 }
299 sptr<NativeRdb::ResultSetProxy> instance = new(std::nothrow) NativeRdb::ResultSetProxy(remote);
300 if (instance == nullptr) {
301 LOG_ERROR("instance object is null.bundleName:%{public}s, storeName:%{public}s, device:%{public}.6s",
302 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(),
303 SqliteUtils::Anonymous(device).c_str());
304 return { RDB_ERROR, nullptr };
305 }
306 return { RDB_OK, std::shared_ptr<ResultSet>(instance.GetRefPtr(), [holder = instance](const auto *) {}) };
307 }
308
ExportObservers()309 RdbServiceProxy::Observers RdbServiceProxy::ExportObservers()
310 {
311 return observers_;
312 }
313
ExportSyncObservers()314 RdbServiceProxy::SyncObservers RdbServiceProxy::ExportSyncObservers()
315 {
316 return syncObservers_;
317 }
318
ImportObservers(Observers & observers)319 void RdbServiceProxy::ImportObservers(Observers &observers)
320 {
321 observers.ForEach([this](const std::string &key, const std::list<ObserverParam> &value) {
322 RdbSyncerParam syncerParam;
323 for (const auto ¶m : value) {
324 syncerParam.bundleName_ = param.bundleName;
325 syncerParam.storeName_ = key;
326 Subscribe(syncerParam, param.subscribeOption, param.observer);
327 }
328 return false;
329 });
330 }
331
ImportSyncObservers(SyncObservers & syncObservers)332 void RdbServiceProxy::ImportSyncObservers(SyncObservers &syncObservers)
333 {
334 syncObservers.ForEach([this](const std::string &key, const std::list<SyncObserverParam> &value) {
335 RdbSyncerParam syncerParam;
336 for (const auto ¶m : value) {
337 syncerParam.bundleName_ = param.bundleName;
338 syncerParam.storeName_ = key;
339 RegisterAutoSyncCallback(syncerParam, param.syncObserver);
340 }
341 return false;
342 });
343 }
344
BeforeOpen(RdbSyncerParam & param)345 int32_t RdbServiceProxy::BeforeOpen(RdbSyncerParam ¶m)
346 {
347 MessageParcel reply;
348 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_BEFORE_OPEN), reply, param);
349 if (status != RDB_OK) {
350 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
351 SqliteUtils::Anonymous(param.storeName_).c_str());
352 return status;
353 }
354 if (!ITypesUtil::Unmarshal(reply, param)) {
355 LOG_ERROR("read result failed.");
356 status = RDB_ERROR;
357 }
358 return status;
359 }
360
AfterOpen(const RdbSyncerParam & param)361 int32_t RdbServiceProxy::AfterOpen(const RdbSyncerParam ¶m)
362 {
363 MessageParcel reply;
364 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_AFTER_OPEN), reply, param);
365 if (status != RDB_OK) {
366 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
367 SqliteUtils::Anonymous(param.storeName_).c_str());
368 }
369 return status;
370 }
371
ReportStatistic(const RdbSyncerParam & param,const RdbStatEvent & statEvent)372 int32_t RdbServiceProxy::ReportStatistic(const RdbSyncerParam ¶m, const RdbStatEvent &statEvent)
373 {
374 MessageParcel reply;
375 int32_t status =
376 IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REPORT_STAT), reply, param, statEvent);
377 if (status != RDB_OK) {
378 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
379 SqliteUtils::Anonymous(param.storeName_).c_str());
380 }
381 return status;
382 }
383
Delete(const RdbSyncerParam & param)384 int32_t RdbServiceProxy::Delete(const RdbSyncerParam ¶m)
385 {
386 MessageParcel reply;
387 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DELETE), reply, param);
388 if (status != RDB_OK) {
389 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
390 SqliteUtils::Anonymous(param.storeName_).c_str());
391 }
392 return status;
393 }
394
QuerySharingResource(const RdbSyncerParam & param,const PredicatesMemo & predicates,const std::vector<std::string> & columns)395 std::pair<int32_t, std::shared_ptr<RdbServiceProxy::ResultSet>> RdbServiceProxy::QuerySharingResource(
396 const RdbSyncerParam ¶m, const PredicatesMemo &predicates, const std::vector<std::string> &columns)
397 {
398 MessageParcel reply;
399 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_QUERY_SHARING_RESOURCE), reply,
400 param, predicates, columns);
401 sptr<IRemoteObject> remote;
402 bool success = ITypesUtil::Unmarshal(reply, remote);
403 if (status != RDB_OK || !success || remote == nullptr) {
404 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, success:%{public}d, remote is "
405 "%{public}s nullptr",
406 status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), success,
407 remote != nullptr ? "not" : "");
408 return { RDB_ERROR, {} };
409 }
410 sptr<NativeRdb::ResultSetProxy> instance = new(std::nothrow) NativeRdb::ResultSetProxy(remote);
411 if (instance == nullptr) {
412 LOG_ERROR("instance object is null.bundleName:%{public}s, storeName:%{public}s",
413 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
414 return { RDB_ERROR, nullptr };
415 }
416 return { RDB_OK, std::shared_ptr<ResultSet>(instance.GetRefPtr(), [instance](const auto *) {}) };
417 }
418
RegisterAutoSyncCallback(const RdbSyncerParam & param,std::shared_ptr<DetailProgressObserver> observer)419 int32_t RdbServiceProxy::RegisterAutoSyncCallback(
420 const RdbSyncerParam ¶m, std::shared_ptr<DetailProgressObserver> observer)
421 {
422 if (observer == nullptr || param.storeName_.empty()) {
423 LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, syncObserver is nullptr", param.bundleName_.c_str(),
424 SqliteUtils::Anonymous(param.storeName_).c_str());
425 return RDB_ERROR;
426 }
427 int32_t status = RDB_OK;
428 auto name = RemoveSuffix(param.storeName_);
429 syncObservers_.Compute(name, [this, ¶m, &status, observer](const auto &store, auto &observers) {
430 for (const auto &element : observers) {
431 if (element.syncObserver.get() == observer.get()) {
432 LOG_ERROR("duplicate observer, storeName:%{public}s", SqliteUtils::Anonymous(store).c_str());
433 return true;
434 }
435 }
436 status = DoRegister(param);
437 if (status == RDB_OK) {
438 observers.push_back({ observer, param.bundleName_ });
439 }
440 return !observers.empty();
441 });
442 return status;
443 }
444
DoRegister(const RdbSyncerParam & param)445 int32_t RdbServiceProxy::DoRegister(const RdbSyncerParam ¶m)
446 {
447 MessageParcel reply;
448 int32_t status = IPC_SEND(
449 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REGISTER_AUTOSYNC_PROGRESS_OBSERVER), reply, param);
450 if (status != RDB_OK) {
451 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
452 SqliteUtils::Anonymous(param.storeName_).c_str());
453 }
454 return status;
455 }
456
UnregisterAutoSyncCallback(const RdbSyncerParam & param,std::shared_ptr<DetailProgressObserver> observer)457 int32_t RdbServiceProxy::UnregisterAutoSyncCallback(
458 const RdbSyncerParam ¶m, std::shared_ptr<DetailProgressObserver> observer)
459 {
460 if (observer == nullptr || param.storeName_.empty()) {
461 LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, syncObserver is nullptr", param.bundleName_.c_str(),
462 SqliteUtils::Anonymous(param.storeName_).c_str());
463 return RDB_ERROR;
464 }
465 int32_t status = RDB_OK;
466 auto name = RemoveSuffix(param.storeName_);
467 syncObservers_.ComputeIfPresent(name, [this, ¶m, &status, observer](const auto &storeName, auto &observers) {
468 for (auto it = observers.begin(); it != observers.end();) {
469 if (it->syncObserver.get() != observer.get()) {
470 ++it;
471 continue;
472 }
473 status = DoUnRegister(param);
474 if (status == RDB_OK) {
475 it = observers.erase(it);
476 }
477 }
478 return !observers.empty();
479 });
480 return status;
481 }
482
DoUnRegister(const RdbSyncerParam & param)483 int32_t RdbServiceProxy::DoUnRegister(const RdbSyncerParam ¶m)
484 {
485 MessageParcel reply;
486 int32_t status = IPC_SEND(
487 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNREGISTER_AUTOSYNC_PROGRESS_OBSERVER), reply, param);
488 if (status != RDB_OK) {
489 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
490 SqliteUtils::Anonymous(param.storeName_).c_str());
491 }
492 return status;
493 }
494
OnDataChange(const Origin & origin,const RdbServiceProxy::PrimaryFields & primaries,RdbServiceProxy::ChangeInfo && changeInfo)495 void RdbServiceProxy::OnDataChange(
496 const Origin &origin, const RdbServiceProxy::PrimaryFields &primaries, RdbServiceProxy::ChangeInfo &&changeInfo)
497 {
498 LOG_DEBUG("store:%{public}s data change from :%{public}s, dataType:%{public}d, origin:%{public}d.",
499 SqliteUtils::Anonymous(origin.store).c_str(),
500 origin.id.empty() ? "empty" : SqliteUtils::Anonymous(*origin.id.begin()).c_str(), origin.dataType,
501 origin.origin);
502 auto name = RdbServiceProxy::RemoveSuffix(origin.store);
503 observers_.ComputeIfPresent(name, [&origin, &primaries, info = std::move(changeInfo)](
504 const auto &key, const std::list<ObserverParam> &value) mutable {
505 auto size = value.size();
506 for (const auto ¶ms : value) {
507 params.observer->OnChange(origin, primaries, --size > 0 ? ChangeInfo(info) : std::move(info));
508 }
509 return !value.empty();
510 });
511 }
512
OnSyncComplete(uint32_t seqNum,Details && result)513 void RdbServiceProxy::OnSyncComplete(uint32_t seqNum, Details &&result)
514 {
515 syncCallbacks_.ComputeIfPresent(seqNum, [&result](const auto &key, const AsyncDetail &callback) {
516 auto finished = result.empty() || (result.begin()->second.progress == SYNC_FINISH);
517 LOG_DEBUG("Sync complete, seqNum%{public}d, result size:%{public}zu", key, result.size());
518 if (callback != nullptr) {
519 callback(std::move(result));
520 }
521 return !finished;
522 });
523 }
524
OnSyncComplete(const std::string & storeName,Details && result)525 void RdbServiceProxy::OnSyncComplete(const std::string &storeName, Details &&result)
526 {
527 syncObservers_.ComputeIfPresent(storeName, [&result](const auto &key, const auto &observers) {
528 LOG_DEBUG("Sync complete, storeName%{public}s, result size:%{public}zu", SqliteUtils::Anonymous(key).c_str(),
529 result.size());
530 for (const auto &it : observers) {
531 if (it.syncObserver != nullptr) {
532 it.syncObserver->ProgressNotification(result);
533 }
534 }
535 return true;
536 });
537 }
538
SetSearchable(const RdbSyncerParam & param,bool isSearchable)539 int32_t RdbServiceProxy::SetSearchable(const RdbSyncerParam ¶m, bool isSearchable)
540 {
541 MessageParcel reply;
542 int32_t status =
543 IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_SEARCHABLE), reply, param, isSearchable);
544 if (status != RDB_OK) {
545 LOG_ERROR("RdbServiceProxy SetSearchable fail, status:%{public}d, "
546 "bundleName:%{public}s, storeName:%{public}s",
547 status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
548 }
549 return status;
550 }
551
NotifyDataChange(const RdbSyncerParam & param,const RdbChangedData & rdbChangedData,const RdbNotifyConfig & rdbNotifyConfig)552 int32_t RdbServiceProxy::NotifyDataChange(
553 const RdbSyncerParam ¶m, const RdbChangedData &rdbChangedData, const RdbNotifyConfig &rdbNotifyConfig)
554 {
555 MessageParcel reply;
556 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_NOTIFY_DATA_CHANGE), reply, param,
557 rdbChangedData, rdbNotifyConfig);
558 if (status != RDB_OK) {
559 LOG_ERROR("RdbServiceProxy NotifyDataChange fail, status:%{public}d, "
560 "bundleName:%{public}s, storeName:%{public}s",
561 status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
562 }
563 return status;
564 }
565
Disable(const RdbSyncerParam & param)566 int32_t RdbServiceProxy::Disable(const RdbSyncerParam ¶m)
567 {
568 MessageParcel reply;
569 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DISABLE), reply, param);
570 if (status != RDB_OK) {
571 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
572 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
573 }
574 return status;
575 }
576
Enable(const RdbSyncerParam & param)577 int32_t RdbServiceProxy::Enable(const RdbSyncerParam ¶m)
578 {
579 MessageParcel reply;
580 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ENABLE), reply, param);
581 if (status != RDB_OK) {
582 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
583 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
584 }
585 return status;
586 }
587
GetPassword(const RdbSyncerParam & param,std::vector<std::vector<uint8_t>> & key)588 int32_t RdbServiceProxy::GetPassword(const RdbSyncerParam ¶m, std::vector<std::vector<uint8_t>> &key)
589 {
590 MessageParcel reply;
591 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_PASSWORD), reply, param);
592 if (status != RDB_OK) {
593 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
594 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
595 return status;
596 }
597 if (!ITypesUtil::Unmarshal(reply, key)) {
598 LOG_ERROR("unmarshal key failed.");
599 status = RDB_ERROR;
600 }
601 return status;
602 }
603
LockCloudContainer(const RdbSyncerParam & param)604 std::pair<int32_t, uint32_t> RdbServiceProxy::LockCloudContainer(const RdbSyncerParam ¶m)
605 {
606 MessageParcel reply;
607 uint32_t expiredTime = 0;
608 int32_t status = IPC_SEND(
609 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_LOCK_CLOUD_CONTAINER), reply, param, expiredTime);
610 if (status != RDB_OK) {
611 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
612 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
613 return { status, expiredTime };
614 }
615 if (!ITypesUtil::Unmarshal(reply, expiredTime)) {
616 LOG_ERROR("Unmarshal failed");
617 status = RDB_ERROR;
618 }
619 return { status, expiredTime };
620 }
621
UnlockCloudContainer(const RdbSyncerParam & param)622 int32_t RdbServiceProxy::UnlockCloudContainer(const RdbSyncerParam ¶m)
623 {
624 MessageParcel reply;
625 int32_t status =
626 IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNLOCK_CLOUD_CONTAINER), reply, param);
627 if (status != RDB_OK) {
628 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
629 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
630 }
631 return status;
632 }
633
GetDebugInfo(const RdbSyncerParam & param,std::map<std::string,RdbDebugInfo> & debugInfo)634 int32_t RdbServiceProxy::GetDebugInfo(const RdbSyncerParam ¶m, std::map<std::string, RdbDebugInfo> &debugInfo)
635 {
636 MessageParcel reply;
637 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_DEBUG_INFO), reply, param);
638 if (status != RDB_OK) {
639 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
640 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
641 }
642 if (!ITypesUtil::Unmarshal(reply, debugInfo)) {
643 LOG_ERROR("Unmarshal failed");
644 status = RDB_ERROR;
645 }
646 return status;
647 }
648
GetDfxInfo(const RdbSyncerParam & param,DistributedRdb::RdbDfxInfo & dfxInfo)649 int32_t RdbServiceProxy::GetDfxInfo(const RdbSyncerParam ¶m, DistributedRdb::RdbDfxInfo &dfxInfo)
650 {
651 MessageParcel reply;
652 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_DFX_INFO), reply, param);
653 if (status != RDB_OK) {
654 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
655 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
656 }
657 if (!ITypesUtil::Unmarshal(reply, dfxInfo)) {
658 LOG_ERROR("Unmarshal failed, bundleName:%{public}s, storeName:%{public}s",
659 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
660 status = RDB_ERROR;
661 }
662 return status;
663 }
664
VerifyPromiseInfo(const RdbSyncerParam & param)665 int32_t RdbServiceProxy::VerifyPromiseInfo(const RdbSyncerParam ¶m)
666 {
667 MessageParcel reply;
668 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_VERIFY_PROMISE_INFO), reply, param);
669 if (status != RDB_OK) {
670 LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
671 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
672 }
673 return status;
674 }
675 } // namespace OHOS::DistributedRdb