• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "published_data_subscriber_manager.h"
17 
18 #include "data_proxy_observer_stub.h"
19 #include "datashare_log.h"
20 
21 namespace OHOS {
22 namespace DataShare {
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId,const PublishedDataCallback & callback)23 std::vector<OperationResult> PublishedDataSubscriberManager::AddObservers(void *subscriber,
24     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId,
25     const PublishedDataCallback &callback)
26 {
27     if (proxy == nullptr) {
28         LOG_ERROR("proxy is nullptr");
29         return std::vector<OperationResult>();
30     }
31     std::vector<Key> keys;
32     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
33         keys.emplace_back(uri, subscriberId);
34     });
35     return BaseCallbacks::AddObservers(
36         keys, subscriber, std::make_shared<Observer>(callback),
37         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
38             Emit(localRegisterKeys, observer);
39         },
40         [&proxy, subscriber, &subscriberId, this](const std::vector<Key> &firstAddKeys,
41             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
42             std::vector<std::string> firstAddUris;
43             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
44                 firstAddUris.emplace_back(result);
45             });
46             if (firstAddUris.empty()) {
47                 return;
48             }
49 
50             Init();
51             auto subResults = proxy->SubscribePublishedData(firstAddUris, subscriberId, serviceCallback_);
52             std::vector<Key> failedKeys;
53             for (auto &subResult : subResults) {
54                 opResult.emplace_back(subResult);
55                 if (subResult.errCode_ != E_OK) {
56                     failedKeys.emplace_back(subResult.key_, subscriberId);
57                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
58                 }
59             }
60             if (failedKeys.size() > 0) {
61                 BaseCallbacks::DelObservers(failedKeys, subscriber);
62             }
63             Destroy();
64         });
65 }
66 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)67 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
68     std::shared_ptr<DataShareServiceProxy> proxy)
69 {
70     if (proxy == nullptr) {
71         LOG_ERROR("proxy is nullptr");
72         return std::vector<OperationResult>();
73     }
74     return BaseCallbacks::DelObservers(subscriber,
75         [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
76             // delete all obs by subscriber
77             std::map<int64_t, std::vector<std::string>> keysMap;
78             for (auto const &key : lastDelKeys) {
79                 lastChangeNodeMap_.erase(key);
80                 keysMap[key.subscriberId_].emplace_back(key.uri_);
81             }
82             for (auto const &[subscriberId, uris] : keysMap) {
83                 auto results = proxy->UnSubscribePublishedData(uris, subscriberId);
84                 opResult.insert(opResult.end(), results.begin(), results.end());
85             }
86             Destroy();
87         });
88 }
89 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)90 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
91     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
92 {
93     if (proxy == nullptr) {
94         LOG_ERROR("proxy is nullptr");
95         return std::vector<OperationResult>();
96     }
97     if (uris.empty()) {
98         return DelObservers(subscriber, proxy);
99     }
100 
101     std::vector<Key> keys;
102     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
103         keys.emplace_back(uri, subscriberId);
104     });
105     return BaseCallbacks::DelObservers(keys, subscriber,
106         [&proxy, &subscriberId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
107             std::vector<std::string> lastDelUris;
108             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
109                 lastChangeNodeMap_.erase(result);
110                 lastDelUris.emplace_back(result);
111             });
112             if (lastDelUris.empty()) {
113                 return;
114             }
115             auto unsubResult = proxy->UnSubscribePublishedData(lastDelUris, subscriberId);
116             if (BaseCallbacks::GetEnabledSubscriberSize() == 0) {
117                 LOG_INFO("no valid subscriber, delete callback");
118                 serviceCallback_ = nullptr;
119             }
120             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
121             Destroy();
122         });
123 }
124 
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)125 std::vector<OperationResult> PublishedDataSubscriberManager::EnableObservers(void *subscriber,
126     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
127 {
128     if (proxy == nullptr) {
129         LOG_ERROR("proxy is nullptr");
130         return std::vector<OperationResult>();
131     }
132     std::vector<Key> keys;
133     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
134         keys.emplace_back(uri, subscriberId);
135     });
136     return BaseCallbacks::EnableObservers(
137         keys, subscriber, [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
138             EmitOnEnable(obsMap);
139         },
140         [&proxy, &subscriberId, subscriber, this](const std::vector<Key> &firstAddKeys,
141         std::vector<OperationResult> &opResult) {
142             std::vector<std::string> firstAddUris;
143             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
144                 firstAddUris.emplace_back(result);
145             });
146             if (firstAddUris.empty()) {
147                 return;
148             }
149             auto subResults = proxy->EnableSubscribePublishedData(firstAddUris, subscriberId);
150             std::vector<Key> failedKeys;
151             for (auto &subResult : subResults) {
152                 opResult.emplace_back(subResult);
153                 if (subResult.errCode_ != E_OK) {
154                     failedKeys.emplace_back(subResult.key_, subscriberId);
155                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
156                 }
157             }
158             if (failedKeys.size() > 0) {
159                 BaseCallbacks::DisableObservers(failedKeys, subscriber);
160             }
161         });
162 }
163 
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)164 std::vector<OperationResult> PublishedDataSubscriberManager::DisableObservers(void *subscriber,
165     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
166 {
167     if (proxy == nullptr) {
168         LOG_ERROR("proxy is nullptr");
169         return std::vector<OperationResult>();
170     }
171     std::vector<Key> keys;
172     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
173         keys.emplace_back(uri, subscriberId);
174     });
175     return BaseCallbacks::DisableObservers(keys, subscriber,
176         [&proxy, &subscriberId, this](const std::vector<Key> &lastDisabledKeys,
177         std::vector<OperationResult> &opResult) {
178             std::vector<std::string> lastDisabledUris;
179             std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
180                 lastDisabledUris.emplace_back(result);
181             });
182             if (lastDisabledUris.empty()) {
183                 return;
184             }
185 
186             auto results = proxy->DisableSubscribePublishedData(lastDisabledUris, subscriberId);
187             opResult.insert(opResult.end(), results.begin(), results.end());
188         });
189 }
190 
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)191 void PublishedDataSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
192 {
193     if (proxy == nullptr) {
194         LOG_ERROR("proxy is nullptr");
195         return;
196     }
197     return BaseCallbacks::RecoverObservers([&proxy, this](const std::vector<Key> &Keys) {
198         std::map<int64_t, std::vector<std::string>> keysMap;
199         for (auto const &key : Keys) {
200             keysMap[key.subscriberId_].emplace_back(key.uri_);
201         }
202         for (auto const &[subscriberId, uris] : keysMap) {
203             auto results = proxy->SubscribePublishedData(uris, subscriberId, serviceCallback_);
204             for (const auto& result : results) {
205                 if (result.errCode_ != E_OK) {
206                     LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
207                         result.key_.c_str(), result.errCode_);
208                 }
209             }
210         }
211     });
212 }
213 
Emit(PublishedDataChangeNode & changeNode)214 void PublishedDataSubscriberManager::Emit(PublishedDataChangeNode &changeNode)
215 {
216     for (auto &data : changeNode.datas_) {
217         Key key(data.key_, data.subscriberId_);
218         lastChangeNodeMap_[key].datas_.clear();
219     }
220     std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
221     for (auto &data : changeNode.datas_) {
222         PublishedObserverMapKey key(data.key_, data.subscriberId_);
223         auto callbacks = BaseCallbacks::GetEnabledObservers(key);
224         if (callbacks.empty()) {
225             LOG_WARN("%{private}s nobody subscribe, but still notify", data.key_.c_str());
226             continue;
227         }
228         lastChangeNodeMap_[key].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
229         lastChangeNodeMap_[key].ownerBundleName_ = changeNode.ownerBundleName_;
230         for (auto const &obs : callbacks) {
231             results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
232         }
233         BaseCallbacks::SetObserversNotifiedOnEnabled(key);
234     }
235     for (auto &[callback, node] : results) {
236         node.ownerBundleName_ = changeNode.ownerBundleName_;
237         callback->OnChange(node);
238     }
239 }
240 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)241 void PublishedDataSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
242 {
243     PublishedDataChangeNode node;
244     for (auto &key : keys) {
245         auto it = lastChangeNodeMap_.find(key);
246         if (it == lastChangeNodeMap_.end()) {
247             continue;
248         }
249         for (auto &data : it->second.datas_) {
250             node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
251         }
252         node.ownerBundleName_ = it->second.ownerBundleName_;
253     }
254     observer->OnChange(node);
255 }
256 
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)257 void PublishedDataSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
258 {
259     std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
260     for (auto &[key, obsVector] : obsMap) {
261         auto it = lastChangeNodeMap_.find(key);
262         if (it == lastChangeNodeMap_.end()) {
263             continue;
264         }
265         for (auto &data : it->second.datas_) {
266             PublishedObserverMapKey mapKey(data.key_, data.subscriberId_);
267             for (auto &obs : obsVector) {
268                 if (obs.isNotifyOnEnabled_) {
269                     results[obs.observer_].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
270                     results[obs.observer_].ownerBundleName_ = it->second.ownerBundleName_;
271                 }
272             }
273         }
274     }
275     for (auto &[callback, node] : results) {
276         callback->OnChange(node);
277     }
278 }
279 
Init()280 bool PublishedDataSubscriberManager::Init()
281 {
282     if (serviceCallback_ == nullptr) {
283         LOG_INFO("callback init");
284         serviceCallback_ = new PublishedDataObserverStub([this](PublishedDataChangeNode &changeNode) {
285             Emit(changeNode);
286         });
287     }
288     return true;
289 }
290 
Destroy()291 void PublishedDataSubscriberManager::Destroy()
292 {
293     if (BaseCallbacks::GetEnabledSubscriberSize() == 0) {
294         if (serviceCallback_ != nullptr) {
295             serviceCallback_->ClearCallback();
296         }
297         LOG_INFO("no valid subscriber, delete callback");
298         serviceCallback_ = nullptr;
299     }
300 }
301 
GetInstance()302 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
303 {
304     static PublishedDataSubscriberManager manager;
305     return manager;
306 }
307 
PublishedDataSubscriberManager()308 PublishedDataSubscriberManager::PublishedDataSubscriberManager()
309 {
310     serviceCallback_ = nullptr;
311 }
312 
PublishedDataObserver(const PublishedDataCallback & callback)313 PublishedDataObserver::PublishedDataObserver(const PublishedDataCallback &callback) : callback_(callback) {}
314 
OnChange(PublishedDataChangeNode & changeNode)315 void PublishedDataObserver::OnChange(PublishedDataChangeNode &changeNode)
316 {
317     callback_(changeNode);
318 }
319 
operator ==(const PublishedDataObserver & rhs) const320 bool PublishedDataObserver::operator==(const PublishedDataObserver &rhs) const
321 {
322     return false;
323 }
324 
operator !=(const PublishedDataObserver & rhs) const325 bool PublishedDataObserver::operator!=(const PublishedDataObserver &rhs) const
326 {
327     return !(rhs == *this);
328 }
329 } // namespace DataShare
330 } // namespace OHOS
331