• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "napi_subscriber_manager.h"
17 
18 #include "datashare_log.h"
19 
20 namespace OHOS {
21 namespace DataShare {
AddObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,const TemplateId & templateId)22 std::vector<OperationResult> NapiRdbSubscriberManager::AddObservers(napi_env env, napi_value callback,
23     const std::vector<std::string> &uris, const TemplateId &templateId)
24 {
25     auto datashareHelper = dataShareHelper_.lock();
26     if (datashareHelper == nullptr) {
27         LOG_ERROR("datashareHelper is nullptr");
28         return std::vector<OperationResult>();
29     }
30 
31     std::vector<Key> keys;
32     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
33         keys.emplace_back(uri, templateId);
34     });
35     return BaseCallbacks::AddObservers(
36         keys, std::make_shared<Observer>(env, callback),
37         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
38             Emit(localRegisterKeys, observer);
39         },
40         [&datashareHelper, &templateId, this](const std::vector<Key> &firstAddKeys,
41             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
42             std::vector<std::string> firstAddUris;
43             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
44                 firstAddUris.emplace_back(result);
45             });
46             if (firstAddUris.empty()) {
47                 return;
48             }
49             auto subResults =
50                 datashareHelper->SubscribeRdbData(firstAddUris, templateId, [this](const RdbChangeNode &changeNode) {
51                     Emit(changeNode);
52                 });
53             std::vector<Key> failedKeys;
54             for (auto &subResult : subResults) {
55                 opResult.emplace_back(subResult);
56                 if (subResult.errCode_ != E_OK) {
57                     failedKeys.emplace_back(subResult.key_, templateId);
58                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
59                 }
60             }
61             if (failedKeys.size() > 0) {
62                 BaseCallbacks::DelObservers(failedKeys, observer);
63             }
64         });
65 }
66 
DelObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,const TemplateId & templateId)67 std::vector<OperationResult> NapiRdbSubscriberManager::DelObservers(napi_env env, napi_value callback,
68     const std::vector<std::string> &uris, const TemplateId &templateId)
69 {
70     auto dataShareHelper = dataShareHelper_.lock();
71     if (dataShareHelper == nullptr) {
72         LOG_ERROR("nativeManager is nullptr");
73         return std::vector<OperationResult>();
74     }
75     std::vector<Key> keys;
76     std::for_each(uris.begin(), uris.end(), [&keys, &templateId](auto &uri) {
77         keys.emplace_back(uri, templateId);
78     });
79     return BaseCallbacks::DelObservers(keys, callback == nullptr ? nullptr : std::make_shared<Observer>(env, callback),
80         [&dataShareHelper, &templateId, this](const std::vector<Key> &lastDelKeys,
81             const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &opResult) {
82             std::vector<std::string> lastDelUris;
83             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
84                 lastChangeNodeMap_.Erase(result);
85                 lastDelUris.emplace_back(result);
86             });
87             if (lastDelUris.empty()) {
88                 return;
89             }
90             auto unsubResult = dataShareHelper->UnsubscribeRdbData(lastDelUris, templateId);
91             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
92         });
93 }
94 
Emit(const RdbChangeNode & changeNode)95 void NapiRdbSubscriberManager::Emit(const RdbChangeNode &changeNode)
96 {
97     Key key(changeNode.uri_, changeNode.templateId_);
98     lastChangeNodeMap_.InsertOrAssign(key, changeNode);
99     auto callbacks = BaseCallbacks::GetEnabledObservers(key);
100     for (auto &obs : callbacks) {
101         if (obs != nullptr) {
102             obs->OnChange(changeNode);
103         }
104     }
105 }
106 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)107 void NapiRdbSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
108 {
109     for (auto const &key : keys) {
110         bool isExist = false;
111         RdbChangeNode node;
112         lastChangeNodeMap_.ComputeIfPresent(key, [&node, &isExist](const Key &, const RdbChangeNode &value) {
113             node = value;
114             isExist = true;
115             return true;
116         });
117         if (isExist) {
118             observer->OnChange(node);
119         }
120     }
121 }
122 
AddObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,int64_t subscriberId)123 std::vector<OperationResult> NapiPublishedSubscriberManager::AddObservers(napi_env env, napi_value callback,
124     const std::vector<std::string> &uris, int64_t subscriberId)
125 {
126     auto dataShareHelper = dataShareHelper_.lock();
127     if (dataShareHelper == nullptr) {
128         LOG_ERROR("datashareHelper is nullptr");
129         return std::vector<OperationResult>();
130     }
131 
132     std::vector<Key> keys;
133     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
134         keys.emplace_back(uri, subscriberId);
135     });
136     return BaseCallbacks::AddObservers(
137         keys, std::make_shared<Observer>(env, callback),
138         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
139             Emit(localRegisterKeys, observer);
140         },
141         [&dataShareHelper, &subscriberId, this](const std::vector<Key> &firstAddKeys,
142             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
143             std::vector<std::string> firstAddUris;
144             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
145                 firstAddUris.emplace_back(result);
146             });
147             if (firstAddUris.empty()) {
148                 return;
149             }
150             auto subResults = dataShareHelper->SubscribePublishedData(firstAddUris, subscriberId,
151                 [this](const PublishedDataChangeNode &changeNode) {
152                     Emit(changeNode);
153                 });
154             std::vector<Key> failedKeys;
155             for (auto &subResult : subResults) {
156                 opResult.emplace_back(subResult);
157                 if (subResult.errCode_ != E_OK) {
158                     failedKeys.emplace_back(subResult.key_, subscriberId);
159                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
160                 }
161             }
162             if (failedKeys.size() > 0) {
163                 BaseCallbacks::DelObservers(failedKeys, observer);
164             }
165         });
166 }
167 
DelObservers(napi_env env,napi_value callback,const std::vector<std::string> & uris,int64_t subscriberId)168 std::vector<OperationResult> NapiPublishedSubscriberManager::DelObservers(napi_env env, napi_value callback,
169     const std::vector<std::string> &uris, int64_t subscriberId)
170 {
171     auto dataShareHelper = dataShareHelper_.lock();
172     if (dataShareHelper == nullptr) {
173         LOG_ERROR("nativeManager is nullptr");
174         return std::vector<OperationResult>();
175     }
176     std::vector<Key> keys;
177     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
178         keys.emplace_back(uri, subscriberId);
179     });
180     return BaseCallbacks::DelObservers(keys, callback == nullptr ? nullptr : std::make_shared<Observer>(env, callback),
181         [&dataShareHelper, &subscriberId, &callback, &uris, this](const std::vector<Key> &lastDelKeys,
182             const std::shared_ptr<Observer> &observer, std::vector<OperationResult> &opResult) {
183             std::vector<std::string> lastDelUris;
184             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
185                 lastChangeNodeMap_.Erase(result);
186                 lastDelUris.emplace_back(result);
187             });
188             if (lastDelUris.empty()) {
189                 return;
190             }
191             auto unsubResult = dataShareHelper->UnsubscribePublishedData(lastDelUris, subscriberId);
192             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
193         });
194 }
195 
Emit(const PublishedDataChangeNode & changeNode)196 void NapiPublishedSubscriberManager::Emit(const PublishedDataChangeNode &changeNode)
197 {
198     for (auto &data : changeNode.datas_) {
199         Key key(data.key_, data.subscriberId_);
200         lastChangeNodeMap_.Compute(key, [](const Key &, PublishedDataChangeNode &value) {
201             value.datas_.clear();
202             return true;
203         });
204     }
205     std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
206     for (auto &data : changeNode.datas_) {
207         Key key(data.key_, data.subscriberId_);
208         auto callbacks = BaseCallbacks::GetEnabledObservers(key);
209         if (callbacks.empty()) {
210             LOG_WARN("%{private}s nobody subscribe, but still notify", data.key_.c_str());
211             continue;
212         }
213         lastChangeNodeMap_.Compute(key, [&data, &changeNode](const Key &, PublishedDataChangeNode &value) {
214             value.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
215             value.ownerBundleName_ = changeNode.ownerBundleName_;
216             return true;
217         });
218         for (auto const &obs : callbacks) {
219             results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
220         }
221     }
222     for (auto &[callback, node] : results) {
223         node.ownerBundleName_ = changeNode.ownerBundleName_;
224         callback->OnChange(node);
225     }
226 }
227 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)228 void NapiPublishedSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
229 {
230     PublishedDataChangeNode node;
231     for (auto &key : keys) {
232         lastChangeNodeMap_.ComputeIfPresent(key, [&node](const Key &, PublishedDataChangeNode &value) {
233             for (auto &data : value.datas_) {
234                 node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
235             }
236             node.ownerBundleName_ = value.ownerBundleName_;
237             return true;
238         });
239     }
240     observer->OnChange(node);
241 }
242 } // namespace DataShare
243 } // namespace OHOS
244