1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rdb_subscriber_manager.h"
17
18 #include "data_proxy_observer_stub.h"
19 #include "datashare_log.h"
20
21 namespace OHOS {
22 namespace DataShare {
GetInstance()23 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
24 {
25 static RdbSubscriberManager manager;
26 return manager;
27 }
28
RdbSubscriberManager()29 RdbSubscriberManager::RdbSubscriberManager()
30 {
31 serviceCallback_ = nullptr;
32 }
33
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId,const RdbCallback & callback)34 std::vector<OperationResult> RdbSubscriberManager::AddObservers(void *subscriber,
35 std::shared_ptr<DataShareServiceProxy> proxy,
36 const std::vector<std::string> &uris, const TemplateId &templateId, const RdbCallback &callback)
37 {
38 if (proxy == nullptr) {
39 LOG_ERROR("proxy is nullptr");
40 return std::vector<OperationResult>();
41 }
42 std::vector<Key> keys;
43 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
44 keys.emplace_back(uri, templateId);
45 });
46 return BaseCallbacks::AddObservers(
47 keys, subscriber, std::make_shared<Observer>(callback),
48 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
49 Emit(localRegisterKeys, observer);
50 },
51 [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
52 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
53 std::vector<std::string> firstAddUris;
54 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
55 firstAddUris.emplace_back(result);
56 });
57 if (firstAddUris.empty()) {
58 return;
59 }
60
61 Init();
62 auto subResults = proxy->SubscribeRdbData(firstAddUris, templateId, serviceCallback_);
63 std::vector<Key> failedKeys;
64 for (auto &subResult : subResults) {
65 opResult.emplace_back(subResult);
66 if (subResult.errCode_ != E_OK) {
67 failedKeys.emplace_back(subResult.key_, templateId);
68 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
69 }
70 }
71 if (!failedKeys.empty()) {
72 BaseCallbacks::DelObservers(failedKeys, subscriber);
73 }
74 Destroy();
75 });
76 }
77
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)78 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
79 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
80 {
81 if (proxy == nullptr) {
82 LOG_ERROR("proxy is nullptr");
83 return std::vector<OperationResult>();
84 }
85 if (uris.empty()) {
86 return DelObservers(subscriber, proxy);
87 }
88
89 std::vector<Key> keys;
90 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
91 keys.emplace_back(uri, templateId);
92 });
93 return BaseCallbacks::DelObservers(keys, subscriber,
94 [&proxy, &templateId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
95 std::vector<std::string> lastDelUris;
96 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
97 lastDelUris.emplace_back(result);
98 lastChangeNodeMap_.erase(result);
99 });
100 if (lastDelUris.empty()) {
101 return;
102 }
103 auto unsubResult = proxy->UnSubscribeRdbData(lastDelUris, templateId);
104 if (BaseCallbacks::GetEnabledSubscriberSize() == 0) {
105 LOG_INFO("no valid subscriber, delete callback");
106 serviceCallback_ = nullptr;
107 }
108 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
109 Destroy();
110 });
111 }
112
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)113 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
114 std::shared_ptr<DataShareServiceProxy> proxy)
115 {
116 if (proxy == nullptr) {
117 LOG_ERROR("proxy is nullptr");
118 return std::vector<OperationResult>();
119 }
120 return BaseCallbacks::DelObservers(subscriber,
121 [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
122 // delete all obs by subscriber
123 for (const auto &key : lastDelKeys) {
124 lastChangeNodeMap_.erase(key);
125 auto unsubResult = proxy->UnSubscribeRdbData(std::vector<std::string>(1, key.uri_), key.templateId_);
126 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
127 }
128 Destroy();
129 });
130 }
131
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)132 std::vector<OperationResult> RdbSubscriberManager::EnableObservers(void *subscriber,
133 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
134 {
135 if (proxy == nullptr) {
136 LOG_ERROR("proxy is nullptr");
137 return std::vector<OperationResult>();
138 }
139 std::vector<Key> keys;
140 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
141 keys.emplace_back(uri, templateId);
142 });
143 return BaseCallbacks::EnableObservers(keys, subscriber,
144 [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
145 EmitOnEnable(obsMap);
146 },
147 [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
148 std::vector<OperationResult> &opResult) {
149 std::vector<std::string> firstAddUris;
150 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
151 firstAddUris.emplace_back(result);
152 });
153 if (firstAddUris.empty()) {
154 return;
155 }
156 auto subResults = proxy->EnableSubscribeRdbData(firstAddUris, templateId);
157 std::vector<Key> failedKeys;
158 for (auto &subResult : subResults) {
159 opResult.emplace_back(subResult);
160 if (subResult.errCode_ != E_OK) {
161 failedKeys.emplace_back(subResult.key_, templateId);
162 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
163 }
164 }
165 if (!failedKeys.empty()) {
166 BaseCallbacks::DisableObservers(failedKeys, subscriber);
167 }
168 });
169 }
170
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)171 std::vector<OperationResult> RdbSubscriberManager::DisableObservers(void *subscriber,
172 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
173 {
174 if (proxy == nullptr) {
175 LOG_ERROR("proxy is nullptr");
176 return std::vector<OperationResult>();
177 }
178 std::vector<Key> keys;
179 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
180 keys.emplace_back(uri, templateId);
181 });
182 return BaseCallbacks::DisableObservers(keys, subscriber,
183 [&proxy, &templateId, this](const std::vector<Key> &lastDisabledKeys, std::vector<OperationResult> &opResult) {
184 std::vector<std::string> lastDisabledUris;
185 std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
186 lastDisabledUris.emplace_back(result);
187 });
188 if (lastDisabledUris.empty()) {
189 return;
190 }
191
192 auto results = proxy->DisableSubscribeRdbData(lastDisabledUris, templateId);
193 opResult.insert(opResult.end(), results.begin(), results.end());
194 });
195 }
196
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)197 void RdbSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
198 {
199 if (proxy == nullptr) {
200 LOG_ERROR("proxy is nullptr");
201 return;
202 }
203 return BaseCallbacks::RecoverObservers([&proxy, this](const std::vector<Key> &Keys) {
204 std::map<TemplateId, std::vector<std::string>> keysMap;
205 for (auto const &key : Keys) {
206 keysMap[key.templateId_].emplace_back(key.uri_);
207 }
208 for (auto const &[templateId, uris] : keysMap) {
209 auto results = proxy->SubscribeRdbData(uris, templateId, serviceCallback_);
210 for (const auto& result : results) {
211 if (result.errCode_ != E_OK) {
212 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
213 result.key_.c_str(), result.errCode_);
214 }
215 }
216 }
217 });
218 }
219
Emit(const RdbChangeNode & changeNode)220 void RdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
221 {
222 RdbObserverMapKey key(changeNode.uri_, changeNode.templateId_);
223 lastChangeNodeMap_[key] = changeNode;
224 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
225 for (auto &obs : callbacks) {
226 if (obs != nullptr) {
227 obs->OnChange(changeNode);
228 }
229 }
230 BaseCallbacks::SetObserversNotifiedOnEnabled(key);
231 }
232
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)233 void RdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
234 {
235 for (auto const &key : keys) {
236 auto it = lastChangeNodeMap_.find(key);
237 if (it != lastChangeNodeMap_.end()) {
238 observer->OnChange(it->second);
239 }
240 }
241 }
242
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)243 void RdbSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
244 {
245 for (auto &[key, obsVector] : obsMap) {
246 auto it = lastChangeNodeMap_.find(key);
247 if (it == lastChangeNodeMap_.end()) {
248 continue;
249 }
250 for (auto &obs : obsVector) {
251 if (obs.isNotifyOnEnabled_) {
252 obs.observer_->OnChange(it->second);
253 }
254 }
255 }
256 }
257
Init()258 bool RdbSubscriberManager::Init()
259 {
260 if (serviceCallback_ == nullptr) {
261 LOG_INFO("callback init");
262 serviceCallback_ = new RdbObserverStub([this](const RdbChangeNode &changeNode) {
263 Emit(changeNode);
264 });
265 }
266 return true;
267 }
268
Destroy()269 void RdbSubscriberManager::Destroy()
270 {
271 if (BaseCallbacks::GetEnabledSubscriberSize() == 0) {
272 if (serviceCallback_ != nullptr) {
273 serviceCallback_->ClearCallback();
274 }
275 LOG_INFO("no valid subscriber, delete callback");
276 serviceCallback_ = nullptr;
277 }
278 }
279
RdbObserver(const RdbCallback & callback)280 RdbObserver::RdbObserver(const RdbCallback &callback) : callback_(callback) {}
281
OnChange(const RdbChangeNode & changeNode)282 void RdbObserver::OnChange(const RdbChangeNode &changeNode)
283 {
284 callback_(changeNode);
285 }
286
operator ==(const RdbObserver & rhs) const287 bool RdbObserver::operator==(const RdbObserver &rhs) const
288 {
289 return false;
290 }
291
operator !=(const RdbObserver & rhs) const292 bool RdbObserver::operator!=(const RdbObserver &rhs) const
293 {
294 return !(rhs == *this);
295 }
296 } // namespace DataShare
297 } // namespace OHOS
298