1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ObserverBridge"
16 #include "observer_bridge.h"
17 #include "kvdb_service_client.h"
18 #include "kvstore_observer_client.h"
19 #include "log_print.h"
20 namespace OHOS::DistributedKv {
21 constexpr uint32_t INVALID_SUBSCRIBE_TYPE = 0;
ObserverBridge(AppId appId,StoreId storeId,int32_t subUser,std::shared_ptr<Observer> observer,const Convertor & cvt)22 ObserverBridge::ObserverBridge(AppId appId, StoreId storeId, int32_t subUser, std::shared_ptr<Observer> observer,
23 const Convertor &cvt) : appId_(std::move(appId)), storeId_(std::move(storeId)), subUser_(subUser),
24 observer_(std::move(observer)), convert_(cvt)
25 {
26 }
27
~ObserverBridge()28 ObserverBridge::~ObserverBridge()
29 {
30 if (remote_ == nullptr) {
31 return;
32 }
33 auto service = KVDBServiceClient::GetInstance();
34 if (service == nullptr) {
35 return;
36 }
37 service->Unsubscribe(appId_, storeId_, subUser_, remote_);
38 }
39
RegisterRemoteObserver(uint32_t realType)40 Status ObserverBridge::RegisterRemoteObserver(uint32_t realType)
41 {
42 std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
43 if (remote_ != nullptr) {
44 remote_->realType_ |= realType;
45 return SUCCESS;
46 }
47
48 auto service = KVDBServiceClient::GetInstance();
49 if (service == nullptr) {
50 return SERVER_UNAVAILABLE;
51 }
52
53 remote_ = new (std::nothrow) ObserverClient(observer_, convert_);
54 if (remote_ == nullptr) {
55 ZLOGE("New ObserverClient failed, appId:%{public}s", appId_.appId.c_str());
56 return ERROR;
57 }
58 auto status = service->Subscribe(appId_, storeId_, subUser_, remote_);
59 if (status != SUCCESS) {
60 remote_ = nullptr;
61 } else {
62 remote_->realType_ = realType;
63 }
64 return status;
65 }
66
UnregisterRemoteObserver(uint32_t realType)67 Status ObserverBridge::UnregisterRemoteObserver(uint32_t realType)
68 {
69 std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
70 if (remote_ == nullptr) {
71 return SUCCESS;
72 }
73
74 auto service = KVDBServiceClient::GetInstance();
75 if (service == nullptr) {
76 return SERVER_UNAVAILABLE;
77 }
78
79 Status status = Status::SUCCESS;
80 remote_->realType_ &= ~SUBSCRIBE_TYPE_LOCAL;
81 remote_->realType_ &= ~realType;
82 if (remote_->realType_ == 0) {
83 status = service->Unsubscribe(appId_, storeId_, subUser_, remote_);
84 remote_ = nullptr;
85 }
86 return status;
87 }
88
OnChange(const DBChangedData & data)89 void ObserverBridge::OnChange(const DBChangedData &data)
90 {
91 std::string deviceId;
92 auto inserted = ConvertDB(data.GetEntriesInserted(), deviceId, convert_);
93 auto updated = ConvertDB(data.GetEntriesUpdated(), deviceId, convert_);
94 auto deleted = ConvertDB(data.GetEntriesDeleted(), deviceId, convert_);
95 ChangeNotification notice(std::move(inserted), std::move(updated), std::move(deleted), deviceId, false);
96 observer_->OnChange(notice);
97 }
98
ObserverClient(std::shared_ptr<Observer> observer,const Convertor & cvt)99 ObserverBridge::ObserverClient::ObserverClient(std::shared_ptr<Observer> observer, const Convertor &cvt)
100 : KvStoreObserverClient(observer), convert_(cvt), realType_(INVALID_SUBSCRIBE_TYPE)
101 {
102 }
103
OnChange(const ChangeNotification & data)104 void ObserverBridge::ObserverClient::OnChange(const ChangeNotification &data)
105 {
106 if ((realType_ & SUBSCRIBE_TYPE_REMOTE) != SUBSCRIBE_TYPE_REMOTE) {
107 return;
108 }
109 std::string deviceId;
110 auto inserted = ObserverBridge::ConvertDB(data.GetInsertEntries(), deviceId, convert_);
111 auto updated = ObserverBridge::ConvertDB(data.GetUpdateEntries(), deviceId, convert_);
112 auto deleted = ObserverBridge::ConvertDB(data.GetDeleteEntries(), deviceId, convert_);
113 ChangeNotification notice(std::move(inserted), std::move(updated), std::move(deleted), deviceId, false);
114 KvStoreObserverClient::OnChange(notice);
115 }
116
OnChange(const DataOrigin & origin,Keys && keys)117 void ObserverBridge::ObserverClient::OnChange(const DataOrigin &origin, Keys &&keys)
118 {
119 if ((realType_ & SUBSCRIBE_TYPE_CLOUD) != SUBSCRIBE_TYPE_CLOUD) {
120 return;
121 }
122 KvStoreObserverClient::OnChange(origin, std::move(keys));
123 }
124
125 template<class T>
ConvertDB(const T & dbEntries,std::string & deviceId,const Convertor & convert)126 std::vector<Entry> ObserverBridge::ConvertDB(const T &dbEntries, std::string &deviceId, const Convertor &convert)
127 {
128 std::vector<Entry> entries(dbEntries.size());
129 auto it = entries.begin();
130 for (const auto &dbEntry : dbEntries) {
131 Entry &entry = *it;
132 entry.key = convert.ToKey(DBKey(dbEntry.key), deviceId);
133 entry.value = dbEntry.value;
134 ++it;
135 }
136 return entries;
137 }
138
OnServiceDeath()139 void ObserverBridge::OnServiceDeath()
140 {
141 std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
142 if (remote_ == nullptr) {
143 return;
144 }
145 remote_ = nullptr;
146 }
147 } // namespace OHOS::DistributedKv
148