• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rdb_service_proxy.h"
17 
18 #include "itypes_util.h"
19 #include "logger.h"
20 #include "sqlite_utils.h"
21 
22 namespace OHOS::DistributedRdb {
23 using namespace OHOS::Rdb;
24 using SqliteUtils = OHOS::NativeRdb::SqliteUtils;
25 using RdbServiceCode = OHOS::DistributedRdb::RelationalStore::RdbServiceInterfaceCode;
26 
27 #define IPC_SEND(code, reply, ...)                                          \
28 ({                                                                          \
29     int32_t __status = RDB_OK;                                              \
30     do {                                                                    \
31         MessageParcel request;                                              \
32         if (!request.WriteInterfaceToken(GetDescriptor())) {                \
33             __status = RDB_ERROR;                                           \
34             break;                                                          \
35         }                                                                   \
36         if (!ITypesUtil::Marshal(request, ##__VA_ARGS__)) {                 \
37             __status = RDB_ERROR;                                           \
38             break;                                                          \
39         }                                                                   \
40         MessageOption option;                                               \
41         auto result = remote_->SendRequest((code), request, reply, option); \
42         if (result != 0) {                                                  \
43             __status = RDB_ERROR;                                           \
44             break;                                                          \
45         }                                                                   \
46                                                                             \
47         ITypesUtil::Unmarshal(reply, __status);                             \
48     } while (0);                                                            \
49     __status;                                                               \
50 })
51 
RdbServiceProxy(const sptr<IRemoteObject> & object)52 RdbServiceProxy::RdbServiceProxy(const sptr<IRemoteObject> &object)
53     : IRemoteProxy<IRdbService>(object)
54 {
55     remote_ = Remote();
56 }
57 
OnSyncComplete(uint32_t seqNum,Details && result)58 void RdbServiceProxy::OnSyncComplete(uint32_t seqNum, Details &&result)
59 {
60     syncCallbacks_.ComputeIfPresent(seqNum, [&result] (const auto& key, const AsyncDetail & callback) {
61         auto finished = result.empty() || (result.begin()->second.progress == SYNC_FINISH);
62         LOG_DEBUG("Sync complete, seqNum%{public}d, result size:%{public}zu", key, result.size());
63         if (callback!=nullptr) {
64             callback(std::move(result));
65         }
66         return !finished;
67     });
68 }
69 
OnDataChange(const Origin & origin,const PrimaryFields & primaries,ChangeInfo && changeInfo)70 void RdbServiceProxy::OnDataChange(const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo)
71 {
72     LOG_DEBUG("store:%{public}s data change from :%{public}s, dataType:%{public}d, origin:%{public}d.",
73         SqliteUtils::Anonymous(origin.store).c_str(),
74         origin.id.empty() ? "empty" : SqliteUtils::Anonymous(*origin.id.begin()).c_str(),
75         origin.dataType, origin.origin);
76     auto name = RemoveSuffix(origin.store);
77     observers_.ComputeIfPresent(name,
78         [&origin, &primaries, info = std::move(changeInfo)](const auto &key, const std::list<ObserverParam> &value)
79             mutable {
80                 auto size = value.size();
81                 for (const auto &params : value) {
82                     params.observer->OnChange(origin, primaries, --size > 0 ? ChangeInfo(info) : std::move(info));
83                 }
84                 return !value.empty();
85             });
86 }
87 
ObtainDistributedTableName(const std::string & device,const std::string & table)88 std::string RdbServiceProxy::ObtainDistributedTableName(const std::string &device, const std::string &table)
89 {
90     return "";
91 }
92 
InitNotifier(const RdbSyncerParam & param)93 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam &param)
94 {
95     notifier_ = new (std::nothrow) RdbNotifierStub(
96         [this] (uint32_t seqNum, Details &&result) {
97             OnSyncComplete(seqNum, std::move(result));
98         },
99         [this](const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) {
100             OnDataChange(origin, primaries, std::move(changeInfo));
101         });
102     if (notifier_ == nullptr) {
103         LOG_ERROR("create notifier failed");
104         return RDB_ERROR;
105     }
106 
107     if (InitNotifier(param, notifier_->AsObject()) != RDB_OK) {
108         notifier_ = nullptr;
109         return RDB_ERROR;
110     }
111 
112     LOG_INFO("success");
113     return RDB_OK;
114 }
115 
InitNotifier(const RdbSyncerParam & param,sptr<IRemoteObject> notifier)116 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam &param, sptr<IRemoteObject> notifier)
117 {
118     MessageParcel reply;
119     int32_t status = IPC_SEND(
120         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_INIT_NOTIFIER), reply, param, notifier);
121     if (status != RDB_OK) {
122         LOG_ERROR("status:%{public}d, bundleName:%{public}s", status, param.bundleName_.c_str());
123     }
124     return status;
125 }
126 
GetSeqNum()127 uint32_t RdbServiceProxy::GetSeqNum()
128 {
129     uint32_t value = ++seqNum_;
130     if (value == 0) {
131         value = ++seqNum_;
132     }
133     return value;
134 }
135 
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)136 std::pair<int32_t, Details> RdbServiceProxy::DoSync(const RdbSyncerParam& param, const Option &option,
137     const PredicatesMemo &predicates)
138 {
139     std::pair<int32_t, Details> result{RDB_ERROR, {}};
140     MessageParcel reply;
141     auto &[status, details] = result;
142     status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SYNC), reply, param, option, predicates);
143     if (status != RDB_OK) {
144         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s",
145             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
146         return result;
147     }
148 
149     if (!ITypesUtil::Unmarshal(reply, details)) {
150         LOG_ERROR("read result failed");
151         status = RDB_ERROR;
152         return result;
153     }
154     return result;
155 }
156 
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)157 int32_t RdbServiceProxy::DoSync(const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates,
158     const AsyncDetail &async)
159 {
160     auto [status, details] = DoSync(param, option, predicates);
161     if (status != RDB_OK) {
162         LOG_INFO("failed");
163         return RDB_ERROR;
164     }
165     LOG_INFO("success");
166 
167     if (async != nullptr) {
168         async(std::move(details));
169     }
170     return RDB_OK;
171 }
172 
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)173 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates)
174 {
175     MessageParcel reply;
176     int32_t status = IPC_SEND(
177         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ASYNC), reply, param, option, predicates);
178     if (status != RDB_OK) {
179         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, seqNum:%{public}u", status,
180             param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), option.seqNum);
181     }
182     return status;
183 }
184 
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & callback)185 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam& param, const Option &option,
186                                  const PredicatesMemo &predicates, const AsyncDetail & callback)
187 {
188     Option asyncOption = option;
189     if (callback != nullptr) {
190         asyncOption.seqNum = GetSeqNum();
191         if (!syncCallbacks_.Insert(asyncOption.seqNum, callback)) {
192             LOG_INFO("insert callback failed");
193             return RDB_ERROR;
194         }
195     }
196     LOG_INFO("num=%{public}u", asyncOption.seqNum);
197     if (DoAsync(param, asyncOption, predicates) != RDB_OK) {
198         LOG_ERROR("failed");
199         syncCallbacks_.Erase(asyncOption.seqNum);
200         return RDB_ERROR;
201     }
202 
203     LOG_INFO("success");
204     return RDB_OK;
205 }
206 
SetDistributedTables(const RdbSyncerParam & param,const std::vector<std::string> & tables,int32_t type)207 int32_t RdbServiceProxy::SetDistributedTables(const RdbSyncerParam& param, const std::vector<std::string> &tables,
208     int32_t type)
209 {
210     MessageParcel reply;
211     int32_t status = IPC_SEND(
212         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_DIST_TABLE), reply, param, tables, type);
213     if (status != RDB_OK) {
214         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, type:%{public}d",
215             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), type);
216     }
217     return status;
218 }
219 
Sync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)220 int32_t RdbServiceProxy::Sync(const RdbSyncerParam &param, const Option &option, const PredicatesMemo &predicates,
221                               const AsyncDetail &async)
222 {
223     if (option.isAsync) {
224         return DoAsync(param, option, predicates, async);
225     }
226     return DoSync(param, option, predicates, async);
227 }
228 
RemoveSuffix(const std::string & name)229 std::string RdbServiceProxy::RemoveSuffix(const std::string& name)
230 {
231     std::string suffix(".db");
232     auto pos = name.rfind(suffix);
233     if (pos == std::string::npos || pos < name.length() - suffix.length()) {
234         return name;
235     }
236     return { name, 0, pos };
237 }
238 
Subscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)239 int32_t RdbServiceProxy::Subscribe(const RdbSyncerParam &param, const SubscribeOption &option,
240                                    RdbStoreObserver *observer)
241 {
242     if (observer == nullptr) {
243         return RDB_ERROR;
244     }
245     if (option.mode < SubscribeMode::REMOTE || option.mode >= SUBSCRIBE_MODE_MAX) {
246         LOG_ERROR("subscribe mode invalid");
247         return RDB_ERROR;
248     }
249     if (DoSubscribe(param, option) != RDB_OK) {
250         return RDB_ERROR;
251     }
252     auto name = RemoveSuffix(param.storeName_);
253     observers_.Compute(name, [observer, &param, &option](const auto &key, std::list<ObserverParam> &value) {
254         for (const auto &element : value) {
255             if (element.observer == observer) {
256                 LOG_ERROR("duplicate observer");
257                 return true;
258             }
259         }
260         value.push_back({ observer, param.bundleName_, option });
261         return true;
262     });
263     return RDB_OK;
264 }
265 
DoSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)266 int32_t RdbServiceProxy::DoSubscribe(const RdbSyncerParam &param, const SubscribeOption &option)
267 {
268     MessageParcel reply;
269     int32_t status = IPC_SEND(
270         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SUBSCRIBE), reply, param, option);
271     if (status != RDB_OK) {
272         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s",
273             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
274     }
275     return status;
276 }
277 
UnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)278 int32_t RdbServiceProxy::UnSubscribe(const RdbSyncerParam &param, const SubscribeOption &option,
279                                      RdbStoreObserver *observer)
280 {
281     if (observer == nullptr) {
282         LOG_ERROR("observer is null");
283         return RDB_ERROR;
284     }
285     if (DoUnSubscribe(param) != RDB_OK) {
286         return RDB_ERROR;
287     }
288     auto name = RemoveSuffix(param.storeName_);
289     observers_.ComputeIfPresent(name, [observer](const auto &key, std::list<ObserverParam> &value) {
290         LOG_INFO("before remove size=%{public}d", static_cast<int>(value.size()));
291         value.remove_if([observer](const ObserverParam &param) {
292             return param.observer == observer;
293         });
294         LOG_INFO("after  remove size=%{public}d", static_cast<int>(value.size()));
295         return !(value.empty());
296     });
297     return RDB_OK;
298 }
299 
DoUnSubscribe(const RdbSyncerParam & param)300 int32_t RdbServiceProxy::DoUnSubscribe(const RdbSyncerParam &param)
301 {
302     MessageParcel reply;
303     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNSUBSCRIBE), reply, param);
304     if (status != RDB_OK) {
305         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s",
306             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
307     }
308     return status;
309 }
310 
RemoteQuery(const RdbSyncerParam & param,const std::string & device,const std::string & sql,const std::vector<std::string> & selectionArgs,sptr<IRemoteObject> & resultSet)311 int32_t RdbServiceProxy::RemoteQuery(const RdbSyncerParam& param, const std::string& device, const std::string& sql,
312                                      const std::vector<std::string>& selectionArgs, sptr<IRemoteObject>& resultSet)
313 {
314     MessageParcel reply;
315     int32_t status = IPC_SEND(
316         static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REMOTE_QUERY), reply, param, device, sql, selectionArgs);
317     if (status != RDB_OK) {
318         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, device:%{public}.6s",
319             status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), device.c_str());
320         return status;
321     }
322 
323     sptr<IRemoteObject> remote = reply.ReadRemoteObject();
324     if (remote == nullptr) {
325         LOG_ERROR("read remote object is null");
326         return RDB_ERROR;
327     }
328     resultSet = remote;
329     return RDB_OK;
330 }
331 
ExportObservers()332 RdbServiceProxy::Observers RdbServiceProxy::ExportObservers()
333 {
334     return observers_;
335 }
336 
ImportObservers(Observers & observers)337 void RdbServiceProxy::ImportObservers(Observers &observers)
338 {
339     LOG_INFO("enter");
340     observers.ForEach([this](const std::string &key, const std::list<ObserverParam> &value) {
341         RdbSyncerParam syncerParam;
342         for (const auto &param : value) {
343             syncerParam.bundleName_ = param.bundleName;
344             syncerParam.storeName_ = key;
345             Subscribe(syncerParam, param.subscribeOption, param.observer);
346         }
347         return false;
348     });
349 }
350 
GetSchema(const RdbSyncerParam & param)351 int32_t RdbServiceProxy::GetSchema(const RdbSyncerParam &param)
352 {
353     MessageParcel reply;
354     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_SCHEMA), reply, param);
355     if (status != RDB_OK) {
356         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
357             SqliteUtils::Anonymous(param.storeName_).c_str());
358     }
359     return status;
360 }
361 
Delete(const RdbSyncerParam & param)362 int32_t RdbServiceProxy::Delete(const RdbSyncerParam &param)
363 {
364     MessageParcel reply;
365     int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DELETE), reply, param);
366     if (status != RDB_OK) {
367         LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
368             SqliteUtils::Anonymous(param.storeName_).c_str());
369     }
370     return status;
371 }
372 } // namespace OHOS::DistributedRdb