• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <cinttypes>
17 #include "datashare_connection.h"
18 
19 #include "ams_mgr_proxy.h"
20 #include "datashare_common.h"
21 #include "datashare_errno.h"
22 #include "datashare_log.h"
23 #include "datashare_proxy.h"
24 #include "datashare_radar_reporter.h"
25 #include "datashare_string_utils.h"
26 
27 namespace OHOS {
28 namespace DataShare {
29 using namespace AppExecFwk;
30 const std::chrono::milliseconds TIME_THRESHOLD = std::chrono::milliseconds(200);
31 /**
32  * @brief This method is called back to receive the connection result after an ability calls the
33  * ConnectAbility method to connect it to an extension ability.
34  *
35  * @param element: Indicates information about the connected extension ability.
36  * @param remote: Indicates the remote proxy object of the extension ability.
37  * @param resultCode: Indicates the connection result code. The value 0 indicates a successful connection, and any
38  * other value indicates a connection failure.
39  */
OnAbilityConnectDone(const AppExecFwk::ElementName & element,const sptr<IRemoteObject> & remoteObject,int resultCode)40 void DataShareConnection::OnAbilityConnectDone(
41     const AppExecFwk::ElementName &element, const sptr<IRemoteObject> &remoteObject, int resultCode)
42 {
43     LOG_INFO("on connect done, req uri:%{public}s, rev uri:%{public}s, ret=%{public}d",
44         DataShareStringUtils::Change(uri_.ToString()).c_str(),
45         DataShareStringUtils::Change(element.GetURI()).c_str(), resultCode);
46     if (remoteObject == nullptr) {
47         LOG_ERROR("remote is nullptr");
48         condition_.condition.notify_all();
49         return;
50     }
51     {
52         std::lock_guard<std::mutex> lock(mutex_);
53         sptr<DataShareProxy> proxy = new (std::nothrow) DataShareProxy(remoteObject);
54         dataShareProxy_ = std::shared_ptr<DataShareProxy>(proxy.GetRefPtr(), [holder = proxy](const auto *) {});
55         condition_.condition.notify_all();
56     }
57     if (isInvalid_.load()) {
58         LOG_ERROR("connect is invalid, req uri:%{public}s, rev uri:%{public}s, ret=%{public}d",
59             DataShareStringUtils::Change(uri_.ToString()).c_str(),
60             DataShareStringUtils::Change(element.GetURI()).c_str(), resultCode);
61         Disconnect();
62     }
63     // re-register when re-connect successed
64     if (isReconnect_.load()) {
65         ReRegisterObserverExtProvider();
66         isReconnect_.store(false);
67     }
68 }
69 
70 /**
71  * @brief This method is called back to receive the disconnection result after the connected extension ability crashes
72  * or is killed. If the extension ability exits unexpectedly, all its connections are disconnected, and each ability
73  * previously connected to it will call onAbilityDisconnectDone.
74  *
75  * @param element: Indicates information about the disconnected extension ability.
76  * @param resultCode: Indicates the disconnection result code. The value 0 indicates a successful disconnection, and
77  * any other value indicates a disconnection failure.
78  */
OnAbilityDisconnectDone(const AppExecFwk::ElementName & element,int resultCode)79 void DataShareConnection::OnAbilityDisconnectDone(const AppExecFwk::ElementName &element, int resultCode)
80 {
81     LOG_INFO("on disconnect done, req uri:%{public}s, rev uri:%{public}s, ret=%{public}d",
82         DataShareStringUtils::Change(uri_.ToString()).c_str(),
83         DataShareStringUtils::Change(element.GetURI()).c_str(), resultCode);
84     std::string uri;
85     {
86         std::lock_guard<std::mutex> lock(mutex_);
87         dataShareProxy_ = nullptr;
88         uri = uri_.ToString();
89     }
90     if (uri.empty()) {
91         return;
92     }
93     if (pool_ == nullptr) {
94         std::lock_guard<std::mutex> lock(mutex_);
95         if (pool_ == nullptr) {
96             pool_ = std::make_shared<ExecutorPool>(MAX_THREADS, MIN_THREADS, DATASHARE_EXECUTOR_NAME);
97         }
98     }
99     ReconnectExtAbility(uri);
100 }
101 
ReconnectExtAbility(const std::string & uri)102 void DataShareConnection::ReconnectExtAbility(const std::string &uri)
103 {
104     if (reConnects_.count == 0) {
105         AmsMgrProxy* instance = AmsMgrProxy::GetInstance();
106         if (instance == nullptr) {
107             LOG_ERROR("get proxy failed uri:%{public}s", DataShareStringUtils::Change(uri_.ToString()).c_str());
108             return;
109         }
110         ErrCode ret = instance->Connect(uri, this, token_);
111         LOG_INFO("reconnect ability, uri:%{public}s, ret = %{public}d",
112             DataShareStringUtils::Change(uri).c_str(), ret);
113         if (ret == E_OK) {
114             auto curr = std::chrono::system_clock::now().time_since_epoch();
115             reConnects_.count = 1;
116             reConnects_.firstTime = std::chrono::duration_cast<std::chrono::milliseconds>(curr).count();
117             reConnects_.prevTime = std::chrono::duration_cast<std::chrono::milliseconds>(curr).count();
118             // set status true
119             isReconnect_.store(true);
120         }
121         return;
122     }
123     return DelayConnectExtAbility(uri);
124 }
125 
DelayConnectExtAbility(const std::string & uri)126 void DataShareConnection::DelayConnectExtAbility(const std::string &uri)
127 {
128     int64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
129         std::chrono::system_clock::now().time_since_epoch()).count();
130     if (now - reConnects_.prevTime >= MAX_RECONNECT_TIME_INTERVAL.count()) {
131         reConnects_.count = 0;
132         reConnects_.firstTime = now;
133         reConnects_.prevTime = now;
134     }
135     if (reConnects_.count >= MAX_RECONNECT) {
136         return;
137     }
138     auto delay = RECONNECT_TIME_INTERVAL;
139     if (now - reConnects_.prevTime >= RECONNECT_TIME_INTERVAL.count()) {
140         delay = std::chrono::seconds(0);
141     }
142     std::weak_ptr<DataShareConnection> self = weak_from_this();
143     auto taskid = pool_->Schedule(delay, [uri, self]() {
144         auto selfSharedPtr = self.lock();
145         if (selfSharedPtr) {
146             AmsMgrProxy* instance = AmsMgrProxy::GetInstance();
147             if (instance == nullptr) {
148                 LOG_ERROR("get proxy failed uri:%{public}s", DataShareStringUtils::Change(uri).c_str());
149                 return;
150             }
151             ErrCode ret = instance->Connect(uri, selfSharedPtr.get(), selfSharedPtr->token_);
152             LOG_INFO("reconnect ability, uri:%{public}s, ret = %{public}d",
153                 DataShareStringUtils::Change(uri).c_str(), ret);
154             if (ret == E_OK) {
155                 selfSharedPtr->reConnects_.count++;
156                 selfSharedPtr->reConnects_.prevTime = std::chrono::duration_cast<std::chrono::milliseconds>(
157                     std::chrono::system_clock::now().time_since_epoch()).count();
158                 selfSharedPtr->isReconnect_.store(true);
159             }
160         }
161     });
162     if (taskid == ExecutorPool::INVALID_TASK_ID) {
163         LOG_ERROR("create scheduler failed, over the max capacity");
164         return;
165     }
166     LOG_DEBUG("create scheduler success");
167     return;
168 }
169 
170 // store observer when it was successfully registered
UpdateObserverExtsProviderMap(const Uri & uri,const sptr<AAFwk::IDataAbilityObserver> & dataObserver,bool isDescendants)171 void DataShareConnection::UpdateObserverExtsProviderMap(const Uri &uri,
172     const sptr<AAFwk::IDataAbilityObserver> &dataObserver, bool isDescendants)
173 {
174     observerExtsProvider_.Compute(dataObserver, [&uri, isDescendants](const auto &key, auto &value) {
175         value.emplace_back(uri, isDescendants);
176         return true;
177     });
178 }
179 
180 // remove observer when it was successfully unregistered
DeleteObserverExtsProviderMap(const Uri & uri,const sptr<AAFwk::IDataAbilityObserver> & dataObserver)181 void DataShareConnection::DeleteObserverExtsProviderMap(const Uri &uri,
182     const sptr<AAFwk::IDataAbilityObserver> &dataObserver)
183 {
184     observerExtsProvider_.Compute(dataObserver, [&uri](const auto &key, auto &value) {
185         value.remove_if([&uri](const auto &param) {
186             return uri == param.uri;
187         });
188         return !value.empty();
189     });
190 }
191 
192 // re-register observer by dataShareProxy
ReRegisterObserverExtProvider()193 void DataShareConnection::ReRegisterObserverExtProvider()
194 {
195     LOG_INFO("ReRegisterObserverExtProvider start");
196     decltype(observerExtsProvider_) observerExtsProvider(std::move(observerExtsProvider_));
197     observerExtsProvider_.Clear();
198     observerExtsProvider.ForEach([this](const auto &key, const auto &value) {
199         for (const auto &param : value) {
200             auto ret = dataShareProxy_->RegisterObserverExtProvider(param.uri, key, param.isDescendants, { true });
201             if (ret != E_OK) {
202                 LOG_ERROR(
203                     "RegisterObserverExt failed, param.uri:%{public}s, ret:%{public}d, param.isDescendants:%{public}d",
204                     DataShareStringUtils::Anonymous(param.uri.ToString()).c_str(), ret, param.isDescendants
205                 );
206             } else {
207                 UpdateObserverExtsProviderMap(param.uri, key, param.isDescendants);
208             }
209         }
210         return false;
211     });
212 }
213 /**
214  * @brief connect remote ability of DataShareExtAbility.
215  */
ConnectDataShareExtAbility(const Uri & uri,const sptr<IRemoteObject> & token)216 std::shared_ptr<DataShareProxy> DataShareConnection::ConnectDataShareExtAbility(const Uri &uri,
217     const sptr<IRemoteObject> &token)
218 {
219     std::string reqUri;
220     {
221         std::lock_guard<std::mutex> lock(mutex_);
222         if (dataShareProxy_ != nullptr) {
223             return dataShareProxy_;
224         }
225         reqUri = uri_.ToString().empty() ? uri.ToString() : uri_.ToString();
226     }
227 
228     AmsMgrProxy* instance = AmsMgrProxy::GetInstance();
229     if (instance == nullptr) {
230         LOG_ERROR("get proxy failed uri:%{public}s", DataShareStringUtils::Change(reqUri).c_str());
231         return nullptr;
232     }
233     ErrCode ret = instance->Connect(reqUri, this, token);
234     LOG_INFO("connect ability, uri = %{public}s. ret = %{public}d", DataShareStringUtils::Change(reqUri).c_str(), ret);
235     if (ret != ERR_OK) {
236         return nullptr;
237     }
238     std::unique_lock<std::mutex> condLock(mutex_);
239     std::shared_ptr<DataShareProxy> proxy = dataShareProxy_;
240     if (proxy != nullptr) {
241         return proxy;
242     }
243     auto start = std::chrono::steady_clock::now();
244     if (condition_.condition.wait_for(condLock, std::chrono::seconds(waitTime_),
245         [this] { return dataShareProxy_ != nullptr; })) {
246         auto finish = std::chrono::steady_clock::now();
247         auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
248         if (duration >= TIME_THRESHOLD) {
249             int64_t milliseconds = duration.count();
250             LOG_WARN("over time connecting ability, uri:%{public}s, time:%{public}" PRIi64 "ms",
251                 DataShareStringUtils::Change(reqUri).c_str(), milliseconds);
252         }
253         LOG_DEBUG("connect ability ended successfully uri:%{public}s", DataShareStringUtils::Change(reqUri).c_str());
254     } else {
255         LOG_WARN("connect timeout uri:%{public}s", DataShareStringUtils::Change(reqUri).c_str());
256     }
257     return dataShareProxy_;
258 }
259 
260 /**
261  * @brief disconnect remote ability of DataShareExtAbility.
262  */
DisconnectDataShareExtAbility()263 void DataShareConnection::DisconnectDataShareExtAbility()
264 {
265     std::string uri;
266     {
267         std::lock_guard<std::mutex> lock(mutex_);
268         uri = uri_.ToString();
269         uri_ = Uri("");
270         if (dataShareProxy_ == nullptr) {
271             return;
272         }
273     }
274 
275     ErrCode ret = Disconnect();
276     LOG_INFO("disconnect uri:%{public}s, ret = %{public}d", DataShareStringUtils::Change(uri).c_str(), ret);
277     if (ret == E_OK) {
278         return;
279     }
280 }
281 
~DataShareConnection()282 DataShareConnection::~DataShareConnection()
283 {
284 }
285 
GetDataShareProxy(const Uri & uri,const sptr<IRemoteObject> & token)286 std::shared_ptr<DataShareProxy> DataShareConnection::GetDataShareProxy(const Uri &uri,
287     const sptr<IRemoteObject> &token)
288 {
289     return ConnectDataShareExtAbility(uri, token);
290 }
291 
SetConnectInvalid()292 void DataShareConnection::SetConnectInvalid()
293 {
294     isInvalid_.store(true);
295 }
296 
Disconnect()297 ErrCode DataShareConnection::Disconnect()
298 {
299     AmsMgrProxy* instance = AmsMgrProxy::GetInstance();
300     if (instance == nullptr) {
301         return -1;
302     }
303     return instance->DisConnect(this);
304 }
305 
GetDataShareProxy()306 std::shared_ptr<DataShareProxy> DataShareConnection::GetDataShareProxy()
307 {
308     std::lock_guard<std::mutex> lock(mutex_);
309     return dataShareProxy_;
310 }
311 }  // namespace DataShare
312 }  // namespace OHOS