1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef DATA_SHARE_CALLBACKS_MANAGER_H
17 #define DATA_SHARE_CALLBACKS_MANAGER_H
18 #include <map>
19 #include <mutex>
20 #include <vector>
21
22 #include "datashare_errno.h"
23 #include "datashare_template.h"
24
25 namespace OHOS::DataShare {
26 template<class Key, class Observer>
27 class CallbacksManager {
28 public:
29 struct ObserverNodeOnEnabled {
30 ObserverNodeOnEnabled(const std::shared_ptr<Observer> &observer, bool isNotifyOnEnabled = false)
observer_ObserverNodeOnEnabled31 : observer_(observer), isNotifyOnEnabled_(isNotifyOnEnabled) {};
32 std::shared_ptr<Observer> observer_;
33 bool isNotifyOnEnabled_;
34 };
35
36 std::vector<OperationResult> AddObservers(const std::vector<Key> &keys, void *subscriber,
37 const std::shared_ptr<Observer> observer,
38 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)>,
39 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer,
40 std::vector<OperationResult> &)>);
41
42 std::vector<OperationResult> DelObservers(const std::vector<Key> &keys, void *subscriber,
43 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
44 CallbacksManager::DefaultProcess);
45
46 std::vector<OperationResult> DelObservers(void *subscriber,
47 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
48 CallbacksManager::DefaultProcess);
49
50 std::vector<OperationResult> EnableObservers(const std::vector<Key> &keys, void *subscriber,
51 std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> processOnLocalEnabled,
52 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)>);
53
54 std::vector<OperationResult> DisableObservers(const std::vector<Key> &keys, void *subscriber,
55 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel =
56 CallbacksManager::DefaultProcess);
57
58 std::vector<std::shared_ptr<Observer>> GetEnabledObservers(const Key &);
59
60 int GetEnabledSubscriberSize();
61 int GetEnabledSubscriberSize(const Key &key);
62 std::vector<Key> GetKeys();
63 void SetObserversNotifiedOnEnabled(const Key &key);
64
65 private:
DefaultProcess(const std::vector<Key> &,std::vector<OperationResult> &)66 static void DefaultProcess(const std::vector<Key> &, std::vector<OperationResult> &){};
67 struct ObserverNode {
68 std::shared_ptr<Observer> observer_;
69 bool enabled_;
70 void *subscriber_;
71 bool isNotifyOnEnabled_;
ObserverNodeObserverNode72 ObserverNode(const std::shared_ptr<Observer> &observer, void *subscriber)
73 : observer_(observer), subscriber_(subscriber)
74 {
75 enabled_ = true;
76 isNotifyOnEnabled_ = false;
77 };
78 };
79 void DelLocalObservers(const Key &key, void *subscriber, std::vector<Key> &lastDelKeys,
80 std::vector<OperationResult> &result);
81 void DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result);
82 std::recursive_mutex mutex_{};
83 std::map<Key, std::vector<ObserverNode>> callbacks_;
84 };
85
86 template<class Key, class Observer>
AddObservers(const std::vector<Key> & keys,void * subscriber,const std::shared_ptr<Observer> observer,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer)> processOnLocalAdd,std::function<void (const std::vector<Key> &,const std::shared_ptr<Observer> & observer,std::vector<OperationResult> &)> processOnFirstAdd)87 std::vector<OperationResult> CallbacksManager<Key, Observer>::AddObservers(const std::vector<Key> &keys,
88 void *subscriber, const std::shared_ptr<Observer> observer,
89 std::function<void(const std::vector<Key> &, const std::shared_ptr<Observer> &observer)> processOnLocalAdd,
90 std::function<void(const std::vector<Key> &,
91 const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &)> processOnFirstAdd)
92 {
93 std::vector<OperationResult> result;
94 std::vector<Key> firstRegisterKey;
95 std::vector<Key> localRegisterKey;
96 {
97 std::lock_guard<decltype(mutex_)> lck(mutex_);
98 for (auto &key : keys) {
99 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
100 if (enabledObservers.empty()) {
101 callbacks_[key].emplace_back(observer, subscriber);
102 firstRegisterKey.emplace_back(key);
103 continue;
104 }
105 localRegisterKey.emplace_back(key);
106 callbacks_[key].emplace_back(observer, subscriber);
107 result.emplace_back(key, E_OK);
108 }
109 }
110 if (!localRegisterKey.empty()) {
111 processOnLocalAdd(localRegisterKey, observer);
112 }
113 processOnFirstAdd(firstRegisterKey, observer, result);
114 return result;
115 }
116
117 template<class Key, class Observer>
GetKeys()118 std::vector<Key> CallbacksManager<Key, Observer>::GetKeys()
119 {
120 std::vector<Key> keys;
121 {
122 std::lock_guard<decltype(mutex_)> lck(mutex_);
123 for (auto &it : callbacks_) {
124 keys.emplace_back(it.first);
125 }
126 }
127 return keys;
128 }
129
130 template<class Key, class Observer>
DelLocalObservers(void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)131 void CallbacksManager<Key, Observer>::DelLocalObservers(void *subscriber, std::vector<Key> &lastDelKeys,
132 std::vector<OperationResult> &result)
133 {
134 for (auto &it : callbacks_) {
135 DelLocalObservers(it.first, subscriber, lastDelKeys, result);
136 }
137 }
138
139 template<class Key, class Observer>
DelLocalObservers(const Key & key,void * subscriber,std::vector<Key> & lastDelKeys,std::vector<OperationResult> & result)140 void CallbacksManager<Key, Observer>::DelLocalObservers(const Key &key, void *subscriber,
141 std::vector<Key> &lastDelKeys, std::vector<OperationResult> &result)
142 {
143 auto it = callbacks_.find(key);
144 if (it == callbacks_.end()) {
145 result.emplace_back(key, E_UNREGISTERED_EMPTY);
146 return;
147 }
148 std::vector<ObserverNode> &callbacks = it->second;
149 auto callbackIt = callbacks.begin();
150 while (callbackIt != callbacks.end()) {
151 if (callbackIt->subscriber_ != subscriber) {
152 callbackIt++;
153 continue;
154 }
155 callbackIt = callbacks.erase(callbackIt);
156 }
157 if (!it->second.empty()) {
158 result.emplace_back(key, E_OK);
159 return;
160 }
161 lastDelKeys.emplace_back(key);
162 }
163
164 template<class Key, class Observer>
DelObservers(void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)165 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(void *subscriber,
166 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
167 {
168 std::vector<OperationResult> result;
169 std::vector<Key> lastDelKeys;
170 {
171 std::lock_guard<decltype(mutex_)> lck(mutex_);
172 DelLocalObservers(subscriber, lastDelKeys, result);
173 if (lastDelKeys.empty()) {
174 return result;
175 }
176 for (auto &key : lastDelKeys) {
177 callbacks_.erase(key);
178 }
179 }
180 processOnLastDel(lastDelKeys, result);
181 return result;
182 }
183
184 template<class Key, class Observer>
DelObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDel)185 std::vector<OperationResult> CallbacksManager<Key, Observer>::DelObservers(const std::vector<Key> &keys,
186 void *subscriber, std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDel)
187 {
188 std::vector<OperationResult> result;
189 std::vector<Key> lastDelKeys;
190 {
191 std::lock_guard<decltype(mutex_)> lck(mutex_);
192 for (auto &key : keys) {
193 DelLocalObservers(key, subscriber, lastDelKeys, result);
194 }
195 if (lastDelKeys.empty()) {
196 return result;
197 }
198 for (auto &key : lastDelKeys) {
199 callbacks_.erase(key);
200 }
201 }
202 processOnLastDel(lastDelKeys, result);
203 return result;
204 }
205
206 template<class Key, class Observer>
GetEnabledObservers(const Key & inputKey)207 std::vector<std::shared_ptr<Observer>> CallbacksManager<Key, Observer>::GetEnabledObservers(const Key &inputKey)
208 {
209 std::lock_guard<decltype(mutex_)> lck(mutex_);
210 auto it = callbacks_.find(inputKey);
211 if (it == callbacks_.end()) {
212 return std::vector<std::shared_ptr<Observer>>();
213 }
214 std::vector<std::shared_ptr<Observer>> results;
215 for (const auto &value : it->second) {
216 if (value.enabled_ && value.observer_ != nullptr) {
217 results.emplace_back(value.observer_);
218 }
219 }
220 return results;
221 }
222
223 template<class Key, class Observer>
EnableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (std::map<Key,std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> enableServiceFunc)224 std::vector<OperationResult> CallbacksManager<Key, Observer>::EnableObservers(
225 const std::vector<Key> &keys, void *subscriber,
226 std::function<void(std::map<Key, std::vector<ObserverNodeOnEnabled>> &)> enableLocalFunc,
227 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> enableServiceFunc)
228 {
229 std::vector<OperationResult> result;
230 std::vector<Key> sendServiceKeys;
231 std::map<Key, std::vector<ObserverNodeOnEnabled>> refreshObservers;
232 {
233 std::lock_guard<decltype(mutex_)> lck(mutex_);
234 for (auto &key : keys) {
235 auto it = callbacks_.find(key);
236 if (it == callbacks_.end()) {
237 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
238 continue;
239 }
240
241 auto& allObservers = it->second;
242 auto iterator = std::find_if(allObservers.begin(), allObservers.end(), [&subscriber](ObserverNode node) {
243 if (node.subscriber_ == subscriber) {
244 return true;
245 }
246 return false;
247 });
248 if (iterator == allObservers.end()) {
249 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
250 continue;
251 }
252 if (iterator->enabled_) {
253 result.emplace_back(key, E_OK);
254 continue;
255 }
256
257 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
258 if (enabledObservers.empty()) {
259 sendServiceKeys.emplace_back(key);
260 }
261 refreshObservers[key].emplace_back(iterator->observer_, iterator->isNotifyOnEnabled_);
262 iterator->enabled_ = true;
263 }
264 }
265 enableServiceFunc(sendServiceKeys, result);
266 enableLocalFunc(refreshObservers);
267
268 return result;
269 }
270
271 template<class Key, class Observer>
DisableObservers(const std::vector<Key> & keys,void * subscriber,std::function<void (const std::vector<Key> &,std::vector<OperationResult> &)> processOnLastDisable)272 std::vector<OperationResult> CallbacksManager<Key, Observer>::DisableObservers(const std::vector<Key> &keys,
273 void *subscriber,
274 std::function<void(const std::vector<Key> &, std::vector<OperationResult> &)> processOnLastDisable)
275 {
276 std::vector<OperationResult> result;
277 std::vector<Key> lastDisabledKeys;
278 {
279 std::lock_guard<decltype(mutex_)> lck(mutex_);
280 for (auto &key : keys) {
281 auto it = callbacks_.find(key);
282 if (it == callbacks_.end()) {
283 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
284 continue;
285 }
286 std::vector<std::shared_ptr<Observer>> enabledObservers = GetEnabledObservers(key);
287 if (enabledObservers.empty()) {
288 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
289 continue;
290 }
291
292 bool hasDisabled = false;
293 for (auto &item : callbacks_[key]) {
294 if (item.subscriber_ == subscriber) {
295 item.enabled_ = false;
296 item.isNotifyOnEnabled_ = false;
297 hasDisabled = true;
298 }
299 }
300 if (!hasDisabled) {
301 result.emplace_back(key, E_SUBSCRIBER_NOT_EXIST);
302 continue;
303 }
304 enabledObservers = GetEnabledObservers(key);
305 if (!enabledObservers.empty()) {
306 result.emplace_back(key, E_OK);
307 continue;
308 }
309 lastDisabledKeys.emplace_back(key);
310 }
311 }
312 processOnLastDisable(lastDisabledKeys, result);
313 return result;
314 }
315
316 template<class Key, class Observer>
GetEnabledSubscriberSize()317 int CallbacksManager<Key, Observer>::GetEnabledSubscriberSize()
318 {
319 int count = 0;
320 std::lock_guard<decltype(mutex_)> lck(mutex_);
321 for (auto &[key, value] : callbacks_) {
322 count += GetEnabledSubscriberSize(key);
323 }
324 return count;
325 }
326
327 template<class Key, class Observer>
GetEnabledSubscriberSize(const Key & key)328 int CallbacksManager<Key, Observer>::GetEnabledSubscriberSize(const Key &key)
329 {
330 std::lock_guard<decltype(mutex_)> lck(mutex_);
331 int count = 0;
332 auto it = callbacks_.find(key);
333 if (it == callbacks_.end()) {
334 return count;
335 }
336 std::vector<ObserverNode> &callbacks = it->second;
337 for (const auto &callback : callbacks) {
338 if (callback.enabled_) {
339 count++;
340 }
341 }
342 return count;
343 }
344
345 template<class Key, class Observer>
SetObserversNotifiedOnEnabled(const Key & key)346 void CallbacksManager<Key, Observer>::SetObserversNotifiedOnEnabled(const Key &key)
347 {
348 std::lock_guard<decltype(mutex_)> lck(mutex_);
349 auto it = callbacks_.find(key);
350 if (it == callbacks_.end()) {
351 return;
352 }
353 std::vector<ObserverNode> &callbacks = it->second;
354 for (auto &observerNode : callbacks) {
355 if (!observerNode.enabled_) {
356 observerNode.isNotifyOnEnabled_ = true;
357 }
358 }
359 }
360 } // namespace OHOS::DataShare
361 #endif // DATA_SHARE_CALLBACKS_MANAGER_H
362