1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rdb_service_proxy.h"
17
18 #include "itypes_util.h"
19 #include "logger.h"
20 #include "sqlite_utils.h"
21
22 namespace OHOS::DistributedRdb {
23 using namespace OHOS::Rdb;
24 using SqliteUtils = OHOS::NativeRdb::SqliteUtils;
25 using RdbServiceCode = OHOS::DistributedRdb::RelationalStore::RdbServiceInterfaceCode;
26
27 #define IPC_SEND(code, reply, ...) \
28 ({ \
29 int32_t __status = RDB_OK; \
30 do { \
31 MessageParcel request; \
32 if (!request.WriteInterfaceToken(GetDescriptor())) { \
33 __status = RDB_ERROR; \
34 break; \
35 } \
36 if (!ITypesUtil::Marshal(request, ##__VA_ARGS__)) { \
37 __status = RDB_ERROR; \
38 break; \
39 } \
40 MessageOption option; \
41 auto result = remote_->SendRequest((code), request, reply, option); \
42 if (result != 0) { \
43 __status = RDB_ERROR; \
44 break; \
45 } \
46 \
47 ITypesUtil::Unmarshal(reply, __status); \
48 } while (0); \
49 __status; \
50 })
51
RdbServiceProxy(const sptr<IRemoteObject> & object)52 RdbServiceProxy::RdbServiceProxy(const sptr<IRemoteObject> &object)
53 : IRemoteProxy<IRdbService>(object)
54 {
55 remote_ = Remote();
56 }
57
OnSyncComplete(uint32_t seqNum,Details && result)58 void RdbServiceProxy::OnSyncComplete(uint32_t seqNum, Details &&result)
59 {
60 syncCallbacks_.ComputeIfPresent(seqNum, [&result] (const auto& key, const AsyncDetail & callback) {
61 auto finished = result.empty() || (result.begin()->second.progress == SYNC_FINISH);
62 LOG_DEBUG("Sync complete, seqNum%{public}d, result size:%{public}zu", key, result.size());
63 if (callback!=nullptr) {
64 callback(std::move(result));
65 }
66 return !finished;
67 });
68 }
69
OnDataChange(const Origin & origin,const PrimaryFields & primaries,ChangeInfo && changeInfo)70 void RdbServiceProxy::OnDataChange(const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo)
71 {
72 LOG_DEBUG("store:%{public}s data change from :%{public}s, dataType:%{public}d, origin:%{public}d.",
73 SqliteUtils::Anonymous(origin.store).c_str(),
74 origin.id.empty() ? "empty" : SqliteUtils::Anonymous(*origin.id.begin()).c_str(),
75 origin.dataType, origin.origin);
76 auto name = RemoveSuffix(origin.store);
77 observers_.ComputeIfPresent(name,
78 [&origin, &primaries, info = std::move(changeInfo)](const auto &key, const std::list<ObserverParam> &value)
79 mutable {
80 auto size = value.size();
81 for (const auto ¶ms : value) {
82 params.observer->OnChange(origin, primaries, --size > 0 ? ChangeInfo(info) : std::move(info));
83 }
84 return !value.empty();
85 });
86 }
87
ObtainDistributedTableName(const std::string & device,const std::string & table)88 std::string RdbServiceProxy::ObtainDistributedTableName(const std::string &device, const std::string &table)
89 {
90 return "";
91 }
92
InitNotifier(const RdbSyncerParam & param)93 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam ¶m)
94 {
95 notifier_ = new (std::nothrow) RdbNotifierStub(
96 [this] (uint32_t seqNum, Details &&result) {
97 OnSyncComplete(seqNum, std::move(result));
98 },
99 [this](const Origin &origin, const PrimaryFields &primaries, ChangeInfo &&changeInfo) {
100 OnDataChange(origin, primaries, std::move(changeInfo));
101 });
102 if (notifier_ == nullptr) {
103 LOG_ERROR("create notifier failed");
104 return RDB_ERROR;
105 }
106
107 if (InitNotifier(param, notifier_->AsObject()) != RDB_OK) {
108 notifier_ = nullptr;
109 return RDB_ERROR;
110 }
111
112 LOG_INFO("success");
113 return RDB_OK;
114 }
115
InitNotifier(const RdbSyncerParam & param,sptr<IRemoteObject> notifier)116 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam ¶m, sptr<IRemoteObject> notifier)
117 {
118 MessageParcel reply;
119 int32_t status = IPC_SEND(
120 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_INIT_NOTIFIER), reply, param, notifier);
121 if (status != RDB_OK) {
122 LOG_ERROR("status:%{public}d, bundleName:%{public}s", status, param.bundleName_.c_str());
123 }
124 return status;
125 }
126
GetSeqNum()127 uint32_t RdbServiceProxy::GetSeqNum()
128 {
129 uint32_t value = ++seqNum_;
130 if (value == 0) {
131 value = ++seqNum_;
132 }
133 return value;
134 }
135
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)136 std::pair<int32_t, Details> RdbServiceProxy::DoSync(const RdbSyncerParam& param, const Option &option,
137 const PredicatesMemo &predicates)
138 {
139 std::pair<int32_t, Details> result{RDB_ERROR, {}};
140 MessageParcel reply;
141 auto &[status, details] = result;
142 status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SYNC), reply, param, option, predicates);
143 if (status != RDB_OK) {
144 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s",
145 status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
146 return result;
147 }
148
149 if (!ITypesUtil::Unmarshal(reply, details)) {
150 LOG_ERROR("read result failed");
151 status = RDB_ERROR;
152 return result;
153 }
154 return result;
155 }
156
DoSync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)157 int32_t RdbServiceProxy::DoSync(const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates,
158 const AsyncDetail &async)
159 {
160 auto [status, details] = DoSync(param, option, predicates);
161 if (status != RDB_OK) {
162 LOG_INFO("failed");
163 return RDB_ERROR;
164 }
165 LOG_INFO("success");
166
167 if (async != nullptr) {
168 async(std::move(details));
169 }
170 return RDB_OK;
171 }
172
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates)173 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates)
174 {
175 MessageParcel reply;
176 int32_t status = IPC_SEND(
177 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_ASYNC), reply, param, option, predicates);
178 if (status != RDB_OK) {
179 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, seqNum:%{public}u", status,
180 param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), option.seqNum);
181 }
182 return status;
183 }
184
DoAsync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & callback)185 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam& param, const Option &option,
186 const PredicatesMemo &predicates, const AsyncDetail & callback)
187 {
188 Option asyncOption = option;
189 if (callback != nullptr) {
190 asyncOption.seqNum = GetSeqNum();
191 if (!syncCallbacks_.Insert(asyncOption.seqNum, callback)) {
192 LOG_INFO("insert callback failed");
193 return RDB_ERROR;
194 }
195 }
196 LOG_INFO("num=%{public}u", asyncOption.seqNum);
197 if (DoAsync(param, asyncOption, predicates) != RDB_OK) {
198 LOG_ERROR("failed");
199 syncCallbacks_.Erase(asyncOption.seqNum);
200 return RDB_ERROR;
201 }
202
203 LOG_INFO("success");
204 return RDB_OK;
205 }
206
SetDistributedTables(const RdbSyncerParam & param,const std::vector<std::string> & tables,int32_t type)207 int32_t RdbServiceProxy::SetDistributedTables(const RdbSyncerParam& param, const std::vector<std::string> &tables,
208 int32_t type)
209 {
210 MessageParcel reply;
211 int32_t status = IPC_SEND(
212 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SET_DIST_TABLE), reply, param, tables, type);
213 if (status != RDB_OK) {
214 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, type:%{public}d",
215 status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), type);
216 }
217 return status;
218 }
219
Sync(const RdbSyncerParam & param,const Option & option,const PredicatesMemo & predicates,const AsyncDetail & async)220 int32_t RdbServiceProxy::Sync(const RdbSyncerParam ¶m, const Option &option, const PredicatesMemo &predicates,
221 const AsyncDetail &async)
222 {
223 if (option.isAsync) {
224 return DoAsync(param, option, predicates, async);
225 }
226 return DoSync(param, option, predicates, async);
227 }
228
RemoveSuffix(const std::string & name)229 std::string RdbServiceProxy::RemoveSuffix(const std::string& name)
230 {
231 std::string suffix(".db");
232 auto pos = name.rfind(suffix);
233 if (pos == std::string::npos || pos < name.length() - suffix.length()) {
234 return name;
235 }
236 return { name, 0, pos };
237 }
238
Subscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)239 int32_t RdbServiceProxy::Subscribe(const RdbSyncerParam ¶m, const SubscribeOption &option,
240 RdbStoreObserver *observer)
241 {
242 if (observer == nullptr) {
243 return RDB_ERROR;
244 }
245 if (option.mode < SubscribeMode::REMOTE || option.mode >= SUBSCRIBE_MODE_MAX) {
246 LOG_ERROR("subscribe mode invalid");
247 return RDB_ERROR;
248 }
249 if (DoSubscribe(param, option) != RDB_OK) {
250 return RDB_ERROR;
251 }
252 auto name = RemoveSuffix(param.storeName_);
253 observers_.Compute(name, [observer, ¶m, &option](const auto &key, std::list<ObserverParam> &value) {
254 for (const auto &element : value) {
255 if (element.observer == observer) {
256 LOG_ERROR("duplicate observer");
257 return true;
258 }
259 }
260 value.push_back({ observer, param.bundleName_, option });
261 return true;
262 });
263 return RDB_OK;
264 }
265
DoSubscribe(const RdbSyncerParam & param,const SubscribeOption & option)266 int32_t RdbServiceProxy::DoSubscribe(const RdbSyncerParam ¶m, const SubscribeOption &option)
267 {
268 MessageParcel reply;
269 int32_t status = IPC_SEND(
270 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_SUBSCRIBE), reply, param, option);
271 if (status != RDB_OK) {
272 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s",
273 status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
274 }
275 return status;
276 }
277
UnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)278 int32_t RdbServiceProxy::UnSubscribe(const RdbSyncerParam ¶m, const SubscribeOption &option,
279 RdbStoreObserver *observer)
280 {
281 if (observer == nullptr) {
282 LOG_ERROR("observer is null");
283 return RDB_ERROR;
284 }
285 if (DoUnSubscribe(param) != RDB_OK) {
286 return RDB_ERROR;
287 }
288 auto name = RemoveSuffix(param.storeName_);
289 observers_.ComputeIfPresent(name, [observer](const auto &key, std::list<ObserverParam> &value) {
290 LOG_INFO("before remove size=%{public}d", static_cast<int>(value.size()));
291 value.remove_if([observer](const ObserverParam ¶m) {
292 return param.observer == observer;
293 });
294 LOG_INFO("after remove size=%{public}d", static_cast<int>(value.size()));
295 return !(value.empty());
296 });
297 return RDB_OK;
298 }
299
DoUnSubscribe(const RdbSyncerParam & param)300 int32_t RdbServiceProxy::DoUnSubscribe(const RdbSyncerParam ¶m)
301 {
302 MessageParcel reply;
303 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_UNSUBSCRIBE), reply, param);
304 if (status != RDB_OK) {
305 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s",
306 status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str());
307 }
308 return status;
309 }
310
RemoteQuery(const RdbSyncerParam & param,const std::string & device,const std::string & sql,const std::vector<std::string> & selectionArgs,sptr<IRemoteObject> & resultSet)311 int32_t RdbServiceProxy::RemoteQuery(const RdbSyncerParam& param, const std::string& device, const std::string& sql,
312 const std::vector<std::string>& selectionArgs, sptr<IRemoteObject>& resultSet)
313 {
314 MessageParcel reply;
315 int32_t status = IPC_SEND(
316 static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_REMOTE_QUERY), reply, param, device, sql, selectionArgs);
317 if (status != RDB_OK) {
318 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s, device:%{public}.6s",
319 status, param.bundleName_.c_str(), SqliteUtils::Anonymous(param.storeName_).c_str(), device.c_str());
320 return status;
321 }
322
323 sptr<IRemoteObject> remote = reply.ReadRemoteObject();
324 if (remote == nullptr) {
325 LOG_ERROR("read remote object is null");
326 return RDB_ERROR;
327 }
328 resultSet = remote;
329 return RDB_OK;
330 }
331
ExportObservers()332 RdbServiceProxy::Observers RdbServiceProxy::ExportObservers()
333 {
334 return observers_;
335 }
336
ImportObservers(Observers & observers)337 void RdbServiceProxy::ImportObservers(Observers &observers)
338 {
339 LOG_INFO("enter");
340 observers.ForEach([this](const std::string &key, const std::list<ObserverParam> &value) {
341 RdbSyncerParam syncerParam;
342 for (const auto ¶m : value) {
343 syncerParam.bundleName_ = param.bundleName;
344 syncerParam.storeName_ = key;
345 Subscribe(syncerParam, param.subscribeOption, param.observer);
346 }
347 return false;
348 });
349 }
350
GetSchema(const RdbSyncerParam & param)351 int32_t RdbServiceProxy::GetSchema(const RdbSyncerParam ¶m)
352 {
353 MessageParcel reply;
354 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_GET_SCHEMA), reply, param);
355 if (status != RDB_OK) {
356 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
357 SqliteUtils::Anonymous(param.storeName_).c_str());
358 }
359 return status;
360 }
361
Delete(const RdbSyncerParam & param)362 int32_t RdbServiceProxy::Delete(const RdbSyncerParam ¶m)
363 {
364 MessageParcel reply;
365 int32_t status = IPC_SEND(static_cast<uint32_t>(RdbServiceCode::RDB_SERVICE_CMD_DELETE), reply, param);
366 if (status != RDB_OK) {
367 LOG_ERROR("status:%{public}d, bundleName:%{public}s, storeName:%{public}s", status, param.bundleName_.c_str(),
368 SqliteUtils::Anonymous(param.storeName_).c_str());
369 }
370 return status;
371 }
372 } // namespace OHOS::DistributedRdb