1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "napi_subscriber_manager.h"
17
18 #include "datashare_log.h"
19
20 namespace OHOS {
21 namespace DataShare {
AddObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,const TemplateId & templateId)22 std::vector<OperationResult> NapiRdbSubscriberManager::AddObservers(napi_env env, napi_value callback,
23 const std::vector<std::string> &uris, const TemplateId &templateId)
24 {
25 auto datashareHelper = dataShareHelper_.lock();
26 if (datashareHelper == nullptr) {
27 LOG_ERROR("datashareHelper is nullptr");
28 return std::vector<OperationResult>();
29 }
30
31 std::vector<Key> keys;
32 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
33 keys.emplace_back(uri, templateId);
34 });
35 return BaseCallbacks::AddObservers(
36 keys, std::make_shared<Observer>(env, callback),
37 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
38 Emit(localRegisterKeys, observer);
39 },
40 [&datashareHelper, &templateId, this](const std::vector<Key> &firstAddKeys,
41 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
42 std::vector<std::string> firstAddUris;
43 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
44 firstAddUris.emplace_back(result);
45 });
46 if (firstAddUris.empty()) {
47 return;
48 }
49 auto subResults =
50 datashareHelper->SubscribeRdbData(firstAddUris, templateId, [this](const RdbChangeNode &changeNode) {
51 Emit(changeNode);
52 });
53 std::vector<Key> failedKeys;
54 for (auto &subResult : subResults) {
55 opResult.emplace_back(subResult);
56 if (subResult.errCode_ != E_OK) {
57 failedKeys.emplace_back(subResult.key_, templateId);
58 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
59 }
60 }
61 if (failedKeys.size() > 0) {
62 BaseCallbacks::DelObservers(failedKeys, observer);
63 }
64 });
65 }
66
DelObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,const TemplateId & templateId)67 std::vector<OperationResult> NapiRdbSubscriberManager::DelObservers(napi_env env, napi_value callback,
68 const std::vector<std::string> &uris, const TemplateId &templateId)
69 {
70 auto dataShareHelper = dataShareHelper_.lock();
71 if (dataShareHelper == nullptr) {
72 LOG_ERROR("nativeManager is nullptr");
73 return std::vector<OperationResult>();
74 }
75 std::vector<Key> keys;
76 std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
77 keys.emplace_back(uri, templateId);
78 });
79 return BaseCallbacks::DelObservers(keys, callback == nullptr ? nullptr : std::make_shared<Observer>(env, callback),
80 [&dataShareHelper, &templateId, this](const std::vector<Key> &lastDelKeys,
81 const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &opResult) {
82 std::vector<std::string> lastDelUris;
83 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
84 lastChangeNodeMap_.Erase(result);
85 lastDelUris.emplace_back(result);
86 });
87 if (lastDelUris.empty()) {
88 return;
89 }
90 auto unsubResult = dataShareHelper->UnsubscribeRdbData(lastDelUris, templateId);
91 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
92 });
93 }
94
Emit(const RdbChangeNode & changeNode)95 void NapiRdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
96 {
97 Key key(changeNode.uri_, changeNode.templateId_);
98 lastChangeNodeMap_.InsertOrAssign(key, changeNode);
99 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
100 for (auto &obs : callbacks) {
101 if (obs != nullptr) {
102 obs->OnChange(changeNode);
103 }
104 }
105 }
106
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)107 void NapiRdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
108 {
109 for (auto const &key : keys) {
110 bool isExist = false;
111 RdbChangeNode node;
112 lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
113 node = value;
114 isExist = true;
115 return true;
116 });
117 if (isExist) {
118 observer->OnChange(node);
119 }
120 }
121 }
122
AddObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,int64_t subscriberId)123 std::vector<OperationResult> NapiPublishedSubscriberManager::AddObservers(napi_env env, napi_value callback,
124 const std::vector<std::string> &uris, int64_t subscriberId)
125 {
126 auto dataShareHelper = dataShareHelper_.lock();
127 if (dataShareHelper == nullptr) {
128 LOG_ERROR("datashareHelper is nullptr");
129 return std::vector<OperationResult>();
130 }
131
132 std::vector<Key> keys;
133 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
134 keys.emplace_back(uri, subscriberId);
135 });
136 return BaseCallbacks::AddObservers(
137 keys, std::make_shared<Observer>(env, callback),
138 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
139 Emit(localRegisterKeys, observer);
140 },
141 [&dataShareHelper, &subscriberId, this](const std::vector<Key> &firstAddKeys,
142 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
143 std::vector<std::string> firstAddUris;
144 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
145 firstAddUris.emplace_back(result);
146 });
147 if (firstAddUris.empty()) {
148 return;
149 }
150 auto subResults = dataShareHelper->SubscribePublishedData(firstAddUris, subscriberId,
151 [this](const PublishedDataChangeNode &changeNode) {
152 Emit(changeNode);
153 });
154 std::vector<Key> failedKeys;
155 for (auto &subResult : subResults) {
156 opResult.emplace_back(subResult);
157 if (subResult.errCode_ != E_OK) {
158 failedKeys.emplace_back(subResult.key_, subscriberId);
159 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
160 }
161 }
162 if (failedKeys.size() > 0) {
163 BaseCallbacks::DelObservers(failedKeys, observer);
164 }
165 });
166 }
167
DelObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,int64_t subscriberId)168 std::vector<OperationResult> NapiPublishedSubscriberManager::DelObservers(napi_env env, napi_value callback,
169 const std::vector<std::string> &uris, int64_t subscriberId)
170 {
171 auto dataShareHelper = dataShareHelper_.lock();
172 if (dataShareHelper == nullptr) {
173 LOG_ERROR("nativeManager is nullptr");
174 return std::vector<OperationResult>();
175 }
176 std::vector<Key> keys;
177 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
178 keys.emplace_back(uri, subscriberId);
179 });
180 return BaseCallbacks::DelObservers(keys, callback == nullptr ? nullptr : std::make_shared<Observer>(env, callback),
181 [&dataShareHelper, &subscriberId, &callback, &uris, this](const std::vector<Key> &lastDelKeys,
182 const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &opResult) {
183 std::vector<std::string> lastDelUris;
184 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
185 lastChangeNodeMap_.Erase(result);
186 lastDelUris.emplace_back(result);
187 });
188 if (lastDelUris.empty()) {
189 return;
190 }
191 auto unsubResult = dataShareHelper->UnsubscribePublishedData(lastDelUris, subscriberId);
192 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
193 });
194 }
195
Emit(const PublishedDataChangeNode & changeNode)196 void NapiPublishedSubscriberManager::Emit(const PublishedDataChangeNode &changeNode)
197 {
198 for (auto &data : changeNode.datas_) {
199 Key key(data.key_, data.subscriberId_);
200 lastChangeNodeMap_.Compute(key, [](const Key &, PublishedDataChangeNode &value) {
201 value.datas_.clear();
202 return true;
203 });
204 }
205 std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
206 for (auto &data : changeNode.datas_) {
207 Key key(data.key_, data.subscriberId_);
208 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
209 if (callbacks.empty()) {
210 LOG_WARN("%{private}s nobody subscribe, but still notify", data.key_.c_str());
211 continue;
212 }
213 lastChangeNodeMap_.Compute(key, [&data, &changeNode](const Key &, PublishedDataChangeNode &value) {
214 value.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
215 value.ownerBundleName_ = changeNode.ownerBundleName_;
216 return true;
217 });
218 for (auto const &obs : callbacks) {
219 results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
220 }
221 }
222 for (auto &[callback, node] : results) {
223 node.ownerBundleName_ = changeNode.ownerBundleName_;
224 callback->OnChange(node);
225 }
226 }
227
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)228 void NapiPublishedSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
229 {
230 PublishedDataChangeNode node;
231 for (auto &key : keys) {
232 lastChangeNodeMap_.ComputeIfPresent(key, [&node](const Key &, PublishedDataChangeNode &value) {
233 for (auto &data : value.datas_) {
234 node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
235 }
236 node.ownerBundleName_ = value.ownerBundleName_;
237 return true;
238 });
239 }
240 observer->OnChange(node);
241 }
242 } // namespace DataShare
243 } // namespace OHOS
244