1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "published_data_subscriber_manager.h"
17
18 #include <cinttypes>
19
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23
24 namespace OHOS {
25 namespace DataShare {
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId,const PublishedDataCallback & callback)26 std::vector<OperationResult> PublishedDataSubscriberManager::AddObservers(void *subscriber,
27 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId,
28 const PublishedDataCallback &callback)
29 {
30 if (proxy == nullptr) {
31 LOG_ERROR("proxy is nullptr");
32 return std::vector<OperationResult>();
33 }
34 std::vector<Key> keys;
35 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
36 keys.emplace_back(uri, subscriberId);
37 });
38 return BaseCallbacks::AddObservers(
39 keys, subscriber, std::make_shared<Observer>(callback),
40 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
41 Emit(localRegisterKeys, observer);
42 },
43 [&proxy, subscriber, &subscriberId, this](const std::vector<Key> &firstAddKeys,
44 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
45 std::vector<std::string> firstAddUris;
46 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
47 firstAddUris.emplace_back(result);
48 });
49 if (firstAddUris.empty()) {
50 return;
51 }
52
53 auto subResults = proxy->SubscribePublishedData(firstAddUris, subscriberId, serviceCallback_);
54 std::vector<Key> failedKeys;
55 for (auto &subResult : subResults) {
56 opResult.emplace_back(subResult);
57 if (subResult.errCode_ != E_OK) {
58 failedKeys.emplace_back(subResult.key_, subscriberId);
59 LOG_WARN("registered failed, uri is %{public}s",
60 DataShareStringUtils::Anonymous(subResult.key_).c_str());
61 }
62 }
63 if (failedKeys.size() > 0) {
64 BaseCallbacks::DelObservers(failedKeys, subscriber);
65 }
66 });
67 }
68
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)69 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
70 std::shared_ptr<DataShareServiceProxy> proxy)
71 {
72 if (proxy == nullptr) {
73 LOG_ERROR("proxy is nullptr");
74 return std::vector<OperationResult>();
75 }
76 return BaseCallbacks::DelObservers(subscriber,
77 [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
78 // delete all obs by subscriber
79 std::map<int64_t, std::vector<std::string>> keysMap;
80 for (auto const &key : lastDelKeys) {
81 lastChangeNodeMap_.Erase(key);
82 keysMap[key.subscriberId_].emplace_back(key.uri_);
83 }
84 for (auto const &[subscriberId, uris] : keysMap) {
85 auto results = proxy->UnSubscribePublishedData(uris, subscriberId);
86 opResult.insert(opResult.end(), results.begin(), results.end());
87 }
88 });
89 }
90
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)91 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
92 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
93 {
94 if (proxy == nullptr) {
95 LOG_ERROR("proxy is nullptr");
96 return std::vector<OperationResult>();
97 }
98 if (uris.empty()) {
99 return DelObservers(subscriber, proxy);
100 }
101
102 std::vector<Key> keys;
103 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
104 keys.emplace_back(uri, subscriberId);
105 });
106 return BaseCallbacks::DelObservers(keys, subscriber,
107 [&proxy, &subscriberId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
108 std::vector<std::string> lastDelUris;
109 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
110 lastChangeNodeMap_.Erase(result);
111 lastDelUris.emplace_back(result);
112 });
113 if (lastDelUris.empty()) {
114 return;
115 }
116 auto unsubResult = proxy->UnSubscribePublishedData(lastDelUris, subscriberId);
117 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
118 });
119 }
120
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)121 std::vector<OperationResult> PublishedDataSubscriberManager::EnableObservers(void *subscriber,
122 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
123 {
124 if (proxy == nullptr) {
125 LOG_ERROR("proxy is nullptr");
126 return std::vector<OperationResult>();
127 }
128 std::vector<Key> keys;
129 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
130 keys.emplace_back(uri, subscriberId);
131 });
132 return BaseCallbacks::EnableObservers(
133 keys, subscriber, [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
134 EmitOnEnable(obsMap);
135 },
136 [&proxy, &subscriberId, subscriber, this](const std::vector<Key> &firstAddKeys,
137 std::vector<OperationResult> &opResult) {
138 std::vector<std::string> firstAddUris;
139 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
140 firstAddUris.emplace_back(result);
141 });
142 if (firstAddUris.empty()) {
143 return;
144 }
145 auto subResults = proxy->EnableSubscribePublishedData(firstAddUris, subscriberId);
146 std::vector<Key> failedKeys;
147 for (auto &subResult : subResults) {
148 opResult.emplace_back(subResult);
149 if (subResult.errCode_ != E_OK) {
150 failedKeys.emplace_back(subResult.key_, subscriberId);
151 LOG_WARN("registered failed, uri is %{public}s",
152 DataShareStringUtils::Anonymous(subResult.key_).c_str());
153 }
154 }
155 if (failedKeys.size() > 0) {
156 BaseCallbacks::DisableObservers(failedKeys, subscriber);
157 }
158 });
159 }
160
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)161 std::vector<OperationResult> PublishedDataSubscriberManager::DisableObservers(void *subscriber,
162 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
163 {
164 if (proxy == nullptr) {
165 LOG_ERROR("proxy is nullptr");
166 return std::vector<OperationResult>();
167 }
168 std::vector<Key> keys;
169 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
170 keys.emplace_back(uri, subscriberId);
171 });
172 return BaseCallbacks::DisableObservers(keys, subscriber,
173 [&proxy, &subscriberId, subscriber, this](const std::vector<Key> &lastDisabledKeys,
174 std::vector<OperationResult> &opResult) {
175 std::vector<std::string> lastDisabledUris;
176 std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
177 lastDisabledUris.emplace_back(result);
178 });
179 if (lastDisabledUris.empty()) {
180 return;
181 }
182
183 auto results = proxy->DisableSubscribePublishedData(lastDisabledUris, subscriberId);
184 std::vector<Key> failedKeys;
185 for (auto &result : results) {
186 opResult.emplace_back(result);
187 if (result.errCode_ != E_OK) {
188 failedKeys.emplace_back(result.key_, subscriberId);
189 LOG_WARN("DisableObservers failed, uri is %{public}s, errCode is %{public}d",
190 DataShareStringUtils::Anonymous(result.key_).c_str(), result.errCode_);
191 }
192 }
193 if (failedKeys.size() > 0) {
194 BaseCallbacks::EnableObservers(failedKeys, subscriber);
195 }
196 });
197 }
198
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)199 void PublishedDataSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
200 {
201 if (proxy == nullptr) {
202 LOG_ERROR("proxy is nullptr");
203 return;
204 }
205
206 std::map<int64_t, std::vector<std::string>> keysMap;
207 std::vector<Key> keys = CallbacksManager::GetKeys();
208 for (const auto& key : keys) {
209 keysMap[key.subscriberId_].emplace_back(key.uri_);
210 }
211 for (const auto &[subscriberId, uris] : keysMap) {
212 auto results = proxy->SubscribePublishedData(uris, subscriberId, serviceCallback_);
213 for (const auto& result : results) {
214 if (result.errCode_ != E_OK) {
215 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
216 DataShareStringUtils::Anonymous(result.key_).c_str(), result.errCode_);
217 }
218 }
219 }
220 }
221
Emit(PublishedDataChangeNode & changeNode)222 void PublishedDataSubscriberManager::Emit(PublishedDataChangeNode &changeNode)
223 {
224 std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
225 for (auto &data : changeNode.datas_) {
226 PublishedObserverMapKey key(data.key_, data.subscriberId_);
227 // Still set observer was notified flag and store data if there is no enabled observer.
228 BaseCallbacks::SetObserversNotifiedOnEnabled(key);
229 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
230 lastChangeNodeMap_.Compute(key, [&data, &changeNode](const Key &, PublishedDataChangeNode &value) {
231 value.datas_.clear();
232 value.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
233 value.ownerBundleName_ = changeNode.ownerBundleName_;
234 return true;
235 });
236
237 if (callbacks.empty()) {
238 LOG_WARN("%{public}s nobody subscribe, but still notify",
239 DataShareStringUtils::Anonymous(data.key_).c_str());
240 continue;
241 }
242 for (auto const &obs : callbacks) {
243 results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
244 }
245 }
246 for (auto &[callback, node] : results) {
247 node.ownerBundleName_ = changeNode.ownerBundleName_;
248 callback->OnChange(node);
249 }
250 }
251
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)252 void PublishedDataSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
253 {
254 PublishedDataChangeNode node;
255 for (auto &key : keys) {
256 lastChangeNodeMap_.ComputeIfPresent(key, [&node](const Key &, PublishedDataChangeNode &value) {
257 for (auto &data : value.datas_) {
258 node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
259 }
260 node.ownerBundleName_ = value.ownerBundleName_;
261 return true;
262 });
263 }
264 observer->OnChange(node);
265 }
266
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)267 void PublishedDataSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
268 {
269 std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
270 for (auto &[key, obsVector] : obsMap) {
271 uint32_t num = 0;
272 lastChangeNodeMap_.ComputeIfPresent(key, [&obsVector = obsVector, &results, &num](const Key &,
273 PublishedDataChangeNode &value) {
274 for (auto &data : value.datas_) {
275 PublishedObserverMapKey mapKey(data.key_, data.subscriberId_);
276 for (auto &obs : obsVector) {
277 if (obs.isNotifyOnEnabled_) {
278 num++;
279 results[obs.observer_].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
280 results[obs.observer_].ownerBundleName_ = value.ownerBundleName_;
281 }
282 }
283 }
284 return true;
285 });
286 if (num > 0) {
287 LOG_INFO("%{public}u will refresh, total %{public}zu, uri %{public}s, subscribeId %{public}" PRId64,
288 num, obsVector.size(), DataShareStringUtils::Anonymous(key.uri_).c_str(), key.subscriberId_);
289 }
290 }
291 for (auto &[callback, node] : results) {
292 callback->OnChange(node);
293 }
294 }
295
GetInstance()296 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
297 {
298 static PublishedDataSubscriberManager manager;
299 return manager;
300 }
301
PublishedDataSubscriberManager()302 PublishedDataSubscriberManager::PublishedDataSubscriberManager()
303 {
304 serviceCallback_ = new (std::nothrow)PublishedDataObserverStub([this](PublishedDataChangeNode &changeNode) {
305 Emit(changeNode);
306 });
307 }
308
PublishedDataObserver(const PublishedDataCallback & callback)309 PublishedDataObserver::PublishedDataObserver(const PublishedDataCallback &callback) : callback_(callback) {}
310
OnChange(PublishedDataChangeNode & changeNode)311 void PublishedDataObserver::OnChange(PublishedDataChangeNode &changeNode)
312 {
313 callback_(changeNode);
314 }
315
operator ==(const PublishedDataObserver & rhs) const316 bool PublishedDataObserver::operator==(const PublishedDataObserver &rhs) const
317 {
318 return false;
319 }
320
operator !=(const PublishedDataObserver & rhs) const321 bool PublishedDataObserver::operator!=(const PublishedDataObserver &rhs) const
322 {
323 return !(rhs == *this);
324 }
325 } // namespace DataShare
326 } // namespace OHOS
327