• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "proxy_data_subscriber_manager.h"
17 
18 #include <cinttypes>
19 
20 #include "data_proxy_observer_stub.h"
21 #include "dataproxy_handle_common.h"
22 #include "datashare_log.h"
23 #include "datashare_string_utils.h"
24 
25 namespace OHOS {
26 namespace DataShare {
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,const ProxyDataCallback & callback)27 std::vector<DataProxyResult> ProxyDataSubscriberManager::AddObservers(void *subscriber,
28     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris,
29     const ProxyDataCallback &callback)
30 {
31     std::vector<DataProxyResult> result = {};
32     if (proxy == nullptr) {
33         LOG_ERROR("proxy is nullptr");
34         return result;
35     }
36     std::vector<Key> keys;
37     std::for_each(uris.begin(), uris.end(), [&keys](auto &uri) {
38         keys.emplace_back(uri);
39     });
40     return BaseCallbacks::AddObservers(
41         keys, subscriber, std::make_shared<Observer>(callback),
42         [&proxy, subscriber, this](const std::vector<Key> &firstAddKeys,
43             const std::shared_ptr<Observer> observer, std::vector<DataProxyResult> &opResult) {
44             std::vector<std::string> firstAddUris;
45             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
46                 firstAddUris.emplace_back(result);
47             });
48             if (firstAddUris.empty()) {
49                 return;
50             }
51 
52             auto subResults = proxy->SubscribeProxyData(firstAddUris, serviceCallback_);
53             std::vector<Key> failedKeys;
54             for (auto &subResult : subResults) {
55                 opResult.emplace_back(subResult);
56                 if (subResult.result_ != SUCCESS) {
57                     failedKeys.emplace_back(subResult.uri_);
58                     LOG_WARN("registered failed, uri is %{public}s, errCode",
59                         DataShareStringUtils::Anonymous(subResult.uri_).c_str());
60                 }
61             }
62 
63             if (failedKeys.size() > 0) {
64                 BaseCallbacks::DelProxyDataObservers(failedKeys, subscriber);
65             }
66         });
67 }
68 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)69 std::vector<DataProxyResult> ProxyDataSubscriberManager::DelObservers(void *subscriber,
70     std::shared_ptr<DataShareServiceProxy> proxy)
71 {
72     std::vector<DataProxyResult> result = {};
73     if (proxy == nullptr) {
74         LOG_ERROR("proxy is nullptr");
75         return result;
76     }
77     return BaseCallbacks::DelObservers(subscriber,
78         [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<DataProxyResult> &opResult) {
79             // delete all obs by subscriber
80             std::vector<std::string> uris;
81             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&uris](const Key &key) {
82                 uris.emplace_back(key.uri_);
83             });
84             auto results = proxy->UnsubscribeProxyData(uris);
85             opResult.insert(opResult.end(), results.begin(), results.end());
86         });
87 }
88 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris)89 std::vector<DataProxyResult> ProxyDataSubscriberManager::DelObservers(void *subscriber,
90     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris)
91 {
92     std::vector<DataProxyResult> result = {};
93     if (proxy == nullptr) {
94         LOG_ERROR("proxy is nullptr");
95         return result;
96     }
97     if (uris.empty()) {
98         return DelObservers(subscriber, proxy);
99     }
100 
101     std::vector<Key> keys;
102     std::for_each(uris.begin(), uris.end(), [&keys](auto &uri) {
103         keys.emplace_back(uri);
104     });
105     return BaseCallbacks::DelProxyDataObservers(keys, subscriber,
106         [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<DataProxyResult> &opResult) {
107             std::vector<std::string> lastDelUris;
108             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
109                 lastDelUris.emplace_back(result);
110             });
111             if (lastDelUris.empty()) {
112                 return;
113             }
114             auto unsubResult = proxy->UnsubscribeProxyData(lastDelUris);
115             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
116         });
117 }
118 
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)119 void ProxyDataSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
120 {
121     if (proxy == nullptr) {
122         LOG_ERROR("proxy is nullptr");
123         return;
124     }
125 
126     std::vector<std::string> uris;
127     std::vector<Key> keys = CallbacksManager::GetKeys();
128     for (const auto& key : keys) {
129         uris.emplace_back(key.uri_);
130     }
131     auto results = proxy->SubscribeProxyData(uris, serviceCallback_);
132     for (const auto& result : results) {
133         if (result.result_ != SUCCESS) {
134             LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
135                 DataShareStringUtils::Anonymous(result.uri_).c_str(), result.result_);
136         }
137     }
138 }
139 
Emit(std::vector<DataProxyChangeInfo> & changeInfo)140 void ProxyDataSubscriberManager::Emit(std::vector<DataProxyChangeInfo> &changeInfo)
141 {
142     std::map<std::shared_ptr<Observer>, std::vector<DataProxyChangeInfo>> results;
143     for (auto &data : changeInfo) {
144         ProxyDataObserverMapKey key(data.uri_);
145         auto callbacks = BaseCallbacks::GetEnabledObservers(key);
146         for (auto const &obs : callbacks) {
147             results[obs].emplace_back(data.changeType_, data.uri_, data.value_);
148         }
149     }
150     for (auto &[callback, node] : results) {
151         callback->OnChange(node);
152     }
153 }
154 
GetInstance()155 ProxyDataSubscriberManager &ProxyDataSubscriberManager::GetInstance()
156 {
157     static ProxyDataSubscriberManager manager;
158     return manager;
159 }
160 
ProxyDataSubscriberManager()161 ProxyDataSubscriberManager::ProxyDataSubscriberManager()
162 {
163     serviceCallback_ = new ProxyDataObserverStub([this](std::vector<DataProxyChangeInfo> &changeInfo) {
164         Emit(changeInfo);
165     });
166 }
167 
ProxyDataObserver(const ProxyDataCallback & callback)168 ProxyDataObserver::ProxyDataObserver(const ProxyDataCallback &callback) : callback_(callback) {}
169 
OnChange(std::vector<DataProxyChangeInfo> & changeInfo)170 void ProxyDataObserver::OnChange(std::vector<DataProxyChangeInfo> &changeInfo)
171 {
172     callback_(changeInfo);
173 }
174 
operator ==(const ProxyDataObserver & rhs) const175 bool ProxyDataObserver::operator==(const ProxyDataObserver &rhs) const
176 {
177     return false;
178 }
179 
operator !=(const ProxyDataObserver & rhs) const180 bool ProxyDataObserver::operator!=(const ProxyDataObserver &rhs) const
181 {
182     return !(rhs == *this);
183 }
184 } // namespace DataShare
185 } // namespace OHOS
186