1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbSubscriberManager"
16
17 #include "rdb_subscriber_manager.h"
18
19 #include "ipc_skeleton.h"
20 #include "general/load_config_data_info_strategy.h"
21 #include "log_print.h"
22 #include "scheduler_manager.h"
23 #include "template_data.h"
24 #include "uri_utils.h"
25 #include "utils/anonymous.h"
26
27 namespace OHOS::DataShare {
Get(const Key & key,int32_t userId,Template & tpl)28 bool TemplateManager::Get(const Key &key, int32_t userId, Template &tpl)
29 {
30 return TemplateData::Query(Id(TemplateData::GenId(key.uri, key.bundleName, key.subscriberId), userId), tpl) == E_OK;
31 }
32
Add(const Key & key,int32_t userId,const Template & tpl)33 int32_t TemplateManager::Add(const Key &key, int32_t userId, const Template &tpl)
34 {
35 auto status = TemplateData::Add(key.uri, userId, key.bundleName, key.subscriberId, tpl);
36 if (!status) {
37 ZLOGE("Add failed, %{public}d", status);
38 return E_ERROR;
39 }
40 return E_OK;
41 }
42
Delete(const Key & key,int32_t userId)43 int32_t TemplateManager::Delete(const Key &key, int32_t userId)
44 {
45 auto status = TemplateData::Delete(key.uri, userId, key.bundleName, key.subscriberId);
46 if (!status) {
47 ZLOGE("Delete failed, %{public}d", status);
48 return E_ERROR;
49 }
50 SchedulerManager::GetInstance().RemoveTimer(key);
51 return E_OK;
52 }
53
Key(const std::string & uri,int64_t subscriberId,const std::string & bundleName)54 Key::Key(const std::string &uri, int64_t subscriberId, const std::string &bundleName)
55 : uri(uri), subscriberId(subscriberId), bundleName(bundleName)
56 {
57 }
58
operator ==(const Key & rhs) const59 bool Key::operator==(const Key &rhs) const
60 {
61 return uri == rhs.uri && subscriberId == rhs.subscriberId && bundleName == rhs.bundleName;
62 }
63
operator !=(const Key & rhs) const64 bool Key::operator!=(const Key &rhs) const
65 {
66 return !(rhs == *this);
67 }
operator <(const Key & rhs) const68 bool Key::operator<(const Key &rhs) const
69 {
70 if (uri < rhs.uri) {
71 return true;
72 }
73 if (rhs.uri < uri) {
74 return false;
75 }
76 if (subscriberId < rhs.subscriberId) {
77 return true;
78 }
79 if (rhs.subscriberId < subscriberId) {
80 return false;
81 }
82 return bundleName < rhs.bundleName;
83 }
operator >(const Key & rhs) const84 bool Key::operator>(const Key &rhs) const
85 {
86 return rhs < *this;
87 }
operator <=(const Key & rhs) const88 bool Key::operator<=(const Key &rhs) const
89 {
90 return !(rhs < *this);
91 }
operator >=(const Key & rhs) const92 bool Key::operator>=(const Key &rhs) const
93 {
94 return !(*this < rhs);
95 }
96
TemplateManager()97 TemplateManager::TemplateManager() {}
98
GetInstance()99 TemplateManager &TemplateManager::GetInstance()
100 {
101 static TemplateManager manager;
102 return manager;
103 }
104
GetInstance()105 RdbSubscriberManager &RdbSubscriberManager::GetInstance()
106 {
107 static RdbSubscriberManager manager;
108 return manager;
109 }
110
Add(const Key & key,const sptr<IDataProxyRdbObserver> observer,std::shared_ptr<Context> context,std::shared_ptr<ExecutorPool> executorPool)111 int RdbSubscriberManager::Add(const Key &key, const sptr<IDataProxyRdbObserver> observer,
112 std::shared_ptr<Context> context, std::shared_ptr<ExecutorPool> executorPool)
113 {
114 int result = E_OK;
115 rdbCache_.Compute(key, [&observer, &context, executorPool, this](const auto &key, auto &value) {
116 ZLOGI("add subscriber, uri %{private}s tokenId 0x%{public}x", key.uri.c_str(), context->callerTokenId);
117 auto callerTokenId = IPCSkeleton::GetCallingTokenID();
118 value.emplace_back(observer, context->callerTokenId, callerTokenId);
119 std::vector<ObserverNode> node;
120 node.emplace_back(observer, context->callerTokenId, callerTokenId);
121 ExecutorPool::Task task = [key, node, context, this]() {
122 LoadConfigDataInfoStrategy loadDataInfo;
123 if (!loadDataInfo(context)) {
124 ZLOGE("loadDataInfo failed, uri %{public}s tokenId 0x%{public}x",
125 DistributedData::Anonymous::Change(key.uri).c_str(), context->callerTokenId);
126 return;
127 }
128 Notify(key, context->currentUserId, node, context->calledSourceDir, context->version);
129 if (GetEnableObserverCount(key) == 1) {
130 SchedulerManager::GetInstance().Execute(
131 key, context->currentUserId, context->calledSourceDir, context->version);
132 }
133 };
134 executorPool->Execute(task);
135 return true;
136 });
137 return result;
138 }
139
Delete(const Key & key,uint32_t firstCallerTokenId)140 int RdbSubscriberManager::Delete(const Key &key, uint32_t firstCallerTokenId)
141 {
142 auto result =
143 rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, this](const auto &key,
144 std::vector<ObserverNode> &value) {
145 ZLOGI("delete subscriber, uri %{public}s tokenId 0x%{public}x",
146 DistributedData::Anonymous::Change(key.uri).c_str(), firstCallerTokenId);
147 for (auto it = value.begin(); it != value.end();) {
148 if (it->firstCallerTokenId == firstCallerTokenId) {
149 ZLOGI("erase start");
150 it = value.erase(it);
151 } else {
152 it++;
153 }
154 }
155 if (GetEnableObserverCount(key) == 0) {
156 SchedulerManager::GetInstance().RemoveTimer(key);
157 }
158 return !value.empty();
159 });
160 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
161 }
162
Delete(uint32_t callerTokenId)163 void RdbSubscriberManager::Delete(uint32_t callerTokenId)
164 {
165 rdbCache_.EraseIf([&callerTokenId, this](const auto &key, std::vector<ObserverNode> &value) {
166 for (auto it = value.begin(); it != value.end();) {
167 if (it->callerTokenId == callerTokenId) {
168 ZLOGI("erase start, uri is %{public}s, tokenId 0x%{public}x",
169 DistributedData::Anonymous::Change(key.uri).c_str(), callerTokenId);
170 it = value.erase(it);
171 } else {
172 it++;
173 }
174 }
175 if (GetEnableObserverCount(key) == 0) {
176 SchedulerManager::GetInstance().RemoveTimer(key);
177 }
178 return value.empty();
179 });
180 }
181
Disable(const Key & key,uint32_t firstCallerTokenId)182 int RdbSubscriberManager::Disable(const Key &key, uint32_t firstCallerTokenId)
183 {
184 auto result =
185 rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, this](const auto &key,
186 std::vector<ObserverNode> &value) {
187 for (auto it = value.begin(); it != value.end(); it++) {
188 if (it->firstCallerTokenId == firstCallerTokenId) {
189 it->enabled = false;
190 it->isNotifyOnEnabled = false;
191 }
192 }
193 return true;
194 });
195 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
196 }
197
Enable(const Key & key,std::shared_ptr<Context> context)198 int RdbSubscriberManager::Enable(const Key &key, std::shared_ptr<Context> context)
199 {
200 auto result = rdbCache_.ComputeIfPresent(key, [&context, this](const auto &key, std::vector<ObserverNode> &value) {
201 for (auto it = value.begin(); it != value.end(); it++) {
202 if (it->firstCallerTokenId != context->callerTokenId) {
203 continue;
204 }
205 it->enabled = true;
206 if (it->isNotifyOnEnabled) {
207 std::vector<ObserverNode> node;
208 node.emplace_back(it->observer, context->callerTokenId);
209 LoadConfigDataInfoStrategy loadDataInfo;
210 if (loadDataInfo(context)) {
211 Notify(key, context->currentUserId, node, context->calledSourceDir, context->version);
212 }
213 }
214 }
215 return true;
216 });
217 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
218 }
219
Emit(const std::string & uri,std::shared_ptr<Context> context)220 void RdbSubscriberManager::Emit(const std::string &uri, std::shared_ptr<Context> context)
221 {
222 if (!URIUtils::IsDataProxyURI(uri)) {
223 return;
224 }
225 if (context->calledSourceDir.empty()) {
226 LoadConfigDataInfoStrategy loadDataInfo;
227 loadDataInfo(context);
228 }
229 rdbCache_.ForEach([&uri, &context, this](const Key &key, std::vector<ObserverNode> &val) {
230 if (key.uri != uri) {
231 return false;
232 }
233 Notify(key, context->currentUserId, val, context->calledSourceDir, context->version);
234 SetObserverNotifyOnEnabled(val);
235 return false;
236 });
237 SchedulerManager::GetInstance().Execute(
238 uri, context->currentUserId, context->calledSourceDir, context->version);
239 }
240
SetObserverNotifyOnEnabled(std::vector<ObserverNode> & nodes)241 void RdbSubscriberManager::SetObserverNotifyOnEnabled(std::vector<ObserverNode> &nodes)
242 {
243 for (auto &node : nodes) {
244 if (!node.enabled) {
245 node.isNotifyOnEnabled = true;
246 }
247 }
248 }
249
GetKeysByUri(const std::string & uri)250 std::vector<Key> RdbSubscriberManager::GetKeysByUri(const std::string &uri)
251 {
252 std::vector<Key> results;
253 rdbCache_.ForEach([&uri, &results](const Key &key, std::vector<ObserverNode> &val) {
254 if (key.uri != uri) {
255 return false;
256 }
257 results.emplace_back(key);
258 return false;
259 });
260 return results;
261 }
262
EmitByKey(const Key & key,int32_t userId,const std::string & rdbPath,int version)263 void RdbSubscriberManager::EmitByKey(const Key &key, int32_t userId, const std::string &rdbPath, int version)
264 {
265 if (!URIUtils::IsDataProxyURI(key.uri)) {
266 return;
267 }
268 rdbCache_.ComputeIfPresent(key, [&rdbPath, &version, &userId, this](const Key &key, auto &val) {
269 Notify(key, userId, val, rdbPath, version);
270 SetObserverNotifyOnEnabled(val);
271 return true;
272 });
273 }
274
GetEnableObserverCount(const Key & key)275 int RdbSubscriberManager::GetEnableObserverCount(const Key &key)
276 {
277 auto pair = rdbCache_.Find(key);
278 if (!pair.first) {
279 return 0;
280 }
281 int count = 0;
282 for (const auto &observer : pair.second) {
283 if (observer.enabled) {
284 count++;
285 }
286 }
287 return count;
288 }
289
Notify(const Key & key,int32_t userId,const std::vector<ObserverNode> & val,const std::string & rdbDir,int rdbVersion)290 int RdbSubscriberManager::Notify(const Key &key, int32_t userId, const std::vector<ObserverNode> &val,
291 const std::string &rdbDir, int rdbVersion)
292 {
293 Template tpl;
294 if (!TemplateManager::GetInstance().Get(key, userId, tpl)) {
295 ZLOGE("template undefined, %{public}s, %{public}" PRId64 ", %{public}s",
296 DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str());
297 return E_TEMPLATE_NOT_EXIST;
298 }
299 auto delegate = DBDelegate::Create(rdbDir, rdbVersion, true);
300 if (delegate == nullptr) {
301 ZLOGE("Create fail %{public}s %{public}s", DistributedData::Anonymous::Change(key.uri).c_str(),
302 key.bundleName.c_str());
303 return E_ERROR;
304 }
305 RdbChangeNode changeNode;
306 changeNode.uri_ = key.uri;
307 changeNode.templateId_.subscriberId_ = key.subscriberId;
308 changeNode.templateId_.bundleName_ = key.bundleName;
309 for (const auto &predicate : tpl.predicates_) {
310 std::string result = delegate->Query(predicate.selectSql_);
311 if (result.empty()) {
312 continue;
313 }
314 changeNode.data_.emplace_back("{\"" + predicate.key_ + "\":" + result + "}");
315 }
316
317 ZLOGI("emit, size %{public}zu %{private}s", val.size(), changeNode.uri_.c_str());
318 for (const auto &callback : val) {
319 if (callback.enabled && callback.observer != nullptr) {
320 callback.observer->OnChangeFromRdb(changeNode);
321 }
322 }
323 return E_OK;
324 }
325
Clear()326 void RdbSubscriberManager::Clear()
327 {
328 rdbCache_.Clear();
329 }
330
Emit(const std::string & uri,int64_t subscriberId,std::shared_ptr<Context> context)331 void RdbSubscriberManager::Emit(const std::string &uri, int64_t subscriberId, std::shared_ptr<Context> context)
332 {
333 if (!URIUtils::IsDataProxyURI(uri)) {
334 return;
335 }
336 if (context->calledSourceDir.empty()) {
337 LoadConfigDataInfoStrategy loadDataInfo;
338 loadDataInfo(context);
339 }
340 rdbCache_.ForEach([&uri, &context, &subscriberId, this](const Key &key, std::vector<ObserverNode> &val) {
341 if (key.uri != uri || key.subscriberId != subscriberId) {
342 return false;
343 }
344 Notify(key, context->currentUserId, val, context->calledSourceDir, context->version);
345 SetObserverNotifyOnEnabled(val);
346 return false;
347 });
348 SchedulerManager::GetInstance().Execute(
349 uri, context->currentUserId, context->calledSourceDir, context->version);
350 }
ObserverNode(const sptr<IDataProxyRdbObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId)351 RdbSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyRdbObserver> &observer,
352 uint32_t firstCallerTokenId, uint32_t callerTokenId)
353 : observer(observer), firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId)
354 {
355 }
356 } // namespace OHOS::DataShare