1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef DATA_SHARE_CALLBACKS_MANAGER_H
17 #define DATA_SHARE_CALLBACKS_MANAGER_H
18 #include <map>
19 #include <mutex>
20 #include <vector>
21
22 #include "datashare_errno.h"
23 #include "datashare_log.h"
24 #include "datashare_template.h"
25 #include "dataproxy_handle_common.h"
26
27 namespace OHOS::DataShare {
28 template<class Key, class Observer>
29 class CallbacksManager {
30 public:
31 struct ObserverNodeOnEnabled {
32 ObserverNodeOnEnabled(const std::shared_ptr<Observer> &observer, bool isNotifyOnEnabled = false)
observer_ObserverNodeOnEnabled33 : observer_(observer), isNotifyOnEnabled_(isNotifyOnEnabled) {};
34 std::shared_ptr<Observer> observer_;
35 bool isNotifyOnEnabled_;
36 };
37
38 std::vector<OperationResult> AddObservers(const std::vector<Key> &keys, void *subscriber,
39 const std::shared_ptr<Observer> observer,
40 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)>,
41 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
42 std::vector<OperationResult> &)>);
43
44 std::vector<DataProxyResult> AddObservers(const std::vector<Key> &keys, void *subscriber,
45 const std::shared_ptr<Observer> observer,
46 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
47 std::vector<DataProxyResult> &)>);
48
49 std::vector<OperationResult> DelObservers(const std::vector<Key> &keys, void *subscriber,
50 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
51 CallbacksManager::DefaultProcess);
52
53 std::vector<DataProxyResult> DelProxyDataObservers(const std::vector<Key> &keys, void *subscriber,
54 std::function<void(const std::vector<Key> &, std::vector<DataProxyResult> &)> processOnLastDel =
55 CallbacksManager::ProxyDataDefaultProcess);
56
57 std::vector<OperationResult> DelObservers(void *subscriber,
58 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
59 CallbacksManager::DefaultProcess);
60
61 std::vector<DataProxyResult> DelObservers(void *subscriber,
62 std::function<void(const std::vector<Key> &, std::vector<DataProxyResult> &)> processOnLastDel);
63
64 std::vector<OperationResult> EnableObservers(const std::vector<Key> &keys, void *subscriber,
65 std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled =
66 CallbacksManager::DefaultProcessOnLocalEnabled,
67 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnFirstAdd =
68 CallbacksManager::DefaultProcess);
69
70 std::vector<OperationResult> DisableObservers(const std::vector<Key> &keys, void *subscriber,
71 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
72 CallbacksManager::DefaultProcess);
73
74 std::vector<std::shared_ptr<Observer>> GetEnabledObservers(const Key &);
75 std::vector<std::shared_ptr<Observer>> GetObserversAndSetNotifiedOn(const Key &);
76
77 int GetAllSubscriberSize();
78 int GetAllSubscriberSize(const Key &key);
79 std::vector<Key> GetKeys();
80 void SetObserversNotifiedOnEnabled(const Key &key);
81 bool IsObserversNotifiedOnEnabled(const Key &key, std::shared_ptr<Observer> &observer);
82
83 private:
DefaultProcess(const std::vector<Key> &,std::vector<OperationResult> &)84 static void DefaultProcess(const std::vector<Key> &, std::vector<OperationResult> &){};
ProxyDataDefaultProcess(const std::vector<Key> &,std::vector<DataProxyResult> &)85 static void ProxyDataDefaultProcess(const std::vector<Key> &, std::vector<DataProxyResult> &){};
DefaultProcessOnLocalEnabled(std::map<Key,std::vector<ObserverNodeOnEnabled>> &)86 static void DefaultProcessOnLocalEnabled(std::map<Key, std::vector<ObserverNodeOnEnabled>> &){};
87 struct ObserverNode {
88 std::shared_ptr<Observer> observer_;
89 bool enabled_;
90 void *subscriber_;
91 bool isNotifyOnEnabled_;
ObserverNodeObserverNode92 ObserverNode(const std::shared_ptr<Observer> &observer, void *subscriber)
93 : observer_(observer), subscriber_(subscriber)
94 {
95 enabled_ = true;
96 isNotifyOnEnabled_ = false;
97 };
98 };
99 void DelLocalObservers(const Key &key, void *subscriber, std::vector<Key> &lastDelKeys,
100 std::vector<OperationResult> &result);
101 void DelLocalObservers(const Key &key, void *subscriber, std::vector<Key> &lastDelKeys,
102 std::vector<DataProxyResult> &result);
103 void DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result);
104 void DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys, std::vector<DataProxyResult> &result);
105 std::recursive_mutex mutex_{};
106 std::map<Key, std::vector<ObserverNode>> callbacks_;
107 };
108
109 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,void * subscriber,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer)> processOnLocalAdd,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)> processOnFirstAdd)110 std::vector<OperationResult> CallbacksManager<Key, Observer>::AddObservers(const std::vector<Key> &keys,
111 void *subscriber, const std::shared_ptr<Observer> observer,
112 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)> processOnLocalAdd,
113 std::function<void(const std::vector<Key> &,
114 const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &)> processOnFirstAdd)
115 {
116 std::vector<OperationResult> result;
117 std::vector<Key> firstRegisterKey;
118 std::vector<Key> localRegisterKey;
119 {
120 std::lock_guard<decltype(mutex_)> lck(mutex_);
121 for (auto &key : keys) {
122 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
123 if (enabledObservers.empty()) {
124 callbacks_[key].emplace_back(observer, subscriber);
125 firstRegisterKey.emplace_back(key);
126 continue;
127 }
128 localRegisterKey.emplace_back(key);
129 callbacks_[key].emplace_back(observer, subscriber);
130 result.emplace_back(key, E_OK);
131 }
132 }
133 if (!localRegisterKey.empty()) {
134 processOnLocalAdd(localRegisterKey, observer);
135 }
136 processOnFirstAdd(firstRegisterKey, observer, result);
137 return result;
138 }
139
140 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,void * subscriber,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<DataProxyResult> &)> processOnFirstAdd)141 std::vector<DataProxyResult> CallbacksManager<Key, Observer>::AddObservers(const std::vector<Key> &keys,
142 void *subscriber, const std::shared_ptr<Observer> observer,
143 std::function<void(const std::vector<Key> &,
144 const std::shared_ptr<Observer> &observer, std::vector<DataProxyResult> &)> processOnFirstAdd)
145 {
146 std::vector<DataProxyResult> result;
147 {
148 std::lock_guard<decltype(mutex_)> lck(mutex_);
149 for (auto &key : keys) {
150 callbacks_[key].emplace_back(observer, subscriber);
151 }
152 }
153 processOnFirstAdd(keys, observer, result);
154 return result;
155 }
156
157 template<class Key, class Observer>
GetKeys()158 std::vector<Key> CallbacksManager<Key, Observer>::GetKeys()
159 {
160 std::vector<Key> keys;
161 {
162 std::lock_guard<decltype(mutex_)> lck(mutex_);
163 for (auto &it : callbacks_) {
164 keys.emplace_back(it.first);
165 }
166 }
167 return keys;
168 }
169
170 template<class Key, class Observer>
DelLocalObservers(void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)171 void CallbacksManager<Key, Observer>::DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys,
172 std::vector<OperationResult> &result)
173 {
174 for (auto &it : callbacks_) {
175 DelLocalObservers(it.first, subscriber, lastDelKeys, result);
176 }
177 }
178
179 template<class Key, class Observer>
DelLocalObservers(void * subscriber,std::vector<Key> & lastDelKeys,std::vector<DataProxyResult> & result)180 void CallbacksManager<Key, Observer>::DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys,
181 std::vector<DataProxyResult> &result)
182 {
183 for (auto &it : callbacks_) {
184 DelLocalObservers(it.first, subscriber, lastDelKeys, result);
185 }
186 }
187
188 template<class Key, class Observer>
DelLocalObservers(const Key & key,void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)189 void CallbacksManager<Key, Observer>::DelLocalObservers(const Key &key, void *subscriber,
190 std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result)
191 {
192 auto it = callbacks_.find(key);
193 if (it == callbacks_.end()) {
194 result.emplace_back(key, E_UNREGISTERED_EMPTY);
195 return;
196 }
197 std::vector<ObserverNode> &callbacks = it->second;
198 auto callbackIt = callbacks.begin();
199 while (callbackIt != callbacks.end()) {
200 if (callbackIt->subscriber_ != subscriber) {
201 callbackIt++;
202 continue;
203 }
204 callbackIt = callbacks.erase(callbackIt);
205 }
206 if (!it->second.empty()) {
207 result.emplace_back(key, E_OK);
208 return;
209 }
210 lastDelKeys.emplace_back(key);
211 }
212
213 template<class Key, class Observer>
DelLocalObservers(const Key & key,void * subscriber,std::vector<Key> & lastDelKeys,std::vector<DataProxyResult> & result)214 void CallbacksManager<Key, Observer>::DelLocalObservers(const Key &key, void *subscriber,
215 std::vector<Key> &lastDelKeys, std::vector<DataProxyResult> &result)
216 {
217 auto it = callbacks_.find(key);
218 if (it == callbacks_.end()) {
219 result.emplace_back(key, INNER_ERROR);
220 return;
221 }
222 std::vector<ObserverNode> &callbacks = it->second;
223 auto callbackIt = callbacks.begin();
224 while (callbackIt != callbacks.end()) {
225 if (callbackIt->subscriber_ != subscriber) {
226 callbackIt++;
227 continue;
228 }
229 callbackIt = callbacks.erase(callbackIt);
230 }
231 if (!it->second.empty()) {
232 result.emplace_back(key, INNER_ERROR);
233 return;
234 }
235 lastDelKeys.emplace_back(key);
236 }
237
238 template<class Key, class Observer>
DelObservers(void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)239 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(void *subscriber,
240 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
241 {
242 std::vector<OperationResult> result;
243 std::vector<Key> lastDelKeys;
244 {
245 std::lock_guard<decltype(mutex_)> lck(mutex_);
246 DelLocalObservers(subscriber, lastDelKeys, result);
247 if (lastDelKeys.empty()) {
248 return result;
249 }
250 for (auto &key : lastDelKeys) {
251 callbacks_.erase(key);
252 }
253 }
254 processOnLastDel(lastDelKeys, result);
255 return result;
256 }
257
258 template<class Key, class Observer>
DelObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)259 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(const std::vector<Key> &keys,
260 void *subscriber, std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
261 {
262 std::vector<OperationResult> result;
263 std::vector<Key> lastDelKeys;
264 {
265 std::lock_guard<decltype(mutex_)> lck(mutex_);
266 for (auto &key : keys) {
267 DelLocalObservers(key, subscriber, lastDelKeys, result);
268 }
269 if (lastDelKeys.empty()) {
270 return result;
271 }
272 for (auto &key : lastDelKeys) {
273 callbacks_.erase(key);
274 }
275 }
276 processOnLastDel(lastDelKeys, result);
277 return result;
278 }
279
280 template<class Key, class Observer>
DelObservers(void * subscriber,std::function<void (const std::vector<Key> &,std::vector<DataProxyResult> &)> processOnLastDel)281 std::vector<DataProxyResult> CallbacksManager<Key, Observer>::DelObservers(void *subscriber,
282 std::function<void(const std::vector<Key> &, std::vector<DataProxyResult> &)> processOnLastDel)
283 {
284 std::vector<DataProxyResult> result;
285 std::vector<Key> lastDelKeys;
286 {
287 std::lock_guard<decltype(mutex_)> lck(mutex_);
288 DelLocalObservers(subscriber, lastDelKeys, result);
289 if (lastDelKeys.empty()) {
290 return result;
291 }
292 for (auto &key : lastDelKeys) {
293 callbacks_.erase(key);
294 }
295 }
296 processOnLastDel(lastDelKeys, result);
297 return result;
298 }
299
300 template<class Key, class Observer>
DelProxyDataObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<DataProxyResult> &)> processOnLastDel)301 std::vector<DataProxyResult> CallbacksManager<Key, Observer>::DelProxyDataObservers(const std::vector<Key> &keys,
302 void *subscriber, std::function<void(const std::vector<Key> &, std::vector<DataProxyResult> &)> processOnLastDel)
303 {
304 std::vector<DataProxyResult> result;
305 std::vector<Key> lastDelKeys;
306 {
307 std::lock_guard<decltype(mutex_)> lck(mutex_);
308 for (auto &key : keys) {
309 DelLocalObservers(key, subscriber, lastDelKeys, result);
310 }
311 if (lastDelKeys.empty()) {
312 return result;
313 }
314 for (auto &key : lastDelKeys) {
315 callbacks_.erase(key);
316 }
317 }
318 processOnLastDel(lastDelKeys, result);
319 return result;
320 }
321
322 template<class Key, class Observer>
GetEnabledObservers(const Key & inputKey)323 std::vector<std::shared_ptr<Observer>> CallbacksManager<Key, Observer>::GetEnabledObservers(const Key &inputKey)
324 {
325 std::lock_guard<decltype(mutex_)> lck(mutex_);
326 auto it = callbacks_.find(inputKey);
327 if (it == callbacks_.end()) {
328 return std::vector<std::shared_ptr<Observer>>();
329 }
330 std::vector<std::shared_ptr<Observer>> results;
331 for (const auto &value : it->second) {
332 if (value.enabled_ && value.observer_ != nullptr) {
333 results.emplace_back(value.observer_);
334 }
335 }
336 return results;
337 }
338
339 template<class Key, class Observer>
GetObserversAndSetNotifiedOn(const Key & inputKey)340 std::vector<std::shared_ptr<Observer>> CallbacksManager<Key, Observer>::GetObserversAndSetNotifiedOn(
341 const Key &inputKey)
342 {
343 std::lock_guard<decltype(mutex_)> lck(mutex_);
344 auto it = callbacks_.find(inputKey);
345 if (it == callbacks_.end()) {
346 return std::vector<std::shared_ptr<Observer>>();
347 }
348 std::vector<std::shared_ptr<Observer>> results;
349 uint32_t num = 0;
350 std::vector<ObserverNode> &callbacks = it->second;
351 for (auto &value : callbacks) {
352 if ((value.enabled_ && value.observer_ != nullptr)) {
353 results.emplace_back(value.observer_);
354 // if get this enabled observer and notify, it's isNotifyOnEnabled flag should be reset to false
355 value.isNotifyOnEnabled_ = false;
356 } else {
357 num++;
358 value.isNotifyOnEnabled_ = true;
359 }
360 }
361 if (num > 0) {
362 LOG_INFO("total %{public}zu, not refreshed %{public}u", callbacks.size(), num);
363 }
364 return results;
365 }
366
367 template<class Key, class Observer>
EnableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (std::map<Key,std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> enableServiceFunc)368 std::vector<OperationResult> CallbacksManager<Key, Observer>::EnableObservers(
369 const std::vector<Key> &keys, void *subscriber,
370 std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,
371 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> enableServiceFunc)
372 {
373 std::vector<OperationResult> result;
374 std::vector<Key> sendServiceKeys;
375 std::map<Key, std::vector<ObserverNodeOnEnabled>> refreshObservers;
376 {
377 std::lock_guard<decltype(mutex_)> lck(mutex_);
378 for (auto &key : keys) {
379 auto it = callbacks_.find(key);
380 if (it == callbacks_.end()) {
381 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
382 continue;
383 }
384
385 auto& allObservers = it->second;
386 auto iterator = std::find_if(allObservers.begin(), allObservers.end(), [&subscriber](ObserverNode node) {
387 if (node.subscriber_ == subscriber) {
388 return true;
389 }
390 return false;
391 });
392 if (iterator == allObservers.end()) {
393 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
394 continue;
395 }
396 if (iterator->enabled_) {
397 result.emplace_back(key, E_OK);
398 continue;
399 }
400
401 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
402 if (enabledObservers.empty()) {
403 sendServiceKeys.emplace_back(key);
404 }
405 refreshObservers[key].emplace_back(iterator->observer_, iterator->isNotifyOnEnabled_);
406 iterator->enabled_ = true;
407 }
408 }
409 enableServiceFunc(sendServiceKeys, result);
410 enableLocalFunc(refreshObservers);
411
412 return result;
413 }
414
415 template<class Key, class Observer>
DisableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDisable)416 std::vector<OperationResult> CallbacksManager<Key, Observer>::DisableObservers(const std::vector<Key> &keys,
417 void *subscriber,
418 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDisable)
419 {
420 std::vector<OperationResult> result;
421 std::vector<Key> lastDisabledKeys;
422 {
423 std::lock_guard<decltype(mutex_)> lck(mutex_);
424 for (auto &key : keys) {
425 auto it = callbacks_.find(key);
426 if (it == callbacks_.end()) {
427 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
428 continue;
429 }
430 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
431 if (enabledObservers.empty()) {
432 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
433 continue;
434 }
435
436 bool hasDisabled = false;
437 for (auto &item : callbacks_[key]) {
438 if (item.subscriber_ == subscriber) {
439 if (item.enabled_) {
440 item.enabled_ = false;
441 item.isNotifyOnEnabled_ = false;
442 }
443 hasDisabled = true;
444 }
445 }
446 if (!hasDisabled) {
447 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
448 continue;
449 }
450 enabledObservers = GetEnabledObservers(key);
451 if (!enabledObservers.empty()) {
452 result.emplace_back(key, E_OK);
453 continue;
454 }
455 lastDisabledKeys.emplace_back(key);
456 }
457 }
458 processOnLastDisable(lastDisabledKeys, result);
459 return result;
460 }
461
462 template<class Key, class Observer>
GetAllSubscriberSize()463 int CallbacksManager<Key, Observer>::GetAllSubscriberSize()
464 {
465 int count = 0;
466 std::lock_guard<decltype(mutex_)> lck(mutex_);
467 for (auto &[key, value] : callbacks_) {
468 count += GetAllSubscriberSize(key);
469 }
470 return count;
471 }
472
473 template<class Key, class Observer>
GetAllSubscriberSize(const Key & key)474 int CallbacksManager<Key, Observer>::GetAllSubscriberSize(const Key &key)
475 {
476 std::lock_guard<decltype(mutex_)> lck(mutex_);
477 auto it = callbacks_.find(key);
478 if (it == callbacks_.end()) {
479 return 0;
480 }
481 return it->second.size();
482 }
483
484 template<class Key, class Observer>
SetObserversNotifiedOnEnabled(const Key & key)485 void CallbacksManager<Key, Observer>::SetObserversNotifiedOnEnabled(const Key &key)
486 {
487 std::lock_guard<decltype(mutex_)> lck(mutex_);
488 auto it = callbacks_.find(key);
489 if (it == callbacks_.end()) {
490 return;
491 }
492 std::vector<ObserverNode> &callbacks = it->second;
493 uint32_t num = 0;
494 for (auto &observerNode : callbacks) {
495 if (!observerNode.enabled_) {
496 num++;
497 observerNode.isNotifyOnEnabled_ = true;
498 }
499 }
500 if (num > 0) {
501 LOG_INFO("total %{public}zu, not refreshed %{public}u", callbacks.size(), num);
502 }
503 }
504
505 template<class Key, class Observer>
IsObserversNotifiedOnEnabled(const Key & key,std::shared_ptr<Observer> & observer)506 bool CallbacksManager<Key, Observer>::IsObserversNotifiedOnEnabled(const Key &key, std::shared_ptr<Observer> &observer)
507 {
508 std::lock_guard<decltype(mutex_)> lck(mutex_);
509 auto it = callbacks_.find(key);
510 if (it == callbacks_.end()) {
511 return false;
512 }
513 std::vector<ObserverNode> &callbacks = it->second;
514 for (auto &node : callbacks) {
515 if (node.observer_ == observer) {
516 return node.isNotifyOnEnabled_;
517 }
518 }
519 return false;
520 }
521 } // namespace OHOS::DataShare
522 #endif // DATA_SHARE_CALLBACKS_MANAGER_H
523