1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "remote_executor.h"
17
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27
28 namespace DistributedDB {
29 namespace {
30 constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31 constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32 constexpr uint32_t MAX_QUEUE_COUNT = 10;
33 constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35 void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36 {
37 if (message != nullptr) {
38 delete message;
39 message = nullptr;
40 }
41 if (packet != nullptr) {
42 delete packet;
43 packet = nullptr;
44 }
45 }
46 }
47
RemoteExecutor()48 RemoteExecutor::RemoteExecutor()
49 : workingThreadsCount_(0),
50 syncInterface_(nullptr),
51 communicator_(nullptr),
52 lastSessionId_(0),
53 lastTaskId_(0),
54 closed_(false)
55 {
56 }
57
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)58 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
59 {
60 if (syncInterface == nullptr || communicator == nullptr) {
61 return -E_INVALID_ARGS;
62 }
63 closed_ = false;
64 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
65 syncInterface_ = syncInterface;
66 communicator_ = communicator;
67 return E_OK;
68 }
69
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)70 int RemoteExecutor::RemoteQuery(const std::string &device, const RemoteCondition &condition,
71 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
72 {
73 if (closed_) {
74 return -E_BUSY;
75 }
76 if (!CheckParamValid(device, timeout)) {
77 return -E_INVALID_ARGS;
78 }
79 int errCode = E_OK;
80 int taskErrCode = E_OK;
81 SemaphoreUtils semaphore(0);
82 Task task;
83 task.result = std::make_shared<RelationalResultSetImpl>();
84 task.target = device;
85 task.timeout = timeout;
86 task.condition = condition;
87 task.onFinished = [&semaphore, &taskErrCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
88 taskErrCode = retCode;
89 result = taskResult;
90 semaphore.SendSemaphore();
91 };
92 task.connectionId = connectionId;
93 errCode = RemoteQueryInner(task);
94 if (errCode != E_OK) {
95 return errCode;
96 }
97 semaphore.WaitSemaphore();
98 return taskErrCode;
99 }
100
ReceiveMessage(const std::string & targetDev,Message * inMsg)101 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
102 {
103 if (inMsg == nullptr) {
104 return -E_INVALID_ARGS;
105 }
106 if (closed_) {
107 LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
108 return -E_BUSY;
109 }
110 RefObject::IncObjRef(this);
111 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
112 ReceiveMessageInner(targetDev, inMsg);
113 RefObject::DecObjRef(this);
114 });
115 if (errCode != E_OK) {
116 RefObject::DecObjRef(this);
117 }
118 return errCode;
119 }
120
NotifyDeviceOffline(const std::string & device)121 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
122 {
123 if (closed_) {
124 return;
125 }
126 LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
127 std::vector<uint32_t> removeList;
128 RemoveTaskByDevice(device, removeList);
129 for (const auto &sessionId : removeList) {
130 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
131 }
132 }
133
NotifyUserChange()134 void RemoteExecutor::NotifyUserChange()
135 {
136 if (closed_) {
137 return;
138 }
139 LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
140 RemoveAllTask(-E_USER_CHANGE);
141 LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
142 }
143
Close()144 void RemoteExecutor::Close()
145 {
146 closed_ = true;
147 LOGD("[RemoteExecutor][Close] close enter");
148 RemoveAllTask(-E_BUSY);
149 ClearInnerSource();
150 {
151 std::unique_lock<std::mutex> lock(msgQueueLock_);
152 clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
153 }
154 LOGD("[RemoteExecutor][Close] close exist");
155 }
156
NotifyConnectionClosed(uint64_t connectionId)157 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
158 {
159 if (closed_) {
160 return;
161 }
162 std::vector<uint32_t> removeList;
163 RemoveTaskByConnection(connectionId, removeList);
164 for (const auto &sessionId : removeList) {
165 DoFinished(sessionId, -E_BUSY);
166 }
167 }
168
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)169 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
170 {
171 LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
172 {
173 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
174 searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
175 if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
176 // message deal in work thread, exist here
177 return -E_NOT_NEED_DELETE_MSG;
178 }
179 workingThreadsCount_++;
180 }
181 bool empty = true;
182 do {
183 std::pair<std::string, Message *> entry;
184 {
185 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
186 empty = searchMessageQueue_.empty();
187 if (empty) {
188 workingThreadsCount_--;
189 continue;
190 }
191 entry = searchMessageQueue_.front();
192 searchMessageQueue_.pop();
193 }
194 ParseOneRequestMessage(entry.first, entry.second);
195 delete entry.second;
196 entry.second = nullptr;
197 } while (!empty);
198 clearCV_.notify_one();
199 return -E_NOT_NEED_DELETE_MSG;
200 }
201
ParseOneRequestMessage(const std::string & device,Message * inMsg)202 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
203 {
204 if (closed_) {
205 LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
206 return;
207 }
208 int errCode = CheckPermissions(device, inMsg);
209 if (errCode != E_OK) {
210 (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
211 return;
212 }
213 errCode = SendRemoteExecutorData(device, inMsg);
214 if (errCode != E_OK) {
215 (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
216 }
217 }
218
CheckPermissions(const std::string & device,Message * inMsg)219 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
220 {
221 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
222 if (storage == nullptr) {
223 LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
224 return -E_BUSY;
225 }
226 // permission check
227 std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
228 std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
229 std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
230 std::string subUseId = storage->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
231 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
232 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
233 { userId, appId, storeId, device, subUseId, instanceId }, CHECK_FLAG_SEND);
234 if (errCode != E_OK) {
235 LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
236 storage->DecRefCount();
237 return errCode;
238 }
239 const auto *packet = inMsg->GetObject<ISyncPacket>();
240 if (packet == nullptr) {
241 LOGE("[RemoteExecutor] get packet object failed");
242 storage->DecRefCount();
243 return -E_INVALID_ARGS;
244 }
245 const auto *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
246 errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel(), requestPacket->GetVersion());
247 storage->DecRefCount();
248 return errCode;
249 }
250
SendRemoteExecutorData(const std::string & device,const Message * inMsg)251 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
252 {
253 auto *syncInterface = GetAndIncSyncInterface();
254 if (syncInterface == nullptr) {
255 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
256 return -E_INVALID_ARGS;
257 }
258 if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
259 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
260 syncInterface->DecRefCount();
261 return -E_NOT_SUPPORT;
262 }
263 RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
264
265 const auto *packet = inMsg->GetObject<ISyncPacket>();
266 if (packet == nullptr) {
267 LOGE("[RemoteExecutor] get packet object failed");
268 storage->DecRefCount();
269 return -E_INVALID_ARGS;
270 }
271 const RemoteExecutorRequestPacket *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
272 int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
273 storage->DecRefCount();
274 return errCode;
275 }
276
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)277 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
278 {
279 const auto *ack = inMsg->GetObject<ISyncPacket>();
280 if (ack == nullptr) {
281 return -E_INVALID_ARGS;
282 }
283 const auto *packet = static_cast<const RemoteExecutorAckPacket *>(ack);
284 int errCode = packet->GetAckCode();
285 uint32_t sessionId = inMsg->GetSessionId();
286 uint32_t sequenceId = inMsg->GetSequenceId();
287 if (!IsPacketValid(sessionId)) {
288 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
289 return -E_INVALID_ARGS;
290 }
291 if (errCode == E_OK) {
292 auto storage = GetAndIncSyncInterface();
293 auto communicator = GetAndIncCommunicator();
294 errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
295 if (storage != nullptr) {
296 storage->DecRefCount();
297 }
298 RefObject::DecObjRef(communicator);
299 }
300 if (errCode != E_OK) {
301 DoFinished(sessionId, errCode);
302 } else {
303 ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
304 }
305 return E_OK;
306 }
307
CheckParamValid(const std::string & device,uint64_t timeout) const308 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
309 {
310 if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
311 LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
312 return false;
313 }
314 if (device.empty()) {
315 LOGD("[RemoteExecutor][CheckParamValid] device is empty");
316 return false;
317 }
318 if (device.length() > DBConstant::MAX_DEV_LENGTH) {
319 LOGE("[RemoteExecutor] dev is too long len=%zu", device.length());
320 return false;
321 }
322 ICommunicator *communicator = GetAndIncCommunicator();
323 if (communicator == nullptr) {
324 return false;
325 }
326 std::string localId;
327 int errCode = communicator->GetLocalIdentity(localId);
328 RefObject::DecObjRef(communicator);
329 if (errCode != E_OK) {
330 return false;
331 }
332 if (localId == device) {
333 LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
334 return false;
335 }
336 return true;
337 }
338
CheckTaskExeStatus(const std::string & device)339 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
340 {
341 uint32_t queueCount = 0u;
342 uint32_t exeTaskCount = 0u;
343 uint32_t totalCount = 0u; // waiting task count in all queue
344 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
345 queueCount = searchTaskQueue_.at(device).size();
346 }
347 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
348 exeTaskCount = deviceWorkingSet_.at(device).size();
349 }
350 for (auto &entry : searchTaskQueue_) {
351 int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
352 int currentQueueCount = static_cast<int>(entry.second.size());
353 if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
354 // all task in this queue can execute, no need calculate as waiting task count
355 continue;
356 }
357 totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
358 static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
359 }
360 return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
361 (totalCount + 1 <= MAX_QUEUE_COUNT);
362 }
363
GenerateSessionId()364 uint32_t RemoteExecutor::GenerateSessionId()
365 {
366 uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
367 while (taskMap_.find(sessionId) != taskMap_.end()) { // LCOV_EXCL_BR_LINE
368 sessionId++;
369 if (sessionId == 0) { // if over flow start with 1
370 sessionId++;
371 }
372 }
373 lastSessionId_ = sessionId;
374 return sessionId;
375 }
376
GenerateTaskId()377 uint32_t RemoteExecutor::GenerateTaskId()
378 {
379 lastTaskId_++;
380 if (lastTaskId_ == 0) { // if over flow start with 1
381 lastTaskId_++;
382 }
383 return lastTaskId_;
384 }
385
RemoteQueryInner(const Task & task)386 int RemoteExecutor::RemoteQueryInner(const Task &task)
387 {
388 uint32_t sessionId = 0u;
389 {
390 // check task count and push into queue in lock
391 std::lock_guard<std::mutex> autoLock(taskLock_);
392 if (!CheckTaskExeStatus(task.target)) {
393 LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
394 return -E_MAX_LIMITS;
395 }
396 sessionId = GenerateSessionId();
397 searchTaskQueue_[task.target].push_back(sessionId);
398 if (taskMap_.find(sessionId) != taskMap_.end()) {
399 LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
400 return -E_INTERNAL_ERROR; // should not happen
401 }
402 taskMap_[sessionId] = task;
403 taskMap_[sessionId].taskId = GenerateTaskId();
404 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
405 taskMap_[sessionId].taskId, task.target.c_str());
406 }
407 std::string device = task.target;
408 RefObject::IncObjRef(this);
409 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
410 TryExecuteTaskInLock(device);
411 RefObject::DecObjRef(this);
412 });
413 if (errCode != E_OK) {
414 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
415 DoRollBack(sessionId);
416 RefObject::DecObjRef(this);
417 }
418 return errCode;
419 }
420
TryExecuteTaskInLock(const std::string & device)421 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
422 {
423 uint32_t sessionId = 0u;
424 {
425 std::lock_guard<std::mutex> autoLock(taskLock_);
426 if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
427 return;
428 }
429 if (searchTaskQueue_[device].empty()) {
430 LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
431 return;
432 }
433 sessionId = searchTaskQueue_[device].front();
434 if (taskMap_.find(sessionId) == taskMap_.end()) {
435 searchTaskQueue_[device].pop_front();
436 LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
437 return;
438 }
439 taskMap_[sessionId].status = Status::WORKING;
440 searchTaskQueue_[device].pop_front();
441 deviceWorkingSet_[device].insert(sessionId);
442 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
443 StartTimer(taskMap_[sessionId].timeout, sessionId);
444 }
445 int errCode = RequestStart(sessionId);
446 if (errCode != E_OK) {
447 DoFinished(sessionId, errCode);
448 }
449 }
450
DoRollBack(uint32_t sessionId)451 void RemoteExecutor::DoRollBack(uint32_t sessionId)
452 {
453 Task task;
454 std::lock_guard<std::mutex> autoLock(taskLock_);
455 if (taskMap_.find(sessionId) == taskMap_.end()) { // LCOV_EXCL_BR_LINE
456 return;
457 }
458 task = taskMap_[sessionId];
459 if (task.status != Status::WAITING) { // LCOV_EXCL_BR_LINE
460 // task is execute, abort roll back
461 return;
462 }
463 taskMap_.erase(sessionId);
464
465 auto iter = searchTaskQueue_[task.target].begin();
466 while (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
467 if ((*iter) == sessionId) { // LCOV_EXCL_BR_LINE
468 break;
469 }
470 iter++;
471 }
472 if (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
473 searchTaskQueue_[task.target].erase(iter);
474 }
475 // this task should not in workingSet
476 deviceWorkingSet_[task.target].erase(sessionId);
477 }
478
RequestStart(uint32_t sessionId)479 int RemoteExecutor::RequestStart(uint32_t sessionId)
480 {
481 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
482 if (message == nullptr) {
483 LOGE("[RemoteExecutor][RequestStart] new message error");
484 return -E_OUT_OF_MEMORY;
485 }
486 message->SetSessionId(sessionId);
487 message->SetMessageType(TYPE_REQUEST);
488 auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
489 if (packet == nullptr) {
490 LOGE("[RemoteExecutor][RequestStart] new packet error");
491 ReleaseMessageAndPacket(message, nullptr);
492 return -E_OUT_OF_MEMORY;
493 }
494 std::string target;
495 int errCode = FillRequestPacket(packet, sessionId, target);
496 if (errCode != E_OK) {
497 ReleaseMessageAndPacket(message, packet);
498 return errCode;
499 }
500 auto exObj = static_cast<ISyncPacket *>(packet);
501 errCode = message->SetExternalObject(exObj);
502 if (errCode != E_OK) {
503 ReleaseMessageAndPacket(message, packet);
504 LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
505 return errCode;
506 }
507 return SendRequestMessage(target, message, sessionId);
508 }
509
SendRequestMessage(const std::string & target,Message * message,uint32_t sessionId)510 int RemoteExecutor::SendRequestMessage(const std::string &target, Message *message, uint32_t sessionId)
511 {
512 auto communicator = GetAndIncCommunicator();
513 auto syncInterface = GetAndIncSyncInterface();
514 if (communicator == nullptr || syncInterface == nullptr) {
515 ReleaseMessageAndPacket(message, nullptr);
516 if (syncInterface != nullptr) {
517 syncInterface->DecRefCount();
518 }
519 RefObject::DecObjRef(communicator);
520 return -E_BUSY;
521 }
522 SendConfig sendConfig;
523 SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
524 RefObject::IncObjRef(this);
525 int errCode = communicator->SendMessage(target, message, sendConfig,
526 [this, sessionId](int errCode, bool isDirectEnd) {
527 if (errCode != E_OK) {
528 DoSendFailed(sessionId, errCode);
529 }
530 RefObject::DecObjRef(this);
531 });
532 if (errCode != E_OK) {
533 ReleaseMessageAndPacket(message, nullptr);
534 RefObject::DecObjRef(this);
535 }
536 RefObject::DecObjRef(communicator);
537 syncInterface->DecRefCount();
538 return errCode;
539 }
540
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)541 int RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
542 const std::string &device)
543 {
544 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
545 if (packet == nullptr) {
546 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
547 return -E_OUT_OF_MEMORY;
548 }
549 packet->SetAckCode(errCode);
550 packet->SetLastAck();
551 return ResponseStart(packet, sessionId, sequenceId, device);
552 }
553
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)554 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
555 const std::string &device)
556 {
557 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
558 if (packet == nullptr) {
559 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
560 return -E_OUT_OF_MEMORY;
561 }
562 packet->SetAckCode(E_OK);
563 if (sendMessage.isLast) {
564 packet->SetLastAck();
565 }
566 packet->SetSecurityOption(sendMessage.option);
567 packet->MoveInRowDataSet(std::move(dataSet));
568 return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
569 }
570
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)571 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
572 const std::string &device)
573 {
574 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
575 if (storage == nullptr) {
576 ReleaseMessageAndPacket(nullptr, packet);
577 LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
578 return -E_BUSY;
579 }
580 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
581 if (message == nullptr) {
582 LOGE("[RemoteExecutor][ResponseStart] new message error");
583 storage->DecRefCount();
584 ReleaseMessageAndPacket(nullptr, packet);
585 return -E_OUT_OF_MEMORY;
586 }
587 packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
588 auto exObj = static_cast<ISyncPacket *>(packet);
589 int errCode = message->SetExternalObject(exObj);
590 if (errCode != E_OK) {
591 ReleaseMessageAndPacket(message, packet);
592 storage->DecRefCount();
593 LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
594 return errCode;
595 }
596 auto *communicator = GetAndIncCommunicator();
597 if (communicator == nullptr) {
598 ReleaseMessageAndPacket(message, nullptr);
599 storage->DecRefCount();
600 LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
601 return -E_BUSY;
602 }
603
604 message->SetTarget(device);
605 message->SetSessionId(sessionId);
606 message->SetSequenceId(sequenceId);
607 message->SetMessageType(TYPE_RESPONSE);
608 SendConfig sendConfig;
609 SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
610 errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
611 RefObject::DecObjRef(communicator);
612 if (errCode != E_OK) {
613 ReleaseMessageAndPacket(message, nullptr);
614 LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
615 }
616 storage->DecRefCount();
617 return errCode;
618 }
619
StartTimer(uint64_t timeout,uint32_t sessionId)620 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
621 {
622 TimerId timerId = 0u;
623 RefObject::IncObjRef(this);
624 TimerAction timeoutCallBack = [this](TimerId timerId) { return TimeoutCallBack(timerId); };
625 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
626 RefObject::DecObjRef(this);
627 }, timerId);
628 if (errCode != E_OK) {
629 RefObject::DecObjRef(this);
630 LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
631 }
632 LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
633 std::lock_guard<std::mutex> autoLock(timeoutLock_);
634 timeoutMap_[timerId] = sessionId;
635 taskFinishMap_[sessionId] = timerId;
636 }
637
RemoveTimer(uint32_t sessionId)638 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
639 {
640 TimerId timerId = 0u;
641 {
642 std::lock_guard<std::mutex> autoLock(timeoutLock_);
643 if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
644 return;
645 }
646 timerId = taskFinishMap_[sessionId];
647 LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
648 timeoutMap_.erase(timerId);
649 taskFinishMap_.erase(sessionId);
650 }
651 RuntimeContext::GetInstance()->RemoveTimer(timerId);
652 }
653
TimeoutCallBack(TimerId timerId)654 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
655 {
656 RefObject::IncObjRef(this);
657 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
658 DoTimeout(timerId);
659 RefObject::DecObjRef(this);
660 });
661 if (errCode != E_OK) {
662 LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
663 RefObject::DecObjRef(this);
664 }
665 return -E_NO_NEED_TIMER;
666 }
667
DoTimeout(TimerId timerId)668 void RemoteExecutor::DoTimeout(TimerId timerId)
669 {
670 LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
671 uint32_t sessionId = 0u;
672 {
673 std::lock_guard<std::mutex> autoLock(timeoutLock_);
674 if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
675 return;
676 }
677 sessionId = timeoutMap_[timerId];
678 }
679 DoFinished(sessionId, -E_TIMEOUT);
680 }
681
DoSendFailed(uint32_t sessionId,int errCode)682 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
683 {
684 LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
685 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
686 }
687
DoFinished(uint32_t sessionId,int errCode)688 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
689 {
690 Task task;
691 if (ClearTaskInfo(sessionId, task) == E_OK) {
692 LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
693 } else {
694 return;
695 }
696 RefObject::IncObjRef(this);
697 if (task.onFinished != nullptr) {
698 task.onFinished(errCode, task.result);
699 LOGD("[RemoteExecutor][DoFinished] onFinished");
700 }
701 std::string device = task.target;
702 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
703 TryExecuteTaskInLock(device);
704 RefObject::DecObjRef(this);
705 });
706 if (retCode != E_OK) {
707 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
708 RefObject::DecObjRef(this);
709 }
710 }
711
ClearTaskInfo(uint32_t sessionId,Task & task)712 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
713 {
714 {
715 std::lock_guard<std::mutex> autoLock(taskLock_);
716 if (taskMap_.find(sessionId) == taskMap_.end()) {
717 return -E_NOT_FOUND;
718 }
719 task = taskMap_[sessionId];
720 taskMap_.erase(sessionId);
721 deviceWorkingSet_[task.target].erase(sessionId);
722 }
723 RemoveTimer(sessionId);
724 return E_OK;
725 }
726
ClearInnerSource()727 void RemoteExecutor::ClearInnerSource()
728 {
729 {
730 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
731 syncInterface_ = nullptr;
732 communicator_ = nullptr;
733 }
734 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
735 LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
736 while (!searchMessageQueue_.empty()) {
737 auto entry = searchMessageQueue_.front();
738 searchMessageQueue_.pop();
739 delete entry.second;
740 entry.second = nullptr;
741 }
742 }
743
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)744 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
745 {
746 ISyncInterface *storage = GetAndIncSyncInterface();
747 if (storage == nullptr) {
748 return -E_BUSY;
749 }
750 SecurityOption localOption;
751 int errCode = storage->GetSecurityOption(localOption);
752 storage->DecRefCount();
753 storage = nullptr;
754 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
755 return -E_SECURITY_OPTION_CHECK_ERROR;
756 }
757 if (errCode == E_OK && localOption.securityLabel == NOT_SET) {
758 LOGE("[AbilitySync] Local not set security option");
759 return -E_SECURITY_OPTION_CHECK_ERROR;
760 }
761 Task task;
762 {
763 std::lock_guard<std::mutex> autoLock(taskLock_);
764 if (taskMap_.find(sessionId) == taskMap_.end()) {
765 LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
766 return -E_FINISHED;
767 }
768 task = taskMap_[sessionId];
769 }
770 packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
771 packet->SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
772 packet->SetSql(task.condition.sql);
773 packet->SetBindArgs(task.condition.bindArgs);
774 packet->SetNeedResponse();
775 packet->SetSecLabel(errCode == -E_NOT_SUPPORT ? NOT_SUPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
776 target = task.target;
777 return E_OK;
778 }
779
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)780 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
781 {
782 int errCode = E_OK;
783 if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
784 DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
785 delete inMsg;
786 inMsg = nullptr;
787 return;
788 }
789 switch (inMsg->GetMessageType()) {
790 case TYPE_REQUEST:
791 errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
792 break;
793 case TYPE_RESPONSE:
794 errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
795 break;
796 default:
797 LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
798 break;
799 }
800 if (errCode != -E_NOT_NEED_DELETE_MSG) {
801 delete inMsg;
802 inMsg = nullptr;
803 }
804 }
805
IsPacketValid(uint32_t sessionId)806 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
807 {
808 std::lock_guard<std::mutex> autoLock(taskLock_);
809 return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
810 }
811
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)812 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
813 const RemoteExecutorAckPacket *packet)
814 {
815 bool isReceiveFinished = false;
816 {
817 std::lock_guard<std::mutex> autoLock(taskLock_);
818 if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
819 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
820 return;
821 }
822 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
823 taskMap_[sessionId].taskId, sequenceId);
824 taskMap_[sessionId].currentCount++;
825 if (packet->IsLastAck()) {
826 taskMap_[sessionId].targetCount = sequenceId;
827 }
828 taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
829 if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
830 isReceiveFinished = true;
831 }
832 }
833 if (isReceiveFinished) {
834 DoFinished(sessionId, E_OK);
835 }
836 }
837
GetAndIncCommunicator() const838 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
839 {
840 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
841 ICommunicator *communicator = communicator_;
842 RefObject::IncObjRef(communicator);
843 return communicator;
844 }
845
GetAndIncSyncInterface() const846 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
847 {
848 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
849 ISyncInterface *syncInterface = syncInterface_;
850 if (syncInterface != nullptr) {
851 syncInterface->IncRefCount();
852 }
853 return syncInterface;
854 }
855
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)856 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
857 {
858 std::lock_guard<std::mutex> autoLock(taskLock_);
859 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
860 for (auto &sessionId : deviceWorkingSet_[device]) {
861 removeList.push_back(sessionId);
862 }
863 }
864 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
865 for (auto &sessionId : searchTaskQueue_[device]) {
866 removeList.push_back(sessionId);
867 }
868 }
869 }
870
RemoveAllTask(int errCode)871 void RemoteExecutor::RemoveAllTask(int errCode)
872 {
873 std::vector<OnFinished> waitToNotify;
874 std::vector<uint32_t> removeTimerList;
875 {
876 std::lock_guard<std::mutex> autoLock(taskLock_);
877 for (auto &taskEntry : taskMap_) {
878 if (taskEntry.second.onFinished != nullptr) {
879 waitToNotify.push_back(taskEntry.second.onFinished);
880 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
881 taskEntry.second.taskId, errCode);
882 }
883 if (taskEntry.second.status == Status::WORKING) {
884 removeTimerList.push_back(taskEntry.first);
885 }
886 }
887 taskMap_.clear();
888 deviceWorkingSet_.clear();
889 searchTaskQueue_.clear();
890 }
891 for (const auto &callBack : waitToNotify) {
892 callBack(errCode, nullptr);
893 }
894 for (const auto &sessionId : removeTimerList) {
895 RemoveTimer(sessionId);
896 }
897 std::lock_guard<std::mutex> autoLock(timeoutLock_);
898 timeoutMap_.clear();
899 taskFinishMap_.clear();
900 }
901
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)902 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
903 {
904 std::lock_guard<std::mutex> autoLock(taskLock_);
905 for (auto &entry : taskMap_) {
906 if (entry.second.connectionId == connectionId) {
907 removeList.push_back(entry.first);
908 }
909 }
910 }
911
GetPacketSize(const std::string & device,size_t & packetSize) const912 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize) const
913 {
914 auto *communicator = GetAndIncCommunicator();
915 if (communicator == nullptr) {
916 LOGD("communicator is nullptr");
917 return -E_BUSY;
918 }
919
920 packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
921 RefObject::DecObjRef(communicator);
922 return E_OK;
923 }
924
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)925 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
926 const SecurityOption &localOption)
927 {
928 bool res = false;
929 if (remoteOption.securityLabel == localOption.securityLabel ||
930 (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
931 localOption.securityLabel == SecurityLabel::NOT_SET)) {
932 res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
933 }
934 if (!res) {
935 LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
936 remoteOption.securityLabel, remoteOption.securityFlag,
937 localOption.securityLabel, localOption.securityFlag);
938 }
939 return res;
940 }
941
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)942 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
943 const std::string &device, uint32_t sessionId)
944 {
945 size_t packetSize = 0u;
946 int errCode = GetPacketSize(device, packetSize);
947 if (errCode != E_OK) {
948 return errCode;
949 }
950 SecurityOption option;
951 errCode = storage->GetSecurityOption(option);
952 if (errCode == -E_NOT_SUPPORT) {
953 option.securityLabel = NOT_SUPPORT_SEC_CLASSIFICATION;
954 errCode = E_OK;
955 }
956 if (errCode != E_OK) {
957 LOGD("GetSecurityOption errCode:%d", errCode);
958 return -E_SECURITY_OPTION_CHECK_ERROR;
959 }
960 ContinueToken token = nullptr;
961 uint32_t sequenceId = 1u;
962 do {
963 RelationalRowDataSet dataSet;
964 errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
965 if (errCode != E_OK) {
966 LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
967 break;
968 }
969 SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
970 errCode = ResponseData(std::move(dataSet), sendMessage, device);
971 if (errCode != E_OK) {
972 break;
973 }
974 sequenceId++;
975 } while (token != nullptr);
976 if (token != nullptr) {
977 storage->ReleaseRemoteQueryContinueToken(token);
978 }
979 return errCode;
980 }
981
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)982 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
983 const SecurityOption &remoteOption)
984 {
985 if (storage == nullptr || communicator == nullptr) {
986 return -E_BUSY;
987 }
988 if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
989 return -E_NOT_SUPPORT;
990 }
991 std::string device;
992 communicator->GetLocalIdentity(device);
993 SecurityOption localOption;
994 int errCode = storage->GetSecurityOption(localOption);
995 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
996 return -E_SECURITY_OPTION_CHECK_ERROR;
997 }
998 if (remoteOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
999 return E_OK;
1000 }
1001 if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
1002 errCode = -E_SECURITY_OPTION_CHECK_ERROR;
1003 } else {
1004 errCode = E_OK;
1005 }
1006 return errCode;
1007 }
1008
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel,uint32_t remoteVersion)1009 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
1010 int32_t remoteSecLabel, uint32_t remoteVersion)
1011 {
1012 SecurityOption localOption;
1013 int errCode = storage->GetSecurityOption(localOption);
1014 LOGI("[RemoteExecutor] remote label:%d local l:%d, f:%d, errCode:%d, remote ver %" PRIu32, remoteSecLabel,
1015 localOption.securityLabel, localOption.securityFlag, errCode, remoteVersion);
1016 if (remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION && errCode == -E_NOT_SUPPORT) {
1017 return E_OK;
1018 }
1019 if (errCode != -E_NOT_SUPPORT && localOption.securityLabel == SecurityLabel::NOT_SET) {
1020 LOGE("[RemoteExecutor] local security label not set!");
1021 return -E_SECURITY_OPTION_CHECK_ERROR;
1022 }
1023 if (remoteVersion >= RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_V4 && remoteSecLabel == NOT_SET) {
1024 LOGE("[RemoteExecutor] remote security label not set!");
1025 return -E_SECURITY_OPTION_CHECK_ERROR;
1026 }
1027 if (errCode == -E_NOT_SUPPORT) {
1028 return E_OK;
1029 }
1030 if (errCode != E_OK) {
1031 return -E_SECURITY_OPTION_CHECK_ERROR;
1032 }
1033 if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
1034 return E_OK;
1035 }
1036 if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1037 return E_OK;
1038 }
1039 return -E_SECURITY_OPTION_CHECK_ERROR;
1040 }
1041
GetTaskCount() const1042 int32_t RemoteExecutor::GetTaskCount() const
1043 {
1044 std::lock_guard<std::mutex> autoLock(taskLock_);
1045 return static_cast<int32_t>(taskMap_.size()); // max taskMap_ size is 32
1046 }
1047 }