• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbSubscriberManager"
16 
17 #include "rdb_subscriber_manager.h"
18 
19 #include <cinttypes>
20 #include <utility>
21 
22 #include "ipc_skeleton.h"
23 #include "general/load_config_data_info_strategy.h"
24 #include "log_print.h"
25 #include "scheduler_manager.h"
26 #include "template_data.h"
27 #include "utils.h"
28 #include "utils/anonymous.h"
29 
30 namespace OHOS::DataShare {
Get(const Key & key,int32_t userId,Template & tpl)31 bool TemplateManager::Get(const Key &key, int32_t userId, Template &tpl)
32 {
33     return TemplateData::Query(Id(TemplateData::GenId(key.uri, key.bundleName, key.subscriberId), userId), tpl) == E_OK;
34 }
35 
Add(const Key & key,int32_t userId,const Template & tpl)36 int32_t TemplateManager::Add(const Key &key, int32_t userId, const Template &tpl)
37 {
38     auto status = TemplateData::Add(key.uri, userId, key.bundleName, key.subscriberId, tpl);
39     if (!status) {
40         ZLOGE("Add failed, %{public}d", status);
41         return E_ERROR;
42     }
43     return E_OK;
44 }
45 
Delete(const Key & key,int32_t userId)46 int32_t TemplateManager::Delete(const Key &key, int32_t userId)
47 {
48     auto status = TemplateData::Delete(key.uri, userId, key.bundleName, key.subscriberId);
49     if (!status) {
50         ZLOGE("Delete failed, %{public}d", status);
51         return E_ERROR;
52     }
53     SchedulerManager::GetInstance().Stop(key);
54     return E_OK;
55 }
56 
Key(const std::string & uri,int64_t subscriberId,const std::string & bundleName)57 Key::Key(const std::string &uri, int64_t subscriberId, const std::string &bundleName)
58     : uri(uri), subscriberId(subscriberId), bundleName(bundleName)
59 {
60 }
61 
operator ==(const Key & rhs) const62 bool Key::operator==(const Key &rhs) const
63 {
64     return uri == rhs.uri && subscriberId == rhs.subscriberId && bundleName == rhs.bundleName;
65 }
66 
operator !=(const Key & rhs) const67 bool Key::operator!=(const Key &rhs) const
68 {
69     return !(rhs == *this);
70 }
operator <(const Key & rhs) const71 bool Key::operator<(const Key &rhs) const
72 {
73     if (uri < rhs.uri) {
74         return true;
75     }
76     if (rhs.uri < uri) {
77         return false;
78     }
79     if (subscriberId < rhs.subscriberId) {
80         return true;
81     }
82     if (rhs.subscriberId < subscriberId) {
83         return false;
84     }
85     return bundleName < rhs.bundleName;
86 }
operator >(const Key & rhs) const87 bool Key::operator>(const Key &rhs) const
88 {
89     return rhs < *this;
90 }
operator <=(const Key & rhs) const91 bool Key::operator<=(const Key &rhs) const
92 {
93     return !(rhs < *this);
94 }
operator >=(const Key & rhs) const95 bool Key::operator>=(const Key &rhs) const
96 {
97     return !(*this < rhs);
98 }
99 
TemplateManager()100 TemplateManager::TemplateManager() {}
101 
GetInstance()102 TemplateManager &TemplateManager::GetInstance()
103 {
104     static TemplateManager manager;
105     return manager;
106 }
107 
GetInstance()108 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
109 {
110     static RdbSubscriberManager manager;
111     return manager;
112 }
113 
Add(const Key & key,const sptr<IDataProxyRdbObserver> observer,std::shared_ptr<Context> context,std::shared_ptr<ExecutorPool> executorPool)114 int RdbSubscriberManager::Add(const Key &key, const sptr<IDataProxyRdbObserver> observer,
115     std::shared_ptr<Context> context, std::shared_ptr<ExecutorPool> executorPool)
116 {
117     int result = E_OK;
118     rdbCache_.Compute(key, [&observer, &context, executorPool, this](const auto &key, auto &value) {
119         ZLOGI("add subscriber, uri %{public}s tokenId 0x%{public}x",
120             URIUtils::Anonymous(key.uri).c_str(), context->callerTokenId);
121         auto callerTokenId = IPCSkeleton::GetCallingTokenID();
122         auto callerPid = IPCSkeleton::GetCallingPid();
123         value.emplace_back(observer, context->callerTokenId, callerTokenId, callerPid, context->visitedUserId);
124         std::vector<ObserverNode> node;
125         node.emplace_back(observer, context->callerTokenId, callerTokenId, callerPid, context->visitedUserId);
126         ExecutorPool::Task task = [key, node, context, this]() {
127             LoadConfigDataInfoStrategy loadDataInfo;
128             if (!loadDataInfo(context)) {
129                 ZLOGE("loadDataInfo failed, uri %{public}s tokenId 0x%{public}x",
130                     URIUtils::Anonymous(key.uri).c_str(), context->callerTokenId);
131                 return;
132             }
133             DistributedData::StoreMetaData metaData = RdbSubscriberManager::GenMetaDataFromContext(context);
134             Notify(key, context->visitedUserId, node, metaData);
135             if (GetEnableObserverCount(key) == 1) {
136                 SchedulerManager::GetInstance().Start(key, context->visitedUserId, metaData);
137             }
138         };
139         executorPool->Execute(task);
140         return true;
141     });
142     return result;
143 }
144 
Delete(const Key & key,uint32_t firstCallerTokenId)145 int RdbSubscriberManager::Delete(const Key &key, uint32_t firstCallerTokenId)
146 {
147     auto result =
148         rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, this](const auto &key,
149             std::vector<ObserverNode> &value) {
150             ZLOGI("delete subscriber, uri %{public}s tokenId 0x%{public}x",
151                 URIUtils::Anonymous(key.uri).c_str(), firstCallerTokenId);
152             for (auto it = value.begin(); it != value.end();) {
153                 if (it->firstCallerTokenId == firstCallerTokenId) {
154                     ZLOGI("erase start");
155                     it = value.erase(it);
156                 } else {
157                     it++;
158                 }
159             }
160             if (value.empty()) {
161                 SchedulerManager::GetInstance().Stop(key);
162             }
163             return !value.empty();
164         });
165     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
166 }
167 
Delete(uint32_t callerTokenId,uint32_t callerPid)168 void RdbSubscriberManager::Delete(uint32_t callerTokenId, uint32_t callerPid)
169 {
170     rdbCache_.EraseIf([&callerTokenId, &callerPid, this](const auto &key, std::vector<ObserverNode> &value) {
171         for (auto it = value.begin(); it != value.end();) {
172             if (it->callerTokenId == callerTokenId && it->callerPid == callerPid) {
173                 it = value.erase(it);
174             } else {
175                 it++;
176             }
177         }
178         if (value.empty()) {
179             ZLOGI("delete timer, subId %{public}" PRId64 ", bundleName %{public}s, tokenId %{public}x, uri %{public}s.",
180                 key.subscriberId, key.bundleName.c_str(), callerTokenId,
181                 URIUtils::Anonymous(key.uri).c_str());
182             SchedulerManager::GetInstance().Stop(key);
183         }
184         return value.empty();
185     });
186 }
187 
Disable(const Key & key,uint32_t firstCallerTokenId)188 int RdbSubscriberManager::Disable(const Key &key, uint32_t firstCallerTokenId)
189 {
190     bool isAllDisabled = true;
191     bool result =
192         rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, &isAllDisabled, this](const auto &key,
193             std::vector<ObserverNode> &value) {
194             for (auto it = value.begin(); it != value.end(); it++) {
195                 if (it->firstCallerTokenId == firstCallerTokenId) {
196                     it->enabled = false;
197                     it->isNotifyOnEnabled = false;
198                 }
199                 if (it->enabled) {
200                     isAllDisabled = false;
201                 }
202             }
203             return true;
204         });
205     if (isAllDisabled) {
206         SchedulerManager::GetInstance().Disable(key);
207     }
208     if (!result) {
209         ZLOGE("disable failed, uri is %{public}s, bundleName is %{public}s, subscriberId is %{public}" PRId64,
210             URIUtils::Anonymous(key.uri).c_str(), key.bundleName.c_str(), key.subscriberId);
211     }
212     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
213 }
214 
Enable(const Key & key,std::shared_ptr<Context> context)215 int RdbSubscriberManager::Enable(const Key &key, std::shared_ptr<Context> context)
216 {
217     bool isChanged = false;
218     DistributedData::StoreMetaData metaData;
219     bool result = rdbCache_.ComputeIfPresent(key, [&context, &metaData, &isChanged, this](const auto &key,
220         std::vector<ObserverNode> &value) {
221         for (auto it = value.begin(); it != value.end(); it++) {
222             if (it->firstCallerTokenId != context->callerTokenId) {
223                 continue;
224             }
225             it->enabled = true;
226             LoadConfigDataInfoStrategy loadDataInfo;
227             if (!loadDataInfo(context)) {
228                 return true;
229             }
230             isChanged = true;
231             metaData = RdbSubscriberManager::GenMetaDataFromContext(context);
232             if (it->isNotifyOnEnabled) {
233                 std::vector<ObserverNode> node;
234                 node.emplace_back(it->observer, context->callerTokenId);
235                 Notify(key, context->visitedUserId, node, metaData);
236             }
237         }
238         return true;
239     });
240     if (isChanged) {
241         SchedulerManager::GetInstance().Enable(key, context->visitedUserId, metaData);
242     }
243     if (!result) {
244         ZLOGE("enable failed, uri is %{public}s, bundleName is %{public}s, subscriberId is %{public}" PRId64,
245             URIUtils::Anonymous(key.uri).c_str(), key.bundleName.c_str(), key.subscriberId);
246     }
247     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
248 }
249 
Emit(const std::string & uri,std::shared_ptr<Context> context)250 void RdbSubscriberManager::Emit(const std::string &uri, std::shared_ptr<Context> context)
251 {
252     if (!URIUtils::IsDataProxyURI(uri)) {
253         return;
254     }
255     if (context->calledSourceDir.empty()) {
256         LoadConfigDataInfoStrategy loadDataInfo;
257         loadDataInfo(context);
258     }
259     DistributedData::StoreMetaData metaData = RdbSubscriberManager::GenMetaDataFromContext(context);
260     rdbCache_.ForEach([&uri, &context, &metaData, this](const Key &key, std::vector<ObserverNode> &val) {
261         if (key.uri != uri) {
262             return false;
263         }
264         Notify(key, context->visitedUserId, val, metaData);
265         SetObserverNotifyOnEnabled(val);
266         return false;
267     });
268     SchedulerManager::GetInstance().Execute(
269         uri, context->visitedUserId, metaData);
270 }
271 
Emit(const std::string & uri,int32_t userId,DistributedData::StoreMetaData & metaData)272 void RdbSubscriberManager::Emit(const std::string &uri, int32_t userId,
273     DistributedData::StoreMetaData &metaData)
274 {
275     if (!URIUtils::IsDataProxyURI(uri)) {
276         return;
277     }
278     bool hasObserver = false;
279     rdbCache_.ForEach([&uri, &userId, &metaData, &hasObserver, this](const Key &key, std::vector<ObserverNode> &val) {
280         if (key.uri != uri) {
281             return false;
282         }
283         hasObserver = true;
284         Notify(key, userId, val, metaData);
285         SetObserverNotifyOnEnabled(val);
286         return false;
287     });
288     if (!hasObserver) {
289         return;
290     }
291     SchedulerManager::GetInstance().Execute(
292         uri, userId, metaData);
293 }
294 
SetObserverNotifyOnEnabled(std::vector<ObserverNode> & nodes)295 void RdbSubscriberManager::SetObserverNotifyOnEnabled(std::vector<ObserverNode> &nodes)
296 {
297     for (auto &node : nodes) {
298         if (!node.enabled) {
299             node.isNotifyOnEnabled = true;
300         }
301     }
302 }
303 
GetKeysByUri(const std::string & uri)304 std::vector<Key> RdbSubscriberManager::GetKeysByUri(const std::string &uri)
305 {
306     std::vector<Key> results;
307     rdbCache_.ForEach([&uri, &results](const Key &key, std::vector<ObserverNode> &val) {
308         if (key.uri != uri) {
309             return false;
310         }
311         results.emplace_back(key);
312         return false;
313     });
314     return results;
315 }
316 
EmitByKey(const Key & key,int32_t userId,const DistributedData::StoreMetaData & metaData)317 void RdbSubscriberManager::EmitByKey(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData)
318 {
319     if (!URIUtils::IsDataProxyURI(key.uri)) {
320         return;
321     }
322     rdbCache_.ComputeIfPresent(key, [&userId, &metaData, this](const Key &key, auto &val) {
323         Notify(key, userId, val, metaData);
324         SetObserverNotifyOnEnabled(val);
325         return true;
326     });
327 }
328 
GetEnableObserverCount(const Key & key)329 int RdbSubscriberManager::GetEnableObserverCount(const Key &key)
330 {
331     auto pair = rdbCache_.Find(key);
332     if (!pair.first) {
333         return 0;
334     }
335     int count = 0;
336     for (const auto &observer : pair.second) {
337         if (observer.enabled) {
338             count++;
339         }
340     }
341     return count;
342 }
343 
Notify(const Key & key,int32_t userId,const std::vector<ObserverNode> & val,const DistributedData::StoreMetaData & metaData)344 int RdbSubscriberManager::Notify(const Key &key, int32_t userId, const std::vector<ObserverNode> &val,
345     const DistributedData::StoreMetaData &metaData)
346 {
347     Template tpl;
348     if (!TemplateManager::GetInstance().Get(key, userId, tpl)) {
349         ZLOGE("template undefined, %{public}s, %{public}" PRId64 ", %{public}s",
350             URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
351         return E_TEMPLATE_NOT_EXIST;
352     }
353     DistributedData::StoreMetaData meta = metaData;
354     meta.user = std::to_string(userId);
355     auto delegate = DBDelegate::Create(meta, key.uri);
356     if (delegate == nullptr) {
357         ZLOGE("Create fail %{public}s %{public}s", URIUtils::Anonymous(key.uri).c_str(),
358             key.bundleName.c_str());
359         return E_ERROR;
360     }
361     RdbChangeNode changeNode;
362     changeNode.uri_ = key.uri;
363     changeNode.templateId_.subscriberId_ = key.subscriberId;
364     changeNode.templateId_.bundleName_ = key.bundleName;
365     for (const auto &predicate : tpl.predicates_) {
366         std::string result = delegate->Query(predicate.selectSql_);
367         if (result.empty()) {
368             continue;
369         }
370         changeNode.data_.emplace_back("{\"" + predicate.key_ + "\":" + result + "}");
371     }
372     if (!tpl.update_.empty()) {
373         auto [errCode, rowCount] = delegate->UpdateSql(tpl.update_);
374         if (errCode != E_OK) {
375             ZLOGE("Update failed, err:%{public}d, %{public}s, %{public}" PRId64 ", %{public}s",
376             errCode, URIUtils::Anonymous(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
377         }
378     }
379 
380     ZLOGI("emit, valSize: %{public}zu, dataSize:%{public}zu, uri:%{public}s,",
381         val.size(), changeNode.data_.size(), URIUtils::Anonymous(changeNode.uri_).c_str());
382     for (const auto &callback : val) {
383         // not notify across user
384         if (callback.userId != userId && userId != 0 && callback.userId != 0) {
385             ZLOGI("Not allow across notify, uri:%{public}s, from %{public}d to %{public}d.",
386                 URIUtils::Anonymous(changeNode.uri_).c_str(), userId, callback.userId);
387             continue;
388         }
389         if (callback.enabled && callback.observer != nullptr) {
390             callback.observer->OnChangeFromRdb(changeNode);
391         }
392     }
393     return E_OK;
394 }
395 
Clear()396 void RdbSubscriberManager::Clear()
397 {
398     rdbCache_.Clear();
399 }
400 
Emit(const std::string & uri,int64_t subscriberId,const std::string & bundleName,std::shared_ptr<Context> context)401 void RdbSubscriberManager::Emit(const std::string &uri, int64_t subscriberId,
402     const std::string &bundleName, std::shared_ptr<Context> context)
403 {
404     if (!URIUtils::IsDataProxyURI(uri)) {
405         return;
406     }
407     if (context->calledSourceDir.empty()) {
408         LoadConfigDataInfoStrategy loadDataInfo;
409         loadDataInfo(context);
410     }
411     DistributedData::StoreMetaData metaData = RdbSubscriberManager::GenMetaDataFromContext(context);
412     rdbCache_.ForEach([&uri, &context, &subscriberId, &metaData, this](const Key &key, std::vector<ObserverNode> &val) {
413         if (key.uri != uri || key.subscriberId != subscriberId) {
414             return false;
415         }
416         Notify(key, context->visitedUserId, val, metaData);
417         SetObserverNotifyOnEnabled(val);
418         return false;
419     });
420     Key executeKey(uri, subscriberId, bundleName);
421     SchedulerManager::GetInstance().Start(executeKey, context->visitedUserId, metaData);
422 }
423 
GenMetaDataFromContext(const std::shared_ptr<Context> context)424 DistributedData::StoreMetaData RdbSubscriberManager::GenMetaDataFromContext(const std::shared_ptr<Context> context)
425 {
426     DistributedData::StoreMetaData metaData;
427     metaData.tokenId = context->calledTokenId;
428     metaData.dataDir = context->calledSourceDir;
429     metaData.storeId = context->calledStoreName;
430     metaData.haMode = context->haMode;
431     metaData.isEncrypt = context->isEncryptDb;
432     metaData.bundleName = context->calledBundleName;
433     return metaData;
434 }
435 
ObserverNode(const sptr<IDataProxyRdbObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId,uint32_t callerPid,int32_t userId)436 RdbSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyRdbObserver> &observer,
437     uint32_t firstCallerTokenId, uint32_t callerTokenId, uint32_t callerPid, int32_t userId): observer(observer),
438     firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId), callerPid(callerPid), userId(userId)
439 {
440 }
441 } // namespace OHOS::DataShare