1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <cinttypes>
17 #include "datashare_connection.h"
18
19 #include "ams_mgr_proxy.h"
20 #include "datashare_common.h"
21 #include "datashare_errno.h"
22 #include "datashare_log.h"
23 #include "datashare_proxy.h"
24 #include "datashare_radar_reporter.h"
25 #include "datashare_string_utils.h"
26
27 namespace OHOS {
28 namespace DataShare {
29 using namespace AppExecFwk;
30 const std::chrono::milliseconds TIME_THRESHOLD = std::chrono::milliseconds(200);
31 /**
32 * @brief This method is called back to receive the connection result after an ability calls the
33 * ConnectAbility method to connect it to an extension ability.
34 *
35 * @param element: Indicates information about the connected extension ability.
36 * @param remote: Indicates the remote proxy object of the extension ability.
37 * @param resultCode: Indicates the connection result code. The value 0 indicates a successful connection, and any
38 * other value indicates a connection failure.
39 */
OnAbilityConnectDone(const AppExecFwk::ElementName & element,const sptr<IRemoteObject> & remoteObject,int resultCode)40 void DataShareConnection::OnAbilityConnectDone(
41 const AppExecFwk::ElementName &element, const sptr<IRemoteObject> &remoteObject, int resultCode)
42 {
43 LOG_INFO("on connect done, req uri:%{public}s, rev uri:%{public}s, ret=%{public}d",
44 DataShareStringUtils::Change(uri_.ToString()).c_str(),
45 DataShareStringUtils::Change(element.GetURI()).c_str(), resultCode);
46 if (remoteObject == nullptr) {
47 LOG_ERROR("remote is nullptr");
48 condition_.condition.notify_all();
49 return;
50 }
51 {
52 std::lock_guard<std::mutex> lock(mutex_);
53 sptr<DataShareProxy> proxy = new (std::nothrow) DataShareProxy(remoteObject);
54 dataShareProxy_ = std::shared_ptr<DataShareProxy>(proxy.GetRefPtr(), [holder = proxy](const auto *) {});
55 condition_.condition.notify_all();
56 }
57 if (isInvalid_.load()) {
58 LOG_ERROR("connect is invalid, req uri:%{public}s, rev uri:%{public}s, ret=%{public}d",
59 DataShareStringUtils::Change(uri_.ToString()).c_str(),
60 DataShareStringUtils::Change(element.GetURI()).c_str(), resultCode);
61 Disconnect();
62 }
63 // re-register when re-connect successed
64 if (isReconnect_.load()) {
65 ReRegisterObserverExtProvider();
66 isReconnect_.store(false);
67 }
68 }
69
70 /**
71 * @brief This method is called back to receive the disconnection result after the connected extension ability crashes
72 * or is killed. If the extension ability exits unexpectedly, all its connections are disconnected, and each ability
73 * previously connected to it will call onAbilityDisconnectDone.
74 *
75 * @param element: Indicates information about the disconnected extension ability.
76 * @param resultCode: Indicates the disconnection result code. The value 0 indicates a successful disconnection, and
77 * any other value indicates a disconnection failure.
78 */
OnAbilityDisconnectDone(const AppExecFwk::ElementName & element,int resultCode)79 void DataShareConnection::OnAbilityDisconnectDone(const AppExecFwk::ElementName &element, int resultCode)
80 {
81 LOG_INFO("on disconnect done, req uri:%{public}s, rev uri:%{public}s, ret=%{public}d",
82 DataShareStringUtils::Change(uri_.ToString()).c_str(),
83 DataShareStringUtils::Change(element.GetURI()).c_str(), resultCode);
84 std::string uri;
85 {
86 std::lock_guard<std::mutex> lock(mutex_);
87 dataShareProxy_ = nullptr;
88 uri = uri_.ToString();
89 }
90 if (uri.empty()) {
91 return;
92 }
93 if (pool_ == nullptr) {
94 std::lock_guard<std::mutex> lock(mutex_);
95 if (pool_ == nullptr) {
96 pool_ = std::make_shared<ExecutorPool>(MAX_THREADS, MIN_THREADS, DATASHARE_EXECUTOR_NAME);
97 }
98 }
99 ReconnectExtAbility(uri);
100 }
101
ReconnectExtAbility(const std::string & uri)102 void DataShareConnection::ReconnectExtAbility(const std::string &uri)
103 {
104 if (reConnects_.count == 0) {
105 AmsMgrProxy* instance = AmsMgrProxy::GetInstance();
106 if (instance == nullptr) {
107 LOG_ERROR("get proxy failed uri:%{public}s", DataShareStringUtils::Change(uri_.ToString()).c_str());
108 return;
109 }
110 ErrCode ret = instance->Connect(uri, this, token_);
111 LOG_INFO("reconnect ability, uri:%{public}s, ret = %{public}d",
112 DataShareStringUtils::Change(uri).c_str(), ret);
113 if (ret == E_OK) {
114 auto curr = std::chrono::system_clock::now().time_since_epoch();
115 reConnects_.count = 1;
116 reConnects_.firstTime = std::chrono::duration_cast<std::chrono::milliseconds>(curr).count();
117 reConnects_.prevTime = std::chrono::duration_cast<std::chrono::milliseconds>(curr).count();
118 // set status true
119 isReconnect_.store(true);
120 }
121 return;
122 }
123 return DelayConnectExtAbility(uri);
124 }
125
DelayConnectExtAbility(const std::string & uri)126 void DataShareConnection::DelayConnectExtAbility(const std::string &uri)
127 {
128 int64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
129 std::chrono::system_clock::now().time_since_epoch()).count();
130 if (now - reConnects_.prevTime >= MAX_RECONNECT_TIME_INTERVAL.count()) {
131 reConnects_.count = 0;
132 reConnects_.firstTime = now;
133 reConnects_.prevTime = now;
134 }
135 if (reConnects_.count >= MAX_RECONNECT) {
136 return;
137 }
138 auto delay = RECONNECT_TIME_INTERVAL;
139 if (now - reConnects_.prevTime >= RECONNECT_TIME_INTERVAL.count()) {
140 delay = std::chrono::seconds(0);
141 }
142 std::weak_ptr<DataShareConnection> self = weak_from_this();
143 auto taskid = pool_->Schedule(delay, [uri, self]() {
144 auto selfSharedPtr = self.lock();
145 if (selfSharedPtr) {
146 AmsMgrProxy* instance = AmsMgrProxy::GetInstance();
147 if (instance == nullptr) {
148 LOG_ERROR("get proxy failed uri:%{public}s", DataShareStringUtils::Change(uri).c_str());
149 return;
150 }
151 ErrCode ret = instance->Connect(uri, selfSharedPtr.get(), selfSharedPtr->token_);
152 LOG_INFO("reconnect ability, uri:%{public}s, ret = %{public}d",
153 DataShareStringUtils::Change(uri).c_str(), ret);
154 if (ret == E_OK) {
155 selfSharedPtr->reConnects_.count++;
156 selfSharedPtr->reConnects_.prevTime = std::chrono::duration_cast<std::chrono::milliseconds>(
157 std::chrono::system_clock::now().time_since_epoch()).count();
158 selfSharedPtr->isReconnect_.store(true);
159 }
160 }
161 });
162 if (taskid == ExecutorPool::INVALID_TASK_ID) {
163 LOG_ERROR("create scheduler failed, over the max capacity");
164 return;
165 }
166 LOG_DEBUG("create scheduler success");
167 return;
168 }
169
170 // store observer when it was successfully registered
UpdateObserverExtsProviderMap(const Uri & uri,const sptr<AAFwk::IDataAbilityObserver> & dataObserver,bool isDescendants)171 void DataShareConnection::UpdateObserverExtsProviderMap(const Uri &uri,
172 const sptr<AAFwk::IDataAbilityObserver> &dataObserver, bool isDescendants)
173 {
174 observerExtsProvider_.Compute(dataObserver, [&uri, isDescendants](const auto &key, auto &value) {
175 value.emplace_back(uri, isDescendants);
176 return true;
177 });
178 }
179
180 // remove observer when it was successfully unregistered
DeleteObserverExtsProviderMap(const Uri & uri,const sptr<AAFwk::IDataAbilityObserver> & dataObserver)181 void DataShareConnection::DeleteObserverExtsProviderMap(const Uri &uri,
182 const sptr<AAFwk::IDataAbilityObserver> &dataObserver)
183 {
184 observerExtsProvider_.Compute(dataObserver, [&uri](const auto &key, auto &value) {
185 value.remove_if([&uri](const auto ¶m) {
186 return uri == param.uri;
187 });
188 return !value.empty();
189 });
190 }
191
192 // re-register observer by dataShareProxy
ReRegisterObserverExtProvider()193 void DataShareConnection::ReRegisterObserverExtProvider()
194 {
195 LOG_INFO("ReRegisterObserverExtProvider start");
196 decltype(observerExtsProvider_) observerExtsProvider(std::move(observerExtsProvider_));
197 observerExtsProvider_.Clear();
198 observerExtsProvider.ForEach([this](const auto &key, const auto &value) {
199 for (const auto ¶m : value) {
200 auto ret = dataShareProxy_->RegisterObserverExtProvider(param.uri, key, param.isDescendants, { true });
201 if (ret != E_OK) {
202 LOG_ERROR(
203 "RegisterObserverExt failed, param.uri:%{public}s, ret:%{public}d, param.isDescendants:%{public}d",
204 DataShareStringUtils::Anonymous(param.uri.ToString()).c_str(), ret, param.isDescendants
205 );
206 } else {
207 UpdateObserverExtsProviderMap(param.uri, key, param.isDescendants);
208 }
209 }
210 return false;
211 });
212 }
213 /**
214 * @brief connect remote ability of DataShareExtAbility.
215 */
ConnectDataShareExtAbility(const Uri & uri,const sptr<IRemoteObject> & token)216 std::shared_ptr<DataShareProxy> DataShareConnection::ConnectDataShareExtAbility(const Uri &uri,
217 const sptr<IRemoteObject> &token)
218 {
219 std::string reqUri;
220 {
221 std::lock_guard<std::mutex> lock(mutex_);
222 if (dataShareProxy_ != nullptr) {
223 return dataShareProxy_;
224 }
225 reqUri = uri_.ToString().empty() ? uri.ToString() : uri_.ToString();
226 }
227
228 AmsMgrProxy* instance = AmsMgrProxy::GetInstance();
229 if (instance == nullptr) {
230 LOG_ERROR("get proxy failed uri:%{public}s", DataShareStringUtils::Change(reqUri).c_str());
231 return nullptr;
232 }
233 ErrCode ret = instance->Connect(reqUri, this, token);
234 LOG_INFO("connect ability, uri = %{public}s. ret = %{public}d", DataShareStringUtils::Change(reqUri).c_str(), ret);
235 if (ret != ERR_OK) {
236 return nullptr;
237 }
238 std::unique_lock<std::mutex> condLock(mutex_);
239 std::shared_ptr<DataShareProxy> proxy = dataShareProxy_;
240 if (proxy != nullptr) {
241 return proxy;
242 }
243 auto start = std::chrono::steady_clock::now();
244 if (condition_.condition.wait_for(condLock, std::chrono::seconds(waitTime_),
245 [this] { return dataShareProxy_ != nullptr; })) {
246 auto finish = std::chrono::steady_clock::now();
247 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
248 if (duration >= TIME_THRESHOLD) {
249 int64_t milliseconds = duration.count();
250 LOG_WARN("over time connecting ability, uri:%{public}s, time:%{public}" PRIi64 "ms",
251 DataShareStringUtils::Change(reqUri).c_str(), milliseconds);
252 }
253 LOG_DEBUG("connect ability ended successfully uri:%{public}s", DataShareStringUtils::Change(reqUri).c_str());
254 } else {
255 LOG_WARN("connect timeout uri:%{public}s", DataShareStringUtils::Change(reqUri).c_str());
256 }
257 return dataShareProxy_;
258 }
259
260 /**
261 * @brief disconnect remote ability of DataShareExtAbility.
262 */
DisconnectDataShareExtAbility()263 void DataShareConnection::DisconnectDataShareExtAbility()
264 {
265 std::string uri;
266 {
267 std::lock_guard<std::mutex> lock(mutex_);
268 uri = uri_.ToString();
269 uri_ = Uri("");
270 if (dataShareProxy_ == nullptr) {
271 return;
272 }
273 }
274
275 ErrCode ret = Disconnect();
276 LOG_INFO("disconnect uri:%{public}s, ret = %{public}d", DataShareStringUtils::Change(uri).c_str(), ret);
277 if (ret == E_OK) {
278 return;
279 }
280 }
281
~DataShareConnection()282 DataShareConnection::~DataShareConnection()
283 {
284 }
285
GetDataShareProxy(const Uri & uri,const sptr<IRemoteObject> & token)286 std::shared_ptr<DataShareProxy> DataShareConnection::GetDataShareProxy(const Uri &uri,
287 const sptr<IRemoteObject> &token)
288 {
289 return ConnectDataShareExtAbility(uri, token);
290 }
291
SetConnectInvalid()292 void DataShareConnection::SetConnectInvalid()
293 {
294 isInvalid_.store(true);
295 }
296
Disconnect()297 ErrCode DataShareConnection::Disconnect()
298 {
299 AmsMgrProxy* instance = AmsMgrProxy::GetInstance();
300 if (instance == nullptr) {
301 return -1;
302 }
303 return instance->DisConnect(this);
304 }
305
GetDataShareProxy()306 std::shared_ptr<DataShareProxy> DataShareConnection::GetDataShareProxy()
307 {
308 std::lock_guard<std::mutex> lock(mutex_);
309 return dataShareProxy_;
310 }
311 } // namespace DataShare
312 } // namespace OHOS