• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rdb_subscriber_manager.h"
17 
18 #include <cinttypes>
19 
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23 
24 namespace OHOS {
25 namespace DataShare {
GetInstance()26 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
27 {
28     static RdbSubscriberManager manager;
29     return manager;
30 }
31 
RdbSubscriberManager()32 RdbSubscriberManager::RdbSubscriberManager()
33 {
34     serviceCallback_ = new RdbObserverStub([this](const RdbChangeNode &changeNode) {
35         Emit(changeNode);
36     });
37 }
38 
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId,const RdbCallback & callback)39 std::vector<OperationResult> RdbSubscriberManager::AddObservers(void *subscriber,
40     std::shared_ptr<DataShareServiceProxy> proxy,
41     const std::vector<std::string> &uris, const TemplateId &templateId, const RdbCallback &callback)
42 {
43     if (proxy == nullptr) {
44         LOG_ERROR("proxy is nullptr");
45         return std::vector<OperationResult>();
46     }
47     std::vector<Key> keys;
48     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
49         keys.emplace_back(uri, templateId);
50     });
51     return BaseCallbacks::AddObservers(
52         keys, subscriber, std::make_shared<Observer>(callback),
53         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
54             Emit(localRegisterKeys, observer);
55         },
56         [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
57             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
58             std::vector<std::string> firstAddUris;
59             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
60                 firstAddUris.emplace_back(result);
61             });
62             if (firstAddUris.empty()) {
63                 return;
64             }
65 
66             auto subResults = proxy->SubscribeRdbData(firstAddUris, templateId, serviceCallback_);
67             std::vector<Key> failedKeys;
68             for (auto &subResult : subResults) {
69                 opResult.emplace_back(subResult);
70                 if (subResult.errCode_ != E_OK) {
71                     failedKeys.emplace_back(subResult.key_, templateId);
72                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
73                 }
74             }
75             if (!failedKeys.empty()) {
76                 BaseCallbacks::DelObservers(failedKeys, subscriber);
77             }
78         });
79 }
80 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)81 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
82     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
83 {
84     if (proxy == nullptr) {
85         LOG_ERROR("proxy is nullptr");
86         return std::vector<OperationResult>();
87     }
88     if (uris.empty()) {
89         return DelObservers(subscriber, proxy);
90     }
91 
92     std::vector<Key> keys;
93     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
94         keys.emplace_back(uri, templateId);
95     });
96     return BaseCallbacks::DelObservers(keys, subscriber,
97         [&proxy, &templateId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
98             std::vector<std::string> lastDelUris;
99             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
100                 lastDelUris.emplace_back(result);
101                 lastChangeNodeMap_.Erase(result);
102             });
103             if (lastDelUris.empty()) {
104                 return;
105             }
106             auto unsubResult = proxy->UnSubscribeRdbData(lastDelUris, templateId);
107             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
108         });
109 }
110 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)111 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
112     std::shared_ptr<DataShareServiceProxy> proxy)
113 {
114     if (proxy == nullptr) {
115         LOG_ERROR("proxy is nullptr");
116         return std::vector<OperationResult>();
117     }
118     return BaseCallbacks::DelObservers(subscriber,
119         [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
120             // delete all obs by subscriber
121             for (const auto &key : lastDelKeys) {
122                 lastChangeNodeMap_.Erase(key);
123                 auto unsubResult = proxy->UnSubscribeRdbData(std::vector<std::string>(1, key.uri_), key.templateId_);
124                 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
125             }
126         });
127 }
128 
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)129 std::vector<OperationResult> RdbSubscriberManager::EnableObservers(void *subscriber,
130     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
131 {
132     if (proxy == nullptr) {
133         LOG_ERROR("proxy is nullptr");
134         return std::vector<OperationResult>();
135     }
136     std::vector<Key> keys;
137     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
138         keys.emplace_back(uri, templateId);
139     });
140     return BaseCallbacks::EnableObservers(keys, subscriber,
141         [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
142             EmitOnEnable(obsMap);
143         },
144         [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
145         std::vector<OperationResult> &opResult) {
146             std::vector<std::string> firstAddUris;
147             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
148                 firstAddUris.emplace_back(result);
149             });
150             if (firstAddUris.empty()) {
151                 return;
152             }
153             auto subResults = proxy->EnableSubscribeRdbData(firstAddUris, templateId);
154             std::vector<Key> failedKeys;
155             for (auto &subResult : subResults) {
156                 opResult.emplace_back(subResult);
157                 if (subResult.errCode_ != E_OK) {
158                     failedKeys.emplace_back(subResult.key_, templateId);
159                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
160                 }
161             }
162             if (!failedKeys.empty()) {
163                 BaseCallbacks::DisableObservers(failedKeys, subscriber);
164             }
165         });
166 }
167 
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)168 std::vector<OperationResult> RdbSubscriberManager::DisableObservers(void *subscriber,
169     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
170 {
171     if (proxy == nullptr) {
172         LOG_ERROR("proxy is nullptr");
173         return std::vector<OperationResult>();
174     }
175     std::vector<Key> keys;
176     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
177         keys.emplace_back(uri, templateId);
178     });
179     return BaseCallbacks::DisableObservers(keys, subscriber,
180         [&proxy, &templateId, this](const std::vector<Key> &lastDisabledKeys, std::vector<OperationResult> &opResult) {
181             std::vector<std::string> lastDisabledUris;
182             std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
183                 lastDisabledUris.emplace_back(result);
184             });
185             if (lastDisabledUris.empty()) {
186                 return;
187             }
188 
189             auto results = proxy->DisableSubscribeRdbData(lastDisabledUris, templateId);
190             opResult.insert(opResult.end(), results.begin(), results.end());
191         });
192 }
193 
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)194 void RdbSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
195 {
196     if (proxy == nullptr) {
197         LOG_ERROR("proxy is nullptr");
198         return;
199     }
200     std::map<TemplateId, std::vector<std::string>> keysMap;
201     std::vector<Key> keys = CallbacksManager::GetKeys();
202     for (const auto& key : keys) {
203         keysMap[key.templateId_].emplace_back(key.uri_);
204     }
205     for (const auto& [templateId, uris] : keysMap) {
206         auto results = proxy->SubscribeRdbData(uris, templateId, serviceCallback_);
207         for (const auto& result : results) {
208             if (result.errCode_ != E_OK) {
209                 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d", result.key_.c_str(),
210                     result.errCode_);
211             }
212         }
213     }
214 }
215 
Emit(const RdbChangeNode & changeNode)216 void RdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
217 {
218     RdbObserverMapKey key(changeNode.uri_, changeNode.templateId_);
219     lastChangeNodeMap_.InsertOrAssign(key, changeNode);
220     auto callbacks = BaseCallbacks::GetEnabledObservers(key);
221     for (auto &obs : callbacks) {
222         if (obs != nullptr) {
223             LOG_INFO("Client send data to form, uri is %{public}s, subscriberId is %{public}" PRId64,
224                 DataShareStringUtils::Anonymous(key.uri_).c_str(), key.templateId_.subscriberId_);
225             obs->OnChange(changeNode);
226         }
227     }
228     BaseCallbacks::SetObserversNotifiedOnEnabled(key);
229 }
230 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)231 void RdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
232 {
233     for (auto const &key : keys) {
234         bool isExist = false;
235         RdbChangeNode node;
236         lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
237             node = value;
238             isExist = true;
239             return true;
240         });
241         if (isExist) {
242             observer->OnChange(node);
243         }
244     }
245 }
246 
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)247 void RdbSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
248 {
249     for (auto &[key, obsVector] : obsMap) {
250         bool isExist = false;
251         RdbChangeNode node;
252         lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
253             node = value;
254             isExist = true;
255             return true;
256         });
257         if (!isExist) {
258             continue;
259         }
260         for (auto &obs : obsVector) {
261             if (obs.isNotifyOnEnabled_) {
262                 obs.isNotifyOnEnabled_ = false;
263                 obs.observer_->OnChange(node);
264             }
265         }
266     }
267 }
268 
RdbObserver(const RdbCallback & callback)269 RdbObserver::RdbObserver(const RdbCallback &callback) : callback_(callback) {}
270 
OnChange(const RdbChangeNode & changeNode)271 void RdbObserver::OnChange(const RdbChangeNode &changeNode)
272 {
273     callback_(changeNode);
274 }
275 
operator ==(const RdbObserver & rhs) const276 bool RdbObserver::operator==(const RdbObserver &rhs) const
277 {
278     return false;
279 }
280 
operator !=(const RdbObserver & rhs) const281 bool RdbObserver::operator!=(const RdbObserver &rhs) const
282 {
283     return !(rhs == *this);
284 }
285 } // namespace DataShare
286 } // namespace OHOS
287