• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef DATA_SHARE_CALLBACKS_MANAGER_H
17 #define DATA_SHARE_CALLBACKS_MANAGER_H
18 #include <map>
19 #include <mutex>
20 #include <vector>
21 
22 #include "datashare_errno.h"
23 #include "datashare_template.h"
24 
25 namespace OHOS::DataShare {
26 template<class Key, class Observer>
27 class CallbacksManager {
28 public:
29     struct ObserverNodeOnEnabled {
30         ObserverNodeOnEnabled(const std::shared_ptr<Observer> &observer, bool isNotifyOnEnabled = false)
observer_ObserverNodeOnEnabled31             : observer_(observer), isNotifyOnEnabled_(isNotifyOnEnabled) {};
32         std::shared_ptr<Observer> observer_;
33         bool isNotifyOnEnabled_;
34     };
35 
36     std::vector<OperationResult> AddObservers(const std::vector<Key> &keys, void *subscriber,
37         const std::shared_ptr<Observer> observer,
38         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)>,
39         std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
40             std::vector<OperationResult> &)>);
41 
42     std::vector<OperationResult> DelObservers(const std::vector<Key> &keys, void *subscriber,
43         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
44             CallbacksManager::DefaultProcess);
45 
46     std::vector<OperationResult> DelObservers(void *subscriber,
47         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
48             CallbacksManager::DefaultProcess);
49 
50     std::vector<OperationResult> EnableObservers(const std::vector<Key> &keys, void *subscriber,
51         std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled,
52         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)>);
53 
54     std::vector<OperationResult> DisableObservers(const std::vector<Key> &keys, void *subscriber,
55         std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
56             CallbacksManager::DefaultProcess);
57 
58     std::vector<std::shared_ptr<Observer>> GetEnabledObservers(const Key &);
59 
60     int GetEnabledSubscriberSize();
61     int GetEnabledSubscriberSize(const Key &key);
62     std::vector<Key> GetKeys();
63     void SetObserversNotifiedOnEnabled(const Key &key);
64 
65 private:
DefaultProcess(const std::vector<Key> &,std::vector<OperationResult> &)66     static void DefaultProcess(const std::vector<Key> &, std::vector<OperationResult> &){};
67     struct ObserverNode {
68         std::shared_ptr<Observer> observer_;
69         bool enabled_;
70         void *subscriber_;
71         bool isNotifyOnEnabled_;
ObserverNodeObserverNode72         ObserverNode(const std::shared_ptr<Observer> &observer, void *subscriber)
73             : observer_(observer), subscriber_(subscriber)
74         {
75             enabled_ = true;
76             isNotifyOnEnabled_ = false;
77         };
78     };
79     void DelLocalObservers(const Key &key, void *subscriber, std::vector<Key> &lastDelKeys,
80         std::vector<OperationResult> &result);
81     void DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result);
82     std::recursive_mutex mutex_{};
83     std::map<Key, std::vector<ObserverNode>> callbacks_;
84 };
85 
86 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,void * subscriber,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer)> processOnLocalAdd,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)> processOnFirstAdd)87 std::vector<OperationResult> CallbacksManager<Key, Observer>::AddObservers(const std::vector<Key> &keys,
88     void *subscriber, const std::shared_ptr<Observer> observer,
89     std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)> processOnLocalAdd,
90     std::function<void(const std::vector<Key> &,
91         const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &)> processOnFirstAdd)
92 {
93     std::vector<OperationResult> result;
94     std::vector<Key> firstRegisterKey;
95     std::vector<Key> localRegisterKey;
96     {
97         std::lock_guard<decltype(mutex_)> lck(mutex_);
98         for (auto &key : keys) {
99             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
100             if (enabledObservers.empty()) {
101                 callbacks_[key].emplace_back(observer, subscriber);
102                 firstRegisterKey.emplace_back(key);
103                 continue;
104             }
105             localRegisterKey.emplace_back(key);
106             callbacks_[key].emplace_back(observer, subscriber);
107             result.emplace_back(key, E_OK);
108         }
109     }
110     if (!localRegisterKey.empty()) {
111         processOnLocalAdd(localRegisterKey, observer);
112     }
113     processOnFirstAdd(firstRegisterKey, observer, result);
114     return result;
115 }
116 
117 template<class Key, class Observer>
GetKeys()118 std::vector<Key> CallbacksManager<Key, Observer>::GetKeys()
119 {
120     std::vector<Key> keys;
121     {
122         std::lock_guard<decltype(mutex_)> lck(mutex_);
123         for (auto &it : callbacks_) {
124             keys.emplace_back(it.first);
125         }
126     }
127     return keys;
128 }
129 
130 template<class Key, class Observer>
DelLocalObservers(void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)131 void CallbacksManager<Key, Observer>::DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys,
132     std::vector<OperationResult> &result)
133 {
134     for (auto &it : callbacks_) {
135         DelLocalObservers(it.first, subscriber, lastDelKeys, result);
136     }
137 }
138 
139 template<class Key, class Observer>
DelLocalObservers(const Key & key,void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)140 void CallbacksManager<Key, Observer>::DelLocalObservers(const Key &key, void *subscriber,
141     std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result)
142 {
143     auto it = callbacks_.find(key);
144     if (it == callbacks_.end()) {
145         result.emplace_back(key, E_UNREGISTERED_EMPTY);
146         return;
147     }
148     std::vector<ObserverNode> &callbacks = it->second;
149     auto callbackIt = callbacks.begin();
150     while (callbackIt != callbacks.end()) {
151         if (callbackIt->subscriber_ != subscriber) {
152             callbackIt++;
153             continue;
154         }
155         callbackIt = callbacks.erase(callbackIt);
156     }
157     if (!it->second.empty()) {
158         result.emplace_back(key, E_OK);
159         return;
160     }
161     lastDelKeys.emplace_back(key);
162 }
163 
164 template<class Key, class Observer>
DelObservers(void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)165 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(void *subscriber,
166     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
167 {
168     std::vector<OperationResult> result;
169     std::vector<Key> lastDelKeys;
170     {
171         std::lock_guard<decltype(mutex_)> lck(mutex_);
172         DelLocalObservers(subscriber, lastDelKeys, result);
173         if (lastDelKeys.empty()) {
174             return result;
175         }
176         for (auto &key : lastDelKeys) {
177             callbacks_.erase(key);
178         }
179     }
180     processOnLastDel(lastDelKeys, result);
181     return result;
182 }
183 
184 template<class Key, class Observer>
DelObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)185 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(const std::vector<Key> &keys,
186     void *subscriber, std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
187 {
188     std::vector<OperationResult> result;
189     std::vector<Key> lastDelKeys;
190     {
191         std::lock_guard<decltype(mutex_)> lck(mutex_);
192         for (auto &key : keys) {
193             DelLocalObservers(key, subscriber, lastDelKeys, result);
194         }
195         if (lastDelKeys.empty()) {
196             return result;
197         }
198         for (auto &key : lastDelKeys) {
199             callbacks_.erase(key);
200         }
201     }
202     processOnLastDel(lastDelKeys, result);
203     return result;
204 }
205 
206 template<class Key, class Observer>
GetEnabledObservers(const Key & inputKey)207 std::vector<std::shared_ptr<Observer>> CallbacksManager<Key, Observer>::GetEnabledObservers(const Key &inputKey)
208 {
209     std::lock_guard<decltype(mutex_)> lck(mutex_);
210     auto it = callbacks_.find(inputKey);
211     if (it == callbacks_.end()) {
212         return std::vector<std::shared_ptr<Observer>>();
213     }
214     std::vector<std::shared_ptr<Observer>> results;
215     for (const auto &value : it->second) {
216         if (value.enabled_ && value.observer_ != nullptr) {
217             results.emplace_back(value.observer_);
218         }
219     }
220     return results;
221 }
222 
223 template<class Key, class Observer>
EnableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (std::map<Key,std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> enableServiceFunc)224 std::vector<OperationResult> CallbacksManager<Key, Observer>::EnableObservers(
225     const std::vector<Key> &keys, void *subscriber,
226     std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,
227     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> enableServiceFunc)
228 {
229     std::vector<OperationResult> result;
230     std::vector<Key> sendServiceKeys;
231     std::map<Key, std::vector<ObserverNodeOnEnabled>> refreshObservers;
232     {
233         std::lock_guard<decltype(mutex_)> lck(mutex_);
234         for (auto &key : keys) {
235             auto it = callbacks_.find(key);
236             if (it == callbacks_.end()) {
237                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
238                 continue;
239             }
240 
241             auto& allObservers = it->second;
242             auto iterator = std::find_if(allObservers.begin(), allObservers.end(), [&subscriber](ObserverNode node) {
243                 if (node.subscriber_ == subscriber) {
244                     return true;
245                 }
246                 return false;
247             });
248             if (iterator == allObservers.end()) {
249                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
250                 continue;
251             }
252             if (iterator->enabled_) {
253                 result.emplace_back(key, E_OK);
254                 continue;
255             }
256 
257             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
258             if (enabledObservers.empty()) {
259                 sendServiceKeys.emplace_back(key);
260             }
261             refreshObservers[key].emplace_back(iterator->observer_, iterator->isNotifyOnEnabled_);
262             iterator->enabled_ = true;
263         }
264     }
265     enableServiceFunc(sendServiceKeys, result);
266     enableLocalFunc(refreshObservers);
267 
268     return result;
269 }
270 
271 template<class Key, class Observer>
DisableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDisable)272 std::vector<OperationResult> CallbacksManager<Key, Observer>::DisableObservers(const std::vector<Key> &keys,
273     void *subscriber,
274     std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDisable)
275 {
276     std::vector<OperationResult> result;
277     std::vector<Key> lastDisabledKeys;
278     {
279         std::lock_guard<decltype(mutex_)> lck(mutex_);
280         for (auto &key : keys) {
281             auto it = callbacks_.find(key);
282             if (it == callbacks_.end()) {
283                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
284                 continue;
285             }
286             std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
287             if (enabledObservers.empty()) {
288                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
289                 continue;
290             }
291 
292             bool hasDisabled = false;
293             for (auto &item : callbacks_[key]) {
294                 if (item.subscriber_ == subscriber) {
295                     item.enabled_ = false;
296                     item.isNotifyOnEnabled_ = false;
297                     hasDisabled = true;
298                 }
299             }
300             if (!hasDisabled) {
301                 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
302                 continue;
303             }
304             enabledObservers = GetEnabledObservers(key);
305             if (!enabledObservers.empty()) {
306                 result.emplace_back(key, E_OK);
307                 continue;
308             }
309             lastDisabledKeys.emplace_back(key);
310         }
311     }
312     processOnLastDisable(lastDisabledKeys, result);
313     return result;
314 }
315 
316 template<class Key, class Observer>
GetEnabledSubscriberSize()317 int CallbacksManager<Key, Observer>::GetEnabledSubscriberSize()
318 {
319     int count = 0;
320     std::lock_guard<decltype(mutex_)> lck(mutex_);
321     for (auto &[key, value] : callbacks_) {
322         count += GetEnabledSubscriberSize(key);
323     }
324     return count;
325 }
326 
327 template<class Key, class Observer>
GetEnabledSubscriberSize(const Key & key)328 int CallbacksManager<Key, Observer>::GetEnabledSubscriberSize(const Key &key)
329 {
330     std::lock_guard<decltype(mutex_)> lck(mutex_);
331     int count = 0;
332     auto it = callbacks_.find(key);
333     if (it == callbacks_.end()) {
334         return count;
335     }
336     std::vector<ObserverNode> &callbacks = it->second;
337     for (const auto &callback : callbacks) {
338         if (callback.enabled_) {
339             count++;
340         }
341     }
342     return count;
343 }
344 
345 template<class Key, class Observer>
SetObserversNotifiedOnEnabled(const Key & key)346 void CallbacksManager<Key, Observer>::SetObserversNotifiedOnEnabled(const Key &key)
347 {
348     std::lock_guard<decltype(mutex_)> lck(mutex_);
349     auto it = callbacks_.find(key);
350     if (it == callbacks_.end()) {
351         return;
352     }
353     std::vector<ObserverNode> &callbacks = it->second;
354     for (auto &observerNode : callbacks) {
355         if (!observerNode.enabled_) {
356             observerNode.isNotifyOnEnabled_ = true;
357         }
358     }
359 }
360 } // namespace OHOS::DataShare
361 #endif // DATA_SHARE_CALLBACKS_MANAGER_H
362