1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rdb_subscriber_manager.h"
17
18 #include <cinttypes>
19
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23
24 namespace OHOS {
25 namespace DataShare {
GetInstance()26 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
27 {
28 static RdbSubscriberManager manager;
29 return manager;
30 }
31
RdbSubscriberManager()32 RdbSubscriberManager::RdbSubscriberManager()
33 {
34 serviceCallback_ = new RdbObserverStub([this](const RdbChangeNode &changeNode) {
35 Emit(changeNode);
36 });
37 }
38
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId,const RdbCallback & callback)39 std::vector<OperationResult> RdbSubscriberManager::AddObservers(void *subscriber,
40 std::shared_ptr<DataShareServiceProxy> proxy,
41 const std::vector<std::string> &uris, const TemplateId &templateId, const RdbCallback &callback)
42 {
43 if (proxy == nullptr) {
44 LOG_ERROR("proxy is nullptr");
45 return std::vector<OperationResult>();
46 }
47 std::vector<Key> keys;
48 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
49 keys.emplace_back(uri, templateId);
50 });
51 return BaseCallbacks::AddObservers(
52 keys, subscriber, std::make_shared<Observer>(callback),
53 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
54 Emit(localRegisterKeys, observer);
55 },
56 [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
57 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
58 std::vector<std::string> firstAddUris;
59 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
60 firstAddUris.emplace_back(result);
61 });
62 if (firstAddUris.empty()) {
63 return;
64 }
65
66 auto subResults = proxy->SubscribeRdbData(firstAddUris, templateId, serviceCallback_);
67 std::vector<Key> failedKeys;
68 for (auto &subResult : subResults) {
69 opResult.emplace_back(subResult);
70 if (subResult.errCode_ != E_OK) {
71 failedKeys.emplace_back(subResult.key_, templateId);
72 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
73 }
74 }
75 if (!failedKeys.empty()) {
76 BaseCallbacks::DelObservers(failedKeys, subscriber);
77 }
78 });
79 }
80
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)81 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
82 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
83 {
84 if (proxy == nullptr) {
85 LOG_ERROR("proxy is nullptr");
86 return std::vector<OperationResult>();
87 }
88 if (uris.empty()) {
89 return DelObservers(subscriber, proxy);
90 }
91
92 std::vector<Key> keys;
93 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
94 keys.emplace_back(uri, templateId);
95 });
96 return BaseCallbacks::DelObservers(keys, subscriber,
97 [&proxy, &templateId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
98 std::vector<std::string> lastDelUris;
99 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
100 lastDelUris.emplace_back(result);
101 lastChangeNodeMap_.Erase(result);
102 });
103 if (lastDelUris.empty()) {
104 return;
105 }
106 auto unsubResult = proxy->UnSubscribeRdbData(lastDelUris, templateId);
107 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
108 });
109 }
110
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)111 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
112 std::shared_ptr<DataShareServiceProxy> proxy)
113 {
114 if (proxy == nullptr) {
115 LOG_ERROR("proxy is nullptr");
116 return std::vector<OperationResult>();
117 }
118 return BaseCallbacks::DelObservers(subscriber,
119 [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
120 // delete all obs by subscriber
121 for (const auto &key : lastDelKeys) {
122 lastChangeNodeMap_.Erase(key);
123 auto unsubResult = proxy->UnSubscribeRdbData(std::vector<std::string>(1, key.uri_), key.templateId_);
124 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
125 }
126 });
127 }
128
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)129 std::vector<OperationResult> RdbSubscriberManager::EnableObservers(void *subscriber,
130 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
131 {
132 if (proxy == nullptr) {
133 LOG_ERROR("proxy is nullptr");
134 return std::vector<OperationResult>();
135 }
136 std::vector<Key> keys;
137 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
138 keys.emplace_back(uri, templateId);
139 });
140 return BaseCallbacks::EnableObservers(keys, subscriber,
141 [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
142 EmitOnEnable(obsMap);
143 },
144 [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
145 std::vector<OperationResult> &opResult) {
146 std::vector<std::string> firstAddUris;
147 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
148 firstAddUris.emplace_back(result);
149 });
150 if (firstAddUris.empty()) {
151 return;
152 }
153 auto subResults = proxy->EnableSubscribeRdbData(firstAddUris, templateId);
154 std::vector<Key> failedKeys;
155 for (auto &subResult : subResults) {
156 opResult.emplace_back(subResult);
157 if (subResult.errCode_ != E_OK) {
158 failedKeys.emplace_back(subResult.key_, templateId);
159 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
160 }
161 }
162 if (!failedKeys.empty()) {
163 BaseCallbacks::DisableObservers(failedKeys, subscriber);
164 }
165 });
166 }
167
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)168 std::vector<OperationResult> RdbSubscriberManager::DisableObservers(void *subscriber,
169 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
170 {
171 if (proxy == nullptr) {
172 LOG_ERROR("proxy is nullptr");
173 return std::vector<OperationResult>();
174 }
175 std::vector<Key> keys;
176 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
177 keys.emplace_back(uri, templateId);
178 });
179 return BaseCallbacks::DisableObservers(keys, subscriber,
180 [&proxy, &templateId, this](const std::vector<Key> &lastDisabledKeys, std::vector<OperationResult> &opResult) {
181 std::vector<std::string> lastDisabledUris;
182 std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
183 lastDisabledUris.emplace_back(result);
184 });
185 if (lastDisabledUris.empty()) {
186 return;
187 }
188
189 auto results = proxy->DisableSubscribeRdbData(lastDisabledUris, templateId);
190 opResult.insert(opResult.end(), results.begin(), results.end());
191 });
192 }
193
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)194 void RdbSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
195 {
196 if (proxy == nullptr) {
197 LOG_ERROR("proxy is nullptr");
198 return;
199 }
200 std::map<TemplateId, std::vector<std::string>> keysMap;
201 std::vector<Key> keys = CallbacksManager::GetKeys();
202 for (const auto& key : keys) {
203 keysMap[key.templateId_].emplace_back(key.uri_);
204 }
205 for (const auto& [templateId, uris] : keysMap) {
206 auto results = proxy->SubscribeRdbData(uris, templateId, serviceCallback_);
207 for (const auto& result : results) {
208 if (result.errCode_ != E_OK) {
209 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d", result.key_.c_str(),
210 result.errCode_);
211 }
212 }
213 }
214 }
215
Emit(const RdbChangeNode & changeNode)216 void RdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
217 {
218 RdbObserverMapKey key(changeNode.uri_, changeNode.templateId_);
219 lastChangeNodeMap_.InsertOrAssign(key, changeNode);
220 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
221 for (auto &obs : callbacks) {
222 if (obs != nullptr) {
223 LOG_INFO("Client send data to form, uri is %{public}s, subscriberId is %{public}" PRId64,
224 DataShareStringUtils::Anonymous(key.uri_).c_str(), key.templateId_.subscriberId_);
225 obs->OnChange(changeNode);
226 }
227 }
228 BaseCallbacks::SetObserversNotifiedOnEnabled(key);
229 }
230
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)231 void RdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
232 {
233 for (auto const &key : keys) {
234 bool isExist = false;
235 RdbChangeNode node;
236 lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
237 node = value;
238 isExist = true;
239 return true;
240 });
241 if (isExist) {
242 observer->OnChange(node);
243 }
244 }
245 }
246
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)247 void RdbSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
248 {
249 for (auto &[key, obsVector] : obsMap) {
250 bool isExist = false;
251 RdbChangeNode node;
252 lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
253 node = value;
254 isExist = true;
255 return true;
256 });
257 if (!isExist) {
258 continue;
259 }
260 for (auto &obs : obsVector) {
261 if (obs.isNotifyOnEnabled_) {
262 obs.isNotifyOnEnabled_ = false;
263 obs.observer_->OnChange(node);
264 }
265 }
266 }
267 }
268
RdbObserver(const RdbCallback & callback)269 RdbObserver::RdbObserver(const RdbCallback &callback) : callback_(callback) {}
270
OnChange(const RdbChangeNode & changeNode)271 void RdbObserver::OnChange(const RdbChangeNode &changeNode)
272 {
273 callback_(changeNode);
274 }
275
operator ==(const RdbObserver & rhs) const276 bool RdbObserver::operator==(const RdbObserver &rhs) const
277 {
278 return false;
279 }
280
operator !=(const RdbObserver & rhs) const281 bool RdbObserver::operator!=(const RdbObserver &rhs) const
282 {
283 return !(rhs == *this);
284 }
285 } // namespace DataShare
286 } // namespace OHOS
287