1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "published_data_subscriber_manager.h"
17
18 #include <cinttypes>
19
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23
24 namespace OHOS {
25 namespace DataShare {
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId,const PublishedDataCallback & callback)26 std::vector<OperationResult> PublishedDataSubscriberManager::AddObservers(void *subscriber,
27 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId,
28 const PublishedDataCallback &callback)
29 {
30 if (proxy == nullptr) {
31 LOG_ERROR("proxy is nullptr");
32 return std::vector<OperationResult>();
33 }
34 std::vector<Key> keys;
35 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
36 keys.emplace_back(uri, subscriberId);
37 });
38 return BaseCallbacks::AddObservers(
39 keys, subscriber, std::make_shared<Observer>(callback),
40 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
41 Emit(localRegisterKeys, observer);
42 },
43 [&proxy, subscriber, &subscriberId, this](const std::vector<Key> &firstAddKeys,
44 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
45 std::vector<std::string> firstAddUris;
46 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
47 firstAddUris.emplace_back(result);
48 });
49 if (firstAddUris.empty()) {
50 return;
51 }
52
53 auto subResults = proxy->SubscribePublishedData(firstAddUris, subscriberId, serviceCallback_);
54 std::vector<Key> failedKeys;
55 for (auto &subResult : subResults) {
56 opResult.emplace_back(subResult);
57 if (subResult.errCode_ != E_OK) {
58 failedKeys.emplace_back(subResult.key_, subscriberId);
59 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
60 }
61 }
62 if (failedKeys.size() > 0) {
63 BaseCallbacks::DelObservers(failedKeys, subscriber);
64 }
65 });
66 }
67
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)68 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
69 std::shared_ptr<DataShareServiceProxy> proxy)
70 {
71 if (proxy == nullptr) {
72 LOG_ERROR("proxy is nullptr");
73 return std::vector<OperationResult>();
74 }
75 return BaseCallbacks::DelObservers(subscriber,
76 [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
77 // delete all obs by subscriber
78 std::map<int64_t, std::vector<std::string>> keysMap;
79 for (auto const &key : lastDelKeys) {
80 lastChangeNodeMap_.Erase(key);
81 keysMap[key.subscriberId_].emplace_back(key.uri_);
82 }
83 for (auto const &[subscriberId, uris] : keysMap) {
84 auto results = proxy->UnSubscribePublishedData(uris, subscriberId);
85 opResult.insert(opResult.end(), results.begin(), results.end());
86 }
87 });
88 }
89
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)90 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
91 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
92 {
93 if (proxy == nullptr) {
94 LOG_ERROR("proxy is nullptr");
95 return std::vector<OperationResult>();
96 }
97 if (uris.empty()) {
98 return DelObservers(subscriber, proxy);
99 }
100
101 std::vector<Key> keys;
102 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
103 keys.emplace_back(uri, subscriberId);
104 });
105 return BaseCallbacks::DelObservers(keys, subscriber,
106 [&proxy, &subscriberId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
107 std::vector<std::string> lastDelUris;
108 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
109 lastChangeNodeMap_.Erase(result);
110 lastDelUris.emplace_back(result);
111 });
112 if (lastDelUris.empty()) {
113 return;
114 }
115 auto unsubResult = proxy->UnSubscribePublishedData(lastDelUris, subscriberId);
116 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
117 });
118 }
119
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)120 std::vector<OperationResult> PublishedDataSubscriberManager::EnableObservers(void *subscriber,
121 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
122 {
123 if (proxy == nullptr) {
124 LOG_ERROR("proxy is nullptr");
125 return std::vector<OperationResult>();
126 }
127 std::vector<Key> keys;
128 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
129 keys.emplace_back(uri, subscriberId);
130 });
131 return BaseCallbacks::EnableObservers(
132 keys, subscriber, [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
133 EmitOnEnable(obsMap);
134 },
135 [&proxy, &subscriberId, subscriber, this](const std::vector<Key> &firstAddKeys,
136 std::vector<OperationResult> &opResult) {
137 std::vector<std::string> firstAddUris;
138 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
139 firstAddUris.emplace_back(result);
140 });
141 if (firstAddUris.empty()) {
142 return;
143 }
144 auto subResults = proxy->EnableSubscribePublishedData(firstAddUris, subscriberId);
145 std::vector<Key> failedKeys;
146 for (auto &subResult : subResults) {
147 opResult.emplace_back(subResult);
148 if (subResult.errCode_ != E_OK) {
149 failedKeys.emplace_back(subResult.key_, subscriberId);
150 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
151 }
152 }
153 if (failedKeys.size() > 0) {
154 BaseCallbacks::DisableObservers(failedKeys, subscriber);
155 }
156 });
157 }
158
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)159 std::vector<OperationResult> PublishedDataSubscriberManager::DisableObservers(void *subscriber,
160 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
161 {
162 if (proxy == nullptr) {
163 LOG_ERROR("proxy is nullptr");
164 return std::vector<OperationResult>();
165 }
166 std::vector<Key> keys;
167 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
168 keys.emplace_back(uri, subscriberId);
169 });
170 return BaseCallbacks::DisableObservers(keys, subscriber,
171 [&proxy, &subscriberId, this](const std::vector<Key> &lastDisabledKeys,
172 std::vector<OperationResult> &opResult) {
173 std::vector<std::string> lastDisabledUris;
174 std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
175 lastDisabledUris.emplace_back(result);
176 });
177 if (lastDisabledUris.empty()) {
178 return;
179 }
180
181 auto results = proxy->DisableSubscribePublishedData(lastDisabledUris, subscriberId);
182 opResult.insert(opResult.end(), results.begin(), results.end());
183 });
184 }
185
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)186 void PublishedDataSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
187 {
188 if (proxy == nullptr) {
189 LOG_ERROR("proxy is nullptr");
190 return;
191 }
192
193 std::map<int64_t, std::vector<std::string>> keysMap;
194 std::vector<Key> keys = CallbacksManager::GetKeys();
195 for (const auto& key : keys) {
196 keysMap[key.subscriberId_].emplace_back(key.uri_);
197 }
198 for (const auto &[subscriberId, uris] : keysMap) {
199 auto results = proxy->SubscribePublishedData(uris, subscriberId, serviceCallback_);
200 for (const auto& result : results) {
201 if (result.errCode_ != E_OK) {
202 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
203 result.key_.c_str(), result.errCode_);
204 }
205 }
206 }
207 }
208
Emit(PublishedDataChangeNode & changeNode)209 void PublishedDataSubscriberManager::Emit(PublishedDataChangeNode &changeNode)
210 {
211 for (auto &data : changeNode.datas_) {
212 Key key(data.key_, data.subscriberId_);
213 lastChangeNodeMap_.Compute(key, [](const Key &, PublishedDataChangeNode &value) {
214 value.datas_.clear();
215 return true;
216 });
217 }
218 std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
219 for (auto &data : changeNode.datas_) {
220 PublishedObserverMapKey key(data.key_, data.subscriberId_);
221 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
222 if (callbacks.empty()) {
223 LOG_WARN("%{private}s nobody subscribe, but still notify", data.key_.c_str());
224 continue;
225 }
226 lastChangeNodeMap_.Compute(key, [&data, &changeNode](const Key &, PublishedDataChangeNode &value) {
227 value.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
228 value.ownerBundleName_ = changeNode.ownerBundleName_;
229 return true;
230 });
231 for (auto const &obs : callbacks) {
232 results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
233 }
234 BaseCallbacks::SetObserversNotifiedOnEnabled(key);
235 }
236 for (auto &[callback, node] : results) {
237 node.ownerBundleName_ = changeNode.ownerBundleName_;
238 callback->OnChange(node);
239 }
240 }
241
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)242 void PublishedDataSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
243 {
244 PublishedDataChangeNode node;
245 for (auto &key : keys) {
246 lastChangeNodeMap_.ComputeIfPresent(key, [&node](const Key &, PublishedDataChangeNode &value) {
247 for (auto &data : value.datas_) {
248 node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
249 }
250 node.ownerBundleName_ = value.ownerBundleName_;
251 return true;
252 });
253 }
254 observer->OnChange(node);
255 }
256
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)257 void PublishedDataSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
258 {
259 std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
260 for (auto &[key, obsVector] : obsMap) {
261 uint32_t num = 0;
262 lastChangeNodeMap_.ComputeIfPresent(key, [&obsVector = obsVector, &results, &num](const Key &,
263 PublishedDataChangeNode &value) {
264 for (auto &data : value.datas_) {
265 PublishedObserverMapKey mapKey(data.key_, data.subscriberId_);
266 for (auto &obs : obsVector) {
267 if (obs.isNotifyOnEnabled_) {
268 num++;
269 results[obs.observer_].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
270 results[obs.observer_].ownerBundleName_ = value.ownerBundleName_;
271 obs.isNotifyOnEnabled_ = false;
272 }
273 }
274 }
275 return true;
276 });
277 if (num > 0) {
278 LOG_INFO("%{public}u will refresh, total %{public}zu, uri %{public}s, subscribeId %{public}" PRId64,
279 num, obsVector.size(), DataShareStringUtils::Anonymous(key.uri_).c_str(), key.subscriberId_);
280 }
281 }
282 for (auto &[callback, node] : results) {
283 callback->OnChange(node);
284 }
285 }
286
GetInstance()287 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
288 {
289 static PublishedDataSubscriberManager manager;
290 return manager;
291 }
292
PublishedDataSubscriberManager()293 PublishedDataSubscriberManager::PublishedDataSubscriberManager()
294 {
295 serviceCallback_ = new PublishedDataObserverStub([this](PublishedDataChangeNode &changeNode) {
296 Emit(changeNode);
297 });
298 }
299
PublishedDataObserver(const PublishedDataCallback & callback)300 PublishedDataObserver::PublishedDataObserver(const PublishedDataCallback &callback) : callback_(callback) {}
301
OnChange(PublishedDataChangeNode & changeNode)302 void PublishedDataObserver::OnChange(PublishedDataChangeNode &changeNode)
303 {
304 callback_(changeNode);
305 }
306
operator ==(const PublishedDataObserver & rhs) const307 bool PublishedDataObserver::operator==(const PublishedDataObserver &rhs) const
308 {
309 return false;
310 }
311
operator !=(const PublishedDataObserver & rhs) const312 bool PublishedDataObserver::operator!=(const PublishedDataObserver &rhs) const
313 {
314 return !(rhs == *this);
315 }
316 } // namespace DataShare
317 } // namespace OHOS
318