• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "published_data_subscriber_manager.h"
17 
18 #include <cinttypes>
19 
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23 
24 namespace OHOS {
25 namespace DataShare {
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId,const PublishedDataCallback & callback)26 std::vector<OperationResult> PublishedDataSubscriberManager::AddObservers(void *subscriber,
27     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId,
28     const PublishedDataCallback &callback)
29 {
30     if (proxy == nullptr) {
31         LOG_ERROR("proxy is nullptr");
32         return std::vector<OperationResult>();
33     }
34     std::vector<Key> keys;
35     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
36         keys.emplace_back(uri, subscriberId);
37     });
38     return BaseCallbacks::AddObservers(
39         keys, subscriber, std::make_shared<Observer>(callback),
40         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
41             Emit(localRegisterKeys, observer);
42         },
43         [&proxy, subscriber, &subscriberId, this](const std::vector<Key> &firstAddKeys,
44             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
45             std::vector<std::string> firstAddUris;
46             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
47                 firstAddUris.emplace_back(result);
48             });
49             if (firstAddUris.empty()) {
50                 return;
51             }
52 
53             auto subResults = proxy->SubscribePublishedData(firstAddUris, subscriberId, serviceCallback_);
54             std::vector<Key> failedKeys;
55             for (auto &subResult : subResults) {
56                 opResult.emplace_back(subResult);
57                 if (subResult.errCode_ != E_OK) {
58                     failedKeys.emplace_back(subResult.key_, subscriberId);
59                     LOG_WARN("registered failed, uri is %{public}s",
60                         DataShareStringUtils::Anonymous(subResult.key_).c_str());
61                 }
62             }
63             if (failedKeys.size() > 0) {
64                 BaseCallbacks::DelObservers(failedKeys, subscriber);
65             }
66         });
67 }
68 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)69 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
70     std::shared_ptr<DataShareServiceProxy> proxy)
71 {
72     if (proxy == nullptr) {
73         LOG_ERROR("proxy is nullptr");
74         return std::vector<OperationResult>();
75     }
76     return BaseCallbacks::DelObservers(subscriber,
77         [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
78             // delete all obs by subscriber
79             std::map<int64_t, std::vector<std::string>> keysMap;
80             for (auto const &key : lastDelKeys) {
81                 lastChangeNodeMap_.Erase(key);
82                 keysMap[key.subscriberId_].emplace_back(key.uri_);
83             }
84             for (auto const &[subscriberId, uris] : keysMap) {
85                 auto results = proxy->UnSubscribePublishedData(uris, subscriberId);
86                 opResult.insert(opResult.end(), results.begin(), results.end());
87             }
88         });
89 }
90 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)91 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
92     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
93 {
94     if (proxy == nullptr) {
95         LOG_ERROR("proxy is nullptr");
96         return std::vector<OperationResult>();
97     }
98     if (uris.empty()) {
99         return DelObservers(subscriber, proxy);
100     }
101 
102     std::vector<Key> keys;
103     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
104         keys.emplace_back(uri, subscriberId);
105     });
106     return BaseCallbacks::DelObservers(keys, subscriber,
107         [&proxy, &subscriberId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
108             std::vector<std::string> lastDelUris;
109             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
110                 lastChangeNodeMap_.Erase(result);
111                 lastDelUris.emplace_back(result);
112             });
113             if (lastDelUris.empty()) {
114                 return;
115             }
116             auto unsubResult = proxy->UnSubscribePublishedData(lastDelUris, subscriberId);
117             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
118         });
119 }
120 
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)121 std::vector<OperationResult> PublishedDataSubscriberManager::EnableObservers(void *subscriber,
122     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
123 {
124     if (proxy == nullptr) {
125         LOG_ERROR("proxy is nullptr");
126         return std::vector<OperationResult>();
127     }
128     std::vector<Key> keys;
129     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
130         keys.emplace_back(uri, subscriberId);
131     });
132     return BaseCallbacks::EnableObservers(
133         keys, subscriber, [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
134             EmitOnEnable(obsMap);
135         },
136         [&proxy, &subscriberId, subscriber, this](const std::vector<Key> &firstAddKeys,
137         std::vector<OperationResult> &opResult) {
138             std::vector<std::string> firstAddUris;
139             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
140                 firstAddUris.emplace_back(result);
141             });
142             if (firstAddUris.empty()) {
143                 return;
144             }
145             auto subResults = proxy->EnableSubscribePublishedData(firstAddUris, subscriberId);
146             std::vector<Key> failedKeys;
147             for (auto &subResult : subResults) {
148                 opResult.emplace_back(subResult);
149                 if (subResult.errCode_ != E_OK) {
150                     failedKeys.emplace_back(subResult.key_, subscriberId);
151                     LOG_WARN("registered failed, uri is %{public}s",
152                         DataShareStringUtils::Anonymous(subResult.key_).c_str());
153                 }
154             }
155             if (failedKeys.size() > 0) {
156                 BaseCallbacks::DisableObservers(failedKeys, subscriber);
157             }
158         });
159 }
160 
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)161 std::vector<OperationResult> PublishedDataSubscriberManager::DisableObservers(void *subscriber,
162     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
163 {
164     if (proxy == nullptr) {
165         LOG_ERROR("proxy is nullptr");
166         return std::vector<OperationResult>();
167     }
168     std::vector<Key> keys;
169     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
170         keys.emplace_back(uri, subscriberId);
171     });
172     return BaseCallbacks::DisableObservers(keys, subscriber,
173         [&proxy, &subscriberId, subscriber, this](const std::vector<Key> &lastDisabledKeys,
174         std::vector<OperationResult> &opResult) {
175             std::vector<std::string> lastDisabledUris;
176             std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
177                 lastDisabledUris.emplace_back(result);
178             });
179             if (lastDisabledUris.empty()) {
180                 return;
181             }
182 
183             auto results = proxy->DisableSubscribePublishedData(lastDisabledUris, subscriberId);
184             std::vector<Key> failedKeys;
185             for (auto &result : results) {
186                 opResult.emplace_back(result);
187                 if (result.errCode_ != E_OK) {
188                     failedKeys.emplace_back(result.key_, subscriberId);
189                     LOG_WARN("DisableObservers failed, uri is %{public}s, errCode is %{public}d",
190                         DataShareStringUtils::Anonymous(result.key_).c_str(), result.errCode_);
191                 }
192             }
193             if (failedKeys.size() > 0) {
194                 BaseCallbacks::EnableObservers(failedKeys, subscriber);
195             }
196         });
197 }
198 
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)199 void PublishedDataSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
200 {
201     if (proxy == nullptr) {
202         LOG_ERROR("proxy is nullptr");
203         return;
204     }
205 
206     std::map<int64_t, std::vector<std::string>> keysMap;
207     std::vector<Key> keys = CallbacksManager::GetKeys();
208     for (const auto& key : keys) {
209         keysMap[key.subscriberId_].emplace_back(key.uri_);
210     }
211     for (const auto &[subscriberId, uris] : keysMap) {
212         auto results = proxy->SubscribePublishedData(uris, subscriberId, serviceCallback_);
213         for (const auto& result : results) {
214             if (result.errCode_ != E_OK) {
215                 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
216                     DataShareStringUtils::Anonymous(result.key_).c_str(), result.errCode_);
217             }
218         }
219     }
220 }
221 
Emit(PublishedDataChangeNode & changeNode)222 void PublishedDataSubscriberManager::Emit(PublishedDataChangeNode &changeNode)
223 {
224     std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
225     for (auto &data : changeNode.datas_) {
226         PublishedObserverMapKey key(data.key_, data.subscriberId_);
227         // Still set observer was notified flag and store data if there is no enabled observer.
228         BaseCallbacks::SetObserversNotifiedOnEnabled(key);
229         auto callbacks = BaseCallbacks::GetEnabledObservers(key);
230         lastChangeNodeMap_.Compute(key, [&data, &changeNode](const Key &, PublishedDataChangeNode &value) {
231             value.datas_.clear();
232             value.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
233             value.ownerBundleName_ = changeNode.ownerBundleName_;
234             return true;
235         });
236 
237         if (callbacks.empty()) {
238             LOG_WARN("%{public}s nobody subscribe, but still notify",
239                 DataShareStringUtils::Anonymous(data.key_).c_str());
240             continue;
241         }
242         for (auto const &obs : callbacks) {
243             results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
244         }
245     }
246     for (auto &[callback, node] : results) {
247         node.ownerBundleName_ = changeNode.ownerBundleName_;
248         callback->OnChange(node);
249     }
250 }
251 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)252 void PublishedDataSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
253 {
254     PublishedDataChangeNode node;
255     for (auto &key : keys) {
256         lastChangeNodeMap_.ComputeIfPresent(key, [&node](const Key &, PublishedDataChangeNode &value) {
257             for (auto &data : value.datas_) {
258                 node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
259             }
260             node.ownerBundleName_ = value.ownerBundleName_;
261             return true;
262         });
263     }
264     observer->OnChange(node);
265 }
266 
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)267 void PublishedDataSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
268 {
269     std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
270     for (auto &[key, obsVector] : obsMap) {
271         uint32_t num = 0;
272         lastChangeNodeMap_.ComputeIfPresent(key, [&obsVector = obsVector, &results, &num](const Key &,
273             PublishedDataChangeNode &value) {
274             for (auto &data : value.datas_) {
275                 PublishedObserverMapKey mapKey(data.key_, data.subscriberId_);
276                 for (auto &obs : obsVector) {
277                     if (obs.isNotifyOnEnabled_) {
278                         num++;
279                         results[obs.observer_].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
280                         results[obs.observer_].ownerBundleName_ = value.ownerBundleName_;
281                     }
282                 }
283             }
284             return true;
285         });
286         if (num > 0) {
287             LOG_INFO("%{public}u will refresh, total %{public}zu, uri %{public}s, subscribeId %{public}" PRId64,
288                 num, obsVector.size(), DataShareStringUtils::Anonymous(key.uri_).c_str(), key.subscriberId_);
289         }
290     }
291     for (auto &[callback, node] : results) {
292         callback->OnChange(node);
293     }
294 }
295 
GetInstance()296 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
297 {
298     static PublishedDataSubscriberManager manager;
299     return manager;
300 }
301 
PublishedDataSubscriberManager()302 PublishedDataSubscriberManager::PublishedDataSubscriberManager()
303 {
304     serviceCallback_ = new (std::nothrow)PublishedDataObserverStub([this](PublishedDataChangeNode &changeNode) {
305         Emit(changeNode);
306     });
307 }
308 
PublishedDataObserver(const PublishedDataCallback & callback)309 PublishedDataObserver::PublishedDataObserver(const PublishedDataCallback &callback) : callback_(callback) {}
310 
OnChange(PublishedDataChangeNode & changeNode)311 void PublishedDataObserver::OnChange(PublishedDataChangeNode &changeNode)
312 {
313     callback_(changeNode);
314 }
315 
operator ==(const PublishedDataObserver & rhs) const316 bool PublishedDataObserver::operator==(const PublishedDataObserver &rhs) const
317 {
318     return false;
319 }
320 
operator !=(const PublishedDataObserver & rhs) const321 bool PublishedDataObserver::operator!=(const PublishedDataObserver &rhs) const
322 {
323     return !(rhs == *this);
324 }
325 } // namespace DataShare
326 } // namespace OHOS
327