1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "RdbServiceProxy"
17
18 #include "rdb_service_proxy.h"
19 #include "itypes_util.h"
20 #include "log_print.h"
21
22 namespace OHOS::DistributedRdb {
RdbServiceProxy(const sptr<IRemoteObject> & object)23 RdbServiceProxy::RdbServiceProxy(const sptr<IRemoteObject> &object)
24 : IRemoteProxy<IRdbService>(object)
25 {
26 ZLOGI("construct");
27 }
28
OnSyncComplete(uint32_t seqNum,const SyncResult & result)29 void RdbServiceProxy::OnSyncComplete(uint32_t seqNum, const SyncResult &result)
30 {
31 syncCallbacks_.ComputeIfPresent(seqNum, [&result] (const auto& key, const SyncCallback& callback) {
32 callback(result);
33 return true;
34 });
35 syncCallbacks_.Erase(seqNum);
36 }
37
OnDataChange(const std::string & storeName,const std::vector<std::string> & devices)38 void RdbServiceProxy::OnDataChange(const std::string& storeName, const std::vector<std::string> &devices)
39 {
40 ZLOGI("%{public}s", storeName.c_str());
41 auto name = RemoveSuffix(storeName);
42 observers_.ComputeIfPresent(
43 name, [&devices] (const auto& key, const ObserverMapValue& value) {
44 for (const auto& observer : value.first) {
45 observer->OnChange(devices);
46 }
47 return true;
48 });
49 }
50
ObtainDistributedTableName(const std::string & device,const std::string & table)51 std::string RdbServiceProxy::ObtainDistributedTableName(const std::string &device, const std::string &table)
52 {
53 MessageParcel data;
54 if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
55 ZLOGE("write descriptor failed");
56 return "";
57 }
58 if (!DistributedKv::ITypesUtil::Marshal(data, device, table)) {
59 ZLOGE("write to message parcel failed");
60 return "";
61 }
62
63 MessageParcel reply;
64 MessageOption option;
65 if (Remote()->SendRequest(RDB_SERVICE_CMD_OBTAIN_TABLE, data, reply, option) != 0) {
66 ZLOGE("send request failed");
67 return "";
68 }
69 return reply.ReadString();
70 }
71
InitNotifier(const RdbSyncerParam & param)72 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam& param)
73 {
74 notifier_ = new (std::nothrow) RdbNotifierStub(
75 [this] (uint32_t seqNum, const SyncResult& result) {
76 OnSyncComplete(seqNum, result);
77 },
78 [this] (const std::string& storeName, const std::vector<std::string>& devices) {
79 OnDataChange(storeName, devices);
80 }
81 );
82 if (notifier_ == nullptr) {
83 ZLOGE("create notifier failed");
84 return RDB_ERROR;
85 }
86
87 if (InitNotifier(param, notifier_->AsObject().GetRefPtr()) != RDB_OK) {
88 notifier_ = nullptr;
89 return RDB_ERROR;
90 }
91
92 ZLOGI("success");
93 return RDB_OK;
94 }
95
InitNotifier(const RdbSyncerParam & param,const sptr<IRemoteObject> notifier)96 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam ¶m, const sptr<IRemoteObject> notifier)
97 {
98 MessageParcel data;
99 if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
100 ZLOGE("write descriptor failed");
101 return RDB_ERROR;
102 }
103 if (!DistributedKv::ITypesUtil::Marshal(data, param, notifier)) {
104 ZLOGE("write to message parcel failed");
105 return RDB_ERROR;
106 }
107
108 MessageParcel reply;
109 MessageOption option;
110 if (Remote()->SendRequest(RDB_SERVICE_CMD_INIT_NOTIFIER, data, reply, option) != 0) {
111 ZLOGE("send request failed");
112 return RDB_ERROR;
113 }
114
115 int32_t res = RDB_ERROR;
116 return reply.ReadInt32(res) ? res : RDB_ERROR;
117 }
118
GetSeqNum()119 uint32_t RdbServiceProxy::GetSeqNum()
120 {
121 return seqNum_++;
122 }
123
DoSync(const RdbSyncerParam & param,const SyncOption & option,const RdbPredicates & predicates,SyncResult & result)124 int32_t RdbServiceProxy::DoSync(const RdbSyncerParam& param, const SyncOption &option,
125 const RdbPredicates &predicates, SyncResult& result)
126 {
127 MessageParcel data;
128 if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
129 ZLOGE("write descriptor failed");
130 return RDB_ERROR;
131 }
132 if (!DistributedKv::ITypesUtil::Marshal(data, param, option, predicates)) {
133 ZLOGE("write to message parcel failed");
134 return RDB_ERROR;
135 }
136
137 MessageParcel reply;
138 MessageOption opt;
139 if (Remote()->SendRequest(RDB_SERVICE_CMD_SYNC, data, reply, opt) != 0) {
140 ZLOGE("send request failed");
141 return RDB_ERROR;
142 }
143
144 if (!DistributedKv::ITypesUtil::Unmarshal(reply, result)) {
145 ZLOGE("read result failed");
146 return RDB_ERROR;
147 }
148 return RDB_OK;
149 }
150
DoSync(const RdbSyncerParam & param,const SyncOption & option,const RdbPredicates & predicates,const SyncCallback & callback)151 int32_t RdbServiceProxy::DoSync(const RdbSyncerParam& param, const SyncOption &option,
152 const RdbPredicates &predicates, const SyncCallback& callback)
153 {
154 SyncResult result;
155 if (DoSync(param, option, predicates, result) != RDB_OK) {
156 ZLOGI("failed");
157 return RDB_ERROR;
158 }
159 ZLOGI("success");
160
161 if (callback != nullptr) {
162 callback(result);
163 }
164 return RDB_OK;
165 }
166
DoAsync(const RdbSyncerParam & param,uint32_t seqNum,const SyncOption & option,const RdbPredicates & predicates)167 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam& param, uint32_t seqNum, const SyncOption &option,
168 const RdbPredicates &predicates)
169 {
170 MessageParcel data;
171 if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
172 ZLOGE("write descriptor failed");
173 return RDB_ERROR;
174 }
175 if (!DistributedKv::ITypesUtil::Marshal(data, param, seqNum, option, predicates)) {
176 ZLOGE("write to message parcel failed");
177 return RDB_ERROR;
178 }
179
180 MessageParcel reply;
181 MessageOption opt;
182 if (Remote()->SendRequest(RDB_SERVICE_CMD_ASYNC, data, reply, opt) != 0) {
183 ZLOGE("send request failed");
184 return RDB_ERROR;
185 }
186
187 int32_t res = RDB_ERROR;
188 return reply.ReadInt32(res) ? res : RDB_ERROR;
189 }
190
DoAsync(const RdbSyncerParam & param,const SyncOption & option,const RdbPredicates & predicates,const SyncCallback & callback)191 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam& param, const SyncOption &option,
192 const RdbPredicates &predicates, const SyncCallback& callback)
193 {
194 uint32_t num = GetSeqNum();
195 if (!syncCallbacks_.Insert(num, callback)) {
196 ZLOGI("insert callback failed");
197 return RDB_ERROR;
198 }
199 ZLOGI("num=%{public}u", num);
200
201 if (DoAsync(param, num, option, predicates) != RDB_OK) {
202 ZLOGE("failed");
203 syncCallbacks_.Erase(num);
204 return RDB_ERROR;
205 }
206
207 ZLOGI("success");
208 return RDB_OK;
209 }
210
SetDistributedTables(const RdbSyncerParam & param,const std::vector<std::string> & tables)211 int32_t RdbServiceProxy::SetDistributedTables(const RdbSyncerParam& param, const std::vector<std::string> &tables)
212 {
213 MessageParcel data;
214 if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
215 ZLOGE("write descriptor failed");
216 return RDB_ERROR;
217 }
218 if (!DistributedKv::ITypesUtil::Marshal(data, param, tables)) {
219 ZLOGE("write to message parcel failed");
220 return RDB_ERROR;
221 }
222
223 MessageParcel reply;
224 MessageOption option;
225 if (Remote()->SendRequest(RDB_SERVICE_CMD_SET_DIST_TABLE, data, reply, option) != 0) {
226 ZLOGE("send request failed");
227 return RDB_ERROR;
228 }
229
230 int32_t res = RDB_ERROR;
231 return reply.ReadInt32(res) ? res : RDB_ERROR;
232 }
233
Sync(const RdbSyncerParam & param,const SyncOption & option,const RdbPredicates & predicates,const SyncCallback & callback)234 int32_t RdbServiceProxy::Sync(const RdbSyncerParam& param, const SyncOption &option,
235 const RdbPredicates &predicates, const SyncCallback &callback)
236 {
237 if (option.isBlock) {
238 return DoSync(param, option, predicates, callback);
239 }
240 return DoAsync(param, option, predicates, callback);
241 }
242
RemoveSuffix(const std::string & name)243 std::string RdbServiceProxy::RemoveSuffix(const std::string& name)
244 {
245 std::string suffix(".db");
246 auto pos = name.rfind(suffix);
247 if (pos == std::string::npos || pos < name.length() - suffix.length()) {
248 return name;
249 }
250 return { name, 0, pos };
251 }
252
Subscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)253 int32_t RdbServiceProxy::Subscribe(const RdbSyncerParam ¶m, const SubscribeOption &option,
254 RdbStoreObserver *observer)
255 {
256 if (option.mode != SubscribeMode::REMOTE) {
257 ZLOGE("subscribe mode invalid");
258 return RDB_ERROR;
259 }
260 if (DoSubscribe(param) != RDB_OK) {
261 ZLOGI("communicate to server failed");
262 return RDB_ERROR;
263 }
264 auto name = RemoveSuffix(param.storeName_);
265 observers_.Compute(
266 name, [observer] (const auto& key, ObserverMapValue& value) {
267 for (const auto& element : value.first) {
268 if (element == observer) {
269 ZLOGE("duplicate observer");
270 return true;
271 }
272 }
273 value.first.push_back(observer);
274 return true;
275 });
276 return RDB_OK;
277 }
278
DoSubscribe(const RdbSyncerParam & param)279 int32_t RdbServiceProxy::DoSubscribe(const RdbSyncerParam ¶m)
280 {
281 MessageParcel data;
282 if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
283 ZLOGE("write descriptor failed");
284 return RDB_ERROR;
285 }
286 if (!DistributedKv::ITypesUtil::Marshalling(param, data)) {
287 ZLOGE("write to message parcel failed");
288 return RDB_ERROR;
289 }
290
291 MessageParcel reply;
292 MessageOption option;
293 if (Remote()->SendRequest(RDB_SERVICE_CMD_SUBSCRIBE, data, reply, option) != 0) {
294 ZLOGE("send request failed");
295 return RDB_ERROR;
296 }
297
298 int32_t res = RDB_ERROR;
299 return reply.ReadInt32(res) ? res : RDB_ERROR;
300 }
301
UnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)302 int32_t RdbServiceProxy::UnSubscribe(const RdbSyncerParam ¶m, const SubscribeOption &option,
303 RdbStoreObserver *observer)
304 {
305 DoUnSubscribe(param);
306 auto name = RemoveSuffix(param.storeName_);
307 observers_.ComputeIfPresent(
308 name, [observer](const auto& key, ObserverMapValue& value) {
309 ZLOGI("before remove size=%{public}d", static_cast<int>(value.first.size()));
310 value.first.remove(observer);
311 ZLOGI("after remove size=%{public}d", static_cast<int>(value.first.size()));
312 return !(value.first.empty());
313 });
314 return RDB_OK;
315 }
316
DoUnSubscribe(const RdbSyncerParam & param)317 int32_t RdbServiceProxy::DoUnSubscribe(const RdbSyncerParam ¶m)
318 {
319 MessageParcel data;
320 if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
321 ZLOGE("write descriptor failed");
322 return RDB_ERROR;
323 }
324 if (!DistributedKv::ITypesUtil::Marshalling(param, data)) {
325 ZLOGE("write to message parcel failed");
326 return RDB_ERROR;
327 }
328
329 MessageParcel reply;
330 MessageOption option;
331 if (Remote()->SendRequest(RDB_SERVICE_CMD_UNSUBSCRIBE, data, reply, option) != 0) {
332 ZLOGE("send request failed");
333 return RDB_ERROR;
334 }
335
336 int32_t res = RDB_ERROR;
337 return reply.ReadInt32(res) ? res : RDB_ERROR;
338 }
339
RemoteQuery(const RdbSyncerParam & param,const std::string & device,const std::string & sql,const std::vector<std::string> & selectionArgs,sptr<IRemoteObject> & resultSet)340 int32_t RdbServiceProxy::RemoteQuery(const RdbSyncerParam& param, const std::string& device, const std::string& sql,
341 const std::vector<std::string>& selectionArgs, sptr<IRemoteObject>& resultSet)
342 {
343 MessageParcel data;
344 if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
345 ZLOGE("write descriptor failed");
346 return RDB_ERROR;
347 }
348 if (!DistributedKv::ITypesUtil::Marshal(data, param, device, sql, selectionArgs)) {
349 ZLOGE("write to message parcel failed");
350 return RDB_ERROR;
351 }
352
353 MessageParcel reply;
354 MessageOption option;
355 if (Remote()->SendRequest(RDB_SERVICE_CMD_REMOTE_QUERY, data, reply, option) != 0) {
356 ZLOGE("send request failed");
357 return RDB_ERROR;
358 }
359
360 int32_t status = reply.ReadInt32();
361 if (status != RdbStatus::RDB_OK) {
362 ZLOGE("remote query failed, server side status is %{public}d", status);
363 return status;
364 }
365
366 sptr<IRemoteObject> remote = reply.ReadRemoteObject();
367 if (remote == nullptr) {
368 ZLOGE("read remote object is null");
369 return RDB_ERROR;
370 }
371 resultSet = remote;
372 return RDB_OK;
373 }
374
ExportObservers()375 RdbServiceProxy::ObserverMap RdbServiceProxy::ExportObservers()
376 {
377 return observers_;
378 }
379
ImportObservers(ObserverMap & observers)380 void RdbServiceProxy::ImportObservers(ObserverMap &observers)
381 {
382 ZLOGI("enter");
383 SubscribeOption option {SubscribeMode::REMOTE};
384 observers.ForEach([this, &option](const std::string& key, const ObserverMapValue& value) {
385 for (auto& observer : value.first) {
386 Subscribe(value.second, option, observer);
387 }
388 return false;
389 });
390 }
CreateRDBTable(const RdbSyncerParam & param,const std::string & writePermission,const std::string & readPermission)391 int32_t RdbServiceProxy::CreateRDBTable(
392 const RdbSyncerParam ¶m, const std::string &writePermission, const std::string &readPermission)
393 {
394 MessageParcel data;
395 if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
396 ZLOGE("write descriptor failed");
397 return RDB_ERROR;
398 }
399 if (!DistributedKv::ITypesUtil::Marshal(data, param, writePermission, readPermission)) {
400 ZLOGE("write to message parcel failed");
401 return RDB_ERROR;
402 }
403
404 MessageParcel reply;
405 MessageOption option;
406 if (Remote()->SendRequest(RDB_SERVICE_CREATE_RDB_TABLE, data, reply, option) != 0) {
407 ZLOGE("send request failed");
408 return RDB_ERROR;
409 }
410
411 int32_t status = reply.ReadInt32();
412 if (status != RdbStatus::RDB_OK) {
413 ZLOGE("remote query failed, server side status is %{public}d", status);
414 return status;
415 }
416 return RDB_OK;
417 }
418
DestroyRDBTable(const RdbSyncerParam & param)419 int32_t RdbServiceProxy::DestroyRDBTable(const RdbSyncerParam ¶m)
420 {
421 MessageParcel data;
422 if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
423 ZLOGE("write descriptor failed");
424 return RDB_ERROR;
425 }
426 if (!DistributedKv::ITypesUtil::Marshal(data, param)) {
427 ZLOGE("write to message parcel failed");
428 return RDB_ERROR;
429 }
430
431 MessageParcel reply;
432 MessageOption option;
433 if (Remote()->SendRequest(RDB_SERVICE_DESTROY_RDB_TABLE, data, reply, option) != 0) {
434 ZLOGE("send request failed");
435 return RDB_ERROR;
436 }
437
438 int32_t status = reply.ReadInt32();
439 if (status != RdbStatus::RDB_OK) {
440 ZLOGE("remote query failed, server side status is %{public}d", status);
441 return status;
442 }
443 return RDB_OK;
444 }
445 } // namespace OHOS::DistributedRdb
446