1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "PublishedDataSubscriberManager"
16
17 #include "published_data_subscriber_manager.h"
18
19 #include "ipc_skeleton.h"
20 #include "general/load_config_data_info_strategy.h"
21 #include "log_print.h"
22 #include "published_data.h"
23 #include "uri_utils.h"
24 #include "utils/anonymous.h"
25
26 namespace OHOS::DataShare {
GetInstance()27 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
28 {
29 static PublishedDataSubscriberManager manager;
30 return manager;
31 }
32
Add(const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer,uint32_t firstCallerTokenId)33 int PublishedDataSubscriberManager::Add(
34 const PublishedDataKey &key, const sptr<IDataProxyPublishedDataObserver> observer, uint32_t firstCallerTokenId)
35 {
36 publishedDataCache_.Compute(
37 key, [&observer, &firstCallerTokenId, this](const PublishedDataKey &key, std::vector<ObserverNode> &value) {
38 ZLOGI("add publish subscriber, uri %{public}s tokenId 0x%{public}x",
39 DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
40 value.emplace_back(observer, firstCallerTokenId, IPCSkeleton::GetCallingTokenID());
41 return true;
42 });
43 return E_OK;
44 }
45
Delete(const PublishedDataKey & key,uint32_t firstCallerTokenId)46 int PublishedDataSubscriberManager::Delete(const PublishedDataKey &key, uint32_t firstCallerTokenId)
47 {
48 auto result =
49 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
50 std::vector<ObserverNode> &value) {
51 for (auto it = value.begin(); it != value.end();) {
52 if (it->firstCallerTokenId == firstCallerTokenId) {
53 ZLOGI("delete publish subscriber, uri %{public}s tokenId 0x%{public}x",
54 DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
55 it = value.erase(it);
56 } else {
57 it++;
58 }
59 }
60 return !value.empty();
61 });
62 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
63 }
64
Delete(uint32_t callerTokenId)65 void PublishedDataSubscriberManager::Delete(uint32_t callerTokenId)
66 {
67 publishedDataCache_.EraseIf([&callerTokenId](const auto &key, std::vector<ObserverNode> &value) {
68 for (auto it = value.begin(); it != value.end();) {
69 if (it->callerTokenId == callerTokenId) {
70 ZLOGI("erase start, uri is %{public}s, tokenId is 0x%{public}x",
71 DistributedData::Anonymous::Change(key.key).c_str(), callerTokenId);
72 it = value.erase(it);
73 } else {
74 it++;
75 }
76 }
77 return value.empty();
78 });
79 }
80
Disable(const PublishedDataKey & key,uint32_t firstCallerTokenId)81 int PublishedDataSubscriberManager::Disable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
82 {
83 auto result =
84 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
85 std::vector<ObserverNode> &value) {
86 for (auto it = value.begin(); it != value.end(); it++) {
87 if (it->firstCallerTokenId == firstCallerTokenId) {
88 it->enabled = false;
89 it->isNotifyOnEnabled = false;
90 }
91 }
92 return true;
93 });
94 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
95 }
96
Enable(const PublishedDataKey & key,uint32_t firstCallerTokenId)97 int PublishedDataSubscriberManager::Enable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
98 {
99 auto result =
100 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
101 std::vector<ObserverNode> &value) {
102 for (auto it = value.begin(); it != value.end(); it++) {
103 if (it->firstCallerTokenId == firstCallerTokenId) {
104 it->enabled = true;
105 }
106 }
107 return true;
108 });
109 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
110 }
111
Emit(const std::vector<PublishedDataKey> & keys,int32_t userId,const std::string & ownerBundleName,const sptr<IDataProxyPublishedDataObserver> observer)112 void PublishedDataSubscriberManager::Emit(const std::vector<PublishedDataKey> &keys, int32_t userId,
113 const std::string &ownerBundleName, const sptr<IDataProxyPublishedDataObserver> observer)
114 {
115 int32_t status;
116 // key is bundleName, value is change node
117 std::map<PublishedDataKey, PublishedDataNode::Data> publishedResult;
118 std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> callbacks;
119 publishedDataCache_.ForEach([&keys, &status, &observer, &publishedResult, &callbacks, &userId, this](
120 const PublishedDataKey &key, std::vector<ObserverNode> &val) {
121 for (auto &data : keys) {
122 if (key != data || publishedResult.count(key) != 0) {
123 continue;
124 }
125 status = PublishedData::Query(
126 Id(PublishedData::GenId(key.key, key.bundleName, key.subscriberId), userId), publishedResult[key]);
127 if (status != E_OK) {
128 ZLOGE("query fail %{public}s %{public}s %{public}" PRId64, data.bundleName.c_str(), data.key.c_str(),
129 data.subscriberId);
130 publishedResult.erase(key);
131 continue;
132 }
133 PutInto(callbacks, val, key, observer);
134 break;
135 }
136 return false;
137 });
138 PublishedDataChangeNode result;
139 for (auto &[callback, keys] : callbacks) {
140 result.datas_.clear();
141 for (auto &key : keys) {
142 if (publishedResult.count(key) != 0) {
143 result.datas_.emplace_back(key.key, key.subscriberId, PublishedDataNode::MoveTo(publishedResult[key]));
144 }
145 }
146 if (result.datas_.empty()) {
147 continue;
148 }
149 result.ownerBundleName_ = ownerBundleName;
150 callback->OnChangeFromPublishedData(result);
151 }
152 }
153
PutInto(std::map<sptr<IDataProxyPublishedDataObserver>,std::vector<PublishedDataKey>> & callbacks,const std::vector<ObserverNode> & val,const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer)154 void PublishedDataSubscriberManager::PutInto(
155 std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> &callbacks,
156 const std::vector<ObserverNode> &val, const PublishedDataKey &key,
157 const sptr<IDataProxyPublishedDataObserver> observer)
158 {
159 for (auto const &callback : val) {
160 if (callback.enabled && callback.observer != nullptr) {
161 // callback the observer, others do not call
162 if (observer != nullptr && callback.observer != observer) {
163 continue;
164 }
165 callbacks[callback.observer].emplace_back(key);
166 }
167 }
168 }
169
Clear()170 void PublishedDataSubscriberManager::Clear()
171 {
172 publishedDataCache_.Clear();
173 }
174
GetCount(const PublishedDataKey & key)175 int PublishedDataSubscriberManager::GetCount(const PublishedDataKey &key)
176 {
177 int count = 0;
178 publishedDataCache_.ComputeIfPresent(key, [&count](const auto &key, std::vector<ObserverNode> &value) {
179 count = static_cast<int>(value.size());
180 return true;
181 });
182 return count;
183 }
184
IsNotifyOnEnabled(const PublishedDataKey & key,uint32_t callerTokenId)185 bool PublishedDataSubscriberManager::IsNotifyOnEnabled(const PublishedDataKey &key, uint32_t callerTokenId)
186 {
187 auto pair = publishedDataCache_.Find(key);
188 if (!pair.first) {
189 return false;
190 }
191 for (const auto &value : pair.second) {
192 if (value.firstCallerTokenId == callerTokenId && value.isNotifyOnEnabled) {
193 return true;
194 }
195 }
196 return false;
197 }
198
SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> & keys)199 void PublishedDataSubscriberManager::SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> &keys)
200 {
201 for (const auto &pkey : keys) {
202 publishedDataCache_.ComputeIfPresent(pkey, [](const auto &key, std::vector<ObserverNode> &value) {
203 for (auto it = value.begin(); it != value.end(); it++) {
204 if (!it->enabled) {
205 it->isNotifyOnEnabled = true;
206 }
207 }
208 return true;
209 });
210 }
211 }
212
PublishedDataKey(const std::string & key,const std::string & bundle,const int64_t subscriberId)213 PublishedDataKey::PublishedDataKey(const std::string &key, const std::string &bundle, const int64_t subscriberId)
214 : key(key), bundleName(bundle), subscriberId(subscriberId)
215 {
216 /* private published data can use key as simple uri */
217 /* etc: datashareproxy://{bundleName}/meeting can use meeting replaced */
218 /* if key is normal uri, bundleName is from uri */
219 if (URIUtils::IsDataProxyURI(key)) {
220 URIUtils::GetBundleNameFromProxyURI(key, bundleName);
221 }
222 }
223
operator <(const PublishedDataKey & rhs) const224 bool PublishedDataKey::operator<(const PublishedDataKey &rhs) const
225 {
226 if (key < rhs.key) {
227 return true;
228 }
229 if (rhs.key < key) {
230 return false;
231 }
232 if (bundleName < rhs.bundleName) {
233 return true;
234 }
235 if (rhs.bundleName < bundleName) {
236 return false;
237 }
238 return subscriberId < rhs.subscriberId;
239 }
240
operator >(const PublishedDataKey & rhs) const241 bool PublishedDataKey::operator>(const PublishedDataKey &rhs) const
242 {
243 return rhs < *this;
244 }
245
operator <=(const PublishedDataKey & rhs) const246 bool PublishedDataKey::operator<=(const PublishedDataKey &rhs) const
247 {
248 return !(rhs < *this);
249 }
250
operator >=(const PublishedDataKey & rhs) const251 bool PublishedDataKey::operator>=(const PublishedDataKey &rhs) const
252 {
253 return !(*this < rhs);
254 }
255
operator ==(const PublishedDataKey & rhs) const256 bool PublishedDataKey::operator==(const PublishedDataKey &rhs) const
257 {
258 return key == rhs.key && bundleName == rhs.bundleName && subscriberId == rhs.subscriberId;
259 }
260
operator !=(const PublishedDataKey & rhs) const261 bool PublishedDataKey::operator!=(const PublishedDataKey &rhs) const
262 {
263 return !(rhs == *this);
264 }
265
ObserverNode(const sptr<IDataProxyPublishedDataObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId)266 PublishedDataSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyPublishedDataObserver> &observer,
267 uint32_t firstCallerTokenId, uint32_t callerTokenId)
268 : observer(observer), firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId)
269 {
270 }
271 } // namespace OHOS::DataShare
272