• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbServiceProxy"
16 #include "rdb_service_proxy.h"
17 
18 #include "itypes_util.h"
19 #include "logger.h"
20 #include "result_set_proxy.h"
21 #include "sqlite_utils.h"
22 
23 namespace OHOS::DistributedRdb {
24 using namespace OHOS::Rdb;
25 using SqliteUtils = OHOS::NativeRdb::SqliteUtils;
26 using RdbServiceCode = OHOS::DistributedRdb::RelationalStore::RdbServiceInterfaceCode;
27 
28 #define IPC_SEND(code, reply, ...)                                          \
29 ({                                                                          \
30     int32_t __status = RDB_OK;                                              \
31     do {                                                                    \
32         MessageParcel request;                                              \
33         if (!request.WriteInterfaceToken(GetDescriptor())) {                \
34             __status = RDB_ERROR;                                           \
35             break;                                                          \
36         }                                                                   \
37         if (!ITypesUtil::Marshal(request, ##__VA_ARGS__)) {                 \
38             __status = RDB_ERROR;                                           \
39             break;                                                          \
40         }                                                                   \
41         MessageOption option;                                               \
42         auto result = remote_->SendRequest((code), request, reply, option); \
43         if (result != 0) {                                                  \
44             __status = RDB_ERROR;                                           \
45             break;                                                          \
46         }                                                                   \
47                                                                             \
48         ITypesUtil::Unmarshal(reply, __status);                             \
49     } while (0);                                                            \
50     __status;                                                               \
51 })
52 
RdbServiceProxy(const sptr<IRemoteObject> & object)53 RdbServiceProxy::RdbServiceProxy(const sptr<IRemoteObject> &object)
54     : IRemoteProxy<IRdbService>(object)
55 {
56     remote_ = Remote();
57 }
58 
ObtainDistributedTableName(const std::string & device,const std::string & table)59 std::string RdbServiceProxy::ObtainDistributedTableName(const std::string &device, const std::string &table)
60 {
61     return "";
62 }
63 
InitNotifier(const RdbSyncerParam & param)64 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam &param)
65 {
66     notifier_ = new (std::nothrow) RdbNotifierStub(
67         [this] (uint32_t seqNum, Details &&result) {
68             OnSyncComplete(seqNum, std::move(result));
69         },
70         [this] (std::string storeName, Details &&result) {
71             OnSyncComplete(storeName, std::move(result));
72         },
73         [this](const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) {
74             OnDataChange(origin, primaries, std::move(changeInfo));
75         });
76     if (notifier_ == nullptr) {
77         LOG_ERROR("create notifier failed.");
78         return RDB_ERROR;
79     }
80 
81     if (InitNotifier(param, notifier_->AsObject()) != RDB_OK) {
82         notifier_ = nullptr;
83         LOG_ERROR("init notifier error.");
84         return RDB_ERROR;
85     }
86 
87     return RDB_OK;
88 }
89 
InitNotifier(const RdbSyncerParam & param,sptr<IRemoteObject> notifier)90 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam &param, sptr<IRemoteObject> notifier)
91 {
92     MessageParcel reply;
93     int32_t status =
94         IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_INIT_NOTIFIER), reply, param, notifier);
95     if (status != RDB_OK) {
96         LOG_ERROR("status:%{public}d, bundleName:%{public}s", status, param.bundleName_.c_str());
97     }
98     return status;
99 }
100 
GetSeqNum()101 uint32_t RdbServiceProxy::GetSeqNum()
102 {
103     uint32_t value = ++seqNum_;
104     if (value == 0) {
105         value = ++seqNum_;
106     }
107     return value;
108 }
109 
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)110 std::pair<int32_t, Details> RdbServiceProxy::DoSync(
111     const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates)
112 {
113     std::pair<int32_t, Details> result{ RDB_ERROR, {} };
114     MessageParcel reply;
115     auto &[status, details] = result;
116     status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SYNC), reply, param, option, predicates);
117     if (status != RDB_OK) {
118         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
119             SqliteUtils::Anonymous(param.storeName_).c_str());
120         return result;
121     }
122 
123     if (!ITypesUtil::Unmarshal(reply, details)) {
124         LOG_ERROR("read result failed.");
125         status = RDB_ERROR;
126         return result;
127     }
128     return result;
129 }
130 
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)131 int32_t RdbServiceProxy::DoSync(
132     const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &async)
133 {
134     auto [status, details] = DoSync(param, option, predicates);
135     if (status != RDB_OK) {
136         LOG_INFO("failed.");
137         return RDB_ERROR;
138     }
139     LOG_INFO("success.");
140     if (async != nullptr) {
141         async(std::move(details));
142     }
143     return RDB_OK;
144 }
145 
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)146 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates)
147 {
148     MessageParcel reply;
149     int32_t status =
150         IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ASYNC), reply, param, option, predicates);
151     if (status != RDB_OK) {
152         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, seqNum:%{public}u", status,
153             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), option.seqNum);
154     }
155     return status;
156 }
157 
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & callback)158 int32_t RdbServiceProxy::DoAsync(
159     const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &callback)
160 {
161     Option asyncOption = option;
162     if (callback != nullptr) {
163         asyncOption.seqNum = GetSeqNum();
164         if (!syncCallbacks_.Insert(asyncOption.seqNum, callback)) {
165             LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, insert callback failed", param.bundleName_.c_str(),
166                 SqliteUtils::Anonymous(param.storeName_).c_str());
167             return RDB_ERROR;
168         }
169     }
170     LOG_INFO("bundleName:%{public}s, storeName:%{public}s, num=%{public}u, start DoAsync", param.bundleName_.c_str(),
171         SqliteUtils::Anonymous(param.storeName_).c_str(), asyncOption.seqNum);
172     if (DoAsync(param, asyncOption, predicates) != RDB_OK) {
173         syncCallbacks_.Erase(asyncOption.seqNum);
174         return RDB_ERROR;
175     }
176     return RDB_OK;
177 }
178 
SetDistributedTables(const RdbSyncerParam & param,const std::vector<std::string> & tables,const std::vector<Reference> & references,bool isRebuild,int32_t type)179 int32_t RdbServiceProxy::SetDistributedTables(const RdbSyncerParam &param, const std::vector<std::string> &tables,
180     const std::vector<Reference> &references, bool isRebuild, int32_t type)
181 {
182     MessageParcel reply;
183     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_DIST_TABLE), reply, param,
184         tables, references, type, isRebuild);
185     if (status != RDB_OK) {
186         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, type:%{public}d", status,
187             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), type);
188     }
189     return status;
190 }
191 
Sync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)192 int32_t RdbServiceProxy::Sync(
193     const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &async)
194 {
195     if (option.isAsync) {
196         return DoAsync(param, option, predicates, async);
197     }
198     return DoSync(param, option, predicates, async);
199 }
200 
RemoveSuffix(const std::string & name)201 std::string RdbServiceProxy::RemoveSuffix(const std::string &name)
202 {
203     std::string suffix(".db");
204     auto pos = name.rfind(suffix);
205     if (pos == std::string::npos || pos < name.length() - suffix.length()) {
206         return name;
207     }
208     return { name, 0, pos };
209 }
210 
Subscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)211 int32_t RdbServiceProxy::Subscribe(
212     const RdbSyncerParam &param, const SubscribeOption &option, RdbStoreObserver *observer)
213 {
214     if (observer == nullptr) {
215         return RDB_ERROR;
216     }
217     if (option.mode < SubscribeMode::REMOTE || option.mode >= SUBSCRIBE_MODE_MAX) {
218         LOG_ERROR("subscribe mode invalid.");
219         return RDB_ERROR;
220     }
221     if (DoSubscribe(param, option) != RDB_OK) {
222         return RDB_ERROR;
223     }
224     auto name = RemoveSuffix(param.storeName_);
225     observers_.Compute(name, [observer, &param, &option](const auto &key, std::list<ObserverParam> &value) {
226         for (const auto &element : value) {
227             if (element.observer == observer) {
228                 LOG_ERROR("duplicate observer, storeName:%{public}s", SqliteUtils::Anonymous(key).c_str());
229                 return true;
230             }
231         }
232         value.push_back({ observer, param.bundleName_, option });
233         return true;
234     });
235     return RDB_OK;
236 }
237 
DoSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)238 int32_t RdbServiceProxy::DoSubscribe(const RdbSyncerParam &param, const SubscribeOption &option)
239 {
240     MessageParcel reply;
241     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SUBSCRIBE), reply, param, option);
242     if (status != RDB_OK) {
243         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
244             SqliteUtils::Anonymous(param.storeName_).c_str());
245     }
246     return status;
247 }
248 
UnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)249 int32_t RdbServiceProxy::UnSubscribe(
250     const RdbSyncerParam &param, const SubscribeOption &option, RdbStoreObserver *observer)
251 {
252     if (observer == nullptr) {
253         LOG_ERROR("observer is null.");
254         return RDB_ERROR;
255     }
256     if (DoUnSubscribe(param, option) != RDB_OK) {
257         return RDB_ERROR;
258     }
259     auto name = RemoveSuffix(param.storeName_);
260     observers_.ComputeIfPresent(name, [observer](const auto &key, std::list<ObserverParam> &value) {
261         LOG_INFO("before remove size=%{public}d", static_cast<int>(value.size()));
262         value.remove_if([observer](const ObserverParam &param) { return param.observer == observer; });
263         LOG_INFO("after  remove size=%{public}d", static_cast<int>(value.size()));
264         return !(value.empty());
265     });
266     return RDB_OK;
267 }
268 
DoUnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)269 int32_t RdbServiceProxy::DoUnSubscribe(const RdbSyncerParam &param, const SubscribeOption &option)
270 {
271     MessageParcel reply;
272     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNSUBSCRIBE), reply, param, option);
273     if (status != RDB_OK) {
274         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
275             SqliteUtils::Anonymous(param.storeName_).c_str());
276     }
277     return status;
278 }
279 
RemoteQuery(const RdbSyncerParam & param,const std::string & device,const std::string & sql,const std::vector<std::string> & selectionArgs)280 std::pair<int32_t, std::shared_ptr<RdbServiceProxy::ResultSet>> RdbServiceProxy::RemoteQuery(
281     const RdbSyncerParam &param, const std::string &device, const std::string &sql,
282     const std::vector<std::string> &selectionArgs)
283 {
284     MessageParcel reply;
285     int32_t status = IPC_SEND(
286         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REMOTE_QUERY), reply, param, device, sql, selectionArgs);
287     if (status != RDB_OK) {
288         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, device:%{public}.6s", status,
289             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(),
290             SqliteUtils::Anonymous(device).c_str());
291         return { status, nullptr };
292     }
293 
294     sptr<IRemoteObject> remote = reply.ReadRemoteObject();
295     if (remote == nullptr) {
296         LOG_ERROR("read remote object is null.");
297         return { RDB_ERROR, nullptr };
298     }
299     sptr<NativeRdb::ResultSetProxy> instance = new(std::nothrow) NativeRdb::ResultSetProxy(remote);
300     if (instance == nullptr) {
301         LOG_ERROR("instance object is null.bundleName:%{public}s, storeName:%{public}s, device:%{public}.6s",
302             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(),
303             SqliteUtils::Anonymous(device).c_str());
304         return { RDB_ERROR, nullptr };
305     }
306     return { RDB_OK, std::shared_ptr<ResultSet>(instance.GetRefPtr(), [holder = instance](const auto *) {}) };
307 }
308 
ExportObservers()309 RdbServiceProxy::Observers RdbServiceProxy::ExportObservers()
310 {
311     return observers_;
312 }
313 
ExportSyncObservers()314 RdbServiceProxy::SyncObservers RdbServiceProxy::ExportSyncObservers()
315 {
316     return syncObservers_;
317 }
318 
ImportObservers(Observers & observers)319 void RdbServiceProxy::ImportObservers(Observers &observers)
320 {
321     observers.ForEach([this](const std::string &key, const std::list<ObserverParam> &value) {
322         RdbSyncerParam syncerParam;
323         for (const auto &param : value) {
324             syncerParam.bundleName_ = param.bundleName;
325             syncerParam.storeName_ = key;
326             Subscribe(syncerParam, param.subscribeOption, param.observer);
327         }
328         return false;
329     });
330 }
331 
ImportSyncObservers(SyncObservers & syncObservers)332 void RdbServiceProxy::ImportSyncObservers(SyncObservers &syncObservers)
333 {
334     syncObservers.ForEach([this](const std::string &key, const std::list<SyncObserverParam> &value) {
335         RdbSyncerParam syncerParam;
336         for (const auto &param : value) {
337             syncerParam.bundleName_ = param.bundleName;
338             syncerParam.storeName_ = key;
339             RegisterAutoSyncCallback(syncerParam, param.syncObserver);
340         }
341         return false;
342     });
343 }
344 
BeforeOpen(RdbSyncerParam & param)345 int32_t RdbServiceProxy::BeforeOpen(RdbSyncerParam &param)
346 {
347     MessageParcel reply;
348     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_BEFORE_OPEN), reply, param);
349     if (status != RDB_OK) {
350         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
351             SqliteUtils::Anonymous(param.storeName_).c_str());
352         return status;
353     }
354     if (!ITypesUtil::Unmarshal(reply, param)) {
355         LOG_ERROR("read result failed.");
356         status = RDB_ERROR;
357     }
358     return status;
359 }
360 
AfterOpen(const RdbSyncerParam & param)361 int32_t RdbServiceProxy::AfterOpen(const RdbSyncerParam &param)
362 {
363     MessageParcel reply;
364     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_AFTER_OPEN), reply, param);
365     if (status != RDB_OK) {
366         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
367             SqliteUtils::Anonymous(param.storeName_).c_str());
368     }
369     return status;
370 }
371 
ReportStatistic(const RdbSyncerParam & param,const RdbStatEvent & statEvent)372 int32_t RdbServiceProxy::ReportStatistic(const RdbSyncerParam &param, const RdbStatEvent &statEvent)
373 {
374     MessageParcel reply;
375     int32_t status =
376         IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REPORT_STAT), reply, param, statEvent);
377     if (status != RDB_OK) {
378         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
379             SqliteUtils::Anonymous(param.storeName_).c_str());
380     }
381     return status;
382 }
383 
Delete(const RdbSyncerParam & param)384 int32_t RdbServiceProxy::Delete(const RdbSyncerParam &param)
385 {
386     MessageParcel reply;
387     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DELETE), reply, param);
388     if (status != RDB_OK) {
389         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
390             SqliteUtils::Anonymous(param.storeName_).c_str());
391     }
392     return status;
393 }
394 
QuerySharingResource(const RdbSyncerParam & param,const PredicatesMemo & predicates,const std::vector<std::string> & columns)395 std::pair<int32_t, std::shared_ptr<RdbServiceProxy::ResultSet>> RdbServiceProxy::QuerySharingResource(
396     const RdbSyncerParam &param, const PredicatesMemo &predicates, const std::vector<std::string> &columns)
397 {
398     MessageParcel reply;
399     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_QUERY_SHARING_RESOURCE), reply,
400         param, predicates, columns);
401     sptr<IRemoteObject> remote;
402     bool success = ITypesUtil::Unmarshal(reply, remote);
403     if (status != RDB_OK || !success || remote == nullptr) {
404         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, success:%{public}d, remote is "
405                   "%{public}s nullptr",
406             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), success,
407             remote != nullptr ? "not" : "");
408         return { RDB_ERROR, {} };
409     }
410     sptr<NativeRdb::ResultSetProxy> instance = new(std::nothrow) NativeRdb::ResultSetProxy(remote);
411     if (instance == nullptr) {
412         LOG_ERROR("instance object is null.bundleName:%{public}s, storeName:%{public}s",
413             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
414         return { RDB_ERROR, nullptr };
415     }
416     return { RDB_OK, std::shared_ptr<ResultSet>(instance.GetRefPtr(), [instance](const auto *) {}) };
417 }
418 
RegisterAutoSyncCallback(const RdbSyncerParam & param,std::shared_ptr<DetailProgressObserver> observer)419 int32_t RdbServiceProxy::RegisterAutoSyncCallback(
420     const RdbSyncerParam &param, std::shared_ptr<DetailProgressObserver> observer)
421 {
422     if (observer == nullptr || param.storeName_.empty()) {
423         LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, syncObserver is nullptr", param.bundleName_.c_str(),
424             SqliteUtils::Anonymous(param.storeName_).c_str());
425         return RDB_ERROR;
426     }
427     int32_t status = RDB_OK;
428     auto name = RemoveSuffix(param.storeName_);
429     syncObservers_.Compute(name, [this, &param, &status, observer](const auto &store, auto &observers) {
430         for (const auto &element : observers) {
431             if (element.syncObserver.get() == observer.get()) {
432                 LOG_ERROR("duplicate observer, storeName:%{public}s", SqliteUtils::Anonymous(store).c_str());
433                 return true;
434             }
435         }
436         status = DoRegister(param);
437         if (status == RDB_OK) {
438             observers.push_back({ observer, param.bundleName_ });
439         }
440         return !observers.empty();
441     });
442     return status;
443 }
444 
DoRegister(const RdbSyncerParam & param)445 int32_t RdbServiceProxy::DoRegister(const RdbSyncerParam &param)
446 {
447     MessageParcel reply;
448     int32_t status = IPC_SEND(
449         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REGISTER_AUTOSYNC_PROGRESS_OBSERVER), reply, param);
450     if (status != RDB_OK) {
451         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
452             SqliteUtils::Anonymous(param.storeName_).c_str());
453     }
454     return status;
455 }
456 
UnregisterAutoSyncCallback(const RdbSyncerParam & param,std::shared_ptr<DetailProgressObserver> observer)457 int32_t RdbServiceProxy::UnregisterAutoSyncCallback(
458     const RdbSyncerParam &param, std::shared_ptr<DetailProgressObserver> observer)
459 {
460     if (observer == nullptr || param.storeName_.empty()) {
461         LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, syncObserver is nullptr", param.bundleName_.c_str(),
462             SqliteUtils::Anonymous(param.storeName_).c_str());
463         return RDB_ERROR;
464     }
465     int32_t status = RDB_OK;
466     auto name = RemoveSuffix(param.storeName_);
467     syncObservers_.ComputeIfPresent(name, [this, &param, &status, observer](const auto &storeName, auto &observers) {
468         for (auto it = observers.begin(); it != observers.end();) {
469             if (it->syncObserver.get() != observer.get()) {
470                 ++it;
471                 continue;
472             }
473             status = DoUnRegister(param);
474             if (status == RDB_OK) {
475                 it = observers.erase(it);
476             }
477         }
478         return !observers.empty();
479     });
480     return status;
481 }
482 
DoUnRegister(const RdbSyncerParam & param)483 int32_t RdbServiceProxy::DoUnRegister(const RdbSyncerParam &param)
484 {
485     MessageParcel reply;
486     int32_t status = IPC_SEND(
487         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNREGISTER_AUTOSYNC_PROGRESS_OBSERVER), reply, param);
488     if (status != RDB_OK) {
489         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
490             SqliteUtils::Anonymous(param.storeName_).c_str());
491     }
492     return status;
493 }
494 
OnDataChange(const Origin & origin,const RdbServiceProxy::PrimaryFields & primaries,RdbServiceProxy::ChangeInfo && changeInfo)495 void RdbServiceProxy::OnDataChange(
496     const Origin &origin, const RdbServiceProxy::PrimaryFields &primaries, RdbServiceProxy::ChangeInfo &&changeInfo)
497 {
498     LOG_DEBUG("store:%{public}s data change from :%{public}s, dataType:%{public}d, origin:%{public}d.",
499         SqliteUtils::Anonymous(origin.store).c_str(),
500         origin.id.empty() ? "empty" : SqliteUtils::Anonymous(*origin.id.begin()).c_str(), origin.dataType,
501         origin.origin);
502     auto name = RdbServiceProxy::RemoveSuffix(origin.store);
503     observers_.ComputeIfPresent(name, [&origin, &primaries, info = std::move(changeInfo)](
504                                           const auto &key, const std::list<ObserverParam> &value) mutable {
505         auto size = value.size();
506         for (const auto &params : value) {
507             params.observer->OnChange(origin, primaries, --size > 0 ? ChangeInfo(info) : std::move(info));
508         }
509         return !value.empty();
510     });
511 }
512 
OnSyncComplete(uint32_t seqNum,Details && result)513 void RdbServiceProxy::OnSyncComplete(uint32_t seqNum, Details &&result)
514 {
515     syncCallbacks_.ComputeIfPresent(seqNum, [&result](const auto &key, const AsyncDetail &callback) {
516         auto finished = result.empty() || (result.begin()->second.progress == SYNC_FINISH);
517         LOG_DEBUG("Sync complete, seqNum%{public}d, result size:%{public}zu", key, result.size());
518         if (callback != nullptr) {
519             callback(std::move(result));
520         }
521         return !finished;
522     });
523 }
524 
OnSyncComplete(const std::string & storeName,Details && result)525 void RdbServiceProxy::OnSyncComplete(const std::string &storeName, Details &&result)
526 {
527     syncObservers_.ComputeIfPresent(storeName, [&result](const auto &key, const auto &observers) {
528         LOG_DEBUG("Sync complete, storeName%{public}s, result size:%{public}zu", SqliteUtils::Anonymous(key).c_str(),
529             result.size());
530         for (const auto &it : observers) {
531             if (it.syncObserver != nullptr) {
532                 it.syncObserver->ProgressNotification(result);
533             }
534         }
535         return true;
536     });
537 }
538 
SetSearchable(const RdbSyncerParam & param,bool isSearchable)539 int32_t RdbServiceProxy::SetSearchable(const RdbSyncerParam &param, bool isSearchable)
540 {
541     MessageParcel reply;
542     int32_t status =
543         IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_SEARCHABLE), reply, param, isSearchable);
544     if (status != RDB_OK) {
545         LOG_ERROR("RdbServiceProxy SetSearchable fail, status:%{public}d, "
546                   "bundleName:%{public}s, storeName:%{public}s",
547             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
548     }
549     return status;
550 }
551 
NotifyDataChange(const RdbSyncerParam & param,const RdbChangedData & rdbChangedData,const RdbNotifyConfig & rdbNotifyConfig)552 int32_t RdbServiceProxy::NotifyDataChange(
553     const RdbSyncerParam &param, const RdbChangedData &rdbChangedData, const RdbNotifyConfig &rdbNotifyConfig)
554 {
555     MessageParcel reply;
556     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_NOTIFY_DATA_CHANGE), reply, param,
557         rdbChangedData, rdbNotifyConfig);
558     if (status != RDB_OK) {
559         LOG_ERROR("RdbServiceProxy NotifyDataChange fail, status:%{public}d, "
560                   "bundleName:%{public}s, storeName:%{public}s",
561             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
562     }
563     return status;
564 }
565 
Disable(const RdbSyncerParam & param)566 int32_t RdbServiceProxy::Disable(const RdbSyncerParam &param)
567 {
568     MessageParcel reply;
569     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DISABLE), reply, param);
570     if (status != RDB_OK) {
571         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
572             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
573     }
574     return status;
575 }
576 
Enable(const RdbSyncerParam & param)577 int32_t RdbServiceProxy::Enable(const RdbSyncerParam &param)
578 {
579     MessageParcel reply;
580     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ENABLE), reply, param);
581     if (status != RDB_OK) {
582         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
583             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
584     }
585     return status;
586 }
587 
GetPassword(const RdbSyncerParam & param,std::vector<std::vector<uint8_t>> & key)588 int32_t RdbServiceProxy::GetPassword(const RdbSyncerParam &param, std::vector<std::vector<uint8_t>> &key)
589 {
590     MessageParcel reply;
591     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_PASSWORD), reply, param);
592     if (status != RDB_OK) {
593         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
594             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
595         return status;
596     }
597     if (!ITypesUtil::Unmarshal(reply, key)) {
598         LOG_ERROR("unmarshal key failed.");
599         status = RDB_ERROR;
600     }
601     return status;
602 }
603 
LockCloudContainer(const RdbSyncerParam & param)604 std::pair<int32_t, uint32_t> RdbServiceProxy::LockCloudContainer(const RdbSyncerParam &param)
605 {
606     MessageParcel reply;
607     uint32_t expiredTime = 0;
608     int32_t status = IPC_SEND(
609         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_LOCK_CLOUD_CONTAINER), reply, param, expiredTime);
610     if (status != RDB_OK) {
611         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
612             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
613         return { status, expiredTime };
614     }
615     if (!ITypesUtil::Unmarshal(reply, expiredTime)) {
616         LOG_ERROR("Unmarshal failed");
617         status = RDB_ERROR;
618     }
619     return { status, expiredTime };
620 }
621 
UnlockCloudContainer(const RdbSyncerParam & param)622 int32_t RdbServiceProxy::UnlockCloudContainer(const RdbSyncerParam &param)
623 {
624     MessageParcel reply;
625     int32_t status =
626         IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNLOCK_CLOUD_CONTAINER), reply, param);
627     if (status != RDB_OK) {
628         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
629             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
630     }
631     return status;
632 }
633 
GetDebugInfo(const RdbSyncerParam & param,std::map<std::string,RdbDebugInfo> & debugInfo)634 int32_t RdbServiceProxy::GetDebugInfo(const RdbSyncerParam &param, std::map<std::string, RdbDebugInfo> &debugInfo)
635 {
636     MessageParcel reply;
637     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_DEBUG_INFO), reply, param);
638     if (status != RDB_OK) {
639         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
640             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
641     }
642     if (!ITypesUtil::Unmarshal(reply, debugInfo)) {
643         LOG_ERROR("Unmarshal failed");
644         status = RDB_ERROR;
645     }
646     return status;
647 }
648 
GetDfxInfo(const RdbSyncerParam & param,DistributedRdb::RdbDfxInfo & dfxInfo)649 int32_t RdbServiceProxy::GetDfxInfo(const RdbSyncerParam &param, DistributedRdb::RdbDfxInfo &dfxInfo)
650 {
651     MessageParcel reply;
652     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_DFX_INFO), reply, param);
653     if (status != RDB_OK) {
654         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
655             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
656     }
657     if (!ITypesUtil::Unmarshal(reply, dfxInfo)) {
658         LOG_ERROR("Unmarshal failed, bundleName:%{public}s, storeName:%{public}s",
659             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
660         status = RDB_ERROR;
661     }
662     return status;
663 }
664 
VerifyPromiseInfo(const RdbSyncerParam & param)665 int32_t RdbServiceProxy::VerifyPromiseInfo(const RdbSyncerParam &param)
666 {
667     MessageParcel reply;
668     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_VERIFY_PROMISE_INFO), reply, param);
669     if (status != RDB_OK) {
670         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
671             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
672     }
673     return status;
674 }
675 } // namespace OHOS::DistributedRdb