• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rdb_subscriber_manager.h"
17 
18 #include <cinttypes>
19 
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23 
24 namespace OHOS {
25 namespace DataShare {
GetInstance()26 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
27 {
28     static RdbSubscriberManager manager;
29     return manager;
30 }
31 
RdbSubscriberManager()32 RdbSubscriberManager::RdbSubscriberManager()
33 {
34     serviceCallback_ = nullptr;
35 }
36 
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId,const RdbCallback & callback)37 std::vector<OperationResult> RdbSubscriberManager::AddObservers(void *subscriber,
38     std::shared_ptr<DataShareServiceProxy> proxy,
39     const std::vector<std::string> &uris, const TemplateId &templateId, const RdbCallback &callback)
40 {
41     if (proxy == nullptr) {
42         LOG_ERROR("proxy is nullptr");
43         return std::vector<OperationResult>();
44     }
45     std::vector<Key> keys;
46     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
47         keys.emplace_back(uri, templateId);
48     });
49     return BaseCallbacks::AddObservers(
50         keys, subscriber, std::make_shared<Observer>(callback),
51         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
52             Emit(localRegisterKeys, observer);
53         },
54         [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
55             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
56             std::vector<std::string> firstAddUris;
57             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
58                 firstAddUris.emplace_back(result);
59             });
60             if (firstAddUris.empty()) {
61                 return;
62             }
63 
64             Init();
65             auto subResults = proxy->SubscribeRdbData(firstAddUris, templateId, serviceCallback_);
66             std::vector<Key> failedKeys;
67             for (auto &subResult : subResults) {
68                 opResult.emplace_back(subResult);
69                 if (subResult.errCode_ != E_OK) {
70                     failedKeys.emplace_back(subResult.key_, templateId);
71                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
72                 }
73             }
74             if (!failedKeys.empty()) {
75                 BaseCallbacks::DelObservers(failedKeys, subscriber);
76             }
77             Destroy();
78         });
79 }
80 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)81 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
82     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
83 {
84     if (proxy == nullptr) {
85         LOG_ERROR("proxy is nullptr");
86         return std::vector<OperationResult>();
87     }
88     if (uris.empty()) {
89         return DelObservers(subscriber, proxy);
90     }
91 
92     std::vector<Key> keys;
93     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
94         keys.emplace_back(uri, templateId);
95     });
96     return BaseCallbacks::DelObservers(keys, subscriber,
97         [&proxy, &templateId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
98             std::vector<std::string> lastDelUris;
99             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
100                 lastDelUris.emplace_back(result);
101                 lastChangeNodeMap_.Erase(result);
102             });
103             if (lastDelUris.empty()) {
104                 return;
105             }
106             auto unsubResult = proxy->UnSubscribeRdbData(lastDelUris, templateId);
107             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
108             Destroy();
109         });
110 }
111 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)112 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
113     std::shared_ptr<DataShareServiceProxy> proxy)
114 {
115     if (proxy == nullptr) {
116         LOG_ERROR("proxy is nullptr");
117         return std::vector<OperationResult>();
118     }
119     return BaseCallbacks::DelObservers(subscriber,
120         [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
121             // delete all obs by subscriber
122             for (const auto &key : lastDelKeys) {
123                 lastChangeNodeMap_.Erase(key);
124                 auto unsubResult = proxy->UnSubscribeRdbData(std::vector<std::string>(1, key.uri_), key.templateId_);
125                 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
126             }
127             Destroy();
128         });
129 }
130 
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)131 std::vector<OperationResult> RdbSubscriberManager::EnableObservers(void *subscriber,
132     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
133 {
134     if (proxy == nullptr) {
135         LOG_ERROR("proxy is nullptr");
136         return std::vector<OperationResult>();
137     }
138     std::vector<Key> keys;
139     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
140         keys.emplace_back(uri, templateId);
141     });
142     return BaseCallbacks::EnableObservers(keys, subscriber,
143         [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
144             EmitOnEnable(obsMap);
145         },
146         [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
147         std::vector<OperationResult> &opResult) {
148             std::vector<std::string> firstAddUris;
149             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
150                 firstAddUris.emplace_back(result);
151             });
152             if (firstAddUris.empty()) {
153                 return;
154             }
155             auto subResults = proxy->EnableSubscribeRdbData(firstAddUris, templateId);
156             std::vector<Key> failedKeys;
157             for (auto &subResult : subResults) {
158                 opResult.emplace_back(subResult);
159                 if (subResult.errCode_ != E_OK) {
160                     failedKeys.emplace_back(subResult.key_, templateId);
161                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
162                 }
163             }
164             if (!failedKeys.empty()) {
165                 BaseCallbacks::DisableObservers(failedKeys, subscriber);
166             }
167         });
168 }
169 
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)170 std::vector<OperationResult> RdbSubscriberManager::DisableObservers(void *subscriber,
171     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
172 {
173     if (proxy == nullptr) {
174         LOG_ERROR("proxy is nullptr");
175         return std::vector<OperationResult>();
176     }
177     std::vector<Key> keys;
178     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
179         keys.emplace_back(uri, templateId);
180     });
181     return BaseCallbacks::DisableObservers(keys, subscriber,
182         [&proxy, &templateId, this](const std::vector<Key> &lastDisabledKeys, std::vector<OperationResult> &opResult) {
183             std::vector<std::string> lastDisabledUris;
184             std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
185                 lastDisabledUris.emplace_back(result);
186             });
187             if (lastDisabledUris.empty()) {
188                 return;
189             }
190 
191             auto results = proxy->DisableSubscribeRdbData(lastDisabledUris, templateId);
192             opResult.insert(opResult.end(), results.begin(), results.end());
193         });
194 }
195 
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)196 void RdbSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
197 {
198     if (proxy == nullptr) {
199         LOG_ERROR("proxy is nullptr");
200         return;
201     }
202     std::map<TemplateId, std::vector<std::string>> keysMap;
203     std::vector<Key> keys = CallbacksManager::GetKeys();
204     for (const auto& key : keys) {
205         keysMap[key.templateId_].emplace_back(key.uri_);
206     }
207     for (const auto& [templateId, uris] : keysMap) {
208         auto results = proxy->SubscribeRdbData(uris, templateId, serviceCallback_);
209         for (const auto& result : results) {
210             if (result.errCode_ != E_OK) {
211                 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d", result.key_.c_str(),
212                     result.errCode_);
213             }
214         }
215     }
216 }
217 
Emit(const RdbChangeNode & changeNode)218 void RdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
219 {
220     RdbObserverMapKey key(changeNode.uri_, changeNode.templateId_);
221     lastChangeNodeMap_.InsertOrAssign(key, changeNode);
222     auto callbacks = BaseCallbacks::GetEnabledObservers(key);
223     for (auto &obs : callbacks) {
224         if (obs != nullptr) {
225             LOG_INFO("Client send data to form, uri is %{public}s, subscriberId is %{public}" PRId64,
226                 DataShareStringUtils::Anonymous(key.uri_).c_str(), key.templateId_.subscriberId_);
227             obs->OnChange(changeNode);
228         }
229     }
230     BaseCallbacks::SetObserversNotifiedOnEnabled(key);
231 }
232 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)233 void RdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
234 {
235     for (auto const &key : keys) {
236         bool isExist = false;
237         RdbChangeNode node;
238         lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
239             node = value;
240             isExist = true;
241             return true;
242         });
243         if (isExist) {
244             observer->OnChange(node);
245         }
246     }
247 }
248 
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)249 void RdbSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
250 {
251     for (auto &[key, obsVector] : obsMap) {
252         bool isExist = false;
253         RdbChangeNode node;
254         lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
255             node = value;
256             isExist = true;
257             return true;
258         });
259         if (!isExist) {
260             continue;
261         }
262         for (auto &obs : obsVector) {
263             if (obs.isNotifyOnEnabled_) {
264                 obs.observer_->OnChange(node);
265             }
266         }
267     }
268 }
269 
Init()270 bool RdbSubscriberManager::Init()
271 {
272     if (serviceCallback_ == nullptr) {
273         LOG_INFO("callback init");
274         serviceCallback_ = new RdbObserverStub([this](const RdbChangeNode &changeNode) {
275             Emit(changeNode);
276         });
277     }
278     return true;
279 }
280 
Destroy()281 void RdbSubscriberManager::Destroy()
282 {
283     if (BaseCallbacks::GetAllSubscriberSize() == 0) {
284         if (serviceCallback_ != nullptr) {
285             serviceCallback_->ClearCallback();
286         }
287         LOG_INFO("no valid subscriber, delete callback");
288         serviceCallback_ = nullptr;
289     }
290 }
291 
RdbObserver(const RdbCallback & callback)292 RdbObserver::RdbObserver(const RdbCallback &callback) : callback_(callback) {}
293 
OnChange(const RdbChangeNode & changeNode)294 void RdbObserver::OnChange(const RdbChangeNode &changeNode)
295 {
296     callback_(changeNode);
297 }
298 
operator ==(const RdbObserver & rhs) const299 bool RdbObserver::operator==(const RdbObserver &rhs) const
300 {
301     return false;
302 }
303 
operator !=(const RdbObserver & rhs) const304 bool RdbObserver::operator!=(const RdbObserver &rhs) const
305 {
306     return !(rhs == *this);
307 }
308 } // namespace DataShare
309 } // namespace OHOS
310