• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef DATA_SHARE_CALLBACKS_MANAGER_H
17 #define DATA_SHARE_CALLBACKS_MANAGER_H
18 #include <map>
19 #include <mutex>
20 #include <vector>
21 
22 #include "datashare_errno.h"
23 #include "datashare_template.h"
24 
25 namespace OHOS::DataShare {
26 template<class Key, class Observer>
27 class CallbacksManager {
28 public:
29     struct ObserverNodeOnEnabled {
30         ObserverNodeOnEnabled(const std::shared_ptr<Observer> &observer, bool isNotifyOnEnabled = false)
observer_ObserverNodeOnEnabled31             : observer_(observer), isNotifyOnEnabled_(isNotifyOnEnabled) {};
32         std::shared_ptr<Observer> observer_;
33         bool isNotifyOnEnabled_;
34     };
35 
36     std::vector<OperationResult> AddObservers(const std::vector<Key> &keys, void *subscriber,
37         const std::shared_ptr<Observer> observer,
38         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)>,
39         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
40             std::vector<OperationResult> &)>);
41 
42     std::vector<OperationResult> DelObservers(const std::vector<Key> &keys, void *subscriber,
43         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
44             CallbacksManager::DefaultProcess);
45 
46     std::vector<OperationResult> DelObservers(void *subscriber,
47         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
48             CallbacksManager::DefaultProcess);
49 
50     std::vector<OperationResult> EnableObservers(const std::vector<Key> &keys, void *subscriber,
51         std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled,
52         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)>);
53 
54     std::vector<OperationResult> DisableObservers(const std::vector<Key> &keys, void *subscriber,
55         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
56             CallbacksManager::DefaultProcess);
57 
58     std::vector<std::shared_ptr<Observer>> GetEnabledObservers(const Key &);
59 
60     int GetEnabledSubscriberSize();
61     int GetEnabledSubscriberSize(const Key &key);
62     void RecoverObservers(std::function<void(const std::vector<Key> &)> recoverObservers);
63 
64     void SetObserversNotifiedOnEnabled(const Key &key);
65 
66 private:
DefaultProcess(const std::vector<Key> &,std::vector<OperationResult> &)67     static void DefaultProcess(const std::vector<Key> &, std::vector<OperationResult> &){};
68     struct ObserverNode {
69         std::shared_ptr<Observer> observer_;
70         bool enabled_;
71         void *subscriber_;
72         bool isNotifyOnEnabled_;
ObserverNodeObserverNode73         ObserverNode(const std::shared_ptr<Observer> &observer, void *subscriber)
74             : observer_(observer), subscriber_(subscriber)
75         {
76             enabled_ = true;
77             isNotifyOnEnabled_ = false;
78         };
79     };
80     void DelLocalObservers(const Key &key, void *subscriber, std::vector<Key> &lastDelKeys,
81         std::vector<OperationResult> &result);
82     void DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result);
83     std::recursive_mutex mutex_{};
84     std::map<Key, std::vector<ObserverNode>> callbacks_;
85 };
86 
87 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,void * subscriber,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer)> processOnLocalAdd,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)> processOnFirstAdd)88 std::vector<OperationResult> CallbacksManager<Key, Observer>::AddObservers(const std::vector<Key> &keys,
89     void *subscriber, const std::shared_ptr<Observer> observer,
90     std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)> processOnLocalAdd,
91     std::function<void(const std::vector<Key> &,
92         const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &)> processOnFirstAdd)
93 {
94     std::vector<OperationResult> result;
95     std::vector<Key> firstRegisterKey;
96     std::vector<Key> localRegisterKey;
97     {
98         std::lock_guard<decltype(mutex_)> lck(mutex_);
99         for (auto &key : keys) {
100             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
101             if (enabledObservers.empty()) {
102                 callbacks_[key].emplace_back(observer, subscriber);
103                 firstRegisterKey.emplace_back(key);
104                 continue;
105             }
106             localRegisterKey.emplace_back(key);
107             callbacks_[key].emplace_back(observer, subscriber);
108             result.emplace_back(key, E_OK);
109         }
110     }
111     if (!localRegisterKey.empty()) {
112         processOnLocalAdd(localRegisterKey, observer);
113     }
114     processOnFirstAdd(firstRegisterKey, observer, result);
115     return result;
116 }
117 
118 template<class Key, class Observer>
RecoverObservers(std::function<void (const std::vector<Key> &)> recoverObservers)119 void CallbacksManager<Key, Observer>::RecoverObservers(std::function<void(const std::vector<Key> &)> recoverObservers)
120 {
121     std::vector<Key> keys;
122     {
123         std::lock_guard<decltype(mutex_)> lck(mutex_);
124         for (auto &it : callbacks_) {
125             if (GetEnabledSubscriberSize(it.first) > 0) {
126                 keys.emplace_back(it.first);
127             }
128         }
129     }
130     recoverObservers(keys);
131 }
132 
133 
134 template<class Key, class Observer>
DelLocalObservers(void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)135 void CallbacksManager<Key, Observer>::DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys,
136     std::vector<OperationResult> &result)
137 {
138     for (auto &it : callbacks_) {
139         DelLocalObservers(it.first, subscriber, lastDelKeys, result);
140     }
141 }
142 
143 template<class Key, class Observer>
DelLocalObservers(const Key & key,void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)144 void CallbacksManager<Key, Observer>::DelLocalObservers(const Key &key, void *subscriber,
145     std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result)
146 {
147     auto it = callbacks_.find(key);
148     if (it == callbacks_.end()) {
149         result.emplace_back(key, E_UNREGISTERED_EMPTY);
150         return;
151     }
152     std::vector<ObserverNode> &callbacks = it->second;
153     auto callbackIt = callbacks.begin();
154     while (callbackIt != callbacks.end()) {
155         if (callbackIt->subscriber_ != subscriber) {
156             callbackIt++;
157             continue;
158         }
159         callbackIt = callbacks.erase(callbackIt);
160     }
161     if (!it->second.empty()) {
162         result.emplace_back(key, E_OK);
163         return;
164     }
165     lastDelKeys.emplace_back(key);
166 }
167 
168 template<class Key, class Observer>
DelObservers(void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)169 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(void *subscriber,
170     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
171 {
172     std::vector<OperationResult> result;
173     std::vector<Key> lastDelKeys;
174     {
175         std::lock_guard<decltype(mutex_)> lck(mutex_);
176         DelLocalObservers(subscriber, lastDelKeys, result);
177         if (lastDelKeys.empty()) {
178             return result;
179         }
180         for (auto &key : lastDelKeys) {
181             callbacks_.erase(key);
182         }
183     }
184     processOnLastDel(lastDelKeys, result);
185     return result;
186 }
187 
188 template<class Key, class Observer>
DelObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)189 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(const std::vector<Key> &keys,
190     void *subscriber, std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
191 {
192     std::vector<OperationResult> result;
193     std::vector<Key> lastDelKeys;
194     {
195         std::lock_guard<decltype(mutex_)> lck(mutex_);
196         for (auto &key : keys) {
197             DelLocalObservers(key, subscriber, lastDelKeys, result);
198         }
199         if (lastDelKeys.empty()) {
200             return result;
201         }
202         for (auto &key : lastDelKeys) {
203             callbacks_.erase(key);
204         }
205     }
206     processOnLastDel(lastDelKeys, result);
207     return result;
208 }
209 
210 template<class Key, class Observer>
GetEnabledObservers(const Key & inputKey)211 std::vector<std::shared_ptr<Observer>> CallbacksManager<Key, Observer>::GetEnabledObservers(const Key &inputKey)
212 {
213     std::lock_guard<decltype(mutex_)> lck(mutex_);
214     auto it = callbacks_.find(inputKey);
215     if (it == callbacks_.end()) {
216         return std::vector<std::shared_ptr<Observer>>();
217     }
218     std::vector<std::shared_ptr<Observer>> results;
219     for (const auto &value : it->second) {
220         if (value.enabled_ && value.observer_ != nullptr) {
221             results.emplace_back(value.observer_);
222         }
223     }
224     return results;
225 }
226 
227 template<class Key, class Observer>
EnableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (std::map<Key,std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnFirstEnabled)228 std::vector<OperationResult> CallbacksManager<Key, Observer>::EnableObservers(
229     const std::vector<Key> &keys, void *subscriber,
230     std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled,
231     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnFirstEnabled)
232 {
233     std::vector<OperationResult> result;
234     std::vector<Key> firstRegisterKey;
235     std::map<Key, std::vector<ObserverNodeOnEnabled>> localEnabledObservers;
236     {
237         std::lock_guard<decltype(mutex_)> lck(mutex_);
238         for (auto &key : keys) {
239             auto it = callbacks_.find(key);
240             if (it == callbacks_.end()) {
241                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
242                 continue;
243             }
244             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
245             bool hasEnabled = false;
246             for (auto &item : callbacks_[key]) {
247                 if (item.subscriber_ != subscriber) {
248                     continue;
249                 }
250                 hasEnabled = true;
251                 if (item.enabled_) {
252                     continue;
253                 }
254                 localEnabledObservers[key].emplace_back(item.observer_, item.isNotifyOnEnabled_);
255                 item.enabled_ = true;
256             }
257             if (!hasEnabled) {
258                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
259                 continue;
260             }
261             if (!enabledObservers.empty()) {
262                 result.emplace_back(key, E_OK);
263                 continue;
264             }
265             localEnabledObservers.erase(key);
266             firstRegisterKey.emplace_back(key);
267         }
268     }
269     if (!localEnabledObservers.empty()) {
270         processOnLocalEnabled(localEnabledObservers);
271     }
272     processOnFirstEnabled(firstRegisterKey, result);
273     return result;
274 }
275 
276 template<class Key, class Observer>
DisableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDisable)277 std::vector<OperationResult> CallbacksManager<Key, Observer>::DisableObservers(const std::vector<Key> &keys,
278     void *subscriber,
279     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDisable)
280 {
281     std::vector<OperationResult> result;
282     std::vector<Key> lastDisabledKeys;
283     {
284         std::lock_guard<decltype(mutex_)> lck(mutex_);
285         for (auto &key : keys) {
286             auto it = callbacks_.find(key);
287             if (it == callbacks_.end()) {
288                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
289                 continue;
290             }
291             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
292             if (enabledObservers.empty()) {
293                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
294                 continue;
295             }
296 
297             bool hasDisabled = false;
298             for (auto &item : callbacks_[key]) {
299                 if (item.subscriber_ == subscriber) {
300                     item.enabled_ = false;
301                     item.isNotifyOnEnabled_ = false;
302                     hasDisabled = true;
303                 }
304             }
305             if (!hasDisabled) {
306                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
307                 continue;
308             }
309             enabledObservers = GetEnabledObservers(key);
310             if (!enabledObservers.empty()) {
311                 result.emplace_back(key, E_OK);
312                 continue;
313             }
314             lastDisabledKeys.emplace_back(key);
315         }
316     }
317     processOnLastDisable(lastDisabledKeys, result);
318     return result;
319 }
320 
321 template<class Key, class Observer>
GetEnabledSubscriberSize()322 int CallbacksManager<Key, Observer>::GetEnabledSubscriberSize()
323 {
324     int count = 0;
325     std::lock_guard<decltype(mutex_)> lck(mutex_);
326     for (auto &[key, value] : callbacks_) {
327         count += GetEnabledSubscriberSize(key);
328     }
329     return count;
330 }
331 
332 template<class Key, class Observer>
GetEnabledSubscriberSize(const Key & key)333 int CallbacksManager<Key, Observer>::GetEnabledSubscriberSize(const Key &key)
334 {
335     std::lock_guard<decltype(mutex_)> lck(mutex_);
336     int count = 0;
337     auto it = callbacks_.find(key);
338     if (it == callbacks_.end()) {
339         return count;
340     }
341     std::vector<ObserverNode> &callbacks = it->second;
342     for (const auto &callback : callbacks) {
343         if (callback.enabled_) {
344             count++;
345         }
346     }
347     return count;
348 }
349 
350 template<class Key, class Observer>
SetObserversNotifiedOnEnabled(const Key & key)351 void CallbacksManager<Key, Observer>::SetObserversNotifiedOnEnabled(const Key &key)
352 {
353     std::lock_guard<decltype(mutex_)> lck(mutex_);
354     auto it = callbacks_.find(key);
355     if (it == callbacks_.end()) {
356         return;
357     }
358     std::vector<ObserverNode> &callbacks = it->second;
359     for (auto &observerNode : callbacks) {
360         if (!observerNode.enabled_) {
361             observerNode.isNotifyOnEnabled_ = true;
362         }
363     }
364 }
365 } // namespace OHOS::DataShare
366 #endif // DATA_SHARE_CALLBACKS_MANAGER_H
367