1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef DATA_SHARE_CALLBACKS_MANAGER_H
17 #define DATA_SHARE_CALLBACKS_MANAGER_H
18 #include <map>
19 #include <mutex>
20 #include <vector>
21
22 #include "datashare_errno.h"
23 #include "datashare_template.h"
24
25 namespace OHOS::DataShare {
26 template<class Key, class Observer>
27 class CallbacksManager {
28 public:
29 struct ObserverNodeOnEnabled {
30 ObserverNodeOnEnabled(const std::shared_ptr<Observer> &observer, bool isNotifyOnEnabled = false)
observer_ObserverNodeOnEnabled31 : observer_(observer), isNotifyOnEnabled_(isNotifyOnEnabled) {};
32 std::shared_ptr<Observer> observer_;
33 bool isNotifyOnEnabled_;
34 };
35
36 std::vector<OperationResult> AddObservers(const std::vector<Key> &keys, void *subscriber,
37 const std::shared_ptr<Observer> observer,
38 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)>,
39 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
40 std::vector<OperationResult> &)>);
41
42 std::vector<OperationResult> DelObservers(const std::vector<Key> &keys, void *subscriber,
43 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
44 CallbacksManager::DefaultProcess);
45
46 std::vector<OperationResult> DelObservers(void *subscriber,
47 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
48 CallbacksManager::DefaultProcess);
49
50 std::vector<OperationResult> EnableObservers(const std::vector<Key> &keys, void *subscriber,
51 std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled,
52 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)>);
53
54 std::vector<OperationResult> DisableObservers(const std::vector<Key> &keys, void *subscriber,
55 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
56 CallbacksManager::DefaultProcess);
57
58 std::vector<std::shared_ptr<Observer>> GetEnabledObservers(const Key &);
59
60 int GetEnabledSubscriberSize();
61 int GetEnabledSubscriberSize(const Key &key);
62 void RecoverObservers(std::function<void(const std::vector<Key> &)> recoverObservers);
63
64 void SetObserversNotifiedOnEnabled(const Key &key);
65
66 private:
DefaultProcess(const std::vector<Key> &,std::vector<OperationResult> &)67 static void DefaultProcess(const std::vector<Key> &, std::vector<OperationResult> &){};
68 struct ObserverNode {
69 std::shared_ptr<Observer> observer_;
70 bool enabled_;
71 void *subscriber_;
72 bool isNotifyOnEnabled_;
ObserverNodeObserverNode73 ObserverNode(const std::shared_ptr<Observer> &observer, void *subscriber)
74 : observer_(observer), subscriber_(subscriber)
75 {
76 enabled_ = true;
77 isNotifyOnEnabled_ = false;
78 };
79 };
80 void DelLocalObservers(const Key &key, void *subscriber, std::vector<Key> &lastDelKeys,
81 std::vector<OperationResult> &result);
82 void DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result);
83 std::recursive_mutex mutex_{};
84 std::map<Key, std::vector<ObserverNode>> callbacks_;
85 };
86
87 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,void * subscriber,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer)> processOnLocalAdd,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)> processOnFirstAdd)88 std::vector<OperationResult> CallbacksManager<Key, Observer>::AddObservers(const std::vector<Key> &keys,
89 void *subscriber, const std::shared_ptr<Observer> observer,
90 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)> processOnLocalAdd,
91 std::function<void(const std::vector<Key> &,
92 const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &)> processOnFirstAdd)
93 {
94 std::vector<OperationResult> result;
95 std::vector<Key> firstRegisterKey;
96 std::vector<Key> localRegisterKey;
97 {
98 std::lock_guard<decltype(mutex_)> lck(mutex_);
99 for (auto &key : keys) {
100 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
101 if (enabledObservers.empty()) {
102 callbacks_[key].emplace_back(observer, subscriber);
103 firstRegisterKey.emplace_back(key);
104 continue;
105 }
106 localRegisterKey.emplace_back(key);
107 callbacks_[key].emplace_back(observer, subscriber);
108 result.emplace_back(key, E_OK);
109 }
110 }
111 if (!localRegisterKey.empty()) {
112 processOnLocalAdd(localRegisterKey, observer);
113 }
114 processOnFirstAdd(firstRegisterKey, observer, result);
115 return result;
116 }
117
118 template<class Key, class Observer>
RecoverObservers(std::function<void (const std::vector<Key> &)> recoverObservers)119 void CallbacksManager<Key, Observer>::RecoverObservers(std::function<void(const std::vector<Key> &)> recoverObservers)
120 {
121 std::vector<Key> keys;
122 {
123 std::lock_guard<decltype(mutex_)> lck(mutex_);
124 for (auto &it : callbacks_) {
125 if (GetEnabledSubscriberSize(it.first) > 0) {
126 keys.emplace_back(it.first);
127 }
128 }
129 }
130 recoverObservers(keys);
131 }
132
133
134 template<class Key, class Observer>
DelLocalObservers(void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)135 void CallbacksManager<Key, Observer>::DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys,
136 std::vector<OperationResult> &result)
137 {
138 for (auto &it : callbacks_) {
139 DelLocalObservers(it.first, subscriber, lastDelKeys, result);
140 }
141 }
142
143 template<class Key, class Observer>
DelLocalObservers(const Key & key,void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)144 void CallbacksManager<Key, Observer>::DelLocalObservers(const Key &key, void *subscriber,
145 std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result)
146 {
147 auto it = callbacks_.find(key);
148 if (it == callbacks_.end()) {
149 result.emplace_back(key, E_UNREGISTERED_EMPTY);
150 return;
151 }
152 std::vector<ObserverNode> &callbacks = it->second;
153 auto callbackIt = callbacks.begin();
154 while (callbackIt != callbacks.end()) {
155 if (callbackIt->subscriber_ != subscriber) {
156 callbackIt++;
157 continue;
158 }
159 callbackIt = callbacks.erase(callbackIt);
160 }
161 if (!it->second.empty()) {
162 result.emplace_back(key, E_OK);
163 return;
164 }
165 lastDelKeys.emplace_back(key);
166 }
167
168 template<class Key, class Observer>
DelObservers(void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)169 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(void *subscriber,
170 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
171 {
172 std::vector<OperationResult> result;
173 std::vector<Key> lastDelKeys;
174 {
175 std::lock_guard<decltype(mutex_)> lck(mutex_);
176 DelLocalObservers(subscriber, lastDelKeys, result);
177 if (lastDelKeys.empty()) {
178 return result;
179 }
180 for (auto &key : lastDelKeys) {
181 callbacks_.erase(key);
182 }
183 }
184 processOnLastDel(lastDelKeys, result);
185 return result;
186 }
187
188 template<class Key, class Observer>
DelObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)189 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(const std::vector<Key> &keys,
190 void *subscriber, std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
191 {
192 std::vector<OperationResult> result;
193 std::vector<Key> lastDelKeys;
194 {
195 std::lock_guard<decltype(mutex_)> lck(mutex_);
196 for (auto &key : keys) {
197 DelLocalObservers(key, subscriber, lastDelKeys, result);
198 }
199 if (lastDelKeys.empty()) {
200 return result;
201 }
202 for (auto &key : lastDelKeys) {
203 callbacks_.erase(key);
204 }
205 }
206 processOnLastDel(lastDelKeys, result);
207 return result;
208 }
209
210 template<class Key, class Observer>
GetEnabledObservers(const Key & inputKey)211 std::vector<std::shared_ptr<Observer>> CallbacksManager<Key, Observer>::GetEnabledObservers(const Key &inputKey)
212 {
213 std::lock_guard<decltype(mutex_)> lck(mutex_);
214 auto it = callbacks_.find(inputKey);
215 if (it == callbacks_.end()) {
216 return std::vector<std::shared_ptr<Observer>>();
217 }
218 std::vector<std::shared_ptr<Observer>> results;
219 for (const auto &value : it->second) {
220 if (value.enabled_ && value.observer_ != nullptr) {
221 results.emplace_back(value.observer_);
222 }
223 }
224 return results;
225 }
226
227 template<class Key, class Observer>
EnableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (std::map<Key,std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnFirstEnabled)228 std::vector<OperationResult> CallbacksManager<Key, Observer>::EnableObservers(
229 const std::vector<Key> &keys, void *subscriber,
230 std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled,
231 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnFirstEnabled)
232 {
233 std::vector<OperationResult> result;
234 std::vector<Key> firstRegisterKey;
235 std::map<Key, std::vector<ObserverNodeOnEnabled>> localEnabledObservers;
236 {
237 std::lock_guard<decltype(mutex_)> lck(mutex_);
238 for (auto &key : keys) {
239 auto it = callbacks_.find(key);
240 if (it == callbacks_.end()) {
241 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
242 continue;
243 }
244 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
245 bool hasEnabled = false;
246 for (auto &item : callbacks_[key]) {
247 if (item.subscriber_ != subscriber) {
248 continue;
249 }
250 hasEnabled = true;
251 if (item.enabled_) {
252 continue;
253 }
254 localEnabledObservers[key].emplace_back(item.observer_, item.isNotifyOnEnabled_);
255 item.enabled_ = true;
256 }
257 if (!hasEnabled) {
258 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
259 continue;
260 }
261 if (!enabledObservers.empty()) {
262 result.emplace_back(key, E_OK);
263 continue;
264 }
265 localEnabledObservers.erase(key);
266 firstRegisterKey.emplace_back(key);
267 }
268 }
269 if (!localEnabledObservers.empty()) {
270 processOnLocalEnabled(localEnabledObservers);
271 }
272 processOnFirstEnabled(firstRegisterKey, result);
273 return result;
274 }
275
276 template<class Key, class Observer>
DisableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDisable)277 std::vector<OperationResult> CallbacksManager<Key, Observer>::DisableObservers(const std::vector<Key> &keys,
278 void *subscriber,
279 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDisable)
280 {
281 std::vector<OperationResult> result;
282 std::vector<Key> lastDisabledKeys;
283 {
284 std::lock_guard<decltype(mutex_)> lck(mutex_);
285 for (auto &key : keys) {
286 auto it = callbacks_.find(key);
287 if (it == callbacks_.end()) {
288 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
289 continue;
290 }
291 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
292 if (enabledObservers.empty()) {
293 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
294 continue;
295 }
296
297 bool hasDisabled = false;
298 for (auto &item : callbacks_[key]) {
299 if (item.subscriber_ == subscriber) {
300 item.enabled_ = false;
301 item.isNotifyOnEnabled_ = false;
302 hasDisabled = true;
303 }
304 }
305 if (!hasDisabled) {
306 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
307 continue;
308 }
309 enabledObservers = GetEnabledObservers(key);
310 if (!enabledObservers.empty()) {
311 result.emplace_back(key, E_OK);
312 continue;
313 }
314 lastDisabledKeys.emplace_back(key);
315 }
316 }
317 processOnLastDisable(lastDisabledKeys, result);
318 return result;
319 }
320
321 template<class Key, class Observer>
GetEnabledSubscriberSize()322 int CallbacksManager<Key, Observer>::GetEnabledSubscriberSize()
323 {
324 int count = 0;
325 std::lock_guard<decltype(mutex_)> lck(mutex_);
326 for (auto &[key, value] : callbacks_) {
327 count += GetEnabledSubscriberSize(key);
328 }
329 return count;
330 }
331
332 template<class Key, class Observer>
GetEnabledSubscriberSize(const Key & key)333 int CallbacksManager<Key, Observer>::GetEnabledSubscriberSize(const Key &key)
334 {
335 std::lock_guard<decltype(mutex_)> lck(mutex_);
336 int count = 0;
337 auto it = callbacks_.find(key);
338 if (it == callbacks_.end()) {
339 return count;
340 }
341 std::vector<ObserverNode> &callbacks = it->second;
342 for (const auto &callback : callbacks) {
343 if (callback.enabled_) {
344 count++;
345 }
346 }
347 return count;
348 }
349
350 template<class Key, class Observer>
SetObserversNotifiedOnEnabled(const Key & key)351 void CallbacksManager<Key, Observer>::SetObserversNotifiedOnEnabled(const Key &key)
352 {
353 std::lock_guard<decltype(mutex_)> lck(mutex_);
354 auto it = callbacks_.find(key);
355 if (it == callbacks_.end()) {
356 return;
357 }
358 std::vector<ObserverNode> &callbacks = it->second;
359 for (auto &observerNode : callbacks) {
360 if (!observerNode.enabled_) {
361 observerNode.isNotifyOnEnabled_ = true;
362 }
363 }
364 }
365 } // namespace OHOS::DataShare
366 #endif // DATA_SHARE_CALLBACKS_MANAGER_H
367