1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbSubscriberManager"
16
17 #include "rdb_subscriber_manager.h"
18
19 #include <cinttypes>
20 #include <utility>
21
22 #include "ipc_skeleton.h"
23 #include "general/load_config_data_info_strategy.h"
24 #include "log_print.h"
25 #include "scheduler_manager.h"
26 #include "template_data.h"
27 #include "uri_utils.h"
28 #include "utils/anonymous.h"
29
30 namespace OHOS::DataShare {
Get(const Key & key,int32_t userId,Template & tpl)31 bool TemplateManager::Get(const Key &key, int32_t userId, Template &tpl)
32 {
33 return TemplateData::Query(Id(TemplateData::GenId(key.uri, key.bundleName, key.subscriberId), userId), tpl) == E_OK;
34 }
35
Add(const Key & key,int32_t userId,const Template & tpl)36 int32_t TemplateManager::Add(const Key &key, int32_t userId, const Template &tpl)
37 {
38 auto status = TemplateData::Add(key.uri, userId, key.bundleName, key.subscriberId, tpl);
39 if (!status) {
40 ZLOGE("Add failed, %{public}d", status);
41 return E_ERROR;
42 }
43 return E_OK;
44 }
45
Delete(const Key & key,int32_t userId)46 int32_t TemplateManager::Delete(const Key &key, int32_t userId)
47 {
48 auto status = TemplateData::Delete(key.uri, userId, key.bundleName, key.subscriberId);
49 if (!status) {
50 ZLOGE("Delete failed, %{public}d", status);
51 return E_ERROR;
52 }
53 SchedulerManager::GetInstance().RemoveTimer(key);
54 return E_OK;
55 }
56
Key(const std::string & uri,int64_t subscriberId,const std::string & bundleName)57 Key::Key(const std::string &uri, int64_t subscriberId, const std::string &bundleName)
58 : uri(uri), subscriberId(subscriberId), bundleName(bundleName)
59 {
60 }
61
operator ==(const Key & rhs) const62 bool Key::operator==(const Key &rhs) const
63 {
64 return uri == rhs.uri && subscriberId == rhs.subscriberId && bundleName == rhs.bundleName;
65 }
66
operator !=(const Key & rhs) const67 bool Key::operator!=(const Key &rhs) const
68 {
69 return !(rhs == *this);
70 }
operator <(const Key & rhs) const71 bool Key::operator<(const Key &rhs) const
72 {
73 if (uri < rhs.uri) {
74 return true;
75 }
76 if (rhs.uri < uri) {
77 return false;
78 }
79 if (subscriberId < rhs.subscriberId) {
80 return true;
81 }
82 if (rhs.subscriberId < subscriberId) {
83 return false;
84 }
85 return bundleName < rhs.bundleName;
86 }
operator >(const Key & rhs) const87 bool Key::operator>(const Key &rhs) const
88 {
89 return rhs < *this;
90 }
operator <=(const Key & rhs) const91 bool Key::operator<=(const Key &rhs) const
92 {
93 return !(rhs < *this);
94 }
operator >=(const Key & rhs) const95 bool Key::operator>=(const Key &rhs) const
96 {
97 return !(*this < rhs);
98 }
99
TemplateManager()100 TemplateManager::TemplateManager() {}
101
GetInstance()102 TemplateManager &TemplateManager::GetInstance()
103 {
104 static TemplateManager manager;
105 return manager;
106 }
107
GetInstance()108 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
109 {
110 static RdbSubscriberManager manager;
111 return manager;
112 }
113
Add(const Key & key,const sptr<IDataProxyRdbObserver> observer,std::shared_ptr<Context> context,std::shared_ptr<ExecutorPool> executorPool)114 int RdbSubscriberManager::Add(const Key &key, const sptr<IDataProxyRdbObserver> observer,
115 std::shared_ptr<Context> context, std::shared_ptr<ExecutorPool> executorPool)
116 {
117 int result = E_OK;
118 rdbCache_.Compute(key, [&observer, &context, executorPool, this](const auto &key, auto &value) {
119 ZLOGI("add subscriber, uri %{private}s tokenId 0x%{public}x", key.uri.c_str(), context->callerTokenId);
120 auto callerTokenId = IPCSkeleton::GetCallingTokenID();
121 value.emplace_back(observer, context->callerTokenId, callerTokenId);
122 std::vector<ObserverNode> node;
123 node.emplace_back(observer, context->callerTokenId, callerTokenId);
124 ExecutorPool::Task task = [key, node, context, this]() {
125 LoadConfigDataInfoStrategy loadDataInfo;
126 if (!loadDataInfo(context)) {
127 ZLOGE("loadDataInfo failed, uri %{public}s tokenId 0x%{public}x",
128 DistributedData::Anonymous::Change(key.uri).c_str(), context->callerTokenId);
129 return;
130 }
131 Notify(key, context->currentUserId, node, context->calledSourceDir, context->version);
132 if (GetEnableObserverCount(key) == 1) {
133 SchedulerManager::GetInstance().Execute(
134 key, context->currentUserId, context->calledSourceDir, context->version);
135 }
136 };
137 executorPool->Execute(task);
138 return true;
139 });
140 return result;
141 }
142
Delete(const Key & key,uint32_t firstCallerTokenId)143 int RdbSubscriberManager::Delete(const Key &key, uint32_t firstCallerTokenId)
144 {
145 auto result =
146 rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, this](const auto &key,
147 std::vector<ObserverNode> &value) {
148 ZLOGI("delete subscriber, uri %{public}s tokenId 0x%{public}x",
149 DistributedData::Anonymous::Change(key.uri).c_str(), firstCallerTokenId);
150 for (auto it = value.begin(); it != value.end();) {
151 if (it->firstCallerTokenId == firstCallerTokenId) {
152 ZLOGI("erase start");
153 it = value.erase(it);
154 } else {
155 it++;
156 }
157 }
158 if (value.empty()) {
159 SchedulerManager::GetInstance().RemoveTimer(key);
160 }
161 return !value.empty();
162 });
163 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
164 }
165
Delete(uint32_t callerTokenId)166 void RdbSubscriberManager::Delete(uint32_t callerTokenId)
167 {
168 rdbCache_.EraseIf([&callerTokenId, this](const auto &key, std::vector<ObserverNode> &value) {
169 for (auto it = value.begin(); it != value.end();) {
170 if (it->callerTokenId == callerTokenId) {
171 it = value.erase(it);
172 } else {
173 it++;
174 }
175 }
176 if (value.empty()) {
177 ZLOGI("delete timer, subId %{public}" PRId64 ", bundleName %{public}s, tokenId %{public}x, uri %{public}s.",
178 key.subscriberId, key.bundleName.c_str(), callerTokenId,
179 DistributedData::Anonymous::Change(key.uri).c_str());
180 SchedulerManager::GetInstance().RemoveTimer(key);
181 }
182 return value.empty();
183 });
184 }
185
Disable(const Key & key,uint32_t firstCallerTokenId)186 int RdbSubscriberManager::Disable(const Key &key, uint32_t firstCallerTokenId)
187 {
188 auto result =
189 rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, this](const auto &key,
190 std::vector<ObserverNode> &value) {
191 for (auto it = value.begin(); it != value.end(); it++) {
192 if (it->firstCallerTokenId == firstCallerTokenId) {
193 it->enabled = false;
194 it->isNotifyOnEnabled = false;
195 }
196 }
197 return true;
198 });
199 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
200 }
201
Enable(const Key & key,std::shared_ptr<Context> context)202 int RdbSubscriberManager::Enable(const Key &key, std::shared_ptr<Context> context)
203 {
204 auto result = rdbCache_.ComputeIfPresent(key, [&context, this](const auto &key, std::vector<ObserverNode> &value) {
205 for (auto it = value.begin(); it != value.end(); it++) {
206 if (it->firstCallerTokenId != context->callerTokenId) {
207 continue;
208 }
209 it->enabled = true;
210 if (it->isNotifyOnEnabled) {
211 std::vector<ObserverNode> node;
212 node.emplace_back(it->observer, context->callerTokenId);
213 LoadConfigDataInfoStrategy loadDataInfo;
214 if (loadDataInfo(context)) {
215 Notify(key, context->currentUserId, node, context->calledSourceDir, context->version);
216 }
217 }
218 }
219 return true;
220 });
221 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
222 }
223
Emit(const std::string & uri,std::shared_ptr<Context> context)224 void RdbSubscriberManager::Emit(const std::string &uri, std::shared_ptr<Context> context)
225 {
226 if (!URIUtils::IsDataProxyURI(uri)) {
227 return;
228 }
229 if (context->calledSourceDir.empty()) {
230 LoadConfigDataInfoStrategy loadDataInfo;
231 loadDataInfo(context);
232 }
233 rdbCache_.ForEach([&uri, &context, this](const Key &key, std::vector<ObserverNode> &val) {
234 if (key.uri != uri) {
235 return false;
236 }
237 Notify(key, context->currentUserId, val, context->calledSourceDir, context->version);
238 SetObserverNotifyOnEnabled(val);
239 return false;
240 });
241 SchedulerManager::GetInstance().Execute(
242 uri, context->currentUserId, context->calledSourceDir, context->version, context->calledBundleName);
243 }
244
Emit(const std::string & uri,int32_t userId,DistributedData::StoreMetaData & metaData)245 void RdbSubscriberManager::Emit(const std::string &uri, int32_t userId,
246 DistributedData::StoreMetaData &metaData)
247 {
248 if (!URIUtils::IsDataProxyURI(uri)) {
249 return;
250 }
251 bool hasObserver = false;
252 rdbCache_.ForEach([&uri, &userId, &metaData, &hasObserver, this](const Key &key, std::vector<ObserverNode> &val) {
253 if (key.uri != uri) {
254 return false;
255 }
256 hasObserver = true;
257 Notify(key, userId, val, metaData.dataDir, metaData.version);
258 SetObserverNotifyOnEnabled(val);
259 return false;
260 });
261 if (!hasObserver) {
262 return;
263 }
264 SchedulerManager::GetInstance().Execute(
265 uri, userId, metaData.dataDir, metaData.version, metaData.bundleName);
266 }
267
SetObserverNotifyOnEnabled(std::vector<ObserverNode> & nodes)268 void RdbSubscriberManager::SetObserverNotifyOnEnabled(std::vector<ObserverNode> &nodes)
269 {
270 for (auto &node : nodes) {
271 if (!node.enabled) {
272 node.isNotifyOnEnabled = true;
273 }
274 }
275 }
276
GetKeysByUri(const std::string & uri)277 std::vector<Key> RdbSubscriberManager::GetKeysByUri(const std::string &uri)
278 {
279 std::vector<Key> results;
280 rdbCache_.ForEach([&uri, &results](const Key &key, std::vector<ObserverNode> &val) {
281 if (key.uri != uri) {
282 return false;
283 }
284 results.emplace_back(key);
285 return false;
286 });
287 return results;
288 }
289
EmitByKey(const Key & key,int32_t userId,const std::string & rdbPath,int version)290 void RdbSubscriberManager::EmitByKey(const Key &key, int32_t userId, const std::string &rdbPath, int version)
291 {
292 if (!URIUtils::IsDataProxyURI(key.uri)) {
293 return;
294 }
295 rdbCache_.ComputeIfPresent(key, [&rdbPath, &version, &userId, this](const Key &key, auto &val) {
296 Notify(key, userId, val, rdbPath, version);
297 SetObserverNotifyOnEnabled(val);
298 return true;
299 });
300 }
301
GetEnableObserverCount(const Key & key)302 int RdbSubscriberManager::GetEnableObserverCount(const Key &key)
303 {
304 auto pair = rdbCache_.Find(key);
305 if (!pair.first) {
306 return 0;
307 }
308 int count = 0;
309 for (const auto &observer : pair.second) {
310 if (observer.enabled) {
311 count++;
312 }
313 }
314 return count;
315 }
316
Notify(const Key & key,int32_t userId,const std::vector<ObserverNode> & val,const std::string & rdbDir,int rdbVersion)317 int RdbSubscriberManager::Notify(const Key &key, int32_t userId, const std::vector<ObserverNode> &val,
318 const std::string &rdbDir, int rdbVersion)
319 {
320 Template tpl;
321 if (!TemplateManager::GetInstance().Get(key, userId, tpl)) {
322 ZLOGE("template undefined, %{public}s, %{public}" PRId64 ", %{public}s",
323 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
324 return E_TEMPLATE_NOT_EXIST;
325 }
326 DistributedData::StoreMetaData meta;
327 meta.dataDir = rdbDir;
328 meta.bundleName = key.bundleName;
329 auto delegate = DBDelegate::Create(meta, key.uri);
330 if (delegate == nullptr) {
331 ZLOGE("Create fail %{public}s %{public}s", DistributedData::Anonymous::Change(key.uri).c_str(),
332 key.bundleName.c_str());
333 return E_ERROR;
334 }
335 RdbChangeNode changeNode;
336 changeNode.uri_ = key.uri;
337 changeNode.templateId_.subscriberId_ = key.subscriberId;
338 changeNode.templateId_.bundleName_ = key.bundleName;
339 for (const auto &predicate : tpl.predicates_) {
340 std::string result = delegate->Query(predicate.selectSql_);
341 if (result.empty()) {
342 continue;
343 }
344 changeNode.data_.emplace_back("{\"" + predicate.key_ + "\":" + result + "}");
345 }
346
347 ZLOGI("emit, valSize: %{public}zu, dataSize:%{public}zu, uri:%{public}s,",
348 val.size(), changeNode.data_.size(), DistributedData::Anonymous::Change(changeNode.uri_).c_str());
349 for (const auto &callback : val) {
350 if (callback.enabled && callback.observer != nullptr) {
351 callback.observer->OnChangeFromRdb(changeNode);
352 }
353 }
354 return E_OK;
355 }
356
Clear()357 void RdbSubscriberManager::Clear()
358 {
359 rdbCache_.Clear();
360 }
361
Emit(const std::string & uri,int64_t subscriberId,std::shared_ptr<Context> context)362 void RdbSubscriberManager::Emit(const std::string &uri, int64_t subscriberId, std::shared_ptr<Context> context)
363 {
364 if (!URIUtils::IsDataProxyURI(uri)) {
365 return;
366 }
367 if (context->calledSourceDir.empty()) {
368 LoadConfigDataInfoStrategy loadDataInfo;
369 loadDataInfo(context);
370 }
371 rdbCache_.ForEach([&uri, &context, &subscriberId, this](const Key &key, std::vector<ObserverNode> &val) {
372 if (key.uri != uri || key.subscriberId != subscriberId) {
373 return false;
374 }
375 Notify(key, context->currentUserId, val, context->calledSourceDir, context->version);
376 SetObserverNotifyOnEnabled(val);
377 return false;
378 });
379 SchedulerManager::GetInstance().Execute(
380 uri, context->currentUserId, context->calledSourceDir, context->version, context->calledBundleName);
381 }
ObserverNode(const sptr<IDataProxyRdbObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId)382 RdbSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyRdbObserver> &observer,
383 uint32_t firstCallerTokenId, uint32_t callerTokenId)
384 : observer(observer), firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId)
385 {
386 }
387 } // namespace OHOS::DataShare
388
389