• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rdb_subscriber_manager.h"
17 
18 #include "data_proxy_observer_stub.h"
19 #include "datashare_log.h"
20 
21 namespace OHOS {
22 namespace DataShare {
GetInstance()23 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
24 {
25     static RdbSubscriberManager manager;
26     return manager;
27 }
28 
RdbSubscriberManager()29 RdbSubscriberManager::RdbSubscriberManager()
30 {
31     serviceCallback_ = nullptr;
32 }
33 
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId,const RdbCallback & callback)34 std::vector<OperationResult> RdbSubscriberManager::AddObservers(void *subscriber,
35     std::shared_ptr<DataShareServiceProxy> proxy,
36     const std::vector<std::string> &uris, const TemplateId &templateId, const RdbCallback &callback)
37 {
38     if (proxy == nullptr) {
39         LOG_ERROR("proxy is nullptr");
40         return std::vector<OperationResult>();
41     }
42     std::vector<Key> keys;
43     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
44         keys.emplace_back(uri, templateId);
45     });
46     return BaseCallbacks::AddObservers(
47         keys, subscriber, std::make_shared<Observer>(callback),
48         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
49             Emit(localRegisterKeys, observer);
50         },
51         [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
52             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
53             std::vector<std::string> firstAddUris;
54             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
55                 firstAddUris.emplace_back(result);
56             });
57             if (firstAddUris.empty()) {
58                 return;
59             }
60 
61             Init();
62             auto subResults = proxy->SubscribeRdbData(firstAddUris, templateId, serviceCallback_);
63             std::vector<Key> failedKeys;
64             for (auto &subResult : subResults) {
65                 opResult.emplace_back(subResult);
66                 if (subResult.errCode_ != E_OK) {
67                     failedKeys.emplace_back(subResult.key_, templateId);
68                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
69                 }
70             }
71             if (!failedKeys.empty()) {
72                 BaseCallbacks::DelObservers(failedKeys, subscriber);
73             }
74             Destroy();
75         });
76 }
77 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)78 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
79     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
80 {
81     if (proxy == nullptr) {
82         LOG_ERROR("proxy is nullptr");
83         return std::vector<OperationResult>();
84     }
85     if (uris.empty()) {
86         return DelObservers(subscriber, proxy);
87     }
88 
89     std::vector<Key> keys;
90     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
91         keys.emplace_back(uri, templateId);
92     });
93     return BaseCallbacks::DelObservers(keys, subscriber,
94         [&proxy, &templateId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
95             std::vector<std::string> lastDelUris;
96             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
97                 lastDelUris.emplace_back(result);
98                 lastChangeNodeMap_.erase(result);
99             });
100             if (lastDelUris.empty()) {
101                 return;
102             }
103             auto unsubResult = proxy->UnSubscribeRdbData(lastDelUris, templateId);
104             if (BaseCallbacks::GetEnabledSubscriberSize() == 0) {
105                 LOG_INFO("no valid subscriber, delete callback");
106                 serviceCallback_ = nullptr;
107             }
108             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
109             Destroy();
110         });
111 }
112 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)113 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
114     std::shared_ptr<DataShareServiceProxy> proxy)
115 {
116     if (proxy == nullptr) {
117         LOG_ERROR("proxy is nullptr");
118         return std::vector<OperationResult>();
119     }
120     return BaseCallbacks::DelObservers(subscriber,
121         [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
122             // delete all obs by subscriber
123             for (const auto &key : lastDelKeys) {
124                 lastChangeNodeMap_.erase(key);
125                 auto unsubResult = proxy->UnSubscribeRdbData(std::vector<std::string>(1, key.uri_), key.templateId_);
126                 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
127             }
128             Destroy();
129         });
130 }
131 
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)132 std::vector<OperationResult> RdbSubscriberManager::EnableObservers(void *subscriber,
133     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
134 {
135     if (proxy == nullptr) {
136         LOG_ERROR("proxy is nullptr");
137         return std::vector<OperationResult>();
138     }
139     std::vector<Key> keys;
140     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
141         keys.emplace_back(uri, templateId);
142     });
143     return BaseCallbacks::EnableObservers(keys, subscriber,
144         [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
145             EmitOnEnable(obsMap);
146         },
147         [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
148         std::vector<OperationResult> &opResult) {
149             std::vector<std::string> firstAddUris;
150             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
151                 firstAddUris.emplace_back(result);
152             });
153             if (firstAddUris.empty()) {
154                 return;
155             }
156             auto subResults = proxy->EnableSubscribeRdbData(firstAddUris, templateId);
157             std::vector<Key> failedKeys;
158             for (auto &subResult : subResults) {
159                 opResult.emplace_back(subResult);
160                 if (subResult.errCode_ != E_OK) {
161                     failedKeys.emplace_back(subResult.key_, templateId);
162                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
163                 }
164             }
165             if (!failedKeys.empty()) {
166                 BaseCallbacks::DisableObservers(failedKeys, subscriber);
167             }
168         });
169 }
170 
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)171 std::vector<OperationResult> RdbSubscriberManager::DisableObservers(void *subscriber,
172     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
173 {
174     if (proxy == nullptr) {
175         LOG_ERROR("proxy is nullptr");
176         return std::vector<OperationResult>();
177     }
178     std::vector<Key> keys;
179     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
180         keys.emplace_back(uri, templateId);
181     });
182     return BaseCallbacks::DisableObservers(keys, subscriber,
183         [&proxy, &templateId, this](const std::vector<Key> &lastDisabledKeys, std::vector<OperationResult> &opResult) {
184             std::vector<std::string> lastDisabledUris;
185             std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
186                 lastDisabledUris.emplace_back(result);
187             });
188             if (lastDisabledUris.empty()) {
189                 return;
190             }
191 
192             auto results = proxy->DisableSubscribeRdbData(lastDisabledUris, templateId);
193             opResult.insert(opResult.end(), results.begin(), results.end());
194         });
195 }
196 
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)197 void RdbSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
198 {
199     if (proxy == nullptr) {
200         LOG_ERROR("proxy is nullptr");
201         return;
202     }
203     return BaseCallbacks::RecoverObservers([&proxy, this](const std::vector<Key> &Keys) {
204         std::map<TemplateId, std::vector<std::string>> keysMap;
205         for (auto const &key : Keys) {
206             keysMap[key.templateId_].emplace_back(key.uri_);
207         }
208         for (auto const &[templateId, uris] : keysMap) {
209             auto results = proxy->SubscribeRdbData(uris, templateId, serviceCallback_);
210             for (const auto& result : results) {
211                 if (result.errCode_ != E_OK) {
212                     LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
213                         result.key_.c_str(), result.errCode_);
214                 }
215             }
216         }
217     });
218 }
219 
Emit(const RdbChangeNode & changeNode)220 void RdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
221 {
222     RdbObserverMapKey key(changeNode.uri_, changeNode.templateId_);
223     lastChangeNodeMap_[key] = changeNode;
224     auto callbacks = BaseCallbacks::GetEnabledObservers(key);
225     for (auto &obs : callbacks) {
226         if (obs != nullptr) {
227             obs->OnChange(changeNode);
228         }
229     }
230     BaseCallbacks::SetObserversNotifiedOnEnabled(key);
231 }
232 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)233 void RdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
234 {
235     for (auto const &key : keys) {
236         auto it = lastChangeNodeMap_.find(key);
237         if (it != lastChangeNodeMap_.end()) {
238             observer->OnChange(it->second);
239         }
240     }
241 }
242 
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)243 void RdbSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
244 {
245     for (auto &[key, obsVector] : obsMap) {
246         auto it = lastChangeNodeMap_.find(key);
247         if (it == lastChangeNodeMap_.end()) {
248             continue;
249         }
250         for (auto &obs : obsVector) {
251             if (obs.isNotifyOnEnabled_) {
252                 obs.observer_->OnChange(it->second);
253             }
254         }
255     }
256 }
257 
Init()258 bool RdbSubscriberManager::Init()
259 {
260     if (serviceCallback_ == nullptr) {
261         LOG_INFO("callback init");
262         serviceCallback_ = new RdbObserverStub([this](const RdbChangeNode &changeNode) {
263             Emit(changeNode);
264         });
265     }
266     return true;
267 }
268 
Destroy()269 void RdbSubscriberManager::Destroy()
270 {
271     if (BaseCallbacks::GetEnabledSubscriberSize() == 0) {
272         if (serviceCallback_ != nullptr) {
273             serviceCallback_->ClearCallback();
274         }
275         LOG_INFO("no valid subscriber, delete callback");
276         serviceCallback_ = nullptr;
277     }
278 }
279 
RdbObserver(const RdbCallback & callback)280 RdbObserver::RdbObserver(const RdbCallback &callback) : callback_(callback) {}
281 
OnChange(const RdbChangeNode & changeNode)282 void RdbObserver::OnChange(const RdbChangeNode &changeNode)
283 {
284     callback_(changeNode);
285 }
286 
operator ==(const RdbObserver & rhs) const287 bool RdbObserver::operator==(const RdbObserver &rhs) const
288 {
289     return false;
290 }
291 
operator !=(const RdbObserver & rhs) const292 bool RdbObserver::operator!=(const RdbObserver &rhs) const
293 {
294     return !(rhs == *this);
295 }
296 } // namespace DataShare
297 } // namespace OHOS
298