• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbServiceProxy"
16 #include "rdb_service_proxy.h"
17 
18 #include <thread>
19 #include "itypes_util.h"
20 #include "logger.h"
21 #include "result_set_proxy.h"
22 #include "sqlite_utils.h"
23 
24 namespace OHOS::DistributedRdb {
25 using namespace OHOS::Rdb;
26 using SqliteUtils = OHOS::NativeRdb::SqliteUtils;
27 using RdbServiceCode = OHOS::DistributedRdb::RelationalStore::RdbServiceInterfaceCode;
28 constexpr int32_t MAX_RETRY = 100;
29 #define IPC_SEND(code, reply, ...)                                          \
30 ({                                                                          \
31     int32_t __status = RDB_OK;                                              \
32     do {                                                                    \
33         MessageParcel request;                                              \
34         if (!request.WriteInterfaceToken(GetDescriptor())) {                \
35             __status = RDB_ERROR;                                           \
36             break;                                                          \
37         }                                                                   \
38         if (!ITypesUtil::Marshal(request, ##__VA_ARGS__)) {                 \
39             __status = RDB_ERROR;                                           \
40             break;                                                          \
41         }                                                                   \
42         MessageOption option;                                               \
43         auto result = remote_->SendRequest((code), request, reply, option); \
44         if (result != 0) {                                                  \
45             __status = RDB_ERROR;                                           \
46             break;                                                          \
47         }                                                                   \
48                                                                             \
49         ITypesUtil::Unmarshal(reply, __status);                             \
50     } while (0);                                                            \
51     __status;                                                               \
52 })
53 
RdbServiceProxy(const sptr<IRemoteObject> & object)54 RdbServiceProxy::RdbServiceProxy(const sptr<IRemoteObject> &object)
55     : IRemoteProxy<IRdbService>(object)
56 {
57     remote_ = Remote();
58 }
59 
ObtainDistributedTableName(const RdbSyncerParam & param,const std::string & device,const std::string & table)60 std::string RdbServiceProxy::ObtainDistributedTableName(
61     const RdbSyncerParam &param, const std::string &device, const std::string &table)
62 {
63     MessageParcel reply;
64     int32_t status =
65         IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_OBTAIN_TABLE), reply, param, device, table);
66     std::string distributedTableName;
67     if (status != RDB_OK || !ITypesUtil::Unmarshal(reply, distributedTableName)) {
68         LOG_ERROR("status:%{public}d, device:%{public}s, table:%{public}s", status,
69             SqliteUtils::Anonymous(device).c_str(), SqliteUtils::Anonymous(table).c_str());
70     }
71     return distributedTableName;
72 }
73 
InitNotifier(const RdbSyncerParam & param)74 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam &param)
75 {
76     notifier_ = new (std::nothrow) RdbNotifierStub(
77         [this] (uint32_t seqNum, Details &&result) {
78             OnSyncComplete(seqNum, std::move(result));
79         },
80         [this] (std::string storeName, Details &&result) {
81             OnSyncComplete(storeName, std::move(result));
82         },
83         [this](const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) {
84             OnDataChange(origin, primaries, std::move(changeInfo));
85         });
86     if (notifier_ == nullptr) {
87         LOG_ERROR("create notifier failed.");
88         return RDB_ERROR;
89     }
90 
91     if (InitNotifier(param, notifier_->AsObject()) != RDB_OK) {
92         notifier_ = nullptr;
93         LOG_ERROR("init notifier error.");
94         return RDB_ERROR;
95     }
96 
97     return RDB_OK;
98 }
99 
InitNotifier(const RdbSyncerParam & param,sptr<IRemoteObject> notifier)100 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam &param, sptr<IRemoteObject> notifier)
101 {
102     MessageParcel reply;
103     int32_t status =
104         IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_INIT_NOTIFIER), reply, param, notifier);
105     if (status != RDB_OK) {
106         LOG_ERROR("status:%{public}d, bundleName:%{public}s", status, param.bundleName_.c_str());
107     }
108     return status;
109 }
110 
GetSeqNum()111 uint32_t RdbServiceProxy::GetSeqNum()
112 {
113     uint32_t value = ++seqNum_;
114     if (value == 0) {
115         value = ++seqNum_;
116     }
117     return value;
118 }
119 
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)120 std::pair<int32_t, Details> RdbServiceProxy::DoSync(
121     const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates)
122 {
123     std::pair<int32_t, Details> result{ RDB_ERROR, {} };
124     MessageParcel reply;
125     auto &[status, details] = result;
126     status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SYNC), reply, param, option, predicates);
127     if (status != RDB_OK) {
128         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
129             SqliteUtils::Anonymous(param.storeName_).c_str());
130         return result;
131     }
132 
133     if (!ITypesUtil::Unmarshal(reply, details)) {
134         LOG_ERROR("read result failed.");
135         status = RDB_ERROR;
136         return result;
137     }
138     return result;
139 }
140 
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)141 int32_t RdbServiceProxy::DoSync(
142     const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &async)
143 {
144     auto [status, details] = DoSync(param, option, predicates);
145     if (status != RDB_OK) {
146         LOG_INFO("failed.");
147         return RDB_ERROR;
148     }
149     LOG_INFO("success.");
150     if (async != nullptr) {
151         async(std::move(details));
152     }
153     return RDB_OK;
154 }
155 
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)156 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates)
157 {
158     MessageParcel reply;
159     int32_t status =
160         IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ASYNC), reply, param, option, predicates);
161     if (status != RDB_OK) {
162         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, seqNum:%{public}u", status,
163             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), option.seqNum);
164     }
165     return status;
166 }
167 
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & callback)168 int32_t RdbServiceProxy::DoAsync(
169     const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &callback)
170 {
171     Option asyncOption = option;
172     if (callback != nullptr) {
173         asyncOption.seqNum = GetSeqNum();
174         if (!syncCallbacks_.Insert(asyncOption.seqNum, callback)) {
175             LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, insert callback failed", param.bundleName_.c_str(),
176                 SqliteUtils::Anonymous(param.storeName_).c_str());
177             return RDB_ERROR;
178         }
179     }
180     LOG_INFO("bundleName:%{public}s, storeName:%{public}s, num=%{public}u, start DoAsync", param.bundleName_.c_str(),
181         SqliteUtils::Anonymous(param.storeName_).c_str(), asyncOption.seqNum);
182     if (DoAsync(param, asyncOption, predicates) != RDB_OK) {
183         syncCallbacks_.Erase(asyncOption.seqNum);
184         return RDB_ERROR;
185     }
186     return RDB_OK;
187 }
188 
SetDistributedTables(const RdbSyncerParam & param,const std::vector<std::string> & tables,const std::vector<Reference> & references,bool isRebuild,int32_t type)189 int32_t RdbServiceProxy::SetDistributedTables(const RdbSyncerParam &param, const std::vector<std::string> &tables,
190     const std::vector<Reference> &references, bool isRebuild, int32_t type)
191 {
192     MessageParcel reply;
193     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_DIST_TABLE), reply, param,
194         tables, references, type, isRebuild);
195     if (status != RDB_OK) {
196         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, type:%{public}d", status,
197             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), type);
198     }
199     return status;
200 }
201 
Sync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)202 int32_t RdbServiceProxy::Sync(
203     const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates, const AsyncDetail &async)
204 {
205     if (option.isAsync) {
206         return DoAsync(param, option, predicates, async);
207     }
208     return DoSync(param, option, predicates, async);
209 }
210 
RemoveSuffix(const std::string & name)211 std::string RdbServiceProxy::RemoveSuffix(const std::string &name)
212 {
213     std::string suffix(".db");
214     auto pos = name.rfind(suffix);
215     if (pos == std::string::npos || pos < name.length() - suffix.length()) {
216         return name;
217     }
218     return { name, 0, pos };
219 }
220 
Subscribe(const RdbSyncerParam & param,const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)221 int32_t RdbServiceProxy::Subscribe(
222     const RdbSyncerParam &param, const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
223 {
224     if (observer == nullptr) {
225         return RDB_ERROR;
226     }
227     if (option.mode < SubscribeMode::REMOTE || option.mode >= SUBSCRIBE_MODE_MAX) {
228         LOG_ERROR("subscribe mode invalid.");
229         return RDB_ERROR;
230     }
231     if (DoSubscribe(param, option) != RDB_OK) {
232         return RDB_ERROR;
233     }
234     auto name = RemoveSuffix(param.storeName_);
235     observers_.Compute(name, [observer, &param, &option](const auto &key, std::list<ObserverParam> &value) {
236         for (const auto &element : value) {
237             if (element.observer.lock() == observer) {
238                 LOG_ERROR("duplicate observer, storeName:%{public}s", SqliteUtils::Anonymous(key).c_str());
239                 return true;
240             }
241         }
242         value.push_back({ observer, param.bundleName_, option });
243         return true;
244     });
245     return RDB_OK;
246 }
247 
DoSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)248 int32_t RdbServiceProxy::DoSubscribe(const RdbSyncerParam &param, const SubscribeOption &option)
249 {
250     MessageParcel reply;
251     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SUBSCRIBE), reply, param, option);
252     if (status != RDB_OK) {
253         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
254             SqliteUtils::Anonymous(param.storeName_).c_str());
255     }
256     return status;
257 }
258 
UnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)259 int32_t RdbServiceProxy::UnSubscribe(
260     const RdbSyncerParam &param, const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
261 {
262     if (observer == nullptr) {
263         LOG_ERROR("observer is null.");
264         return RDB_ERROR;
265     }
266     if (DoUnSubscribe(param, option) != RDB_OK) {
267         return RDB_ERROR;
268     }
269     auto name = RemoveSuffix(param.storeName_);
270     observers_.ComputeIfPresent(name, [observer](const auto &key, std::list<ObserverParam> &value) {
271         LOG_INFO("before remove size=%{public}d", static_cast<int>(value.size()));
272         value.remove_if([observer](const ObserverParam &param) { return param.observer.lock() == observer; });
273         LOG_INFO("after  remove size=%{public}d", static_cast<int>(value.size()));
274         return !(value.empty());
275     });
276     return RDB_OK;
277 }
278 
DoUnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)279 int32_t RdbServiceProxy::DoUnSubscribe(const RdbSyncerParam &param, const SubscribeOption &option)
280 {
281     MessageParcel reply;
282     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNSUBSCRIBE), reply, param, option);
283     if (status != RDB_OK) {
284         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
285             SqliteUtils::Anonymous(param.storeName_).c_str());
286     }
287     return status;
288 }
289 
RemoteQuery(const RdbSyncerParam & param,const std::string & device,const std::string & sql,const std::vector<std::string> & selectionArgs)290 std::pair<int32_t, std::shared_ptr<RdbServiceProxy::ResultSet>> RdbServiceProxy::RemoteQuery(
291     const RdbSyncerParam &param, const std::string &device, const std::string &sql,
292     const std::vector<std::string> &selectionArgs)
293 {
294     MessageParcel reply;
295     int32_t status = IPC_SEND(
296         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REMOTE_QUERY), reply, param, device, sql, selectionArgs);
297     if (status != RDB_OK) {
298         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, device:%{public}.6s", status,
299             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(),
300             SqliteUtils::Anonymous(device).c_str());
301         return { status, nullptr };
302     }
303 
304     sptr<IRemoteObject> remote = reply.ReadRemoteObject();
305     if (remote == nullptr) {
306         LOG_ERROR("read remote object is null.");
307         return { RDB_ERROR, nullptr };
308     }
309     sptr<NativeRdb::ResultSetProxy> instance = new(std::nothrow) NativeRdb::ResultSetProxy(remote);
310     if (instance == nullptr) {
311         LOG_ERROR("instance object is null.bundleName:%{public}s, storeName:%{public}s, device:%{public}.6s",
312             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(),
313             SqliteUtils::Anonymous(device).c_str());
314         return { RDB_ERROR, nullptr };
315     }
316     return { RDB_OK, std::shared_ptr<ResultSet>(instance.GetRefPtr(), [holder = instance](const auto *) {}) };
317 }
318 
ExportObservers()319 RdbServiceProxy::Observers RdbServiceProxy::ExportObservers()
320 {
321     return observers_;
322 }
323 
ExportSyncObservers()324 RdbServiceProxy::SyncObservers RdbServiceProxy::ExportSyncObservers()
325 {
326     return syncObservers_;
327 }
328 
ImportObservers(Observers & observers)329 void RdbServiceProxy::ImportObservers(Observers &observers)
330 {
331     observers.ForEach([this](const std::string &key, const std::list<ObserverParam> &value) {
332         RdbSyncerParam syncerParam;
333         for (const auto &param : value) {
334             syncerParam.bundleName_ = param.bundleName;
335             syncerParam.storeName_ = key;
336             Subscribe(syncerParam, param.subscribeOption, param.observer.lock());
337         }
338         return false;
339     });
340 }
341 
ImportSyncObservers(SyncObservers & syncObservers)342 void RdbServiceProxy::ImportSyncObservers(SyncObservers &syncObservers)
343 {
344     syncObservers.ForEach([this](const std::string &key, const std::list<SyncObserverParam> &value) {
345         RdbSyncerParam syncerParam;
346         for (const auto &param : value) {
347             syncerParam.bundleName_ = param.bundleName;
348             syncerParam.storeName_ = key;
349             RegisterAutoSyncCallback(syncerParam, param.syncObserver);
350         }
351         return false;
352     });
353 }
354 
BeforeOpen(RdbSyncerParam & param)355 int32_t RdbServiceProxy::BeforeOpen(RdbSyncerParam &param)
356 {
357     MessageParcel reply;
358     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_BEFORE_OPEN), reply, param);
359     if (status != RDB_OK) {
360         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
361             SqliteUtils::Anonymous(param.storeName_).c_str());
362         return status;
363     }
364     if (!ITypesUtil::Unmarshal(reply, param)) {
365         LOG_ERROR("read result failed.");
366         status = RDB_ERROR;
367     }
368     return status;
369 }
370 
AfterOpen(const RdbSyncerParam & param)371 int32_t RdbServiceProxy::AfterOpen(const RdbSyncerParam &param)
372 {
373     MessageParcel reply;
374     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_AFTER_OPEN), reply, param);
375     if (status != RDB_OK) {
376         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
377             SqliteUtils::Anonymous(param.storeName_).c_str());
378     }
379     return status;
380 }
381 
ReportStatistic(const RdbSyncerParam & param,const RdbStatEvent & statEvent)382 int32_t RdbServiceProxy::ReportStatistic(const RdbSyncerParam &param, const RdbStatEvent &statEvent)
383 {
384     MessageParcel reply;
385     int32_t status =
386         IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REPORT_STAT), reply, param, statEvent);
387     if (status != RDB_OK) {
388         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
389             SqliteUtils::Anonymous(param.storeName_).c_str());
390     }
391     return status;
392 }
393 
Delete(const RdbSyncerParam & param)394 int32_t RdbServiceProxy::Delete(const RdbSyncerParam &param)
395 {
396     MessageParcel reply;
397     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DELETE), reply, param);
398     if (status != RDB_OK) {
399         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
400             SqliteUtils::Anonymous(param.storeName_).c_str());
401     }
402     return status;
403 }
404 
QuerySharingResource(const RdbSyncerParam & param,const PredicatesMemo & predicates,const std::vector<std::string> & columns)405 std::pair<int32_t, std::shared_ptr<RdbServiceProxy::ResultSet>> RdbServiceProxy::QuerySharingResource(
406     const RdbSyncerParam &param, const PredicatesMemo &predicates, const std::vector<std::string> &columns)
407 {
408     MessageParcel reply;
409     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_QUERY_SHARING_RESOURCE), reply,
410         param, predicates, columns);
411     sptr<IRemoteObject> remote;
412     bool success = ITypesUtil::Unmarshal(reply, remote);
413     if (status != RDB_OK || !success || remote == nullptr) {
414         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, success:%{public}d, remote is "
415                   "%{public}s nullptr",
416             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), success,
417             remote != nullptr ? "not" : "");
418         return { RDB_ERROR, {} };
419     }
420     sptr<NativeRdb::ResultSetProxy> instance = new(std::nothrow) NativeRdb::ResultSetProxy(remote);
421     if (instance == nullptr) {
422         LOG_ERROR("instance object is null.bundleName:%{public}s, storeName:%{public}s",
423             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
424         return { RDB_ERROR, nullptr };
425     }
426     return { RDB_OK, std::shared_ptr<ResultSet>(instance.GetRefPtr(), [instance](const auto *) {}) };
427 }
428 
RegisterAutoSyncCallback(const RdbSyncerParam & param,std::shared_ptr<DetailProgressObserver> observer)429 int32_t RdbServiceProxy::RegisterAutoSyncCallback(
430     const RdbSyncerParam &param, std::shared_ptr<DetailProgressObserver> observer)
431 {
432     if (observer == nullptr || param.storeName_.empty()) {
433         LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, syncObserver is nullptr", param.bundleName_.c_str(),
434             SqliteUtils::Anonymous(param.storeName_).c_str());
435         return RDB_ERROR;
436     }
437     int32_t status = RDB_OK;
438     auto name = RemoveSuffix(param.storeName_);
439     syncObservers_.Compute(name, [this, &param, &status, observer](const auto &store, auto &observers) {
440         for (const auto &element : observers) {
441             if (element.syncObserver.get() == observer.get()) {
442                 LOG_ERROR("duplicate observer, storeName:%{public}s", SqliteUtils::Anonymous(store).c_str());
443                 return true;
444             }
445         }
446         status = DoRegister(param);
447         if (status == RDB_OK) {
448             observers.push_back({ observer, param.bundleName_ });
449         }
450         return !observers.empty();
451     });
452     return status;
453 }
454 
DoRegister(const RdbSyncerParam & param)455 int32_t RdbServiceProxy::DoRegister(const RdbSyncerParam &param)
456 {
457     MessageParcel reply;
458     int32_t status = IPC_SEND(
459         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REGISTER_AUTOSYNC_PROGRESS_OBSERVER), reply, param);
460     if (status != RDB_OK) {
461         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
462             SqliteUtils::Anonymous(param.storeName_).c_str());
463     }
464     return status;
465 }
466 
UnregisterAutoSyncCallback(const RdbSyncerParam & param,std::shared_ptr<DetailProgressObserver> observer)467 int32_t RdbServiceProxy::UnregisterAutoSyncCallback(
468     const RdbSyncerParam &param, std::shared_ptr<DetailProgressObserver> observer)
469 {
470     if (observer == nullptr || param.storeName_.empty()) {
471         LOG_ERROR("bundleName:%{public}s, storeName:%{public}s, syncObserver is nullptr", param.bundleName_.c_str(),
472             SqliteUtils::Anonymous(param.storeName_).c_str());
473         return RDB_ERROR;
474     }
475     int32_t status = RDB_OK;
476     auto name = RemoveSuffix(param.storeName_);
477     syncObservers_.ComputeIfPresent(name, [this, &param, &status, observer](const auto &storeName, auto &observers) {
478         for (auto it = observers.begin(); it != observers.end();) {
479             if (it->syncObserver.get() != observer.get()) {
480                 ++it;
481                 continue;
482             }
483             status = DoUnRegister(param);
484             if (status == RDB_OK) {
485                 it = observers.erase(it);
486             }
487         }
488         return !observers.empty();
489     });
490     return status;
491 }
492 
DoUnRegister(const RdbSyncerParam & param)493 int32_t RdbServiceProxy::DoUnRegister(const RdbSyncerParam &param)
494 {
495     MessageParcel reply;
496     int32_t status = IPC_SEND(
497         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNREGISTER_AUTOSYNC_PROGRESS_OBSERVER), reply, param);
498     if (status != RDB_OK) {
499         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
500             SqliteUtils::Anonymous(param.storeName_).c_str());
501     }
502     return status;
503 }
504 
OnDataChange(const Origin & origin,const RdbServiceProxy::PrimaryFields & primaries,RdbServiceProxy::ChangeInfo && changeInfo)505 void RdbServiceProxy::OnDataChange(
506     const Origin &origin, const RdbServiceProxy::PrimaryFields &primaries, RdbServiceProxy::ChangeInfo &&changeInfo)
507 {
508     LOG_DEBUG("store:%{public}s data change from :%{public}s, dataType:%{public}d, origin:%{public}d.",
509         SqliteUtils::Anonymous(origin.store).c_str(),
510         origin.id.empty() ? "empty" : SqliteUtils::Anonymous(*origin.id.begin()).c_str(), origin.dataType,
511         origin.origin);
512     auto name = RdbServiceProxy::RemoveSuffix(origin.store);
513     observers_.ComputeIfPresent(name, [&origin, &primaries, info = std::move(changeInfo)](
514                                           const auto &key, const std::list<ObserverParam> &value) mutable {
515         auto size = value.size();
516         for (const auto &params : value) {
517             auto obs = params.observer.lock();
518             size--;
519             if (obs != nullptr) {
520                 obs->OnChange(origin, primaries, size > 0 ? ChangeInfo(info) : std::move(info));
521             }
522         }
523         return !value.empty();
524     });
525 }
526 
OnRemoteDeadSyncComplete()527 void RdbServiceProxy::OnRemoteDeadSyncComplete()
528 {
529     std::map<std::string, ProgressDetail> result;
530     result.insert(std::pair<std::string, ProgressDetail>("", {Progress::SYNC_FINISH, ProgressCode::UNKNOWN_ERROR}));
531     auto callbacks = std::move(syncCallbacks_);
532     callbacks.ForEach([&result](const auto &key, const AsyncDetail &callback) {
533         LOG_INFO("remote dead, compensate progress notification");
534         std::map<std::string, ProgressDetail> copyResult = result;
535         if (callback != nullptr) {
536             callback(std::move(result));
537         }
538         return false;
539     });
540 }
541 
OnSyncComplete(uint32_t seqNum,Details && result)542 void RdbServiceProxy::OnSyncComplete(uint32_t seqNum, Details &&result)
543 {
544     syncCallbacks_.ComputeIfPresent(seqNum, [&result](const auto &key, const AsyncDetail &callback) {
545         auto finished = result.empty() || (result.begin()->second.progress == SYNC_FINISH);
546         LOG_DEBUG("Sync complete, seqNum%{public}d, result size:%{public}zu", key, result.size());
547         if (callback != nullptr) {
548             callback(std::move(result));
549         }
550         return !finished;
551     });
552 }
553 
OnSyncComplete(const std::string & storeName,Details && result)554 void RdbServiceProxy::OnSyncComplete(const std::string &storeName, Details &&result)
555 {
556     syncObservers_.ComputeIfPresent(storeName, [&result](const auto &key, const auto &observers) {
557         LOG_DEBUG("Sync complete, storeName%{public}s, result size:%{public}zu", SqliteUtils::Anonymous(key).c_str(),
558             result.size());
559         for (const auto &it : observers) {
560             if (it.syncObserver != nullptr) {
561                 it.syncObserver->ProgressNotification(result);
562             }
563         }
564         return true;
565     });
566 }
567 
SetSearchable(const RdbSyncerParam & param,bool isSearchable)568 int32_t RdbServiceProxy::SetSearchable(const RdbSyncerParam &param, bool isSearchable)
569 {
570     MessageParcel reply;
571     int32_t status =
572         IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_SEARCHABLE), reply, param, isSearchable);
573     if (status != RDB_OK) {
574         LOG_ERROR("RdbServiceProxy SetSearchable fail, status:%{public}d, "
575                   "bundleName:%{public}s, storeName:%{public}s",
576             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
577     }
578     return status;
579 }
580 
NotifyDataChange(const RdbSyncerParam & param,const RdbChangedData & rdbChangedData,const RdbNotifyConfig & rdbNotifyConfig)581 int32_t RdbServiceProxy::NotifyDataChange(
582     const RdbSyncerParam &param, const RdbChangedData &rdbChangedData, const RdbNotifyConfig &rdbNotifyConfig)
583 {
584     MessageParcel reply;
585     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_NOTIFY_DATA_CHANGE), reply, param,
586         rdbChangedData, rdbNotifyConfig);
587     if (status != RDB_OK) {
588         LOG_ERROR("RdbServiceProxy NotifyDataChange fail, status:%{public}d, "
589                   "bundleName:%{public}s, storeName:%{public}s",
590             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
591     }
592     return status;
593 }
594 
Disable(const RdbSyncerParam & param)595 int32_t RdbServiceProxy::Disable(const RdbSyncerParam &param)
596 {
597     MessageParcel reply;
598     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DISABLE), reply, param);
599     if (status != RDB_OK) {
600         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
601             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
602     }
603     return status;
604 }
605 
Enable(const RdbSyncerParam & param)606 int32_t RdbServiceProxy::Enable(const RdbSyncerParam &param)
607 {
608     MessageParcel reply;
609     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ENABLE), reply, param);
610     if (status != RDB_OK) {
611         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
612             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
613     }
614     return status;
615 }
616 
GetPassword(const RdbSyncerParam & param,std::vector<std::vector<uint8_t>> & key)617 int32_t RdbServiceProxy::GetPassword(const RdbSyncerParam &param, std::vector<std::vector<uint8_t>> &key)
618 {
619     MessageParcel reply;
620     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_PASSWORD), reply, param);
621     if (status != RDB_OK) {
622         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
623             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
624         return status;
625     }
626     if (!ITypesUtil::Unmarshal(reply, key)) {
627         LOG_ERROR("unmarshal key failed.");
628         status = RDB_ERROR;
629     }
630     return status;
631 }
632 
LockCloudContainer(const RdbSyncerParam & param)633 std::pair<int32_t, uint32_t> RdbServiceProxy::LockCloudContainer(const RdbSyncerParam &param)
634 {
635     MessageParcel reply;
636     uint32_t expiredTime = 0;
637     int32_t status = IPC_SEND(
638         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_LOCK_CLOUD_CONTAINER), reply, param, expiredTime);
639     if (status != RDB_OK) {
640         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
641             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
642         return { status, expiredTime };
643     }
644     if (!ITypesUtil::Unmarshal(reply, expiredTime)) {
645         LOG_ERROR("Unmarshal failed");
646         status = RDB_ERROR;
647     }
648     return { status, expiredTime };
649 }
650 
UnlockCloudContainer(const RdbSyncerParam & param)651 int32_t RdbServiceProxy::UnlockCloudContainer(const RdbSyncerParam &param)
652 {
653     MessageParcel reply;
654     int32_t status =
655         IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNLOCK_CLOUD_CONTAINER), reply, param);
656     if (status != RDB_OK) {
657         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
658             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
659     }
660     return status;
661 }
662 
GetDebugInfo(const RdbSyncerParam & param,std::map<std::string,RdbDebugInfo> & debugInfo)663 int32_t RdbServiceProxy::GetDebugInfo(const RdbSyncerParam &param, std::map<std::string, RdbDebugInfo> &debugInfo)
664 {
665     MessageParcel reply;
666     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_DEBUG_INFO), reply, param);
667     if (status != RDB_OK) {
668         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
669             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
670     }
671     if (!ITypesUtil::Unmarshal(reply, debugInfo)) {
672         LOG_ERROR("Unmarshal failed");
673         status = RDB_ERROR;
674     }
675     return status;
676 }
677 
GetDfxInfo(const RdbSyncerParam & param,DistributedRdb::RdbDfxInfo & dfxInfo)678 int32_t RdbServiceProxy::GetDfxInfo(const RdbSyncerParam &param, DistributedRdb::RdbDfxInfo &dfxInfo)
679 {
680     MessageParcel reply;
681     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_DFX_INFO), reply, param);
682     if (status != RDB_OK) {
683         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
684             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
685     }
686     if (!ITypesUtil::Unmarshal(reply, dfxInfo)) {
687         LOG_ERROR("Unmarshal failed, bundleName:%{public}s, storeName:%{public}s",
688             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
689         status = RDB_ERROR;
690     }
691     return status;
692 }
693 
VerifyPromiseInfo(const RdbSyncerParam & param)694 int32_t RdbServiceProxy::VerifyPromiseInfo(const RdbSyncerParam &param)
695 {
696     MessageParcel reply;
697     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_VERIFY_PROMISE_INFO), reply, param);
698     if (status != RDB_OK) {
699         LOG_ERROR("fail, status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status,
700             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
701     }
702     return status;
703 }
704 
~RdbServiceProxy()705 RdbServiceProxy::~RdbServiceProxy()
706 {
707     int32_t retry = 0;
708     while (notifier_ != nullptr && notifier_->GetSptrRefCount() > 1 && retry < MAX_RETRY) {
709         retry++;
710         std::this_thread::sleep_for(std::chrono::milliseconds(1));
711     }
712     if (notifier_ != nullptr && notifier_->GetSptrRefCount() > 1) {
713         LOG_WARN("notifier_ has other in use:%{public}d!", notifier_->GetSptrRefCount());
714     }
715 }
716 } // namespace OHOS::DistributedRdb