• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rdb_subscriber_manager.h"
17 
18 #include <cinttypes>
19 
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23 
24 namespace OHOS {
25 namespace DataShare {
GetInstance()26 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
27 {
28     static RdbSubscriberManager manager;
29     return manager;
30 }
31 
RdbSubscriberManager()32 RdbSubscriberManager::RdbSubscriberManager()
33 {
34     serviceCallback_ = new (std::nothrow)RdbObserverStub([this](const RdbChangeNode &changeNode) {
35         Emit(changeNode);
36     });
37 }
38 
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId,const RdbCallback & callback)39 std::vector<OperationResult> RdbSubscriberManager::AddObservers(void *subscriber,
40     std::shared_ptr<DataShareServiceProxy> proxy,
41     const std::vector<std::string> &uris, const TemplateId &templateId, const RdbCallback &callback)
42 {
43     if (proxy == nullptr) {
44         LOG_ERROR("proxy is nullptr");
45         return std::vector<OperationResult>();
46     }
47     std::vector<Key> keys;
48     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
49         keys.emplace_back(uri, templateId);
50     });
51     return BaseCallbacks::AddObservers(
52         keys, subscriber, std::make_shared<Observer>(callback),
53         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
54             Emit(localRegisterKeys, observer);
55         },
56         [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
57             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
58             std::vector<std::string> firstAddUris;
59             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
60                 firstAddUris.emplace_back(result);
61             });
62             if (firstAddUris.empty()) {
63                 return;
64             }
65 
66             auto subResults = proxy->SubscribeRdbData(firstAddUris, templateId, serviceCallback_);
67             std::vector<Key> failedKeys;
68             for (auto &subResult : subResults) {
69                 opResult.emplace_back(subResult);
70                 if (subResult.errCode_ != E_OK) {
71                     failedKeys.emplace_back(subResult.key_, templateId);
72                     LOG_WARN("registered failed, uri is %{public}s",
73                         DataShareStringUtils::Anonymous(subResult.key_).c_str());
74                 }
75             }
76             if (!failedKeys.empty()) {
77                 BaseCallbacks::DelObservers(failedKeys, subscriber);
78             }
79         });
80 }
81 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)82 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
83     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
84 {
85     if (proxy == nullptr) {
86         LOG_ERROR("proxy is nullptr");
87         return std::vector<OperationResult>();
88     }
89     if (uris.empty()) {
90         return DelObservers(subscriber, proxy);
91     }
92 
93     std::vector<Key> keys;
94     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
95         keys.emplace_back(uri, templateId);
96     });
97     return BaseCallbacks::DelObservers(keys, subscriber,
98         [&proxy, &templateId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
99             std::vector<std::string> lastDelUris;
100             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
101                 lastDelUris.emplace_back(result);
102                 lastChangeNodeMap_.Erase(result);
103             });
104             if (lastDelUris.empty()) {
105                 return;
106             }
107             auto unsubResult = proxy->UnSubscribeRdbData(lastDelUris, templateId);
108             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
109         });
110 }
111 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)112 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
113     std::shared_ptr<DataShareServiceProxy> proxy)
114 {
115     if (proxy == nullptr) {
116         LOG_ERROR("proxy is nullptr");
117         return std::vector<OperationResult>();
118     }
119     return BaseCallbacks::DelObservers(subscriber,
120         [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
121             // delete all obs by subscriber
122             for (const auto &key : lastDelKeys) {
123                 lastChangeNodeMap_.Erase(key);
124                 auto unsubResult = proxy->UnSubscribeRdbData(std::vector<std::string>(1, key.uri_), key.templateId_);
125                 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
126             }
127         });
128 }
129 
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)130 std::vector<OperationResult> RdbSubscriberManager::EnableObservers(void *subscriber,
131     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
132 {
133     if (proxy == nullptr) {
134         LOG_ERROR("proxy is nullptr");
135         return std::vector<OperationResult>();
136     }
137     std::vector<Key> keys;
138     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
139         keys.emplace_back(uri, templateId);
140     });
141     return BaseCallbacks::EnableObservers(keys, subscriber,
142         [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
143             EmitOnEnable(obsMap);
144         },
145         [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
146         std::vector<OperationResult> &opResult) {
147             std::vector<std::string> firstAddUris;
148             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
149                 firstAddUris.emplace_back(result);
150             });
151             if (firstAddUris.empty()) {
152                 return;
153             }
154             auto subResults = proxy->EnableSubscribeRdbData(firstAddUris, templateId);
155             std::vector<Key> failedKeys;
156             for (auto &subResult : subResults) {
157                 opResult.emplace_back(subResult);
158                 if (subResult.errCode_ != E_OK) {
159                     failedKeys.emplace_back(subResult.key_, templateId);
160                     LOG_WARN("registered failed, uri is %{public}s",
161                         DataShareStringUtils::Anonymous(subResult.key_).c_str());
162                 }
163             }
164             if (!failedKeys.empty()) {
165                 BaseCallbacks::DisableObservers(failedKeys, subscriber);
166             }
167         });
168 }
169 
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)170 std::vector<OperationResult> RdbSubscriberManager::DisableObservers(void *subscriber,
171     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
172 {
173     if (proxy == nullptr) {
174         LOG_ERROR("proxy is nullptr");
175         return std::vector<OperationResult>();
176     }
177     std::vector<Key> keys;
178     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
179         keys.emplace_back(uri, templateId);
180     });
181     return BaseCallbacks::DisableObservers(keys, subscriber,
182         [&proxy, subscriber, &templateId, this](const std::vector<Key> &lastDisabledKeys,
183         std::vector<OperationResult> &opResult) {
184             std::vector<std::string> lastDisabledUris;
185             std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
186                 lastDisabledUris.emplace_back(result);
187             });
188             if (lastDisabledUris.empty()) {
189                 return;
190             }
191 
192             auto results = proxy->DisableSubscribeRdbData(lastDisabledUris, templateId);
193             std::vector<Key> failedKeys;
194             for (auto &result : results) {
195                 opResult.emplace_back(result);
196                 if (result.errCode_ != E_OK) {
197                     failedKeys.emplace_back(result.key_, templateId);
198                     LOG_WARN("DisableObservers failed, uri is %{public}s, errCode is %{public}d",
199                         DataShareStringUtils::Anonymous(result.key_).c_str(), result.errCode_);
200                 }
201             }
202             if (!failedKeys.empty()) {
203                 BaseCallbacks::EnableObservers(failedKeys, subscriber);
204             }
205         });
206 }
207 
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)208 void RdbSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
209 {
210     if (proxy == nullptr) {
211         LOG_ERROR("proxy is nullptr");
212         return;
213     }
214     std::map<TemplateId, std::vector<std::string>> keysMap;
215     std::vector<Key> keys = CallbacksManager::GetKeys();
216     for (const auto& key : keys) {
217         keysMap[key.templateId_].emplace_back(key.uri_);
218     }
219     for (const auto& [templateId, uris] : keysMap) {
220         auto results = proxy->SubscribeRdbData(uris, templateId, serviceCallback_);
221         for (const auto& result : results) {
222             if (result.errCode_ != E_OK) {
223                 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
224                     DataShareStringUtils::Anonymous(result.key_).c_str(), result.errCode_);
225             }
226         }
227     }
228 }
229 
Emit(const RdbChangeNode & changeNode)230 void RdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
231 {
232     RdbObserverMapKey key(changeNode.uri_, changeNode.templateId_);
233     lastChangeNodeMap_.InsertOrAssign(key, changeNode);
234     auto callbacks = BaseCallbacks::GetObserversAndSetNotifiedOn(key);
235     for (auto &obs : callbacks) {
236         if (obs != nullptr) {
237             LOG_INFO("Client send data to form, uri is %{public}s, subscriberId is %{public}" PRId64,
238                 DataShareStringUtils::Anonymous(key.uri_).c_str(), key.templateId_.subscriberId_);
239             obs->OnChange(changeNode);
240         }
241     }
242 }
243 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)244 void RdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
245 {
246     for (auto const &key : keys) {
247         bool isExist = false;
248         RdbChangeNode node;
249         lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
250             node = value;
251             isExist = true;
252             return true;
253         });
254         if (isExist) {
255             observer->OnChange(node);
256         }
257     }
258 }
259 
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)260 void RdbSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
261 {
262     for (auto &[key, obsVector] : obsMap) {
263         bool isExist = false;
264         RdbChangeNode node;
265         lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
266             node = value;
267             isExist = true;
268             return true;
269         });
270         if (!isExist) {
271             continue;
272         }
273         for (auto &obs : obsVector) {
274             // after the flag in callbacks is put into obsMap, the real flag in callbacks maybe modified
275             // before read in obsMap here
276             if (BaseCallbacks::IsObserversNotifiedOnEnabled(key, obs.observer_)) {
277                 obs.observer_->OnChange(node);
278             }
279         }
280     }
281 }
282 
RdbObserver(const RdbCallback & callback)283 RdbObserver::RdbObserver(const RdbCallback &callback) : callback_(callback) {}
284 
OnChange(const RdbChangeNode & changeNode)285 void RdbObserver::OnChange(const RdbChangeNode &changeNode)
286 {
287     callback_(changeNode);
288 }
289 
operator ==(const RdbObserver & rhs) const290 bool RdbObserver::operator==(const RdbObserver &rhs) const
291 {
292     return false;
293 }
294 
operator !=(const RdbObserver & rhs) const295 bool RdbObserver::operator!=(const RdbObserver &rhs) const
296 {
297     return !(rhs == *this);
298 }
299 } // namespace DataShare
300 } // namespace OHOS
301