1 /* 2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "ipc_transactors.h" 17 18 #include "common_defines.h" 19 #include "common_utilities_hpp.h" 20 #include "json.hpp" 21 22 namespace OHOS::uitest { 23 using namespace std; 24 using namespace chrono; 25 using namespace nlohmann; 26 NextMessageId()27 static uint32_t NextMessageId() 28 { 29 static uint32_t increasingMessageId = 0; 30 return increasingMessageId++; 31 } 32 EmitCall(string_view apiId,string_view caller,string_view params)33 void MessageTransceiver::EmitCall(string_view apiId, string_view caller, string_view params) 34 { 35 TransactionMessage msg = { 36 .apiId_=string(apiId), 37 .callerParcel_=string(caller), 38 .paramsParcel_=string(params) 39 }; 40 msg.id_ = NextMessageId(); 41 msg.type_ = TransactionType::CALL; 42 EmitMessage(msg); 43 } 44 EmitReply(const TransactionMessage & request,string_view reply)45 void MessageTransceiver::EmitReply(const TransactionMessage &request, string_view reply) 46 { 47 TransactionMessage msg = request; // keep the calling id 48 msg.resultParcel_ = string(reply); 49 msg.type_ = TransactionType::REPLY; 50 EmitMessage(msg); 51 } 52 EmitHandshake()53 void MessageTransceiver::EmitHandshake() 54 { 55 TransactionMessage msg = { 56 .id_ = NextMessageId(), 57 .type_=TransactionType::HANDSHAKE 58 }; 59 EmitMessage(msg); 60 } 61 EmitAck(const TransactionMessage & handshake)62 void MessageTransceiver::EmitAck(const TransactionMessage &handshake) 63 { 64 TransactionMessage msg = handshake; // keep the calling id 65 msg.type_ = TransactionType::ACK; 66 EmitMessage(msg); 67 } 68 EmitExit()69 void MessageTransceiver::EmitExit() 70 { 71 TransactionMessage msg = { 72 .id_ = NextMessageId(), 73 .type_=TransactionType::EXIT 74 }; 75 EmitMessage(msg); 76 } 77 EmitMessage(const TransactionMessage & message)78 void MessageTransceiver::EmitMessage(const TransactionMessage &message) 79 { 80 lastOutgoingMessageMillis_.store(GetCurrentMillisecond()); 81 DoEmitMessage(message); 82 } 83 SetMessageFilter(std::function<bool (TransactionType)> filter)84 void MessageTransceiver::SetMessageFilter(std::function<bool(TransactionType)> filter) 85 { 86 this->messageFilter_ = move(filter); 87 } 88 PollCallReply(TransactionMessage & out,uint64_t timeoutMs)89 MessageTransceiver::PollStatus MessageTransceiver::PollCallReply(TransactionMessage &out, uint64_t timeoutMs) 90 { 91 const auto timeout = chrono::milliseconds(timeoutMs); 92 static constexpr uint32_t flagSet = FLAG_REQUEST_EXIT | FLAG_CONNECT_DIED; 93 const auto checker = [&]() { 94 return (extraFlags_.load() & flagSet) != 0 || !messageQueue_.empty(); 95 }; 96 unique_lock<mutex> lock(queueLock_); 97 if (busyCond_.wait_for(lock, timeout, checker)) { 98 if ((extraFlags_.load() & flagSet) != 0) { 99 if ((extraFlags_.load() & FLAG_REQUEST_EXIT) > 0) { 100 return ABORT_REQUEST_EXIT; 101 } else { 102 return ABORT_CONNECTION_DIED; 103 } 104 } else { 105 // copy and pop 106 out = messageQueue_.front(); 107 messageQueue_.pop(); 108 return SUCCESS; 109 } 110 } else { 111 return ABORT_WAIT_TIMEOUT; 112 } 113 } 114 OnReceiveMessage(const TransactionMessage & message)115 void MessageTransceiver::OnReceiveMessage(const TransactionMessage &message) 116 { 117 if (message.type_ == TransactionType::INVALID) { 118 return; 119 } 120 if (messageFilter_ != nullptr && !messageFilter_(message.type_)) { 121 return; 122 } 123 lastIncomingMessageMillis_.store(GetCurrentMillisecond()); 124 bool doNotification = true; 125 if (message.type_ == CALL || message.type_ == REPLY) { 126 lock_guard lock(queueLock_); 127 messageQueue_.push(message); 128 } else if (message.type_ == EXIT) { 129 extraFlags_.store(extraFlags_.load() | FLAG_REQUEST_EXIT); 130 } else if (message.type_ == HANDSHAKE) { 131 // send ack automatically 132 EmitAck(message); 133 doNotification = false; 134 } else { 135 // handshake and ack are DFX events, won't be enqueued and notified 136 doNotification = false; 137 } 138 if (doNotification) { 139 busyCond_.notify_all(); 140 } 141 } 142 ScheduleCheckConnection(bool emitHandshake)143 void MessageTransceiver::ScheduleCheckConnection(bool emitHandshake) 144 { 145 if (autoHandshaking_.load()) { 146 return; 147 } 148 autoHandshaking_.store(true); 149 lastOutgoingMessageMillis_.store(0); 150 lastIncomingMessageMillis_.store(GetCurrentMillisecond()); // give a reasonable initial value 151 static constexpr uint32_t slices = 100; 152 static constexpr uint64_t secureDurationMs = WATCH_DOG_TIMEOUT_MS * 0.9; 153 constexpr auto interval = chrono::milliseconds(secureDurationMs / slices); 154 future<void> periodWork = async(launch::async, [transceiver = this, interval, emitHandshake]() { 155 while (transceiver != nullptr && transceiver->autoHandshaking_.load()) { 156 const auto millis = GetCurrentMillisecond(); 157 const auto outgoingIdleTime = millis - transceiver->lastOutgoingMessageMillis_.load(); 158 const auto incomingIdleTime = millis - transceiver->lastIncomingMessageMillis_.load(); 159 if (emitHandshake && outgoingIdleTime > secureDurationMs) { 160 // emit handshake in secure_duration 161 transceiver->EmitHandshake(); 162 } 163 // check connection died in each slice 164 if (incomingIdleTime > WATCH_DOG_TIMEOUT_MS) { 165 if (((transceiver->extraFlags_.load()) & FLAG_CONNECT_DIED) == 0) { 166 // first detected 167 transceiver->extraFlags_.store(transceiver->extraFlags_.load() | FLAG_CONNECT_DIED); 168 LOG_D("Connection dead detected"); 169 } 170 transceiver->busyCond_.notify_all(); // notify the observer immediately 171 } 172 this_thread::sleep_for(interval); 173 } 174 LOG_D("Connection check exited"); 175 }); 176 handshakeFuture_ = move(periodWork); 177 LOG_I("Connection-check scheduled, autoHandshake=%{public}d", emitHandshake); 178 } 179 EnsureConnectionAlive(uint64_t timeoutMs)180 bool MessageTransceiver::EnsureConnectionAlive(uint64_t timeoutMs) 181 { 182 constexpr uint64_t intervalMs = 20; 183 constexpr auto duration = chrono::milliseconds(intervalMs); 184 const auto prevIncoming = lastIncomingMessageMillis_.load(); 185 for (size_t count = 0; count < (timeoutMs / intervalMs); count++) { 186 if (lastIncomingMessageMillis_.load() > prevIncoming) { // newer message came 187 return true; 188 } 189 EmitHandshake(); 190 this_thread::sleep_for(duration); 191 } 192 return false; 193 } 194 Finalize()195 void MessageTransceiver::Finalize() 196 { 197 if (autoHandshaking_.load() && handshakeFuture_.valid()) { 198 autoHandshaking_.store(false); 199 handshakeFuture_.get(); 200 } 201 } 202 Initialize()203 bool Transactor::Initialize() 204 { 205 auto pTransceiver = CreateTransceiver(); 206 DCHECK(pTransceiver != nullptr); 207 transceiver_ = move(pTransceiver); 208 transceiver_->SetMessageFilter(GetMessageFilter()); 209 return transceiver_->Initialize(); 210 } 211 Finalize()212 void Transactor::Finalize() 213 { 214 if (transceiver_ != nullptr) { 215 // inject exit message 216 auto terminate = TransactionMessage {.type_ = TransactionType::EXIT}; 217 transceiver_->OnReceiveMessage(terminate); 218 transceiver_->Finalize(); 219 } 220 } 221 RunLoop()222 uint32_t TransactionServer::RunLoop() 223 { 224 DCHECK(transceiver_ != nullptr && callFunc_ != nullptr); 225 while (true) { 226 TransactionMessage message; 227 auto status = transceiver_->PollCallReply(message, WAIT_TRANSACTION_MS); 228 string reply; 229 switch (status) { 230 case MessageTransceiver::PollStatus::SUCCESS: 231 DCHECK(message.type_ == TransactionType::CALL); 232 reply = callFunc_(message.apiId_, message.callerParcel_, message.paramsParcel_); 233 transceiver_->EmitReply(message, reply); 234 break; 235 case MessageTransceiver::PollStatus::ABORT_CONNECTION_DIED: 236 return EXIT_CODE_FAILURE; 237 case MessageTransceiver::PollStatus::ABORT_REQUEST_EXIT: 238 return EXIT_CODE_SUCCESS; 239 default: // continue wait-and-fetch 240 continue; 241 } 242 } 243 } 244 SetCallFunction(function<string (string_view,string_view,string_view)> func)245 void TransactionServer::SetCallFunction(function<string(string_view, string_view, string_view)> func) 246 { 247 callFunc_ = std::move(func); 248 } 249 CreateResultForDiedConnection()250 static string CreateResultForDiedConnection() 251 { 252 json data; 253 json exceptionInfo; 254 exceptionInfo[KEY_CODE] = "INTERNAL_ERROR"; 255 exceptionInfo[KEY_MESSAGE] = "connection with uitest_daemon is dead"; 256 data[KEY_EXCEPTION] = exceptionInfo; 257 return data.dump(); 258 } 259 CreateResultForConcurrentInvoke(string_view processingApi,string_view incomingApi)260 static string CreateResultForConcurrentInvoke(string_view processingApi, string_view incomingApi) 261 { 262 static constexpr string_view msg = "uitest-api dose not allow calling concurrently, current processing:"; 263 json data; 264 json exceptionInfo; 265 exceptionInfo[KEY_CODE] = "USAGE_ERROR"; 266 exceptionInfo[KEY_MESSAGE] = string(msg) + string(processingApi) + ", incoming: " + string(incomingApi); 267 data[KEY_EXCEPTION] = exceptionInfo; 268 return data.dump(); 269 } 270 InvokeApi(string_view apiId,string_view caller,string_view params)271 string TransactionClient::InvokeApi(string_view apiId, string_view caller, string_view params) 272 { 273 unique_lock<mutex> stateLock(stateMtx_); 274 // return immediately if the cs-connection has died or concurrent invoking occurred 275 if (transceiver_ == nullptr || connectionDied_) { 276 return CreateResultForDiedConnection(); 277 } 278 if (!processingApi_.empty()) { 279 return CreateResultForConcurrentInvoke(processingApi_, apiId); 280 } 281 processingApi_ = apiId; 282 stateLock.unlock(); // unlock, allow reentry, make it possible to check and reject concurrent usage 283 transceiver_->EmitCall(apiId, caller, params); 284 while (true) { 285 TransactionMessage message; 286 auto status = transceiver_->PollCallReply(message, WAIT_TRANSACTION_MS); 287 string reply; 288 switch (status) { 289 case MessageTransceiver::PollStatus::SUCCESS: 290 DCHECK(message.type_ == TransactionType::REPLY); 291 stateLock.lock(); 292 processingApi_.clear(); 293 stateLock.unlock(); 294 return message.resultParcel_; 295 case MessageTransceiver::PollStatus::ABORT_CONNECTION_DIED: 296 case MessageTransceiver::PollStatus::ABORT_REQUEST_EXIT: 297 stateLock.lock(); 298 connectionDied_ = true; 299 stateLock.unlock(); 300 return CreateResultForDiedConnection(); 301 default: // continue wait-and-fetch 302 break; 303 } 304 } 305 } 306 Finalize()307 void TransactionClient::Finalize() 308 { 309 if (transceiver_ != nullptr) { 310 // destroy server side 311 transceiver_->EmitExit(); 312 // destroy self side 313 Transactor::Finalize(); 314 connectionDied_ = true; 315 LOG_I("CsConnection disposed"); 316 transceiver_ = nullptr; 317 } 318 } 319 }