1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rdb_subscriber_manager.h"
17
18 #include "data_proxy_observer_stub.h"
19 #include "datashare_log.h"
20
21 namespace OHOS {
22 namespace DataShare {
GetInstance()23 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
24 {
25 static RdbSubscriberManager manager;
26 return manager;
27 }
28
RdbSubscriberManager()29 RdbSubscriberManager::RdbSubscriberManager()
30 {
31 serviceCallback_ = nullptr;
32 }
33
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId,const RdbCallback & callback)34 std::vector<OperationResult> RdbSubscriberManager::AddObservers(void *subscriber,
35 std::shared_ptr<DataShareServiceProxy> proxy,
36 const std::vector<std::string> &uris, const TemplateId &templateId, const RdbCallback &callback)
37 {
38 if (proxy == nullptr) {
39 LOG_ERROR("proxy is nullptr");
40 return std::vector<OperationResult>();
41 }
42 std::vector<Key> keys;
43 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
44 keys.emplace_back(uri, templateId);
45 });
46 return BaseCallbacks::AddObservers(
47 keys, subscriber, std::make_shared<Observer>(callback),
48 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
49 Emit(localRegisterKeys, observer);
50 },
51 [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
52 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
53 std::vector<std::string> firstAddUris;
54 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
55 firstAddUris.emplace_back(result);
56 });
57 if (firstAddUris.empty()) {
58 return;
59 }
60
61 Init();
62 auto subResults = proxy->SubscribeRdbData(firstAddUris, templateId, serviceCallback_);
63 std::vector<Key> failedKeys;
64 for (auto &subResult : subResults) {
65 opResult.emplace_back(subResult);
66 if (subResult.errCode_ != E_OK) {
67 failedKeys.emplace_back(subResult.key_, templateId);
68 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
69 }
70 }
71 if (!failedKeys.empty()) {
72 BaseCallbacks::DelObservers(failedKeys, subscriber);
73 }
74 Destroy();
75 });
76 }
77
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)78 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
79 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
80 {
81 if (proxy == nullptr) {
82 LOG_ERROR("proxy is nullptr");
83 return std::vector<OperationResult>();
84 }
85 if (uris.empty()) {
86 return DelObservers(subscriber, proxy);
87 }
88
89 std::vector<Key> keys;
90 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
91 keys.emplace_back(uri, templateId);
92 });
93 return BaseCallbacks::DelObservers(keys, subscriber,
94 [&proxy, &templateId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
95 std::vector<std::string> lastDelUris;
96 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
97 lastDelUris.emplace_back(result);
98 lastChangeNodeMap_.erase(result);
99 });
100 if (lastDelUris.empty()) {
101 return;
102 }
103 auto unsubResult = proxy->UnSubscribeRdbData(lastDelUris, templateId);
104 if (BaseCallbacks::GetEnabledSubscriberSize() == 0) {
105 LOG_INFO("no valid subscriber, delete callback");
106 serviceCallback_ = nullptr;
107 }
108 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
109 Destroy();
110 });
111 }
112
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)113 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
114 std::shared_ptr<DataShareServiceProxy> proxy)
115 {
116 if (proxy == nullptr) {
117 LOG_ERROR("proxy is nullptr");
118 return std::vector<OperationResult>();
119 }
120 return BaseCallbacks::DelObservers(subscriber,
121 [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
122 // delete all obs by subscriber
123 for (const auto &key : lastDelKeys) {
124 lastChangeNodeMap_.erase(key);
125 auto unsubResult = proxy->UnSubscribeRdbData(std::vector<std::string>(1, key.uri_), key.templateId_);
126 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
127 }
128 Destroy();
129 });
130 }
131
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)132 std::vector<OperationResult> RdbSubscriberManager::EnableObservers(void *subscriber,
133 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
134 {
135 if (proxy == nullptr) {
136 LOG_ERROR("proxy is nullptr");
137 return std::vector<OperationResult>();
138 }
139 std::vector<Key> keys;
140 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
141 keys.emplace_back(uri, templateId);
142 });
143 return BaseCallbacks::EnableObservers(keys, subscriber,
144 [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
145 EmitOnEnable(obsMap);
146 },
147 [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
148 std::vector<OperationResult> &opResult) {
149 std::vector<std::string> firstAddUris;
150 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
151 firstAddUris.emplace_back(result);
152 });
153 if (firstAddUris.empty()) {
154 return;
155 }
156 auto subResults = proxy->EnableSubscribeRdbData(firstAddUris, templateId);
157 std::vector<Key> failedKeys;
158 for (auto &subResult : subResults) {
159 opResult.emplace_back(subResult);
160 if (subResult.errCode_ != E_OK) {
161 failedKeys.emplace_back(subResult.key_, templateId);
162 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
163 }
164 }
165 if (!failedKeys.empty()) {
166 BaseCallbacks::DisableObservers(failedKeys, subscriber);
167 }
168 });
169 }
170
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)171 std::vector<OperationResult> RdbSubscriberManager::DisableObservers(void *subscriber,
172 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
173 {
174 if (proxy == nullptr) {
175 LOG_ERROR("proxy is nullptr");
176 return std::vector<OperationResult>();
177 }
178 std::vector<Key> keys;
179 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
180 keys.emplace_back(uri, templateId);
181 });
182 return BaseCallbacks::DisableObservers(keys, subscriber,
183 [&proxy, &templateId, this](const std::vector<Key> &lastDisabledKeys, std::vector<OperationResult> &opResult) {
184 std::vector<std::string> lastDisabledUris;
185 std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
186 lastDisabledUris.emplace_back(result);
187 });
188 if (lastDisabledUris.empty()) {
189 return;
190 }
191
192 auto results = proxy->DisableSubscribeRdbData(lastDisabledUris, templateId);
193 opResult.insert(opResult.end(), results.begin(), results.end());
194 });
195 }
196
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)197 void RdbSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
198 {
199 if (proxy == nullptr) {
200 LOG_ERROR("proxy is nullptr");
201 return;
202 }
203 std::map<TemplateId, std::vector<std::string>> keysMap;
204 std::vector<Key> keys = CallbacksManager::GetKeys();
205 for (const auto& key : keys) {
206 keysMap[key.templateId_].emplace_back(key.uri_);
207 }
208 for (const auto& [templateId, uris] : keysMap) {
209 auto results = proxy->SubscribeRdbData(uris, templateId, serviceCallback_);
210 for (const auto& result : results) {
211 if (result.errCode_ != E_OK) {
212 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d", result.key_.c_str(),
213 result.errCode_);
214 }
215 }
216 }
217 }
218
Emit(const RdbChangeNode & changeNode)219 void RdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
220 {
221 RdbObserverMapKey key(changeNode.uri_, changeNode.templateId_);
222 lastChangeNodeMap_[key] = changeNode;
223 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
224 for (auto &obs : callbacks) {
225 if (obs != nullptr) {
226 obs->OnChange(changeNode);
227 }
228 }
229 BaseCallbacks::SetObserversNotifiedOnEnabled(key);
230 }
231
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)232 void RdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
233 {
234 for (auto const &key : keys) {
235 auto it = lastChangeNodeMap_.find(key);
236 if (it != lastChangeNodeMap_.end()) {
237 observer->OnChange(it->second);
238 }
239 }
240 }
241
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)242 void RdbSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
243 {
244 for (auto &[key, obsVector] : obsMap) {
245 auto it = lastChangeNodeMap_.find(key);
246 if (it == lastChangeNodeMap_.end()) {
247 continue;
248 }
249 for (auto &obs : obsVector) {
250 if (obs.isNotifyOnEnabled_) {
251 obs.observer_->OnChange(it->second);
252 }
253 }
254 }
255 }
256
Init()257 bool RdbSubscriberManager::Init()
258 {
259 if (serviceCallback_ == nullptr) {
260 LOG_INFO("callback init");
261 serviceCallback_ = new RdbObserverStub([this](const RdbChangeNode &changeNode) {
262 Emit(changeNode);
263 });
264 }
265 return true;
266 }
267
Destroy()268 void RdbSubscriberManager::Destroy()
269 {
270 if (BaseCallbacks::GetEnabledSubscriberSize() == 0) {
271 if (serviceCallback_ != nullptr) {
272 serviceCallback_->ClearCallback();
273 }
274 LOG_INFO("no valid subscriber, delete callback");
275 serviceCallback_ = nullptr;
276 }
277 }
278
RdbObserver(const RdbCallback & callback)279 RdbObserver::RdbObserver(const RdbCallback &callback) : callback_(callback) {}
280
OnChange(const RdbChangeNode & changeNode)281 void RdbObserver::OnChange(const RdbChangeNode &changeNode)
282 {
283 callback_(changeNode);
284 }
285
operator ==(const RdbObserver & rhs) const286 bool RdbObserver::operator==(const RdbObserver &rhs) const
287 {
288 return false;
289 }
290
operator !=(const RdbObserver & rhs) const291 bool RdbObserver::operator!=(const RdbObserver &rhs) const
292 {
293 return !(rhs == *this);
294 }
295 } // namespace DataShare
296 } // namespace OHOS
297