• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "published_data_subscriber_manager.h"
17 
18 #include <cinttypes>
19 
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23 
24 namespace OHOS {
25 namespace DataShare {
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId,const PublishedDataCallback & callback)26 std::vector<OperationResult> PublishedDataSubscriberManager::AddObservers(void *subscriber,
27     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId,
28     const PublishedDataCallback &callback)
29 {
30     if (proxy == nullptr) {
31         LOG_ERROR("proxy is nullptr");
32         return std::vector<OperationResult>();
33     }
34     std::vector<Key> keys;
35     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
36         keys.emplace_back(uri, subscriberId);
37     });
38     return BaseCallbacks::AddObservers(
39         keys, subscriber, std::make_shared<Observer>(callback),
40         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
41             Emit(localRegisterKeys, observer);
42         },
43         [&proxy, subscriber, &subscriberId, this](const std::vector<Key> &firstAddKeys,
44             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
45             std::vector<std::string> firstAddUris;
46             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
47                 firstAddUris.emplace_back(result);
48             });
49             if (firstAddUris.empty()) {
50                 return;
51             }
52 
53             auto subResults = proxy->SubscribePublishedData(firstAddUris, subscriberId, serviceCallback_);
54             std::vector<Key> failedKeys;
55             for (auto &subResult : subResults) {
56                 opResult.emplace_back(subResult);
57                 if (subResult.errCode_ != E_OK) {
58                     failedKeys.emplace_back(subResult.key_, subscriberId);
59                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
60                 }
61             }
62             if (failedKeys.size() > 0) {
63                 BaseCallbacks::DelObservers(failedKeys, subscriber);
64             }
65         });
66 }
67 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)68 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
69     std::shared_ptr<DataShareServiceProxy> proxy)
70 {
71     if (proxy == nullptr) {
72         LOG_ERROR("proxy is nullptr");
73         return std::vector<OperationResult>();
74     }
75     return BaseCallbacks::DelObservers(subscriber,
76         [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
77             // delete all obs by subscriber
78             std::map<int64_t, std::vector<std::string>> keysMap;
79             for (auto const &key : lastDelKeys) {
80                 lastChangeNodeMap_.Erase(key);
81                 keysMap[key.subscriberId_].emplace_back(key.uri_);
82             }
83             for (auto const &[subscriberId, uris] : keysMap) {
84                 auto results = proxy->UnSubscribePublishedData(uris, subscriberId);
85                 opResult.insert(opResult.end(), results.begin(), results.end());
86             }
87         });
88 }
89 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)90 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
91     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
92 {
93     if (proxy == nullptr) {
94         LOG_ERROR("proxy is nullptr");
95         return std::vector<OperationResult>();
96     }
97     if (uris.empty()) {
98         return DelObservers(subscriber, proxy);
99     }
100 
101     std::vector<Key> keys;
102     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
103         keys.emplace_back(uri, subscriberId);
104     });
105     return BaseCallbacks::DelObservers(keys, subscriber,
106         [&proxy, &subscriberId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
107             std::vector<std::string> lastDelUris;
108             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
109                 lastChangeNodeMap_.Erase(result);
110                 lastDelUris.emplace_back(result);
111             });
112             if (lastDelUris.empty()) {
113                 return;
114             }
115             auto unsubResult = proxy->UnSubscribePublishedData(lastDelUris, subscriberId);
116             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
117         });
118 }
119 
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)120 std::vector<OperationResult> PublishedDataSubscriberManager::EnableObservers(void *subscriber,
121     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
122 {
123     if (proxy == nullptr) {
124         LOG_ERROR("proxy is nullptr");
125         return std::vector<OperationResult>();
126     }
127     std::vector<Key> keys;
128     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
129         keys.emplace_back(uri, subscriberId);
130     });
131     return BaseCallbacks::EnableObservers(
132         keys, subscriber, [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
133             EmitOnEnable(obsMap);
134         },
135         [&proxy, &subscriberId, subscriber, this](const std::vector<Key> &firstAddKeys,
136         std::vector<OperationResult> &opResult) {
137             std::vector<std::string> firstAddUris;
138             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
139                 firstAddUris.emplace_back(result);
140             });
141             if (firstAddUris.empty()) {
142                 return;
143             }
144             auto subResults = proxy->EnableSubscribePublishedData(firstAddUris, subscriberId);
145             std::vector<Key> failedKeys;
146             for (auto &subResult : subResults) {
147                 opResult.emplace_back(subResult);
148                 if (subResult.errCode_ != E_OK) {
149                     failedKeys.emplace_back(subResult.key_, subscriberId);
150                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
151                 }
152             }
153             if (failedKeys.size() > 0) {
154                 BaseCallbacks::DisableObservers(failedKeys, subscriber);
155             }
156         });
157 }
158 
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)159 std::vector<OperationResult> PublishedDataSubscriberManager::DisableObservers(void *subscriber,
160     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
161 {
162     if (proxy == nullptr) {
163         LOG_ERROR("proxy is nullptr");
164         return std::vector<OperationResult>();
165     }
166     std::vector<Key> keys;
167     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
168         keys.emplace_back(uri, subscriberId);
169     });
170     return BaseCallbacks::DisableObservers(keys, subscriber,
171         [&proxy, &subscriberId, this](const std::vector<Key> &lastDisabledKeys,
172         std::vector<OperationResult> &opResult) {
173             std::vector<std::string> lastDisabledUris;
174             std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
175                 lastDisabledUris.emplace_back(result);
176             });
177             if (lastDisabledUris.empty()) {
178                 return;
179             }
180 
181             auto results = proxy->DisableSubscribePublishedData(lastDisabledUris, subscriberId);
182             opResult.insert(opResult.end(), results.begin(), results.end());
183         });
184 }
185 
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)186 void PublishedDataSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
187 {
188     if (proxy == nullptr) {
189         LOG_ERROR("proxy is nullptr");
190         return;
191     }
192 
193     std::map<int64_t, std::vector<std::string>> keysMap;
194     std::vector<Key> keys = CallbacksManager::GetKeys();
195     for (const auto& key : keys) {
196         keysMap[key.subscriberId_].emplace_back(key.uri_);
197     }
198     for (const auto &[subscriberId, uris] : keysMap) {
199         auto results = proxy->SubscribePublishedData(uris, subscriberId, serviceCallback_);
200         for (const auto& result : results) {
201             if (result.errCode_ != E_OK) {
202                 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
203                          result.key_.c_str(), result.errCode_);
204             }
205         }
206     }
207 }
208 
Emit(PublishedDataChangeNode & changeNode)209 void PublishedDataSubscriberManager::Emit(PublishedDataChangeNode &changeNode)
210 {
211     for (auto &data : changeNode.datas_) {
212         Key key(data.key_, data.subscriberId_);
213         lastChangeNodeMap_.Compute(key, [](const Key &, PublishedDataChangeNode &value) {
214             value.datas_.clear();
215             return true;
216         });
217     }
218     std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
219     for (auto &data : changeNode.datas_) {
220         PublishedObserverMapKey key(data.key_, data.subscriberId_);
221         auto callbacks = BaseCallbacks::GetEnabledObservers(key);
222         if (callbacks.empty()) {
223             LOG_WARN("%{private}s nobody subscribe, but still notify", data.key_.c_str());
224             continue;
225         }
226         lastChangeNodeMap_.Compute(key, [&data, &changeNode](const Key &, PublishedDataChangeNode &value) {
227             value.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
228             value.ownerBundleName_ = changeNode.ownerBundleName_;
229             return true;
230         });
231         for (auto const &obs : callbacks) {
232             results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
233         }
234         BaseCallbacks::SetObserversNotifiedOnEnabled(key);
235     }
236     for (auto &[callback, node] : results) {
237         node.ownerBundleName_ = changeNode.ownerBundleName_;
238         callback->OnChange(node);
239     }
240 }
241 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)242 void PublishedDataSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
243 {
244     PublishedDataChangeNode node;
245     for (auto &key : keys) {
246         lastChangeNodeMap_.ComputeIfPresent(key, [&node](const Key &, PublishedDataChangeNode &value) {
247             for (auto &data : value.datas_) {
248                 node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
249             }
250             node.ownerBundleName_ = value.ownerBundleName_;
251             return true;
252         });
253     }
254     observer->OnChange(node);
255 }
256 
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)257 void PublishedDataSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
258 {
259     std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
260     for (auto &[key, obsVector] : obsMap) {
261         uint32_t num = 0;
262         lastChangeNodeMap_.ComputeIfPresent(key, [&obsVector = obsVector, &results, &num](const Key &,
263             PublishedDataChangeNode &value) {
264             for (auto &data : value.datas_) {
265                 PublishedObserverMapKey mapKey(data.key_, data.subscriberId_);
266                 for (auto &obs : obsVector) {
267                     if (obs.isNotifyOnEnabled_) {
268                         num++;
269                         results[obs.observer_].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
270                         results[obs.observer_].ownerBundleName_ = value.ownerBundleName_;
271                         obs.isNotifyOnEnabled_ = false;
272                     }
273                 }
274             }
275             return true;
276         });
277         if (num > 0) {
278             LOG_INFO("%{public}u will refresh, total %{public}zu, uri %{public}s, subscribeId %{public}" PRId64,
279                 num, obsVector.size(), DataShareStringUtils::Anonymous(key.uri_).c_str(), key.subscriberId_);
280         }
281     }
282     for (auto &[callback, node] : results) {
283         callback->OnChange(node);
284     }
285 }
286 
GetInstance()287 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
288 {
289     static PublishedDataSubscriberManager manager;
290     return manager;
291 }
292 
PublishedDataSubscriberManager()293 PublishedDataSubscriberManager::PublishedDataSubscriberManager()
294 {
295     serviceCallback_ = new PublishedDataObserverStub([this](PublishedDataChangeNode &changeNode) {
296         Emit(changeNode);
297     });
298 }
299 
PublishedDataObserver(const PublishedDataCallback & callback)300 PublishedDataObserver::PublishedDataObserver(const PublishedDataCallback &callback) : callback_(callback) {}
301 
OnChange(PublishedDataChangeNode & changeNode)302 void PublishedDataObserver::OnChange(PublishedDataChangeNode &changeNode)
303 {
304     callback_(changeNode);
305 }
306 
operator ==(const PublishedDataObserver & rhs) const307 bool PublishedDataObserver::operator==(const PublishedDataObserver &rhs) const
308 {
309     return false;
310 }
311 
operator !=(const PublishedDataObserver & rhs) const312 bool PublishedDataObserver::operator!=(const PublishedDataObserver &rhs) const
313 {
314     return !(rhs == *this);
315 }
316 } // namespace DataShare
317 } // namespace OHOS
318