• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef DATA_SHARE_CALLBACKS_MANAGER_H
17 #define DATA_SHARE_CALLBACKS_MANAGER_H
18 #include <map>
19 #include <mutex>
20 #include <vector>
21 
22 #include "datashare_errno.h"
23 #include "datashare_log.h"
24 #include "datashare_template.h"
25 #include "dataproxy_handle_common.h"
26 
27 namespace OHOS::DataShare {
28 template<class Key, class Observer>
29 class CallbacksManager {
30 public:
31     struct ObserverNodeOnEnabled {
32         ObserverNodeOnEnabled(const std::shared_ptr<Observer> &observer, bool isNotifyOnEnabled = false)
observer_ObserverNodeOnEnabled33             : observer_(observer), isNotifyOnEnabled_(isNotifyOnEnabled) {};
34         std::shared_ptr<Observer> observer_;
35         bool isNotifyOnEnabled_;
36     };
37 
38     std::vector<OperationResult> AddObservers(const std::vector<Key> &keys, void *subscriber,
39         const std::shared_ptr<Observer> observer,
40         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)>,
41         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
42             std::vector<OperationResult> &)>);
43 
44     std::vector<DataProxyResult> AddObservers(const std::vector<Key> &keys, void *subscriber,
45         const std::shared_ptr<Observer> observer,
46         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
47             std::vector<DataProxyResult> &)>);
48 
49     std::vector<OperationResult> DelObservers(const std::vector<Key> &keys, void *subscriber,
50         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
51             CallbacksManager::DefaultProcess);
52 
53     std::vector<DataProxyResult> DelProxyDataObservers(const std::vector<Key> &keys, void *subscriber,
54         std::function<void(const std::vector<Key> &, std::vector<DataProxyResult> &)> processOnLastDel =
55             CallbacksManager::ProxyDataDefaultProcess);
56 
57     std::vector<OperationResult> DelObservers(void *subscriber,
58         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
59             CallbacksManager::DefaultProcess);
60 
61     std::vector<DataProxyResult> DelObservers(void *subscriber,
62         std::function<void(const std::vector<Key> &, std::vector<DataProxyResult> &)> processOnLastDel);
63 
64     std::vector<OperationResult> EnableObservers(const std::vector<Key> &keys, void *subscriber,
65         std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled =
66             CallbacksManager::DefaultProcessOnLocalEnabled,
67         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnFirstAdd =
68             CallbacksManager::DefaultProcess);
69 
70     std::vector<OperationResult> DisableObservers(const std::vector<Key> &keys, void *subscriber,
71         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
72             CallbacksManager::DefaultProcess);
73 
74     std::vector<std::shared_ptr<Observer>> GetEnabledObservers(const Key &);
75     std::vector<std::shared_ptr<Observer>> GetObserversAndSetNotifiedOn(const Key &);
76 
77     int GetAllSubscriberSize();
78     int GetAllSubscriberSize(const Key &key);
79     std::vector<Key> GetKeys();
80     void SetObserversNotifiedOnEnabled(const Key &key);
81     bool IsObserversNotifiedOnEnabled(const Key &key, std::shared_ptr<Observer> &observer);
82 
83 private:
DefaultProcess(const std::vector<Key> &,std::vector<OperationResult> &)84     static void DefaultProcess(const std::vector<Key> &, std::vector<OperationResult> &){};
ProxyDataDefaultProcess(const std::vector<Key> &,std::vector<DataProxyResult> &)85     static void ProxyDataDefaultProcess(const std::vector<Key> &, std::vector<DataProxyResult> &){};
DefaultProcessOnLocalEnabled(std::map<Key,std::vector<ObserverNodeOnEnabled>> &)86     static void DefaultProcessOnLocalEnabled(std::map<Key, std::vector<ObserverNodeOnEnabled>> &){};
87     struct ObserverNode {
88         std::shared_ptr<Observer> observer_;
89         bool enabled_;
90         void *subscriber_;
91         bool isNotifyOnEnabled_;
ObserverNodeObserverNode92         ObserverNode(const std::shared_ptr<Observer> &observer, void *subscriber)
93             : observer_(observer), subscriber_(subscriber)
94         {
95             enabled_ = true;
96             isNotifyOnEnabled_ = false;
97         };
98     };
99     void DelLocalObservers(const Key &key, void *subscriber, std::vector<Key> &lastDelKeys,
100         std::vector<OperationResult> &result);
101     void DelLocalObservers(const Key &key, void *subscriber, std::vector<Key> &lastDelKeys,
102         std::vector<DataProxyResult> &result);
103     void DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result);
104     void DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys, std::vector<DataProxyResult> &result);
105     std::recursive_mutex mutex_{};
106     std::map<Key, std::vector<ObserverNode>> callbacks_;
107 };
108 
109 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,void * subscriber,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer)> processOnLocalAdd,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)> processOnFirstAdd)110 std::vector<OperationResult> CallbacksManager<Key, Observer>::AddObservers(const std::vector<Key> &keys,
111     void *subscriber, const std::shared_ptr<Observer> observer,
112     std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)> processOnLocalAdd,
113     std::function<void(const std::vector<Key> &,
114         const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &)> processOnFirstAdd)
115 {
116     std::vector<OperationResult> result;
117     std::vector<Key> firstRegisterKey;
118     std::vector<Key> localRegisterKey;
119     {
120         std::lock_guard<decltype(mutex_)> lck(mutex_);
121         for (auto &key : keys) {
122             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
123             if (enabledObservers.empty()) {
124                 callbacks_[key].emplace_back(observer, subscriber);
125                 firstRegisterKey.emplace_back(key);
126                 continue;
127             }
128             localRegisterKey.emplace_back(key);
129             callbacks_[key].emplace_back(observer, subscriber);
130             result.emplace_back(key, E_OK);
131         }
132     }
133     if (!localRegisterKey.empty()) {
134         processOnLocalAdd(localRegisterKey, observer);
135     }
136     processOnFirstAdd(firstRegisterKey, observer, result);
137     return result;
138 }
139 
140 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,void * subscriber,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<DataProxyResult> &)> processOnFirstAdd)141 std::vector<DataProxyResult> CallbacksManager<Key, Observer>::AddObservers(const std::vector<Key> &keys,
142     void *subscriber, const std::shared_ptr<Observer> observer,
143     std::function<void(const std::vector<Key> &,
144         const std::shared_ptr<Observer> &observer, std::vector<DataProxyResult> &)> processOnFirstAdd)
145 {
146     std::vector<DataProxyResult> result;
147     {
148         std::lock_guard<decltype(mutex_)> lck(mutex_);
149         for (auto &key : keys) {
150             callbacks_[key].emplace_back(observer, subscriber);
151         }
152     }
153     processOnFirstAdd(keys, observer, result);
154     return result;
155 }
156 
157 template<class Key, class Observer>
GetKeys()158 std::vector<Key> CallbacksManager<Key, Observer>::GetKeys()
159 {
160     std::vector<Key> keys;
161     {
162         std::lock_guard<decltype(mutex_)> lck(mutex_);
163         for (auto &it : callbacks_) {
164             keys.emplace_back(it.first);
165         }
166     }
167     return keys;
168 }
169 
170 template<class Key, class Observer>
DelLocalObservers(void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)171 void CallbacksManager<Key, Observer>::DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys,
172     std::vector<OperationResult> &result)
173 {
174     for (auto &it : callbacks_) {
175         DelLocalObservers(it.first, subscriber, lastDelKeys, result);
176     }
177 }
178 
179 template<class Key, class Observer>
DelLocalObservers(void * subscriber,std::vector<Key> & lastDelKeys,std::vector<DataProxyResult> & result)180 void CallbacksManager<Key, Observer>::DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys,
181     std::vector<DataProxyResult> &result)
182 {
183     for (auto &it : callbacks_) {
184         DelLocalObservers(it.first, subscriber, lastDelKeys, result);
185     }
186 }
187 
188 template<class Key, class Observer>
DelLocalObservers(const Key & key,void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)189 void CallbacksManager<Key, Observer>::DelLocalObservers(const Key &key, void *subscriber,
190     std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result)
191 {
192     auto it = callbacks_.find(key);
193     if (it == callbacks_.end()) {
194         result.emplace_back(key, E_UNREGISTERED_EMPTY);
195         return;
196     }
197     std::vector<ObserverNode> &callbacks = it->second;
198     auto callbackIt = callbacks.begin();
199     while (callbackIt != callbacks.end()) {
200         if (callbackIt->subscriber_ != subscriber) {
201             callbackIt++;
202             continue;
203         }
204         callbackIt = callbacks.erase(callbackIt);
205     }
206     if (!it->second.empty()) {
207         result.emplace_back(key, E_OK);
208         return;
209     }
210     lastDelKeys.emplace_back(key);
211 }
212 
213 template<class Key, class Observer>
DelLocalObservers(const Key & key,void * subscriber,std::vector<Key> & lastDelKeys,std::vector<DataProxyResult> & result)214 void CallbacksManager<Key, Observer>::DelLocalObservers(const Key &key, void *subscriber,
215     std::vector<Key> &lastDelKeys, std::vector<DataProxyResult> &result)
216 {
217     auto it = callbacks_.find(key);
218     if (it == callbacks_.end()) {
219         result.emplace_back(key, INNER_ERROR);
220         return;
221     }
222     std::vector<ObserverNode> &callbacks = it->second;
223     auto callbackIt = callbacks.begin();
224     while (callbackIt != callbacks.end()) {
225         if (callbackIt->subscriber_ != subscriber) {
226             callbackIt++;
227             continue;
228         }
229         callbackIt = callbacks.erase(callbackIt);
230     }
231     if (!it->second.empty()) {
232         result.emplace_back(key, INNER_ERROR);
233         return;
234     }
235     lastDelKeys.emplace_back(key);
236 }
237 
238 template<class Key, class Observer>
DelObservers(void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)239 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(void *subscriber,
240     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
241 {
242     std::vector<OperationResult> result;
243     std::vector<Key> lastDelKeys;
244     {
245         std::lock_guard<decltype(mutex_)> lck(mutex_);
246         DelLocalObservers(subscriber, lastDelKeys, result);
247         if (lastDelKeys.empty()) {
248             return result;
249         }
250         for (auto &key : lastDelKeys) {
251             callbacks_.erase(key);
252         }
253     }
254     processOnLastDel(lastDelKeys, result);
255     return result;
256 }
257 
258 template<class Key, class Observer>
DelObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)259 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(const std::vector<Key> &keys,
260     void *subscriber, std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
261 {
262     std::vector<OperationResult> result;
263     std::vector<Key> lastDelKeys;
264     {
265         std::lock_guard<decltype(mutex_)> lck(mutex_);
266         for (auto &key : keys) {
267             DelLocalObservers(key, subscriber, lastDelKeys, result);
268         }
269         if (lastDelKeys.empty()) {
270             return result;
271         }
272         for (auto &key : lastDelKeys) {
273             callbacks_.erase(key);
274         }
275     }
276     processOnLastDel(lastDelKeys, result);
277     return result;
278 }
279 
280 template<class Key, class Observer>
DelObservers(void * subscriber,std::function<void (const std::vector<Key> &,std::vector<DataProxyResult> &)> processOnLastDel)281 std::vector<DataProxyResult> CallbacksManager<Key, Observer>::DelObservers(void *subscriber,
282     std::function<void(const std::vector<Key> &, std::vector<DataProxyResult> &)> processOnLastDel)
283 {
284     std::vector<DataProxyResult> result;
285     std::vector<Key> lastDelKeys;
286     {
287         std::lock_guard<decltype(mutex_)> lck(mutex_);
288         DelLocalObservers(subscriber, lastDelKeys, result);
289         if (lastDelKeys.empty()) {
290             return result;
291         }
292         for (auto &key : lastDelKeys) {
293             callbacks_.erase(key);
294         }
295     }
296     processOnLastDel(lastDelKeys, result);
297     return result;
298 }
299 
300 template<class Key, class Observer>
DelProxyDataObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<DataProxyResult> &)> processOnLastDel)301 std::vector<DataProxyResult> CallbacksManager<Key, Observer>::DelProxyDataObservers(const std::vector<Key> &keys,
302     void *subscriber, std::function<void(const std::vector<Key> &, std::vector<DataProxyResult> &)> processOnLastDel)
303 {
304     std::vector<DataProxyResult> result;
305     std::vector<Key> lastDelKeys;
306     {
307         std::lock_guard<decltype(mutex_)> lck(mutex_);
308         for (auto &key : keys) {
309             DelLocalObservers(key, subscriber, lastDelKeys, result);
310         }
311         if (lastDelKeys.empty()) {
312             return result;
313         }
314         for (auto &key : lastDelKeys) {
315             callbacks_.erase(key);
316         }
317     }
318     processOnLastDel(lastDelKeys, result);
319     return result;
320 }
321 
322 template<class Key, class Observer>
GetEnabledObservers(const Key & inputKey)323 std::vector<std::shared_ptr<Observer>> CallbacksManager<Key, Observer>::GetEnabledObservers(const Key &inputKey)
324 {
325     std::lock_guard<decltype(mutex_)> lck(mutex_);
326     auto it = callbacks_.find(inputKey);
327     if (it == callbacks_.end()) {
328         return std::vector<std::shared_ptr<Observer>>();
329     }
330     std::vector<std::shared_ptr<Observer>> results;
331     for (const auto &value : it->second) {
332         if (value.enabled_ && value.observer_ != nullptr) {
333             results.emplace_back(value.observer_);
334         }
335     }
336     return results;
337 }
338 
339 template<class Key, class Observer>
GetObserversAndSetNotifiedOn(const Key & inputKey)340 std::vector<std::shared_ptr<Observer>> CallbacksManager<Key, Observer>::GetObserversAndSetNotifiedOn(
341     const Key &inputKey)
342 {
343     std::lock_guard<decltype(mutex_)> lck(mutex_);
344     auto it = callbacks_.find(inputKey);
345     if (it == callbacks_.end()) {
346         return std::vector<std::shared_ptr<Observer>>();
347     }
348     std::vector<std::shared_ptr<Observer>> results;
349     uint32_t num = 0;
350     std::vector<ObserverNode> &callbacks = it->second;
351     for (auto &value : callbacks) {
352         if ((value.enabled_ && value.observer_ != nullptr)) {
353             results.emplace_back(value.observer_);
354             // if get this enabled observer and notify, it's isNotifyOnEnabled flag should be reset to false
355             value.isNotifyOnEnabled_ = false;
356         } else {
357             num++;
358             value.isNotifyOnEnabled_ = true;
359         }
360     }
361     if (num > 0) {
362         LOG_INFO("total %{public}zu, not refreshed %{public}u", callbacks.size(), num);
363     }
364     return results;
365 }
366 
367 template<class Key, class Observer>
EnableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (std::map<Key,std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> enableServiceFunc)368 std::vector<OperationResult> CallbacksManager<Key, Observer>::EnableObservers(
369     const std::vector<Key> &keys, void *subscriber,
370     std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,
371     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> enableServiceFunc)
372 {
373     std::vector<OperationResult> result;
374     std::vector<Key> sendServiceKeys;
375     std::map<Key, std::vector<ObserverNodeOnEnabled>> refreshObservers;
376     {
377         std::lock_guard<decltype(mutex_)> lck(mutex_);
378         for (auto &key : keys) {
379             auto it = callbacks_.find(key);
380             if (it == callbacks_.end()) {
381                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
382                 continue;
383             }
384 
385             auto& allObservers = it->second;
386             auto iterator = std::find_if(allObservers.begin(), allObservers.end(), [&subscriber](ObserverNode node) {
387                 if (node.subscriber_ == subscriber) {
388                     return true;
389                 }
390                 return false;
391             });
392             if (iterator == allObservers.end()) {
393                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
394                 continue;
395             }
396             if (iterator->enabled_) {
397                 result.emplace_back(key, E_OK);
398                 continue;
399             }
400 
401             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
402             if (enabledObservers.empty()) {
403                 sendServiceKeys.emplace_back(key);
404             }
405             refreshObservers[key].emplace_back(iterator->observer_, iterator->isNotifyOnEnabled_);
406             iterator->enabled_ = true;
407         }
408     }
409     enableServiceFunc(sendServiceKeys, result);
410     enableLocalFunc(refreshObservers);
411 
412     return result;
413 }
414 
415 template<class Key, class Observer>
DisableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDisable)416 std::vector<OperationResult> CallbacksManager<Key, Observer>::DisableObservers(const std::vector<Key> &keys,
417     void *subscriber,
418     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDisable)
419 {
420     std::vector<OperationResult> result;
421     std::vector<Key> lastDisabledKeys;
422     {
423         std::lock_guard<decltype(mutex_)> lck(mutex_);
424         for (auto &key : keys) {
425             auto it = callbacks_.find(key);
426             if (it == callbacks_.end()) {
427                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
428                 continue;
429             }
430             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
431             if (enabledObservers.empty()) {
432                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
433                 continue;
434             }
435 
436             bool hasDisabled = false;
437             for (auto &item : callbacks_[key]) {
438                 if (item.subscriber_ == subscriber) {
439                     if (item.enabled_) {
440                         item.enabled_ = false;
441                         item.isNotifyOnEnabled_ = false;
442                     }
443                     hasDisabled = true;
444                 }
445             }
446             if (!hasDisabled) {
447                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
448                 continue;
449             }
450             enabledObservers = GetEnabledObservers(key);
451             if (!enabledObservers.empty()) {
452                 result.emplace_back(key, E_OK);
453                 continue;
454             }
455             lastDisabledKeys.emplace_back(key);
456         }
457     }
458     processOnLastDisable(lastDisabledKeys, result);
459     return result;
460 }
461 
462 template<class Key, class Observer>
GetAllSubscriberSize()463 int CallbacksManager<Key, Observer>::GetAllSubscriberSize()
464 {
465     int count = 0;
466     std::lock_guard<decltype(mutex_)> lck(mutex_);
467     for (auto &[key, value] : callbacks_) {
468         count += GetAllSubscriberSize(key);
469     }
470     return count;
471 }
472 
473 template<class Key, class Observer>
GetAllSubscriberSize(const Key & key)474 int CallbacksManager<Key, Observer>::GetAllSubscriberSize(const Key &key)
475 {
476     std::lock_guard<decltype(mutex_)> lck(mutex_);
477     auto it = callbacks_.find(key);
478     if (it == callbacks_.end()) {
479         return 0;
480     }
481     return it->second.size();
482 }
483 
484 template<class Key, class Observer>
SetObserversNotifiedOnEnabled(const Key & key)485 void CallbacksManager<Key, Observer>::SetObserversNotifiedOnEnabled(const Key &key)
486 {
487     std::lock_guard<decltype(mutex_)> lck(mutex_);
488     auto it = callbacks_.find(key);
489     if (it == callbacks_.end()) {
490         return;
491     }
492     std::vector<ObserverNode> &callbacks = it->second;
493     uint32_t num = 0;
494     for (auto &observerNode : callbacks) {
495         if (!observerNode.enabled_) {
496             num++;
497             observerNode.isNotifyOnEnabled_ = true;
498         }
499     }
500     if (num > 0) {
501         LOG_INFO("total %{public}zu, not refreshed %{public}u", callbacks.size(), num);
502     }
503 }
504 
505 template<class Key, class Observer>
IsObserversNotifiedOnEnabled(const Key & key,std::shared_ptr<Observer> & observer)506 bool CallbacksManager<Key, Observer>::IsObserversNotifiedOnEnabled(const Key &key, std::shared_ptr<Observer> &observer)
507 {
508     std::lock_guard<decltype(mutex_)> lck(mutex_);
509     auto it = callbacks_.find(key);
510     if (it == callbacks_.end()) {
511         return false;
512     }
513     std::vector<ObserverNode> &callbacks = it->second;
514     for (auto &node : callbacks) {
515         if (node.observer_ == observer) {
516             return node.isNotifyOnEnabled_;
517         }
518     }
519     return false;
520 }
521 } // namespace OHOS::DataShare
522 #endif // DATA_SHARE_CALLBACKS_MANAGER_H
523