1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rdb_subscriber_manager.h"
17
18 #include <cinttypes>
19
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23
24 namespace OHOS {
25 namespace DataShare {
GetInstance()26 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
27 {
28 static RdbSubscriberManager manager;
29 return manager;
30 }
31
RdbSubscriberManager()32 RdbSubscriberManager::RdbSubscriberManager()
33 {
34 serviceCallback_ = new (std::nothrow)RdbObserverStub([this](const RdbChangeNode &changeNode) {
35 Emit(changeNode);
36 });
37 }
38
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId,const RdbCallback & callback)39 std::vector<OperationResult> RdbSubscriberManager::AddObservers(void *subscriber,
40 std::shared_ptr<DataShareServiceProxy> proxy,
41 const std::vector<std::string> &uris, const TemplateId &templateId, const RdbCallback &callback)
42 {
43 if (proxy == nullptr) {
44 LOG_ERROR("proxy is nullptr");
45 return std::vector<OperationResult>();
46 }
47 std::vector<Key> keys;
48 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
49 keys.emplace_back(uri, templateId);
50 });
51 return BaseCallbacks::AddObservers(
52 keys, subscriber, std::make_shared<Observer>(callback),
53 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
54 Emit(localRegisterKeys, observer);
55 },
56 [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
57 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
58 std::vector<std::string> firstAddUris;
59 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
60 firstAddUris.emplace_back(result);
61 });
62 if (firstAddUris.empty()) {
63 return;
64 }
65
66 auto subResults = proxy->SubscribeRdbData(firstAddUris, templateId, serviceCallback_);
67 std::vector<Key> failedKeys;
68 for (auto &subResult : subResults) {
69 opResult.emplace_back(subResult);
70 if (subResult.errCode_ != E_OK) {
71 failedKeys.emplace_back(subResult.key_, templateId);
72 LOG_WARN("registered failed, uri is %{public}s",
73 DataShareStringUtils::Anonymous(subResult.key_).c_str());
74 }
75 }
76 if (!failedKeys.empty()) {
77 BaseCallbacks::DelObservers(failedKeys, subscriber);
78 }
79 });
80 }
81
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)82 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
83 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
84 {
85 if (proxy == nullptr) {
86 LOG_ERROR("proxy is nullptr");
87 return std::vector<OperationResult>();
88 }
89 if (uris.empty()) {
90 return DelObservers(subscriber, proxy);
91 }
92
93 std::vector<Key> keys;
94 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
95 keys.emplace_back(uri, templateId);
96 });
97 return BaseCallbacks::DelObservers(keys, subscriber,
98 [&proxy, &templateId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
99 std::vector<std::string> lastDelUris;
100 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
101 lastDelUris.emplace_back(result);
102 lastChangeNodeMap_.Erase(result);
103 });
104 if (lastDelUris.empty()) {
105 return;
106 }
107 auto unsubResult = proxy->UnSubscribeRdbData(lastDelUris, templateId);
108 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
109 });
110 }
111
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)112 std::vector<OperationResult> RdbSubscriberManager::DelObservers(void *subscriber,
113 std::shared_ptr<DataShareServiceProxy> proxy)
114 {
115 if (proxy == nullptr) {
116 LOG_ERROR("proxy is nullptr");
117 return std::vector<OperationResult>();
118 }
119 return BaseCallbacks::DelObservers(subscriber,
120 [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
121 // delete all obs by subscriber
122 for (const auto &key : lastDelKeys) {
123 lastChangeNodeMap_.Erase(key);
124 auto unsubResult = proxy->UnSubscribeRdbData(std::vector<std::string>(1, key.uri_), key.templateId_);
125 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
126 }
127 });
128 }
129
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)130 std::vector<OperationResult> RdbSubscriberManager::EnableObservers(void *subscriber,
131 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
132 {
133 if (proxy == nullptr) {
134 LOG_ERROR("proxy is nullptr");
135 return std::vector<OperationResult>();
136 }
137 std::vector<Key> keys;
138 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
139 keys.emplace_back(uri, templateId);
140 });
141 return BaseCallbacks::EnableObservers(keys, subscriber,
142 [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
143 EmitOnEnable(obsMap);
144 },
145 [&proxy, subscriber, &templateId, this](const std::vector<Key> &firstAddKeys,
146 std::vector<OperationResult> &opResult) {
147 std::vector<std::string> firstAddUris;
148 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
149 firstAddUris.emplace_back(result);
150 });
151 if (firstAddUris.empty()) {
152 return;
153 }
154 auto subResults = proxy->EnableSubscribeRdbData(firstAddUris, templateId);
155 std::vector<Key> failedKeys;
156 for (auto &subResult : subResults) {
157 opResult.emplace_back(subResult);
158 if (subResult.errCode_ != E_OK) {
159 failedKeys.emplace_back(subResult.key_, templateId);
160 LOG_WARN("registered failed, uri is %{public}s",
161 DataShareStringUtils::Anonymous(subResult.key_).c_str());
162 }
163 }
164 if (!failedKeys.empty()) {
165 BaseCallbacks::DisableObservers(failedKeys, subscriber);
166 }
167 });
168 }
169
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const TemplateId & templateId)170 std::vector<OperationResult> RdbSubscriberManager::DisableObservers(void *subscriber,
171 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, const TemplateId &templateId)
172 {
173 if (proxy == nullptr) {
174 LOG_ERROR("proxy is nullptr");
175 return std::vector<OperationResult>();
176 }
177 std::vector<Key> keys;
178 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
179 keys.emplace_back(uri, templateId);
180 });
181 return BaseCallbacks::DisableObservers(keys, subscriber,
182 [&proxy, subscriber, &templateId, this](const std::vector<Key> &lastDisabledKeys,
183 std::vector<OperationResult> &opResult) {
184 std::vector<std::string> lastDisabledUris;
185 std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
186 lastDisabledUris.emplace_back(result);
187 });
188 if (lastDisabledUris.empty()) {
189 return;
190 }
191
192 auto results = proxy->DisableSubscribeRdbData(lastDisabledUris, templateId);
193 std::vector<Key> failedKeys;
194 for (auto &result : results) {
195 opResult.emplace_back(result);
196 if (result.errCode_ != E_OK) {
197 failedKeys.emplace_back(result.key_, templateId);
198 LOG_WARN("DisableObservers failed, uri is %{public}s, errCode is %{public}d",
199 DataShareStringUtils::Anonymous(result.key_).c_str(), result.errCode_);
200 }
201 }
202 if (!failedKeys.empty()) {
203 BaseCallbacks::EnableObservers(failedKeys, subscriber);
204 }
205 });
206 }
207
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)208 void RdbSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
209 {
210 if (proxy == nullptr) {
211 LOG_ERROR("proxy is nullptr");
212 return;
213 }
214 std::map<TemplateId, std::vector<std::string>> keysMap;
215 std::vector<Key> keys = CallbacksManager::GetKeys();
216 for (const auto& key : keys) {
217 keysMap[key.templateId_].emplace_back(key.uri_);
218 }
219 for (const auto& [templateId, uris] : keysMap) {
220 auto results = proxy->SubscribeRdbData(uris, templateId, serviceCallback_);
221 for (const auto& result : results) {
222 if (result.errCode_ != E_OK) {
223 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
224 DataShareStringUtils::Anonymous(result.key_).c_str(), result.errCode_);
225 }
226 }
227 }
228 }
229
Emit(const RdbChangeNode & changeNode)230 void RdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
231 {
232 RdbObserverMapKey key(changeNode.uri_, changeNode.templateId_);
233 lastChangeNodeMap_.InsertOrAssign(key, changeNode);
234 auto callbacks = BaseCallbacks::GetObserversAndSetNotifiedOn(key);
235 for (auto &obs : callbacks) {
236 if (obs != nullptr) {
237 LOG_INFO("Client send data to form, uri is %{public}s, subscriberId is %{public}" PRId64,
238 DataShareStringUtils::Anonymous(key.uri_).c_str(), key.templateId_.subscriberId_);
239 obs->OnChange(changeNode);
240 }
241 }
242 }
243
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)244 void RdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
245 {
246 for (auto const &key : keys) {
247 bool isExist = false;
248 RdbChangeNode node;
249 lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
250 node = value;
251 isExist = true;
252 return true;
253 });
254 if (isExist) {
255 observer->OnChange(node);
256 }
257 }
258 }
259
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)260 void RdbSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
261 {
262 for (auto &[key, obsVector] : obsMap) {
263 bool isExist = false;
264 RdbChangeNode node;
265 lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
266 node = value;
267 isExist = true;
268 return true;
269 });
270 if (!isExist) {
271 continue;
272 }
273 for (auto &obs : obsVector) {
274 // after the flag in callbacks is put into obsMap, the real flag in callbacks maybe modified
275 // before read in obsMap here
276 if (BaseCallbacks::IsObserversNotifiedOnEnabled(key, obs.observer_)) {
277 obs.observer_->OnChange(node);
278 }
279 }
280 }
281 }
282
RdbObserver(const RdbCallback & callback)283 RdbObserver::RdbObserver(const RdbCallback &callback) : callback_(callback) {}
284
OnChange(const RdbChangeNode & changeNode)285 void RdbObserver::OnChange(const RdbChangeNode &changeNode)
286 {
287 callback_(changeNode);
288 }
289
operator ==(const RdbObserver & rhs) const290 bool RdbObserver::operator==(const RdbObserver &rhs) const
291 {
292 return false;
293 }
294
operator !=(const RdbObserver & rhs) const295 bool RdbObserver::operator!=(const RdbObserver &rhs) const
296 {
297 return !(rhs == *this);
298 }
299 } // namespace DataShare
300 } // namespace OHOS
301