1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "published_data_subscriber_manager.h"
17
18 #include "data_proxy_observer_stub.h"
19 #include "datashare_log.h"
20
21 namespace OHOS {
22 namespace DataShare {
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId,const PublishedDataCallback & callback)23 std::vector<OperationResult> PublishedDataSubscriberManager::AddObservers(void *subscriber,
24 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId,
25 const PublishedDataCallback &callback)
26 {
27 if (proxy == nullptr) {
28 LOG_ERROR("proxy is nullptr");
29 return std::vector<OperationResult>();
30 }
31 std::vector<Key> keys;
32 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
33 keys.emplace_back(uri, subscriberId);
34 });
35 return BaseCallbacks::AddObservers(
36 keys, subscriber, std::make_shared<Observer>(callback),
37 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
38 Emit(localRegisterKeys, observer);
39 },
40 [&proxy, subscriber, &subscriberId, this](const std::vector<Key> &firstAddKeys,
41 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
42 std::vector<std::string> firstAddUris;
43 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
44 firstAddUris.emplace_back(result);
45 });
46 if (firstAddUris.empty()) {
47 return;
48 }
49
50 Init();
51 auto subResults = proxy->SubscribePublishedData(firstAddUris, subscriberId, serviceCallback_);
52 std::vector<Key> failedKeys;
53 for (auto &subResult : subResults) {
54 opResult.emplace_back(subResult);
55 if (subResult.errCode_ != E_OK) {
56 failedKeys.emplace_back(subResult.key_, subscriberId);
57 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
58 }
59 }
60 if (failedKeys.size() > 0) {
61 BaseCallbacks::DelObservers(failedKeys, subscriber);
62 }
63 Destroy();
64 });
65 }
66
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)67 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
68 std::shared_ptr<DataShareServiceProxy> proxy)
69 {
70 if (proxy == nullptr) {
71 LOG_ERROR("proxy is nullptr");
72 return std::vector<OperationResult>();
73 }
74 return BaseCallbacks::DelObservers(subscriber,
75 [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
76 // delete all obs by subscriber
77 std::map<int64_t, std::vector<std::string>> keysMap;
78 for (auto const &key : lastDelKeys) {
79 lastChangeNodeMap_.erase(key);
80 keysMap[key.subscriberId_].emplace_back(key.uri_);
81 }
82 for (auto const &[subscriberId, uris] : keysMap) {
83 auto results = proxy->UnSubscribePublishedData(uris, subscriberId);
84 opResult.insert(opResult.end(), results.begin(), results.end());
85 }
86 Destroy();
87 });
88 }
89
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)90 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
91 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
92 {
93 if (proxy == nullptr) {
94 LOG_ERROR("proxy is nullptr");
95 return std::vector<OperationResult>();
96 }
97 if (uris.empty()) {
98 return DelObservers(subscriber, proxy);
99 }
100
101 std::vector<Key> keys;
102 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
103 keys.emplace_back(uri, subscriberId);
104 });
105 return BaseCallbacks::DelObservers(keys, subscriber,
106 [&proxy, &subscriberId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
107 std::vector<std::string> lastDelUris;
108 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
109 lastChangeNodeMap_.erase(result);
110 lastDelUris.emplace_back(result);
111 });
112 if (lastDelUris.empty()) {
113 return;
114 }
115 auto unsubResult = proxy->UnSubscribePublishedData(lastDelUris, subscriberId);
116 if (BaseCallbacks::GetEnabledSubscriberSize() == 0) {
117 LOG_INFO("no valid subscriber, delete callback");
118 serviceCallback_ = nullptr;
119 }
120 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
121 Destroy();
122 });
123 }
124
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)125 std::vector<OperationResult> PublishedDataSubscriberManager::EnableObservers(void *subscriber,
126 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
127 {
128 if (proxy == nullptr) {
129 LOG_ERROR("proxy is nullptr");
130 return std::vector<OperationResult>();
131 }
132 std::vector<Key> keys;
133 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
134 keys.emplace_back(uri, subscriberId);
135 });
136 return BaseCallbacks::EnableObservers(
137 keys, subscriber, [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
138 EmitOnEnable(obsMap);
139 },
140 [&proxy, &subscriberId, subscriber, this](const std::vector<Key> &firstAddKeys,
141 std::vector<OperationResult> &opResult) {
142 std::vector<std::string> firstAddUris;
143 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
144 firstAddUris.emplace_back(result);
145 });
146 if (firstAddUris.empty()) {
147 return;
148 }
149 auto subResults = proxy->EnableSubscribePublishedData(firstAddUris, subscriberId);
150 std::vector<Key> failedKeys;
151 for (auto &subResult : subResults) {
152 opResult.emplace_back(subResult);
153 if (subResult.errCode_ != E_OK) {
154 failedKeys.emplace_back(subResult.key_, subscriberId);
155 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
156 }
157 }
158 if (failedKeys.size() > 0) {
159 BaseCallbacks::DisableObservers(failedKeys, subscriber);
160 }
161 });
162 }
163
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)164 std::vector<OperationResult> PublishedDataSubscriberManager::DisableObservers(void *subscriber,
165 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
166 {
167 if (proxy == nullptr) {
168 LOG_ERROR("proxy is nullptr");
169 return std::vector<OperationResult>();
170 }
171 std::vector<Key> keys;
172 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
173 keys.emplace_back(uri, subscriberId);
174 });
175 return BaseCallbacks::DisableObservers(keys, subscriber,
176 [&proxy, &subscriberId, this](const std::vector<Key> &lastDisabledKeys,
177 std::vector<OperationResult> &opResult) {
178 std::vector<std::string> lastDisabledUris;
179 std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
180 lastDisabledUris.emplace_back(result);
181 });
182 if (lastDisabledUris.empty()) {
183 return;
184 }
185
186 auto results = proxy->DisableSubscribePublishedData(lastDisabledUris, subscriberId);
187 opResult.insert(opResult.end(), results.begin(), results.end());
188 });
189 }
190
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)191 void PublishedDataSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
192 {
193 if (proxy == nullptr) {
194 LOG_ERROR("proxy is nullptr");
195 return;
196 }
197 return BaseCallbacks::RecoverObservers([&proxy, this](const std::vector<Key> &Keys) {
198 std::map<int64_t, std::vector<std::string>> keysMap;
199 for (auto const &key : Keys) {
200 keysMap[key.subscriberId_].emplace_back(key.uri_);
201 }
202 for (auto const &[subscriberId, uris] : keysMap) {
203 auto results = proxy->SubscribePublishedData(uris, subscriberId, serviceCallback_);
204 for (const auto& result : results) {
205 if (result.errCode_ != E_OK) {
206 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
207 result.key_.c_str(), result.errCode_);
208 }
209 }
210 }
211 });
212 }
213
Emit(PublishedDataChangeNode & changeNode)214 void PublishedDataSubscriberManager::Emit(PublishedDataChangeNode &changeNode)
215 {
216 for (auto &data : changeNode.datas_) {
217 Key key(data.key_, data.subscriberId_);
218 lastChangeNodeMap_[key].datas_.clear();
219 }
220 std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
221 for (auto &data : changeNode.datas_) {
222 PublishedObserverMapKey key(data.key_, data.subscriberId_);
223 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
224 if (callbacks.empty()) {
225 LOG_WARN("%{private}s nobody subscribe, but still notify", data.key_.c_str());
226 continue;
227 }
228 lastChangeNodeMap_[key].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
229 lastChangeNodeMap_[key].ownerBundleName_ = changeNode.ownerBundleName_;
230 for (auto const &obs : callbacks) {
231 results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
232 }
233 BaseCallbacks::SetObserversNotifiedOnEnabled(key);
234 }
235 for (auto &[callback, node] : results) {
236 node.ownerBundleName_ = changeNode.ownerBundleName_;
237 callback->OnChange(node);
238 }
239 }
240
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)241 void PublishedDataSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
242 {
243 PublishedDataChangeNode node;
244 for (auto &key : keys) {
245 auto it = lastChangeNodeMap_.find(key);
246 if (it == lastChangeNodeMap_.end()) {
247 continue;
248 }
249 for (auto &data : it->second.datas_) {
250 node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
251 }
252 node.ownerBundleName_ = it->second.ownerBundleName_;
253 }
254 observer->OnChange(node);
255 }
256
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)257 void PublishedDataSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
258 {
259 std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
260 for (auto &[key, obsVector] : obsMap) {
261 auto it = lastChangeNodeMap_.find(key);
262 if (it == lastChangeNodeMap_.end()) {
263 continue;
264 }
265 for (auto &data : it->second.datas_) {
266 PublishedObserverMapKey mapKey(data.key_, data.subscriberId_);
267 for (auto &obs : obsVector) {
268 if (obs.isNotifyOnEnabled_) {
269 results[obs.observer_].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
270 results[obs.observer_].ownerBundleName_ = it->second.ownerBundleName_;
271 }
272 }
273 }
274 }
275 for (auto &[callback, node] : results) {
276 callback->OnChange(node);
277 }
278 }
279
Init()280 bool PublishedDataSubscriberManager::Init()
281 {
282 if (serviceCallback_ == nullptr) {
283 LOG_INFO("callback init");
284 serviceCallback_ = new PublishedDataObserverStub([this](PublishedDataChangeNode &changeNode) {
285 Emit(changeNode);
286 });
287 }
288 return true;
289 }
290
Destroy()291 void PublishedDataSubscriberManager::Destroy()
292 {
293 if (BaseCallbacks::GetEnabledSubscriberSize() == 0) {
294 if (serviceCallback_ != nullptr) {
295 serviceCallback_->ClearCallback();
296 }
297 LOG_INFO("no valid subscriber, delete callback");
298 serviceCallback_ = nullptr;
299 }
300 }
301
GetInstance()302 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
303 {
304 static PublishedDataSubscriberManager manager;
305 return manager;
306 }
307
PublishedDataSubscriberManager()308 PublishedDataSubscriberManager::PublishedDataSubscriberManager()
309 {
310 serviceCallback_ = nullptr;
311 }
312
PublishedDataObserver(const PublishedDataCallback & callback)313 PublishedDataObserver::PublishedDataObserver(const PublishedDataCallback &callback) : callback_(callback) {}
314
OnChange(PublishedDataChangeNode & changeNode)315 void PublishedDataObserver::OnChange(PublishedDataChangeNode &changeNode)
316 {
317 callback_(changeNode);
318 }
319
operator ==(const PublishedDataObserver & rhs) const320 bool PublishedDataObserver::operator==(const PublishedDataObserver &rhs) const
321 {
322 return false;
323 }
324
operator !=(const PublishedDataObserver & rhs) const325 bool PublishedDataObserver::operator!=(const PublishedDataObserver &rhs) const
326 {
327 return !(rhs == *this);
328 }
329 } // namespace DataShare
330 } // namespace OHOS
331