• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "RdbServiceProxy"
17 
18 #include "rdb_service_proxy.h"
19 #include "itypes_util.h"
20 #include "log_print.h"
21 
22 namespace OHOS::DistributedRdb {
RdbServiceProxy(const sptr<IRemoteObject> & object)23 RdbServiceProxy::RdbServiceProxy(const sptr<IRemoteObject> &object)
24     : IRemoteProxy<IRdbService>(object)
25 {
26     ZLOGI("construct");
27 }
28 
OnSyncComplete(uint32_t seqNum,const SyncResult & result)29 void RdbServiceProxy::OnSyncComplete(uint32_t seqNum, const SyncResult &result)
30 {
31     syncCallbacks_.ComputeIfPresent(seqNum, [&result] (const auto& key, const SyncCallback& callback) {
32         callback(result);
33         return true;
34     });
35     syncCallbacks_.Erase(seqNum);
36 }
37 
OnDataChange(const std::string & storeName,const std::vector<std::string> & devices)38 void RdbServiceProxy::OnDataChange(const std::string& storeName, const std::vector<std::string> &devices)
39 {
40     ZLOGI("%{public}s", storeName.c_str());
41     auto name = RemoveSuffix(storeName);
42     observers_.ComputeIfPresent(
43         name, [&devices] (const auto& key, const ObserverMapValue& value) {
44             for (const auto& observer : value.first) {
45                 observer->OnChange(devices);
46             }
47             return true;
48         });
49 }
50 
ObtainDistributedTableName(const std::string & device,const std::string & table)51 std::string RdbServiceProxy::ObtainDistributedTableName(const std::string &device, const std::string &table)
52 {
53     MessageParcel data;
54     if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
55         ZLOGE("write descriptor failed");
56         return "";
57     }
58     if (!DistributedKv::ITypesUtil::Marshal(data, device, table)) {
59         ZLOGE("write to message parcel failed");
60         return "";
61     }
62 
63     MessageParcel reply;
64     MessageOption option;
65     if (Remote()->SendRequest(RDB_SERVICE_CMD_OBTAIN_TABLE, data, reply, option) != 0) {
66         ZLOGE("send request failed");
67         return "";
68     }
69     return reply.ReadString();
70 }
71 
InitNotifier(const RdbSyncerParam & param)72 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam& param)
73 {
74     notifier_ = new (std::nothrow) RdbNotifierStub(
75         [this] (uint32_t seqNum, const SyncResult& result) {
76             OnSyncComplete(seqNum, result);
77         },
78         [this] (const std::string& storeName, const std::vector<std::string>& devices) {
79             OnDataChange(storeName, devices);
80         }
81     );
82     if (notifier_ == nullptr) {
83         ZLOGE("create notifier failed");
84         return RDB_ERROR;
85     }
86 
87     if (InitNotifier(param, notifier_->AsObject().GetRefPtr()) != RDB_OK) {
88         notifier_ = nullptr;
89         return RDB_ERROR;
90     }
91 
92     ZLOGI("success");
93     return RDB_OK;
94 }
95 
InitNotifier(const RdbSyncerParam & param,const sptr<IRemoteObject> notifier)96 int32_t RdbServiceProxy::InitNotifier(const RdbSyncerParam &param, const sptr<IRemoteObject> notifier)
97 {
98     MessageParcel data;
99     if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
100         ZLOGE("write descriptor failed");
101         return RDB_ERROR;
102     }
103     if (!DistributedKv::ITypesUtil::Marshal(data, param, notifier)) {
104         ZLOGE("write to message parcel failed");
105         return RDB_ERROR;
106     }
107 
108     MessageParcel reply;
109     MessageOption option;
110     if (Remote()->SendRequest(RDB_SERVICE_CMD_INIT_NOTIFIER, data, reply, option) != 0) {
111         ZLOGE("send request failed");
112         return RDB_ERROR;
113     }
114 
115     int32_t res = RDB_ERROR;
116     return reply.ReadInt32(res) ? res : RDB_ERROR;
117 }
118 
GetSeqNum()119 uint32_t RdbServiceProxy::GetSeqNum()
120 {
121     return seqNum_++;
122 }
123 
DoSync(const RdbSyncerParam & param,const SyncOption & option,const RdbPredicates & predicates,SyncResult & result)124 int32_t RdbServiceProxy::DoSync(const RdbSyncerParam& param, const SyncOption &option,
125                                 const RdbPredicates &predicates, SyncResult& result)
126 {
127     MessageParcel data;
128     if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
129         ZLOGE("write descriptor failed");
130         return RDB_ERROR;
131     }
132     if (!DistributedKv::ITypesUtil::Marshal(data, param, option, predicates)) {
133         ZLOGE("write to message parcel failed");
134         return RDB_ERROR;
135     }
136 
137     MessageParcel reply;
138     MessageOption opt;
139     if (Remote()->SendRequest(RDB_SERVICE_CMD_SYNC, data, reply, opt) != 0) {
140         ZLOGE("send request failed");
141         return RDB_ERROR;
142     }
143 
144     if (!DistributedKv::ITypesUtil::Unmarshal(reply, result)) {
145         ZLOGE("read result failed");
146         return RDB_ERROR;
147     }
148     return RDB_OK;
149 }
150 
DoSync(const RdbSyncerParam & param,const SyncOption & option,const RdbPredicates & predicates,const SyncCallback & callback)151 int32_t RdbServiceProxy::DoSync(const RdbSyncerParam& param, const SyncOption &option,
152                                 const RdbPredicates &predicates, const SyncCallback& callback)
153 {
154     SyncResult result;
155     if (DoSync(param, option, predicates, result) != RDB_OK) {
156         ZLOGI("failed");
157         return RDB_ERROR;
158     }
159     ZLOGI("success");
160 
161     if (callback != nullptr) {
162         callback(result);
163     }
164     return RDB_OK;
165 }
166 
DoAsync(const RdbSyncerParam & param,uint32_t seqNum,const SyncOption & option,const RdbPredicates & predicates)167 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam& param, uint32_t seqNum, const SyncOption &option,
168                                  const RdbPredicates &predicates)
169 {
170     MessageParcel data;
171     if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
172         ZLOGE("write descriptor failed");
173         return RDB_ERROR;
174     }
175     if (!DistributedKv::ITypesUtil::Marshal(data, param, seqNum, option, predicates)) {
176         ZLOGE("write to message parcel failed");
177         return RDB_ERROR;
178     }
179 
180     MessageParcel reply;
181     MessageOption opt;
182     if (Remote()->SendRequest(RDB_SERVICE_CMD_ASYNC, data, reply, opt) != 0) {
183         ZLOGE("send request failed");
184         return RDB_ERROR;
185     }
186 
187     int32_t res = RDB_ERROR;
188     return reply.ReadInt32(res) ? res : RDB_ERROR;
189 }
190 
DoAsync(const RdbSyncerParam & param,const SyncOption & option,const RdbPredicates & predicates,const SyncCallback & callback)191 int32_t RdbServiceProxy::DoAsync(const RdbSyncerParam& param, const SyncOption &option,
192                                  const RdbPredicates &predicates, const SyncCallback& callback)
193 {
194     uint32_t num = GetSeqNum();
195     if (!syncCallbacks_.Insert(num, callback)) {
196         ZLOGI("insert callback failed");
197         return RDB_ERROR;
198     }
199     ZLOGI("num=%{public}u", num);
200 
201     if (DoAsync(param, num, option, predicates) != RDB_OK) {
202         ZLOGE("failed");
203         syncCallbacks_.Erase(num);
204         return RDB_ERROR;
205     }
206 
207     ZLOGI("success");
208     return RDB_OK;
209 }
210 
SetDistributedTables(const RdbSyncerParam & param,const std::vector<std::string> & tables)211 int32_t RdbServiceProxy::SetDistributedTables(const RdbSyncerParam& param, const std::vector<std::string> &tables)
212 {
213     MessageParcel data;
214     if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
215         ZLOGE("write descriptor failed");
216         return RDB_ERROR;
217     }
218     if (!DistributedKv::ITypesUtil::Marshal(data, param, tables)) {
219         ZLOGE("write to message parcel failed");
220         return RDB_ERROR;
221     }
222 
223     MessageParcel reply;
224     MessageOption option;
225     if (Remote()->SendRequest(RDB_SERVICE_CMD_SET_DIST_TABLE, data, reply, option) != 0) {
226         ZLOGE("send request failed");
227         return RDB_ERROR;
228     }
229 
230     int32_t res = RDB_ERROR;
231     return reply.ReadInt32(res) ? res : RDB_ERROR;
232 }
233 
Sync(const RdbSyncerParam & param,const SyncOption & option,const RdbPredicates & predicates,const SyncCallback & callback)234 int32_t RdbServiceProxy::Sync(const RdbSyncerParam& param, const SyncOption &option,
235                               const RdbPredicates &predicates, const SyncCallback &callback)
236 {
237     if (option.isBlock) {
238         return DoSync(param, option, predicates, callback);
239     }
240     return DoAsync(param, option, predicates, callback);
241 }
242 
RemoveSuffix(const std::string & name)243 std::string RdbServiceProxy::RemoveSuffix(const std::string& name)
244 {
245     std::string suffix(".db");
246     auto pos = name.rfind(suffix);
247     if (pos == std::string::npos || pos < name.length() - suffix.length()) {
248         return name;
249     }
250     return { name, 0, pos };
251 }
252 
Subscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)253 int32_t RdbServiceProxy::Subscribe(const RdbSyncerParam &param, const SubscribeOption &option,
254                                    RdbStoreObserver *observer)
255 {
256     if (option.mode != SubscribeMode::REMOTE) {
257         ZLOGE("subscribe mode invalid");
258         return RDB_ERROR;
259     }
260     if (DoSubscribe(param) != RDB_OK) {
261         ZLOGI("communicate to server failed");
262         return RDB_ERROR;
263     }
264     auto name = RemoveSuffix(param.storeName_);
265     observers_.Compute(
266         name, [observer] (const auto& key, ObserverMapValue& value) {
267             for (const auto& element : value.first) {
268                 if (element == observer) {
269                     ZLOGE("duplicate observer");
270                     return true;
271                 }
272             }
273             value.first.push_back(observer);
274             return true;
275         });
276     return RDB_OK;
277 }
278 
DoSubscribe(const RdbSyncerParam & param)279 int32_t RdbServiceProxy::DoSubscribe(const RdbSyncerParam &param)
280 {
281     MessageParcel data;
282     if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
283         ZLOGE("write descriptor failed");
284         return RDB_ERROR;
285     }
286     if (!DistributedKv::ITypesUtil::Marshalling(param, data)) {
287         ZLOGE("write to message parcel failed");
288         return RDB_ERROR;
289     }
290 
291     MessageParcel reply;
292     MessageOption option;
293     if (Remote()->SendRequest(RDB_SERVICE_CMD_SUBSCRIBE, data, reply, option) != 0) {
294         ZLOGE("send request failed");
295         return RDB_ERROR;
296     }
297 
298     int32_t res = RDB_ERROR;
299     return reply.ReadInt32(res) ? res : RDB_ERROR;
300 }
301 
UnSubscribe(const RdbSyncerParam & param,const SubscribeOption & option,RdbStoreObserver * observer)302 int32_t RdbServiceProxy::UnSubscribe(const RdbSyncerParam &param, const SubscribeOption &option,
303                                      RdbStoreObserver *observer)
304 {
305     DoUnSubscribe(param);
306     auto name = RemoveSuffix(param.storeName_);
307     observers_.ComputeIfPresent(
308         name, [observer](const auto& key, ObserverMapValue& value) {
309             ZLOGI("before remove size=%{public}d", static_cast<int>(value.first.size()));
310             value.first.remove(observer);
311             ZLOGI("after  remove size=%{public}d", static_cast<int>(value.first.size()));
312             return !(value.first.empty());
313     });
314     return RDB_OK;
315 }
316 
DoUnSubscribe(const RdbSyncerParam & param)317 int32_t RdbServiceProxy::DoUnSubscribe(const RdbSyncerParam &param)
318 {
319     MessageParcel data;
320     if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
321         ZLOGE("write descriptor failed");
322         return RDB_ERROR;
323     }
324     if (!DistributedKv::ITypesUtil::Marshalling(param, data)) {
325         ZLOGE("write to message parcel failed");
326         return RDB_ERROR;
327     }
328 
329     MessageParcel reply;
330     MessageOption option;
331     if (Remote()->SendRequest(RDB_SERVICE_CMD_UNSUBSCRIBE, data, reply, option) != 0) {
332         ZLOGE("send request failed");
333         return RDB_ERROR;
334     }
335 
336     int32_t res = RDB_ERROR;
337     return reply.ReadInt32(res) ? res : RDB_ERROR;
338 }
339 
RemoteQuery(const RdbSyncerParam & param,const std::string & device,const std::string & sql,const std::vector<std::string> & selectionArgs,sptr<IRemoteObject> & resultSet)340 int32_t RdbServiceProxy::RemoteQuery(const RdbSyncerParam& param, const std::string& device, const std::string& sql,
341                                      const std::vector<std::string>& selectionArgs, sptr<IRemoteObject>& resultSet)
342 {
343     MessageParcel data;
344     if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
345         ZLOGE("write descriptor failed");
346         return RDB_ERROR;
347     }
348     if (!DistributedKv::ITypesUtil::Marshal(data, param, device, sql, selectionArgs)) {
349         ZLOGE("write to message parcel failed");
350         return RDB_ERROR;
351     }
352 
353     MessageParcel reply;
354     MessageOption option;
355     if (Remote()->SendRequest(RDB_SERVICE_CMD_REMOTE_QUERY, data, reply, option) != 0) {
356         ZLOGE("send request failed");
357         return RDB_ERROR;
358     }
359 
360     int32_t status = reply.ReadInt32();
361     if (status != RdbStatus::RDB_OK) {
362         ZLOGE("remote query failed, server side status is %{public}d", status);
363         return status;
364     }
365 
366     sptr<IRemoteObject> remote = reply.ReadRemoteObject();
367     if (remote == nullptr) {
368         ZLOGE("read remote object is null");
369         return RDB_ERROR;
370     }
371     resultSet = remote;
372     return RDB_OK;
373 }
374 
ExportObservers()375 RdbServiceProxy::ObserverMap RdbServiceProxy::ExportObservers()
376 {
377     return observers_;
378 }
379 
ImportObservers(ObserverMap & observers)380 void RdbServiceProxy::ImportObservers(ObserverMap &observers)
381 {
382     ZLOGI("enter");
383     SubscribeOption option {SubscribeMode::REMOTE};
384     observers.ForEach([this, &option](const std::string& key, const ObserverMapValue& value) {
385         for (auto& observer : value.first) {
386             Subscribe(value.second, option, observer);
387         }
388         return false;
389     });
390 }
CreateRDBTable(const RdbSyncerParam & param,const std::string & writePermission,const std::string & readPermission)391 int32_t RdbServiceProxy::CreateRDBTable(
392     const RdbSyncerParam &param, const std::string &writePermission, const std::string &readPermission)
393 {
394     MessageParcel data;
395     if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
396         ZLOGE("write descriptor failed");
397         return RDB_ERROR;
398     }
399     if (!DistributedKv::ITypesUtil::Marshal(data, param, writePermission, readPermission)) {
400         ZLOGE("write to message parcel failed");
401         return RDB_ERROR;
402     }
403 
404     MessageParcel reply;
405     MessageOption option;
406     if (Remote()->SendRequest(RDB_SERVICE_CREATE_RDB_TABLE, data, reply, option) != 0) {
407         ZLOGE("send request failed");
408         return RDB_ERROR;
409     }
410 
411     int32_t status = reply.ReadInt32();
412     if (status != RdbStatus::RDB_OK) {
413         ZLOGE("remote query failed, server side status is %{public}d", status);
414         return status;
415     }
416     return RDB_OK;
417 }
418 
DestroyRDBTable(const RdbSyncerParam & param)419 int32_t RdbServiceProxy::DestroyRDBTable(const RdbSyncerParam &param)
420 {
421     MessageParcel data;
422     if (!data.WriteInterfaceToken(IRdbService::GetDescriptor())) {
423         ZLOGE("write descriptor failed");
424         return RDB_ERROR;
425     }
426     if (!DistributedKv::ITypesUtil::Marshal(data, param)) {
427         ZLOGE("write to message parcel failed");
428         return RDB_ERROR;
429     }
430 
431     MessageParcel reply;
432     MessageOption option;
433     if (Remote()->SendRequest(RDB_SERVICE_DESTROY_RDB_TABLE, data, reply, option) != 0) {
434         ZLOGE("send request failed");
435         return RDB_ERROR;
436     }
437 
438     int32_t status = reply.ReadInt32();
439     if (status != RdbStatus::RDB_OK) {
440         ZLOGE("remote query failed, server side status is %{public}d", status);
441         return status;
442     }
443     return RDB_OK;
444 }
445 } // namespace OHOS::DistributedRdb
446