• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "PublishedDataSubscriberManager"
16 
17 #include "published_data_subscriber_manager.h"
18 
19 #include <cinttypes>
20 
21 #include "ipc_skeleton.h"
22 #include "general/load_config_data_info_strategy.h"
23 #include "log_print.h"
24 #include "published_data.h"
25 #include "uri_utils.h"
26 #include "utils/anonymous.h"
27 
28 namespace OHOS::DataShare {
GetInstance()29 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
30 {
31     static PublishedDataSubscriberManager manager;
32     return manager;
33 }
34 
Add(const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer,uint32_t firstCallerTokenId)35 int PublishedDataSubscriberManager::Add(
36     const PublishedDataKey &key, const sptr<IDataProxyPublishedDataObserver> observer, uint32_t firstCallerTokenId)
37 {
38     publishedDataCache_.Compute(
39         key, [&observer, &firstCallerTokenId, this](const PublishedDataKey &key, std::vector<ObserverNode> &value) {
40             ZLOGI("add publish subscriber, uri %{public}s tokenId 0x%{public}x",
41                 DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
42             value.emplace_back(observer, firstCallerTokenId, IPCSkeleton::GetCallingTokenID());
43             return true;
44         });
45     return E_OK;
46 }
47 
Delete(const PublishedDataKey & key,uint32_t firstCallerTokenId)48 int PublishedDataSubscriberManager::Delete(const PublishedDataKey &key, uint32_t firstCallerTokenId)
49 {
50     auto result =
51         publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
52             std::vector<ObserverNode> &value) {
53             for (auto it = value.begin(); it != value.end();) {
54                 if (it->firstCallerTokenId == firstCallerTokenId) {
55                     ZLOGI("delete publish subscriber, uri %{public}s tokenId 0x%{public}x",
56                         DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
57                     it = value.erase(it);
58                 } else {
59                     it++;
60                 }
61             }
62             return !value.empty();
63         });
64     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
65 }
66 
Delete(uint32_t callerTokenId)67 void PublishedDataSubscriberManager::Delete(uint32_t callerTokenId)
68 {
69     publishedDataCache_.EraseIf([&callerTokenId](const auto &key, std::vector<ObserverNode> &value) {
70         for (auto it = value.begin(); it != value.end();) {
71             if (it->callerTokenId == callerTokenId) {
72                 ZLOGI("erase start, uri is %{public}s, tokenId is 0x%{public}x",
73                     DistributedData::Anonymous::Change(key.key).c_str(), callerTokenId);
74                 it = value.erase(it);
75             } else {
76                 it++;
77             }
78         }
79         return value.empty();
80     });
81 }
82 
Disable(const PublishedDataKey & key,uint32_t firstCallerTokenId)83 int PublishedDataSubscriberManager::Disable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
84 {
85     auto result =
86         publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
87             std::vector<ObserverNode> &value) {
88             for (auto it = value.begin(); it != value.end(); it++) {
89                 if (it->firstCallerTokenId == firstCallerTokenId) {
90                     it->enabled = false;
91                     it->isNotifyOnEnabled = false;
92                 }
93             }
94             return true;
95         });
96     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
97 }
98 
Enable(const PublishedDataKey & key,uint32_t firstCallerTokenId)99 int PublishedDataSubscriberManager::Enable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
100 {
101     auto result =
102         publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
103             std::vector<ObserverNode> &value) {
104             for (auto it = value.begin(); it != value.end(); it++) {
105                 if (it->firstCallerTokenId == firstCallerTokenId) {
106                     it->enabled = true;
107                 }
108             }
109             return true;
110         });
111     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
112 }
113 
Emit(const std::vector<PublishedDataKey> & keys,int32_t userId,const std::string & ownerBundleName,const sptr<IDataProxyPublishedDataObserver> observer)114 void PublishedDataSubscriberManager::Emit(const std::vector<PublishedDataKey> &keys, int32_t userId,
115     const std::string &ownerBundleName, const sptr<IDataProxyPublishedDataObserver> observer)
116 {
117     int32_t status;
118     // key is bundleName, value is change node
119     std::map<PublishedDataKey, PublishedDataNode::Data> publishedResult;
120     std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> callbacks;
121     publishedDataCache_.ForEach([&keys, &status, &observer, &publishedResult, &callbacks, &userId, this](
122         const PublishedDataKey &key, std::vector<ObserverNode> &val) {
123         for (auto &data : keys) {
124             if (key != data || publishedResult.count(key) != 0) {
125                 continue;
126             }
127             status = PublishedData::Query(
128                 Id(PublishedData::GenId(key.key, key.bundleName, key.subscriberId), userId), publishedResult[key]);
129             if (status != E_OK) {
130                 ZLOGE("query fail %{public}s %{public}s %{public}" PRId64, data.bundleName.c_str(), data.key.c_str(),
131                     data.subscriberId);
132                 publishedResult.erase(key);
133                 continue;
134             }
135             PutInto(callbacks, val, key, observer);
136             break;
137         }
138         return false;
139     });
140     PublishedDataChangeNode result;
141     for (auto &[callback, keys] : callbacks) {
142         result.datas_.clear();
143         for (auto &key : keys) {
144             if (publishedResult.count(key) != 0) {
145                 result.datas_.emplace_back(key.key, key.subscriberId, PublishedDataNode::MoveTo(publishedResult[key]));
146             }
147         }
148         if (result.datas_.empty()) {
149             continue;
150         }
151         result.ownerBundleName_ = ownerBundleName;
152         callback->OnChangeFromPublishedData(result);
153     }
154 }
155 
PutInto(std::map<sptr<IDataProxyPublishedDataObserver>,std::vector<PublishedDataKey>> & callbacks,const std::vector<ObserverNode> & val,const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer)156 void PublishedDataSubscriberManager::PutInto(
157     std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> &callbacks,
158     const std::vector<ObserverNode> &val, const PublishedDataKey &key,
159     const sptr<IDataProxyPublishedDataObserver> observer)
160 {
161     for (auto const &callback : val) {
162         if (callback.enabled && callback.observer != nullptr) {
163             // callback the observer, others do not call
164             if (observer != nullptr && callback.observer != observer) {
165                 continue;
166             }
167             callbacks[callback.observer].emplace_back(key);
168         }
169     }
170 }
171 
Clear()172 void PublishedDataSubscriberManager::Clear()
173 {
174     publishedDataCache_.Clear();
175 }
176 
GetCount(const PublishedDataKey & key)177 int PublishedDataSubscriberManager::GetCount(const PublishedDataKey &key)
178 {
179     int count = 0;
180     publishedDataCache_.ComputeIfPresent(key, [&count](const auto &key, std::vector<ObserverNode> &value) {
181         count = static_cast<int>(value.size());
182         return true;
183     });
184     return count;
185 }
186 
IsNotifyOnEnabled(const PublishedDataKey & key,uint32_t callerTokenId)187 bool PublishedDataSubscriberManager::IsNotifyOnEnabled(const PublishedDataKey &key, uint32_t callerTokenId)
188 {
189     auto pair = publishedDataCache_.Find(key);
190     if (!pair.first) {
191         return false;
192     }
193     for (const auto &value : pair.second) {
194         if (value.firstCallerTokenId == callerTokenId && value.isNotifyOnEnabled) {
195             return true;
196         }
197     }
198     return false;
199 }
200 
SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> & keys)201 void PublishedDataSubscriberManager::SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> &keys)
202 {
203     for (const auto &pkey : keys) {
204         publishedDataCache_.ComputeIfPresent(pkey, [](const auto &key, std::vector<ObserverNode> &value) {
205             for (auto it = value.begin(); it != value.end(); it++) {
206                 if (!it->enabled) {
207                     it->isNotifyOnEnabled = true;
208                 }
209             }
210             return true;
211         });
212     }
213 }
214 
PublishedDataKey(const std::string & key,const std::string & bundle,const int64_t subscriberId)215 PublishedDataKey::PublishedDataKey(const std::string &key, const std::string &bundle, const int64_t subscriberId)
216     : key(key), bundleName(bundle), subscriberId(subscriberId)
217 {
218     /* private published data can use key as simple uri */
219     /* etc: datashareproxy://{bundleName}/meeting can use meeting replaced */
220     /* if key is normal uri, bundleName is from uri */
221     if (URIUtils::IsDataProxyURI(key)) {
222         URIUtils::GetBundleNameFromProxyURI(key, bundleName);
223     }
224 }
225 
operator <(const PublishedDataKey & rhs) const226 bool PublishedDataKey::operator<(const PublishedDataKey &rhs) const
227 {
228     if (key < rhs.key) {
229         return true;
230     }
231     if (rhs.key < key) {
232         return false;
233     }
234     if (bundleName < rhs.bundleName) {
235         return true;
236     }
237     if (rhs.bundleName < bundleName) {
238         return false;
239     }
240     return subscriberId < rhs.subscriberId;
241 }
242 
operator >(const PublishedDataKey & rhs) const243 bool PublishedDataKey::operator>(const PublishedDataKey &rhs) const
244 {
245     return rhs < *this;
246 }
247 
operator <=(const PublishedDataKey & rhs) const248 bool PublishedDataKey::operator<=(const PublishedDataKey &rhs) const
249 {
250     return !(rhs < *this);
251 }
252 
operator >=(const PublishedDataKey & rhs) const253 bool PublishedDataKey::operator>=(const PublishedDataKey &rhs) const
254 {
255     return !(*this < rhs);
256 }
257 
operator ==(const PublishedDataKey & rhs) const258 bool PublishedDataKey::operator==(const PublishedDataKey &rhs) const
259 {
260     return key == rhs.key && bundleName == rhs.bundleName && subscriberId == rhs.subscriberId;
261 }
262 
operator !=(const PublishedDataKey & rhs) const263 bool PublishedDataKey::operator!=(const PublishedDataKey &rhs) const
264 {
265     return !(rhs == *this);
266 }
267 
ObserverNode(const sptr<IDataProxyPublishedDataObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId)268 PublishedDataSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyPublishedDataObserver> &observer,
269     uint32_t firstCallerTokenId, uint32_t callerTokenId)
270     : observer(observer), firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId)
271 {
272 }
273 } // namespace OHOS::DataShare
274