• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "ipc_transactors.h"
17 
18 #include "common_defines.h"
19 #include "common_utilities_hpp.h"
20 #include "json.hpp"
21 
22 namespace OHOS::uitest {
23     using namespace std;
24     using namespace chrono;
25     using namespace nlohmann;
26 
NextMessageId()27     static uint32_t NextMessageId()
28     {
29         static uint32_t increasingMessageId = 0;
30         return increasingMessageId++;
31     }
32 
EmitCall(string_view apiId,string_view caller,string_view params)33     void MessageTransceiver::EmitCall(string_view apiId, string_view caller, string_view params)
34     {
35         TransactionMessage msg = {
36             .apiId_=string(apiId),
37             .callerParcel_=string(caller),
38             .paramsParcel_=string(params)
39         };
40         msg.id_ = NextMessageId();
41         msg.type_ = TransactionType::CALL;
42         EmitMessage(msg);
43     }
44 
EmitReply(const TransactionMessage & request,string_view reply)45     void MessageTransceiver::EmitReply(const TransactionMessage &request, string_view reply)
46     {
47         TransactionMessage msg = request; // keep the calling id
48         msg.resultParcel_ = string(reply);
49         msg.type_ = TransactionType::REPLY;
50         EmitMessage(msg);
51     }
52 
EmitHandshake()53     void MessageTransceiver::EmitHandshake()
54     {
55         TransactionMessage msg = {
56             .id_ = NextMessageId(),
57             .type_=TransactionType::HANDSHAKE
58         };
59         EmitMessage(msg);
60     }
61 
EmitAck(const TransactionMessage & handshake)62     void MessageTransceiver::EmitAck(const TransactionMessage &handshake)
63     {
64         TransactionMessage msg = handshake; // keep the calling id
65         msg.type_ = TransactionType::ACK;
66         EmitMessage(msg);
67     }
68 
EmitExit()69     void MessageTransceiver::EmitExit()
70     {
71         TransactionMessage msg = {
72             .id_ = NextMessageId(),
73             .type_=TransactionType::EXIT
74         };
75         EmitMessage(msg);
76     }
77 
EmitMessage(const TransactionMessage & message)78     void MessageTransceiver::EmitMessage(const TransactionMessage &message)
79     {
80         lastOutgoingMessageMillis_.store(GetCurrentMillisecond());
81         DoEmitMessage(message);
82     }
83 
SetMessageFilter(std::function<bool (TransactionType)> filter)84     void MessageTransceiver::SetMessageFilter(std::function<bool(TransactionType)> filter)
85     {
86         this->messageFilter_ = move(filter);
87     }
88 
PollCallReply(TransactionMessage & out,uint64_t timeoutMs)89     MessageTransceiver::PollStatus MessageTransceiver::PollCallReply(TransactionMessage &out, uint64_t timeoutMs)
90     {
91         const auto timeout = chrono::milliseconds(timeoutMs);
92         static constexpr uint32_t flagSet = FLAG_REQUEST_EXIT | FLAG_CONNECT_DIED;
93         const auto checker = [&]() {
94             return (extraFlags_.load() & flagSet) != 0 || !messageQueue_.empty();
95         };
96         unique_lock<mutex> lock(queueLock_);
97         if (busyCond_.wait_for(lock, timeout, checker)) {
98             if ((extraFlags_.load() & flagSet) != 0) {
99                 if ((extraFlags_.load() & FLAG_REQUEST_EXIT) > 0) {
100                     return ABORT_REQUEST_EXIT;
101                 } else {
102                     return ABORT_CONNECTION_DIED;
103                 }
104             } else {
105                 // copy and pop
106                 out = messageQueue_.front();
107                 messageQueue_.pop();
108                 return SUCCESS;
109             }
110         } else {
111             return ABORT_WAIT_TIMEOUT;
112         }
113     }
114 
OnReceiveMessage(const TransactionMessage & message)115     void MessageTransceiver::OnReceiveMessage(const TransactionMessage &message)
116     {
117         if (message.type_ == TransactionType::INVALID) {
118             return;
119         }
120         if (messageFilter_ != nullptr && !messageFilter_(message.type_)) {
121             return;
122         }
123         lastIncomingMessageMillis_.store(GetCurrentMillisecond());
124         bool doNotification = true;
125         if (message.type_ == CALL || message.type_ == REPLY) {
126             lock_guard lock(queueLock_);
127             messageQueue_.push(message);
128         } else if (message.type_ == EXIT) {
129             extraFlags_.store(extraFlags_.load() | FLAG_REQUEST_EXIT);
130         } else if (message.type_ == HANDSHAKE) {
131             // send ack automatically
132             EmitAck(message);
133             doNotification = false;
134         } else {
135             // handshake and ack are DFX events, won't be enqueued and notified
136             doNotification = false;
137         }
138         if (doNotification) {
139             busyCond_.notify_all();
140         }
141     }
142 
ScheduleCheckConnection(bool emitHandshake)143     void MessageTransceiver::ScheduleCheckConnection(bool emitHandshake)
144     {
145         if (autoHandshaking_.load()) {
146             return;
147         }
148         autoHandshaking_.store(true);
149         lastOutgoingMessageMillis_.store(0);
150         lastIncomingMessageMillis_.store(GetCurrentMillisecond()); // give a reasonable initial value
151         static constexpr uint32_t slices = 100;
152         static constexpr uint64_t secureDurationMs = WATCH_DOG_TIMEOUT_MS * 0.9;
153         constexpr auto interval = chrono::milliseconds(secureDurationMs / slices);
154         future<void> periodWork = async(launch::async, [transceiver = this, interval, emitHandshake]() {
155             while (transceiver != nullptr && transceiver->autoHandshaking_.load()) {
156                 const auto millis = GetCurrentMillisecond();
157                 const auto outgoingIdleTime = millis - transceiver->lastOutgoingMessageMillis_.load();
158                 const auto incomingIdleTime = millis - transceiver->lastIncomingMessageMillis_.load();
159                 if (emitHandshake && outgoingIdleTime > secureDurationMs) {
160                     // emit handshake in secure_duration
161                     transceiver->EmitHandshake();
162                 }
163                 // check connection died in each slice
164                 if (incomingIdleTime > WATCH_DOG_TIMEOUT_MS) {
165                     if (((transceiver->extraFlags_.load()) & FLAG_CONNECT_DIED) == 0) {
166                         // first detected
167                         transceiver->extraFlags_.store(transceiver->extraFlags_.load() | FLAG_CONNECT_DIED);
168                         LOG_D("Connection dead detected");
169                     }
170                     transceiver->busyCond_.notify_all(); // notify the observer immediately
171                 }
172                 this_thread::sleep_for(interval);
173             }
174             LOG_D("Connection check exited");
175         });
176         handshakeFuture_ = move(periodWork);
177         LOG_I("Connection-check scheduled, autoHandshake=%{public}d", emitHandshake);
178     }
179 
EnsureConnectionAlive(uint64_t timeoutMs)180     bool MessageTransceiver::EnsureConnectionAlive(uint64_t timeoutMs)
181     {
182         constexpr uint64_t intervalMs = 20;
183         constexpr auto duration = chrono::milliseconds(intervalMs);
184         const auto prevIncoming = lastIncomingMessageMillis_.load();
185         for (size_t count = 0; count < (timeoutMs / intervalMs); count++) {
186             if (lastIncomingMessageMillis_.load() > prevIncoming) { // newer message came
187                 return true;
188             }
189             EmitHandshake();
190             this_thread::sleep_for(duration);
191         }
192         return false;
193     }
194 
Finalize()195     void MessageTransceiver::Finalize()
196     {
197         if (autoHandshaking_.load() && handshakeFuture_.valid()) {
198             autoHandshaking_.store(false);
199             handshakeFuture_.get();
200         }
201     }
202 
Initialize()203     bool Transactor::Initialize()
204     {
205         auto pTransceiver = CreateTransceiver();
206         DCHECK(pTransceiver != nullptr);
207         transceiver_ = move(pTransceiver);
208         transceiver_->SetMessageFilter(GetMessageFilter());
209         return transceiver_->Initialize();
210     }
211 
Finalize()212     void Transactor::Finalize()
213     {
214         if (transceiver_ != nullptr) {
215             // inject exit message
216             auto terminate = TransactionMessage {.type_ = TransactionType::EXIT};
217             transceiver_->OnReceiveMessage(terminate);
218             transceiver_->Finalize();
219         }
220     }
221 
RunLoop()222     uint32_t TransactionServer::RunLoop()
223     {
224         DCHECK(transceiver_ != nullptr && callFunc_ != nullptr);
225         while (true) {
226             TransactionMessage message;
227             auto status = transceiver_->PollCallReply(message, WAIT_TRANSACTION_MS);
228             string reply;
229             switch (status) {
230                 case MessageTransceiver::PollStatus::SUCCESS:
231                     DCHECK(message.type_ == TransactionType::CALL);
232                     reply = callFunc_(message.apiId_, message.callerParcel_, message.paramsParcel_);
233                     transceiver_->EmitReply(message, reply);
234                     break;
235                 case MessageTransceiver::PollStatus::ABORT_CONNECTION_DIED:
236                     return EXIT_CODE_FAILURE;
237                 case MessageTransceiver::PollStatus::ABORT_REQUEST_EXIT:
238                     return EXIT_CODE_SUCCESS;
239                 default: // continue wait-and-fetch
240                     continue;
241             }
242         }
243     }
244 
SetCallFunction(function<string (string_view,string_view,string_view)> func)245     void TransactionServer::SetCallFunction(function<string(string_view, string_view, string_view)> func)
246     {
247         callFunc_ = std::move(func);
248     }
249 
CreateResultForDiedConnection()250     static string CreateResultForDiedConnection()
251     {
252         json data;
253         json exceptionInfo;
254         exceptionInfo[KEY_CODE] = "INTERNAL_ERROR";
255         exceptionInfo[KEY_MESSAGE] = "connection with uitest_daemon is dead";
256         data[KEY_EXCEPTION] = exceptionInfo;
257         return data.dump();
258     }
259 
CreateResultForConcurrentInvoke(string_view processingApi,string_view incomingApi)260     static string CreateResultForConcurrentInvoke(string_view processingApi, string_view incomingApi)
261     {
262         static constexpr string_view msg = "uitest-api dose not allow calling concurrently, current processing:";
263         json data;
264         json exceptionInfo;
265         exceptionInfo[KEY_CODE] = "USAGE_ERROR";
266         exceptionInfo[KEY_MESSAGE] = string(msg) + string(processingApi) + ", incoming: " + string(incomingApi);
267         data[KEY_EXCEPTION] = exceptionInfo;
268         return data.dump();
269     }
270 
InvokeApi(string_view apiId,string_view caller,string_view params)271     string TransactionClient::InvokeApi(string_view apiId, string_view caller, string_view params)
272     {
273         unique_lock<mutex> stateLock(stateMtx_);
274         // return immediately if the cs-connection has died or concurrent invoking occurred
275         if (transceiver_ == nullptr || connectionDied_) {
276             return CreateResultForDiedConnection();
277         }
278         if (!processingApi_.empty()) {
279             return CreateResultForConcurrentInvoke(processingApi_, apiId);
280         }
281         processingApi_ = apiId;
282         stateLock.unlock(); // unlock, allow reentry, make it possible to check and reject concurrent usage
283         transceiver_->EmitCall(apiId, caller, params);
284         while (true) {
285             TransactionMessage message;
286             auto status = transceiver_->PollCallReply(message, WAIT_TRANSACTION_MS);
287             string reply;
288             switch (status) {
289                 case MessageTransceiver::PollStatus::SUCCESS:
290                     DCHECK(message.type_ == TransactionType::REPLY);
291                     stateLock.lock();
292                     processingApi_.clear();
293                     stateLock.unlock();
294                     return message.resultParcel_;
295                 case MessageTransceiver::PollStatus::ABORT_CONNECTION_DIED:
296                 case MessageTransceiver::PollStatus::ABORT_REQUEST_EXIT:
297                     stateLock.lock();
298                     connectionDied_ = true;
299                     stateLock.unlock();
300                     return CreateResultForDiedConnection();
301                 default: // continue wait-and-fetch
302                     break;
303             }
304         }
305     }
306 
Finalize()307     void TransactionClient::Finalize()
308     {
309         if (transceiver_ != nullptr) {
310             // destroy server side
311             transceiver_->EmitExit();
312             // destroy self side
313             Transactor::Finalize();
314             connectionDied_ = true;
315             LOG_I("CsConnection disposed");
316             transceiver_ = nullptr;
317         }
318     }
319 }