1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "PublishedDataSubscriberManager"
16
17 #include "published_data_subscriber_manager.h"
18
19 #include <cinttypes>
20
21 #include "ipc_skeleton.h"
22 #include "general/load_config_data_info_strategy.h"
23 #include "log_print.h"
24 #include "published_data.h"
25 #include "uri_utils.h"
26 #include "utils/anonymous.h"
27
28 namespace OHOS::DataShare {
GetInstance()29 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
30 {
31 static PublishedDataSubscriberManager manager;
32 return manager;
33 }
34
Add(const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer,uint32_t firstCallerTokenId)35 int PublishedDataSubscriberManager::Add(
36 const PublishedDataKey &key, const sptr<IDataProxyPublishedDataObserver> observer, uint32_t firstCallerTokenId)
37 {
38 publishedDataCache_.Compute(
39 key, [&observer, &firstCallerTokenId, this](const PublishedDataKey &key, std::vector<ObserverNode> &value) {
40 ZLOGI("add publish subscriber, uri %{public}s tokenId 0x%{public}x",
41 DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
42 value.emplace_back(observer, firstCallerTokenId, IPCSkeleton::GetCallingTokenID());
43 return true;
44 });
45 return E_OK;
46 }
47
Delete(const PublishedDataKey & key,uint32_t firstCallerTokenId)48 int PublishedDataSubscriberManager::Delete(const PublishedDataKey &key, uint32_t firstCallerTokenId)
49 {
50 auto result =
51 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
52 std::vector<ObserverNode> &value) {
53 for (auto it = value.begin(); it != value.end();) {
54 if (it->firstCallerTokenId == firstCallerTokenId) {
55 ZLOGI("delete publish subscriber, uri %{public}s tokenId 0x%{public}x",
56 DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
57 it = value.erase(it);
58 } else {
59 it++;
60 }
61 }
62 return !value.empty();
63 });
64 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
65 }
66
Delete(uint32_t callerTokenId)67 void PublishedDataSubscriberManager::Delete(uint32_t callerTokenId)
68 {
69 publishedDataCache_.EraseIf([&callerTokenId](const auto &key, std::vector<ObserverNode> &value) {
70 for (auto it = value.begin(); it != value.end();) {
71 if (it->callerTokenId == callerTokenId) {
72 ZLOGI("erase start, uri is %{public}s, tokenId is 0x%{public}x",
73 DistributedData::Anonymous::Change(key.key).c_str(), callerTokenId);
74 it = value.erase(it);
75 } else {
76 it++;
77 }
78 }
79 return value.empty();
80 });
81 }
82
Disable(const PublishedDataKey & key,uint32_t firstCallerTokenId)83 int PublishedDataSubscriberManager::Disable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
84 {
85 auto result =
86 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
87 std::vector<ObserverNode> &value) {
88 for (auto it = value.begin(); it != value.end(); it++) {
89 if (it->firstCallerTokenId == firstCallerTokenId) {
90 it->enabled = false;
91 it->isNotifyOnEnabled = false;
92 }
93 }
94 return true;
95 });
96 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
97 }
98
Enable(const PublishedDataKey & key,uint32_t firstCallerTokenId)99 int PublishedDataSubscriberManager::Enable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
100 {
101 auto result =
102 publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
103 std::vector<ObserverNode> &value) {
104 for (auto it = value.begin(); it != value.end(); it++) {
105 if (it->firstCallerTokenId == firstCallerTokenId) {
106 it->enabled = true;
107 }
108 }
109 return true;
110 });
111 return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
112 }
113
Emit(const std::vector<PublishedDataKey> & keys,int32_t userId,const std::string & ownerBundleName,const sptr<IDataProxyPublishedDataObserver> observer)114 void PublishedDataSubscriberManager::Emit(const std::vector<PublishedDataKey> &keys, int32_t userId,
115 const std::string &ownerBundleName, const sptr<IDataProxyPublishedDataObserver> observer)
116 {
117 int32_t status;
118 // key is bundleName, value is change node
119 std::map<PublishedDataKey, PublishedDataNode::Data> publishedResult;
120 std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> callbacks;
121 publishedDataCache_.ForEach([&keys, &status, &observer, &publishedResult, &callbacks, &userId, this](
122 const PublishedDataKey &key, std::vector<ObserverNode> &val) {
123 for (auto &data : keys) {
124 if (key != data || publishedResult.count(key) != 0) {
125 continue;
126 }
127 status = PublishedData::Query(
128 Id(PublishedData::GenId(key.key, key.bundleName, key.subscriberId), userId), publishedResult[key]);
129 if (status != E_OK) {
130 ZLOGE("query fail %{public}s %{public}s %{public}" PRId64, data.bundleName.c_str(), data.key.c_str(),
131 data.subscriberId);
132 publishedResult.erase(key);
133 continue;
134 }
135 PutInto(callbacks, val, key, observer);
136 break;
137 }
138 return false;
139 });
140 PublishedDataChangeNode result;
141 for (auto &[callback, keys] : callbacks) {
142 result.datas_.clear();
143 for (auto &key : keys) {
144 if (publishedResult.count(key) != 0) {
145 result.datas_.emplace_back(key.key, key.subscriberId, PublishedDataNode::MoveTo(publishedResult[key]));
146 }
147 }
148 if (result.datas_.empty()) {
149 continue;
150 }
151 result.ownerBundleName_ = ownerBundleName;
152 callback->OnChangeFromPublishedData(result);
153 }
154 }
155
PutInto(std::map<sptr<IDataProxyPublishedDataObserver>,std::vector<PublishedDataKey>> & callbacks,const std::vector<ObserverNode> & val,const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer)156 void PublishedDataSubscriberManager::PutInto(
157 std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> &callbacks,
158 const std::vector<ObserverNode> &val, const PublishedDataKey &key,
159 const sptr<IDataProxyPublishedDataObserver> observer)
160 {
161 for (auto const &callback : val) {
162 if (callback.enabled && callback.observer != nullptr) {
163 // callback the observer, others do not call
164 if (observer != nullptr && callback.observer != observer) {
165 continue;
166 }
167 callbacks[callback.observer].emplace_back(key);
168 }
169 }
170 }
171
Clear()172 void PublishedDataSubscriberManager::Clear()
173 {
174 publishedDataCache_.Clear();
175 }
176
GetCount(const PublishedDataKey & key)177 int PublishedDataSubscriberManager::GetCount(const PublishedDataKey &key)
178 {
179 int count = 0;
180 publishedDataCache_.ComputeIfPresent(key, [&count](const auto &key, std::vector<ObserverNode> &value) {
181 count = static_cast<int>(value.size());
182 return true;
183 });
184 return count;
185 }
186
IsNotifyOnEnabled(const PublishedDataKey & key,uint32_t callerTokenId)187 bool PublishedDataSubscriberManager::IsNotifyOnEnabled(const PublishedDataKey &key, uint32_t callerTokenId)
188 {
189 auto pair = publishedDataCache_.Find(key);
190 if (!pair.first) {
191 return false;
192 }
193 for (const auto &value : pair.second) {
194 if (value.firstCallerTokenId == callerTokenId && value.isNotifyOnEnabled) {
195 return true;
196 }
197 }
198 return false;
199 }
200
SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> & keys)201 void PublishedDataSubscriberManager::SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> &keys)
202 {
203 for (const auto &pkey : keys) {
204 publishedDataCache_.ComputeIfPresent(pkey, [](const auto &key, std::vector<ObserverNode> &value) {
205 for (auto it = value.begin(); it != value.end(); it++) {
206 if (!it->enabled) {
207 it->isNotifyOnEnabled = true;
208 }
209 }
210 return true;
211 });
212 }
213 }
214
PublishedDataKey(const std::string & key,const std::string & bundle,const int64_t subscriberId)215 PublishedDataKey::PublishedDataKey(const std::string &key, const std::string &bundle, const int64_t subscriberId)
216 : key(key), bundleName(bundle), subscriberId(subscriberId)
217 {
218 /* private published data can use key as simple uri */
219 /* etc: datashareproxy://{bundleName}/meeting can use meeting replaced */
220 /* if key is normal uri, bundleName is from uri */
221 if (URIUtils::IsDataProxyURI(key)) {
222 URIUtils::GetBundleNameFromProxyURI(key, bundleName);
223 }
224 }
225
operator <(const PublishedDataKey & rhs) const226 bool PublishedDataKey::operator<(const PublishedDataKey &rhs) const
227 {
228 if (key < rhs.key) {
229 return true;
230 }
231 if (rhs.key < key) {
232 return false;
233 }
234 if (bundleName < rhs.bundleName) {
235 return true;
236 }
237 if (rhs.bundleName < bundleName) {
238 return false;
239 }
240 return subscriberId < rhs.subscriberId;
241 }
242
operator >(const PublishedDataKey & rhs) const243 bool PublishedDataKey::operator>(const PublishedDataKey &rhs) const
244 {
245 return rhs < *this;
246 }
247
operator <=(const PublishedDataKey & rhs) const248 bool PublishedDataKey::operator<=(const PublishedDataKey &rhs) const
249 {
250 return !(rhs < *this);
251 }
252
operator >=(const PublishedDataKey & rhs) const253 bool PublishedDataKey::operator>=(const PublishedDataKey &rhs) const
254 {
255 return !(*this < rhs);
256 }
257
operator ==(const PublishedDataKey & rhs) const258 bool PublishedDataKey::operator==(const PublishedDataKey &rhs) const
259 {
260 return key == rhs.key && bundleName == rhs.bundleName && subscriberId == rhs.subscriberId;
261 }
262
operator !=(const PublishedDataKey & rhs) const263 bool PublishedDataKey::operator!=(const PublishedDataKey &rhs) const
264 {
265 return !(rhs == *this);
266 }
267
ObserverNode(const sptr<IDataProxyPublishedDataObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId)268 PublishedDataSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyPublishedDataObserver> &observer,
269 uint32_t firstCallerTokenId, uint32_t callerTokenId)
270 : observer(observer), firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId)
271 {
272 }
273 } // namespace OHOS::DataShare
274