• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbSubscriberManager"
16 
17 #include "rdb_subscriber_manager.h"
18 
19 #include "ipc_skeleton.h"
20 #include "general/load_config_data_info_strategy.h"
21 #include "log_print.h"
22 #include "scheduler_manager.h"
23 #include "template_data.h"
24 #include "uri_utils.h"
25 #include "utils/anonymous.h"
26 
27 namespace OHOS::DataShare {
Get(const Key & key,int32_t userId,Template & tpl)28 bool TemplateManager::Get(const Key &key, int32_t userId, Template &tpl)
29 {
30     return TemplateData::Query(Id(TemplateData::GenId(key.uri, key.bundleName, key.subscriberId), userId), tpl) == E_OK;
31 }
32 
Add(const Key & key,int32_t userId,const Template & tpl)33 int32_t TemplateManager::Add(const Key &key, int32_t userId, const Template &tpl)
34 {
35     auto status = TemplateData::Add(key.uri, userId, key.bundleName, key.subscriberId, tpl);
36     if (!status) {
37         ZLOGE("Add failed, %{public}d", status);
38         return E_ERROR;
39     }
40     return E_OK;
41 }
42 
Delete(const Key & key,int32_t userId)43 int32_t TemplateManager::Delete(const Key &key, int32_t userId)
44 {
45     auto status = TemplateData::Delete(key.uri, userId, key.bundleName, key.subscriberId);
46     if (!status) {
47         ZLOGE("Delete failed, %{public}d", status);
48         return E_ERROR;
49     }
50     SchedulerManager::GetInstance().RemoveTimer(key);
51     return E_OK;
52 }
53 
Key(const std::string & uri,int64_t subscriberId,const std::string & bundleName)54 Key::Key(const std::string &uri, int64_t subscriberId, const std::string &bundleName)
55     : uri(uri), subscriberId(subscriberId), bundleName(bundleName)
56 {
57 }
58 
operator ==(const Key & rhs) const59 bool Key::operator==(const Key &rhs) const
60 {
61     return uri == rhs.uri && subscriberId == rhs.subscriberId && bundleName == rhs.bundleName;
62 }
63 
operator !=(const Key & rhs) const64 bool Key::operator!=(const Key &rhs) const
65 {
66     return !(rhs == *this);
67 }
operator <(const Key & rhs) const68 bool Key::operator<(const Key &rhs) const
69 {
70     if (uri < rhs.uri) {
71         return true;
72     }
73     if (rhs.uri < uri) {
74         return false;
75     }
76     if (subscriberId < rhs.subscriberId) {
77         return true;
78     }
79     if (rhs.subscriberId < subscriberId) {
80         return false;
81     }
82     return bundleName < rhs.bundleName;
83 }
operator >(const Key & rhs) const84 bool Key::operator>(const Key &rhs) const
85 {
86     return rhs < *this;
87 }
operator <=(const Key & rhs) const88 bool Key::operator<=(const Key &rhs) const
89 {
90     return !(rhs < *this);
91 }
operator >=(const Key & rhs) const92 bool Key::operator>=(const Key &rhs) const
93 {
94     return !(*this < rhs);
95 }
96 
TemplateManager()97 TemplateManager::TemplateManager() {}
98 
GetInstance()99 TemplateManager &TemplateManager::GetInstance()
100 {
101     static TemplateManager manager;
102     return manager;
103 }
104 
GetInstance()105 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
106 {
107     static RdbSubscriberManager manager;
108     return manager;
109 }
110 
Add(const Key & key,const sptr<IDataProxyRdbObserver> observer,std::shared_ptr<Context> context,std::shared_ptr<ExecutorPool> executorPool)111 int RdbSubscriberManager::Add(const Key &key, const sptr<IDataProxyRdbObserver> observer,
112     std::shared_ptr<Context> context, std::shared_ptr<ExecutorPool> executorPool)
113 {
114     int result = E_OK;
115     rdbCache_.Compute(key, [&observer, &context, executorPool, this](const auto &key, auto &value) {
116         ZLOGI("add subscriber, uri %{private}s tokenId 0x%{public}x", key.uri.c_str(), context->callerTokenId);
117         auto callerTokenId = IPCSkeleton::GetCallingTokenID();
118         value.emplace_back(observer, context->callerTokenId, callerTokenId);
119         std::vector<ObserverNode> node;
120         node.emplace_back(observer, context->callerTokenId, callerTokenId);
121         ExecutorPool::Task task = [key, node, context, this]() {
122             LoadConfigDataInfoStrategy loadDataInfo;
123             if (!loadDataInfo(context)) {
124                 ZLOGE("loadDataInfo failed, uri %{public}s tokenId 0x%{public}x",
125                     DistributedData::Anonymous::Change(key.uri).c_str(), context->callerTokenId);
126                 return;
127             }
128             Notify(key, context->currentUserId, node, context->calledSourceDir, context->version);
129             if (GetEnableObserverCount(key) == 1) {
130                 SchedulerManager::GetInstance().Execute(
131                     key, context->currentUserId, context->calledSourceDir, context->version);
132             }
133         };
134         executorPool->Execute(task);
135         return true;
136     });
137     return result;
138 }
139 
Delete(const Key & key,uint32_t firstCallerTokenId)140 int RdbSubscriberManager::Delete(const Key &key, uint32_t firstCallerTokenId)
141 {
142     auto result =
143         rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, this](const auto &key,
144             std::vector<ObserverNode> &value) {
145             ZLOGI("delete subscriber, uri %{public}s tokenId 0x%{public}x",
146                 DistributedData::Anonymous::Change(key.uri).c_str(), firstCallerTokenId);
147             for (auto it = value.begin(); it != value.end();) {
148                 if (it->firstCallerTokenId == firstCallerTokenId) {
149                     ZLOGI("erase start");
150                     it = value.erase(it);
151                 } else {
152                     it++;
153                 }
154             }
155             if (GetEnableObserverCount(key) == 0) {
156                 SchedulerManager::GetInstance().RemoveTimer(key);
157             }
158             return !value.empty();
159         });
160     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
161 }
162 
Delete(uint32_t callerTokenId)163 void RdbSubscriberManager::Delete(uint32_t callerTokenId)
164 {
165     rdbCache_.EraseIf([&callerTokenId, this](const auto &key, std::vector<ObserverNode> &value) {
166         for (auto it = value.begin(); it != value.end();) {
167             if (it->callerTokenId == callerTokenId) {
168                 ZLOGI("erase start, uri is %{public}s, tokenId 0x%{public}x",
169                     DistributedData::Anonymous::Change(key.uri).c_str(), callerTokenId);
170                 it = value.erase(it);
171             } else {
172                 it++;
173             }
174         }
175         if (GetEnableObserverCount(key) == 0) {
176             SchedulerManager::GetInstance().RemoveTimer(key);
177         }
178         return value.empty();
179     });
180 }
181 
Disable(const Key & key,uint32_t firstCallerTokenId)182 int RdbSubscriberManager::Disable(const Key &key, uint32_t firstCallerTokenId)
183 {
184     auto result =
185         rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, this](const auto &key,
186             std::vector<ObserverNode> &value) {
187             for (auto it = value.begin(); it != value.end(); it++) {
188                 if (it->firstCallerTokenId == firstCallerTokenId) {
189                     it->enabled = false;
190                     it->isNotifyOnEnabled = false;
191                 }
192             }
193             return true;
194         });
195     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
196 }
197 
Enable(const Key & key,std::shared_ptr<Context> context)198 int RdbSubscriberManager::Enable(const Key &key, std::shared_ptr<Context> context)
199 {
200     auto result = rdbCache_.ComputeIfPresent(key, [&context, this](const auto &key, std::vector<ObserverNode> &value) {
201         for (auto it = value.begin(); it != value.end(); it++) {
202             if (it->firstCallerTokenId != context->callerTokenId) {
203                 continue;
204             }
205             it->enabled = true;
206             if (it->isNotifyOnEnabled) {
207                 std::vector<ObserverNode> node;
208                 node.emplace_back(it->observer, context->callerTokenId);
209                 LoadConfigDataInfoStrategy loadDataInfo;
210                 if (loadDataInfo(context)) {
211                     Notify(key, context->currentUserId, node, context->calledSourceDir, context->version);
212                 }
213             }
214         }
215         return true;
216     });
217     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
218 }
219 
Emit(const std::string & uri,std::shared_ptr<Context> context)220 void RdbSubscriberManager::Emit(const std::string &uri, std::shared_ptr<Context> context)
221 {
222     if (!URIUtils::IsDataProxyURI(uri)) {
223         return;
224     }
225     if (context->calledSourceDir.empty()) {
226         LoadConfigDataInfoStrategy loadDataInfo;
227         loadDataInfo(context);
228     }
229     rdbCache_.ForEach([&uri, &context, this](const Key &key, std::vector<ObserverNode> &val) {
230         if (key.uri != uri) {
231             return false;
232         }
233         Notify(key, context->currentUserId, val, context->calledSourceDir, context->version);
234         SetObserverNotifyOnEnabled(val);
235         return false;
236     });
237     SchedulerManager::GetInstance().Execute(
238         uri, context->currentUserId, context->calledSourceDir, context->version);
239 }
240 
SetObserverNotifyOnEnabled(std::vector<ObserverNode> & nodes)241 void RdbSubscriberManager::SetObserverNotifyOnEnabled(std::vector<ObserverNode> &nodes)
242 {
243     for (auto &node : nodes) {
244         if (!node.enabled) {
245             node.isNotifyOnEnabled = true;
246         }
247     }
248 }
249 
GetKeysByUri(const std::string & uri)250 std::vector<Key> RdbSubscriberManager::GetKeysByUri(const std::string &uri)
251 {
252     std::vector<Key> results;
253     rdbCache_.ForEach([&uri, &results](const Key &key, std::vector<ObserverNode> &val) {
254         if (key.uri != uri) {
255             return false;
256         }
257         results.emplace_back(key);
258         return false;
259     });
260     return results;
261 }
262 
EmitByKey(const Key & key,int32_t userId,const std::string & rdbPath,int version)263 void RdbSubscriberManager::EmitByKey(const Key &key, int32_t userId, const std::string &rdbPath, int version)
264 {
265     if (!URIUtils::IsDataProxyURI(key.uri)) {
266         return;
267     }
268     rdbCache_.ComputeIfPresent(key, [&rdbPath, &version, &userId, this](const Key &key, auto &val) {
269         Notify(key, userId, val, rdbPath, version);
270         SetObserverNotifyOnEnabled(val);
271         return true;
272     });
273 }
274 
GetEnableObserverCount(const Key & key)275 int RdbSubscriberManager::GetEnableObserverCount(const Key &key)
276 {
277     auto pair = rdbCache_.Find(key);
278     if (!pair.first) {
279         return 0;
280     }
281     int count = 0;
282     for (const auto &observer : pair.second) {
283         if (observer.enabled) {
284             count++;
285         }
286     }
287     return count;
288 }
289 
Notify(const Key & key,int32_t userId,const std::vector<ObserverNode> & val,const std::string & rdbDir,int rdbVersion)290 int RdbSubscriberManager::Notify(const Key &key, int32_t userId, const std::vector<ObserverNode> &val,
291     const std::string &rdbDir, int rdbVersion)
292 {
293     Template tpl;
294     if (!TemplateManager::GetInstance().Get(key, userId, tpl)) {
295         ZLOGE("template undefined, %{public}s, %{public}" PRId64 ", %{public}s",
296             DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
297         return E_TEMPLATE_NOT_EXIST;
298     }
299     auto delegate = DBDelegate::Create(rdbDir, rdbVersion, true);
300     if (delegate == nullptr) {
301         ZLOGE("Create fail %{public}s %{public}s", DistributedData::Anonymous::Change(key.uri).c_str(),
302             key.bundleName.c_str());
303         return E_ERROR;
304     }
305     RdbChangeNode changeNode;
306     changeNode.uri_ = key.uri;
307     changeNode.templateId_.subscriberId_ = key.subscriberId;
308     changeNode.templateId_.bundleName_ = key.bundleName;
309     for (const auto &predicate : tpl.predicates_) {
310         std::string result = delegate->Query(predicate.selectSql_);
311         if (result.empty()) {
312             continue;
313         }
314         changeNode.data_.emplace_back("{\"" + predicate.key_ + "\":" + result + "}");
315     }
316 
317     ZLOGI("emit, size %{public}zu %{private}s", val.size(), changeNode.uri_.c_str());
318     for (const auto &callback : val) {
319         if (callback.enabled && callback.observer != nullptr) {
320             callback.observer->OnChangeFromRdb(changeNode);
321         }
322     }
323     return E_OK;
324 }
325 
Clear()326 void RdbSubscriberManager::Clear()
327 {
328     rdbCache_.Clear();
329 }
330 
Emit(const std::string & uri,int64_t subscriberId,std::shared_ptr<Context> context)331 void RdbSubscriberManager::Emit(const std::string &uri, int64_t subscriberId, std::shared_ptr<Context> context)
332 {
333     if (!URIUtils::IsDataProxyURI(uri)) {
334         return;
335     }
336     if (context->calledSourceDir.empty()) {
337         LoadConfigDataInfoStrategy loadDataInfo;
338         loadDataInfo(context);
339     }
340     rdbCache_.ForEach([&uri, &context, &subscriberId, this](const Key &key, std::vector<ObserverNode> &val) {
341         if (key.uri != uri || key.subscriberId != subscriberId) {
342             return false;
343         }
344         Notify(key, context->currentUserId, val, context->calledSourceDir, context->version);
345         SetObserverNotifyOnEnabled(val);
346         return false;
347     });
348     SchedulerManager::GetInstance().Execute(
349         uri, context->currentUserId, context->calledSourceDir, context->version);
350 }
ObserverNode(const sptr<IDataProxyRdbObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId)351 RdbSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyRdbObserver> &observer,
352     uint32_t firstCallerTokenId, uint32_t callerTokenId)
353     : observer(observer), firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId)
354 {
355 }
356 } // namespace OHOS::DataShare