• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "PublishedDataSubscriberManager"
16 
17 #include "published_data_subscriber_manager.h"
18 
19 #include "ipc_skeleton.h"
20 #include "general/load_config_data_info_strategy.h"
21 #include "log_print.h"
22 #include "published_data.h"
23 #include "uri_utils.h"
24 #include "utils/anonymous.h"
25 
26 namespace OHOS::DataShare {
GetInstance()27 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
28 {
29     static PublishedDataSubscriberManager manager;
30     return manager;
31 }
32 
Add(const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer,uint32_t firstCallerTokenId)33 int PublishedDataSubscriberManager::Add(
34     const PublishedDataKey &key, const sptr<IDataProxyPublishedDataObserver> observer, uint32_t firstCallerTokenId)
35 {
36     publishedDataCache_.Compute(
37         key, [&observer, &firstCallerTokenId, this](const PublishedDataKey &key, std::vector<ObserverNode> &value) {
38             ZLOGI("add publish subscriber, uri %{public}s tokenId 0x%{public}x",
39                 DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
40             value.emplace_back(observer, firstCallerTokenId, IPCSkeleton::GetCallingTokenID());
41             return true;
42         });
43     return E_OK;
44 }
45 
Delete(const PublishedDataKey & key,uint32_t firstCallerTokenId)46 int PublishedDataSubscriberManager::Delete(const PublishedDataKey &key, uint32_t firstCallerTokenId)
47 {
48     auto result =
49         publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
50             std::vector<ObserverNode> &value) {
51             for (auto it = value.begin(); it != value.end();) {
52                 if (it->firstCallerTokenId == firstCallerTokenId) {
53                     ZLOGI("delete publish subscriber, uri %{public}s tokenId 0x%{public}x",
54                         DistributedData::Anonymous::Change(key.key).c_str(), firstCallerTokenId);
55                     it = value.erase(it);
56                 } else {
57                     it++;
58                 }
59             }
60             return !value.empty();
61         });
62     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
63 }
64 
Delete(uint32_t callerTokenId)65 void PublishedDataSubscriberManager::Delete(uint32_t callerTokenId)
66 {
67     publishedDataCache_.EraseIf([&callerTokenId](const auto &key, std::vector<ObserverNode> &value) {
68         for (auto it = value.begin(); it != value.end();) {
69             if (it->callerTokenId == callerTokenId) {
70                 ZLOGI("erase start, uri is %{public}s, tokenId is 0x%{public}x",
71                     DistributedData::Anonymous::Change(key.key).c_str(), callerTokenId);
72                 it = value.erase(it);
73             } else {
74                 it++;
75             }
76         }
77         return value.empty();
78     });
79 }
80 
Disable(const PublishedDataKey & key,uint32_t firstCallerTokenId)81 int PublishedDataSubscriberManager::Disable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
82 {
83     auto result =
84         publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
85             std::vector<ObserverNode> &value) {
86             for (auto it = value.begin(); it != value.end(); it++) {
87                 if (it->firstCallerTokenId == firstCallerTokenId) {
88                     it->enabled = false;
89                     it->isNotifyOnEnabled = false;
90                 }
91             }
92             return true;
93         });
94     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
95 }
96 
Enable(const PublishedDataKey & key,uint32_t firstCallerTokenId)97 int PublishedDataSubscriberManager::Enable(const PublishedDataKey &key, uint32_t firstCallerTokenId)
98 {
99     auto result =
100         publishedDataCache_.ComputeIfPresent(key, [&firstCallerTokenId](const auto &key,
101             std::vector<ObserverNode> &value) {
102             for (auto it = value.begin(); it != value.end(); it++) {
103                 if (it->firstCallerTokenId == firstCallerTokenId) {
104                     it->enabled = true;
105                 }
106             }
107             return true;
108         });
109     return result ? E_OK : E_SUBSCRIBER_NOT_EXIST;
110 }
111 
Emit(const std::vector<PublishedDataKey> & keys,int32_t userId,const std::string & ownerBundleName,const sptr<IDataProxyPublishedDataObserver> observer)112 void PublishedDataSubscriberManager::Emit(const std::vector<PublishedDataKey> &keys, int32_t userId,
113     const std::string &ownerBundleName, const sptr<IDataProxyPublishedDataObserver> observer)
114 {
115     int32_t status;
116     // key is bundleName, value is change node
117     std::map<PublishedDataKey, PublishedDataNode::Data> publishedResult;
118     std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> callbacks;
119     publishedDataCache_.ForEach([&keys, &status, &observer, &publishedResult, &callbacks, &userId, this](
120         const PublishedDataKey &key, std::vector<ObserverNode> &val) {
121         for (auto &data : keys) {
122             if (key != data || publishedResult.count(key) != 0) {
123                 continue;
124             }
125             status = PublishedData::Query(
126                 Id(PublishedData::GenId(key.key, key.bundleName, key.subscriberId), userId), publishedResult[key]);
127             if (status != E_OK) {
128                 ZLOGE("query fail %{public}s %{public}s %{public}" PRId64, data.bundleName.c_str(), data.key.c_str(),
129                     data.subscriberId);
130                 publishedResult.erase(key);
131                 continue;
132             }
133             PutInto(callbacks, val, key, observer);
134             break;
135         }
136         return false;
137     });
138     PublishedDataChangeNode result;
139     for (auto &[callback, keys] : callbacks) {
140         result.datas_.clear();
141         for (auto &key : keys) {
142             if (publishedResult.count(key) != 0) {
143                 result.datas_.emplace_back(key.key, key.subscriberId, PublishedDataNode::MoveTo(publishedResult[key]));
144             }
145         }
146         if (result.datas_.empty()) {
147             continue;
148         }
149         result.ownerBundleName_ = ownerBundleName;
150         callback->OnChangeFromPublishedData(result);
151     }
152 }
153 
PutInto(std::map<sptr<IDataProxyPublishedDataObserver>,std::vector<PublishedDataKey>> & callbacks,const std::vector<ObserverNode> & val,const PublishedDataKey & key,const sptr<IDataProxyPublishedDataObserver> observer)154 void PublishedDataSubscriberManager::PutInto(
155     std::map<sptr<IDataProxyPublishedDataObserver>, std::vector<PublishedDataKey>> &callbacks,
156     const std::vector<ObserverNode> &val, const PublishedDataKey &key,
157     const sptr<IDataProxyPublishedDataObserver> observer)
158 {
159     for (auto const &callback : val) {
160         if (callback.enabled && callback.observer != nullptr) {
161             // callback the observer, others do not call
162             if (observer != nullptr && callback.observer != observer) {
163                 continue;
164             }
165             callbacks[callback.observer].emplace_back(key);
166         }
167     }
168 }
169 
Clear()170 void PublishedDataSubscriberManager::Clear()
171 {
172     publishedDataCache_.Clear();
173 }
174 
GetCount(const PublishedDataKey & key)175 int PublishedDataSubscriberManager::GetCount(const PublishedDataKey &key)
176 {
177     int count = 0;
178     publishedDataCache_.ComputeIfPresent(key, [&count](const auto &key, std::vector<ObserverNode> &value) {
179         count = static_cast<int>(value.size());
180         return true;
181     });
182     return count;
183 }
184 
IsNotifyOnEnabled(const PublishedDataKey & key,uint32_t callerTokenId)185 bool PublishedDataSubscriberManager::IsNotifyOnEnabled(const PublishedDataKey &key, uint32_t callerTokenId)
186 {
187     auto pair = publishedDataCache_.Find(key);
188     if (!pair.first) {
189         return false;
190     }
191     for (const auto &value : pair.second) {
192         if (value.firstCallerTokenId == callerTokenId && value.isNotifyOnEnabled) {
193             return true;
194         }
195     }
196     return false;
197 }
198 
SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> & keys)199 void PublishedDataSubscriberManager::SetObserversNotifiedOnEnabled(const std::vector<PublishedDataKey> &keys)
200 {
201     for (const auto &pkey : keys) {
202         publishedDataCache_.ComputeIfPresent(pkey, [](const auto &key, std::vector<ObserverNode> &value) {
203             for (auto it = value.begin(); it != value.end(); it++) {
204                 if (!it->enabled) {
205                     it->isNotifyOnEnabled = true;
206                 }
207             }
208             return true;
209         });
210     }
211 }
212 
PublishedDataKey(const std::string & key,const std::string & bundle,const int64_t subscriberId)213 PublishedDataKey::PublishedDataKey(const std::string &key, const std::string &bundle, const int64_t subscriberId)
214     : key(key), bundleName(bundle), subscriberId(subscriberId)
215 {
216     /* private published data can use key as simple uri */
217     /* etc: datashareproxy://{bundleName}/meeting can use meeting replaced */
218     /* if key is normal uri, bundleName is from uri */
219     if (URIUtils::IsDataProxyURI(key)) {
220         URIUtils::GetBundleNameFromProxyURI(key, bundleName);
221     }
222 }
223 
operator <(const PublishedDataKey & rhs) const224 bool PublishedDataKey::operator<(const PublishedDataKey &rhs) const
225 {
226     if (key < rhs.key) {
227         return true;
228     }
229     if (rhs.key < key) {
230         return false;
231     }
232     if (bundleName < rhs.bundleName) {
233         return true;
234     }
235     if (rhs.bundleName < bundleName) {
236         return false;
237     }
238     return subscriberId < rhs.subscriberId;
239 }
240 
operator >(const PublishedDataKey & rhs) const241 bool PublishedDataKey::operator>(const PublishedDataKey &rhs) const
242 {
243     return rhs < *this;
244 }
245 
operator <=(const PublishedDataKey & rhs) const246 bool PublishedDataKey::operator<=(const PublishedDataKey &rhs) const
247 {
248     return !(rhs < *this);
249 }
250 
operator >=(const PublishedDataKey & rhs) const251 bool PublishedDataKey::operator>=(const PublishedDataKey &rhs) const
252 {
253     return !(*this < rhs);
254 }
255 
operator ==(const PublishedDataKey & rhs) const256 bool PublishedDataKey::operator==(const PublishedDataKey &rhs) const
257 {
258     return key == rhs.key && bundleName == rhs.bundleName && subscriberId == rhs.subscriberId;
259 }
260 
operator !=(const PublishedDataKey & rhs) const261 bool PublishedDataKey::operator!=(const PublishedDataKey &rhs) const
262 {
263     return !(rhs == *this);
264 }
265 
ObserverNode(const sptr<IDataProxyPublishedDataObserver> & observer,uint32_t firstCallerTokenId,uint32_t callerTokenId)266 PublishedDataSubscriberManager::ObserverNode::ObserverNode(const sptr<IDataProxyPublishedDataObserver> &observer,
267     uint32_t firstCallerTokenId, uint32_t callerTokenId)
268     : observer(observer), firstCallerTokenId(firstCallerTokenId), callerTokenId(callerTokenId)
269 {
270 }
271 } // namespace OHOS::DataShare
272