1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "PublishedDataSubscriberManager"
16
17 #include "published_data_subscriber_manager.h"
18
19 #include <cinttypes>
20
21 #include "ipc_skeleton.h"
22 #include "general/load_config_data_info_strategy.h"
23 #include "log_print.h"
24 #include "published_data.h"
25 #include "utils.h"
26 #include "utils/anonymous.h"
27
28 namespace OHOS::DataShare {
GetInstance()29 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
30 {
31 static PublishedDataSubscriberManager manager;
32 return manager;
33 }
34
Add(const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer,uint32_t firstCallerTokenId,int32_t userId)35 int PublishedDataSubscriberManager::Add(const PublishedDataKey &key,
36 const sptr<IDataProxyPublishedDataObserver> observer, uint32_t firstCallerTokenId, int32_t userId)
37 {
38 publishedDataCache_.Compute(
39 key, [&observer, &firstCallerTokenId, userId, this](const PublishedDataKey &key,
40 std::vector<ObserverNode> &value) {
41 ZLOGI("add publish subscriber, uri %{public}s tokenId 0x%{public}x",
42 URIUtils::Anonymous(key.key).c_str(), firstCallerTokenId);
43 value.emplace_back(observer, firstCallerTokenId, IPCSkeleton::GetCallingTokenID(),
44 IPCSkeleton::GetCallingPid(), userId);
45 return true;
46 });
47 return E_OK;
48 }
49
Delete(const PublishedDataKey & key,uint32_t firstCallerTokenId)50 int PublishedDataSubscriberManager::Delete(const PublishedDataKey &key, uint32_t firstCallerTokenId)
51 {
52 auto result =
53 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
54 std::vector<ObserverNode> &value) {
55 for (auto it = value.begin(); it != value.end();) {
56 if (it->firstCallerTokenId == firstCallerTokenId) {
57 ZLOGI("delete publish subscriber, uri %{public}s tokenId 0x%{public}x",
58 URIUtils::Anonymous(key.key).c_str(), firstCallerTokenId);
59 it = value.erase(it);
60 } else {
61 it++;
62 }
63 }
64 return !value.empty();
65 });
66 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
67 }
68
Delete(uint32_t callerTokenId,uint32_t callerPid)69 void PublishedDataSubscriberManager::Delete(uint32_t callerTokenId, uint32_t callerPid)
70 {
71 publishedDataCache_.EraseIf([&callerTokenId, &callerPid](const auto &key, std::vector<ObserverNode> &value) {
72 for (auto it = value.begin(); it != value.end();) {
73 if (it->callerTokenId == callerTokenId && it->callerPid == callerPid) {
74 ZLOGI("erase start, uri is %{public}s, tokenId is 0x%{public}x, pid is %{public}d",
75 URIUtils::Anonymous(key.key).c_str(), callerTokenId, callerPid);
76 it = value.erase(it);
77 } else {
78 it++;
79 }
80 }
81 return value.empty();
82 });
83 }
84
Disable(const PublishedDataKey & key,uint32_t firstCallerTokenId)85 int PublishedDataSubscriberManager::Disable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
86 {
87 bool result =
88 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
89 std::vector<ObserverNode> &value) {
90 for (auto it = value.begin(); it != value.end(); it++) {
91 if (it->firstCallerTokenId == firstCallerTokenId) {
92 it->enabled = false;
93 it->isNotifyOnEnabled = false;
94 }
95 }
96 return true;
97 });
98 if (!result) {
99 ZLOGE("disable failed, uri is %{public}s, bundleName is %{public}s, subscriberId is %{public}" PRId64,
100 URIUtils::Anonymous(key.key).c_str(), key.bundleName.c_str(), key.subscriberId);
101 }
102 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
103 }
104
Enable(const PublishedDataKey & key,uint32_t firstCallerTokenId)105 int PublishedDataSubscriberManager::Enable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
106 {
107 bool result =
108 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
109 std::vector<ObserverNode> &value) {
110 for (auto it = value.begin(); it != value.end(); it++) {
111 if (it->firstCallerTokenId == firstCallerTokenId) {
112 it->enabled = true;
113 }
114 }
115 return true;
116 });
117 if (!result) {
118 ZLOGE("enable failed, uri is %{public}s, bundleName is %{public}s, subscriberId is %{public}" PRId64,
119 URIUtils::Anonymous(key.key).c_str(), key.bundleName.c_str(), key.subscriberId);
120 }
121 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
122 }
123
124 // if arg observer is not null, notify that observer only; otherwise notify all observers
Emit(const std::vector<PublishedDataKey> & keys,int32_t userId,const std::string & ownerBundleName,const sptr<IDataProxyPublishedDataObserver> observer)125 void PublishedDataSubscriberManager::Emit(const std::vector<PublishedDataKey> &keys, int32_t userId,
126 const std::string &ownerBundleName, const sptr<IDataProxyPublishedDataObserver> observer)
127 {
128 int32_t status;
129 // key is bundleName, value is change node
130 std::map<PublishedDataKey, PublishedDataNode::Data> publishedResult;
131 std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> callbacks;
132 publishedDataCache_.ForEach([&keys, &status, &observer, &publishedResult, &callbacks, &userId, this](
133 const PublishedDataKey &key, std::vector<ObserverNode> &val) {
134 for (auto &data : keys) {
135 if (key != data || publishedResult.count(key) != 0) {
136 continue;
137 }
138 status = PublishedData::Query(
139 Id(PublishedData::GenId(key.key, key.bundleName, key.subscriberId), userId), publishedResult[key]);
140 if (status != E_OK) {
141 ZLOGE("query fail %{public}s %{public}s %{public}" PRId64, data.bundleName.c_str(), data.key.c_str(),
142 data.subscriberId);
143 publishedResult.erase(key);
144 continue;
145 }
146 PutInto(callbacks, val, key, observer, userId);
147 break;
148 }
149 return false;
150 });
151 PublishedDataChangeNode result;
152 for (auto &[callback, keys] : callbacks) {
153 result.datas_.clear();
154 for (auto &key : keys) {
155 if (publishedResult.count(key) != 0) {
156 result.datas_.emplace_back(key.key, key.subscriberId, PublishedDataNode::MoveTo(publishedResult[key]));
157 }
158 }
159 if (result.datas_.empty()) {
160 continue;
161 }
162 result.ownerBundleName_ = ownerBundleName;
163 callback->OnChangeFromPublishedData(result);
164 }
165 }
166
PutInto(std::map<sptr<IDataProxyPublishedDataObserver>,std::vector<PublishedDataKey>> & callbacks,const std::vector<ObserverNode> & val,const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer,int32_t userId)167 void PublishedDataSubscriberManager::PutInto(
168 std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> &callbacks,
169 const std::vector<ObserverNode> &val, const PublishedDataKey &key,
170 const sptr<IDataProxyPublishedDataObserver> observer, int32_t userId)
171 {
172 for (auto const &callback : val) {
173 if (callback.enabled && callback.observer != nullptr) {
174 // callback the observer, others do not call
175 if (observer != nullptr && callback.observer != observer) {
176 continue;
177 }
178 if (callback.userId != 0 && callback.userId != userId && userId != 0) {
179 ZLOGE("Not across user publish, from %{public}d to %{public}d", userId, callback.userId);
180 continue;
181 }
182 callbacks[callback.observer].emplace_back(key);
183 }
184 }
185 }
186
Clear()187 void PublishedDataSubscriberManager::Clear()
188 {
189 publishedDataCache_.Clear();
190 }
191
GetCount(const PublishedDataKey & key)192 int PublishedDataSubscriberManager::GetCount(const PublishedDataKey &key)
193 {
194 int count = 0;
195 publishedDataCache_.ComputeIfPresent(key, [&count](const auto &key, std::vector<ObserverNode> &value) {
196 count = static_cast<int>(value.size());
197 return true;
198 });
199 return count;
200 }
201
IsNotifyOnEnabled(const PublishedDataKey & key,uint32_t callerTokenId)202 bool PublishedDataSubscriberManager::IsNotifyOnEnabled(const PublishedDataKey &key, uint32_t callerTokenId)
203 {
204 auto pair = publishedDataCache_.Find(key);
205 if (!pair.first) {
206 return false;
207 }
208 for (const auto &value : pair.second) {
209 if (value.firstCallerTokenId == callerTokenId && value.isNotifyOnEnabled) {
210 return true;
211 }
212 }
213 return false;
214 }
215
SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> & keys)216 void PublishedDataSubscriberManager::SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> &keys)
217 {
218 for (const auto &pkey : keys) {
219 publishedDataCache_.ComputeIfPresent(pkey, [](const auto &key, std::vector<ObserverNode> &value) {
220 for (auto it = value.begin(); it != value.end(); it++) {
221 if (!it->enabled) {
222 it->isNotifyOnEnabled = true;
223 }
224 }
225 return true;
226 });
227 }
228 }
229
PublishedDataKey(const std::string & key,const std::string & bundle,const int64_t subscriberId)230 PublishedDataKey::PublishedDataKey(const std::string &key, const std::string &bundle, const int64_t subscriberId)
231 : key(key), bundleName(bundle), subscriberId(subscriberId)
232 {
233 /* private published data can use key as simple uri */
234 /* etc: datashareproxy://{bundleName}/meeting can use meeting replaced */
235 /* if key is normal uri, bundleName is from uri */
236 if (URIUtils::IsDataProxyURI(key)) {
237 URIUtils::GetBundleNameFromProxyURI(key, bundleName);
238 }
239 }
240
operator <(const PublishedDataKey & rhs) const241 bool PublishedDataKey::operator<(const PublishedDataKey &rhs) const
242 {
243 if (key < rhs.key) {
244 return true;
245 }
246 if (rhs.key < key) {
247 return false;
248 }
249 if (bundleName < rhs.bundleName) {
250 return true;
251 }
252 if (rhs.bundleName < bundleName) {
253 return false;
254 }
255 return subscriberId < rhs.subscriberId;
256 }
257
operator >(const PublishedDataKey & rhs) const258 bool PublishedDataKey::operator>(const PublishedDataKey &rhs) const
259 {
260 return rhs < *this;
261 }
262
operator <=(const PublishedDataKey & rhs) const263 bool PublishedDataKey::operator<=(const PublishedDataKey &rhs) const
264 {
265 return !(rhs < *this);
266 }
267
operator >=(const PublishedDataKey & rhs) const268 bool PublishedDataKey::operator>=(const PublishedDataKey &rhs) const
269 {
270 return !(*this < rhs);
271 }
272
operator ==(const PublishedDataKey & rhs) const273 bool PublishedDataKey::operator==(const PublishedDataKey &rhs) const
274 {
275 return key == rhs.key && bundleName == rhs.bundleName && subscriberId == rhs.subscriberId;
276 }
277
operator !=(const PublishedDataKey & rhs) const278 bool PublishedDataKey::operator!=(const PublishedDataKey &rhs) const
279 {
280 return !(rhs == *this);
281 }
282
ObserverNode(const sptr<IDataProxyPublishedDataObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId,uint32_t callerPid,int32_t userId)283 PublishedDataSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyPublishedDataObserver> &observer,
284 uint32_t firstCallerTokenId, uint32_t callerTokenId, uint32_t callerPid, int32_t userId): observer(observer),
285 firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId), callerPid(callerPid), userId(userId)
286 {
287 }
288 } // namespace OHOS::DataShare
289