1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "remote_executor.h"
17
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27
28 namespace DistributedDB {
29 namespace {
30 constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31 constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32 constexpr uint32_t MAX_QUEUE_COUNT = 10;
33 constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35 void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36 {
37 delete message;
38 message = nullptr;
39 delete packet;
40 packet = nullptr;
41 }
42 }
43
RemoteExecutor()44 RemoteExecutor::RemoteExecutor()
45 : workingThreadsCount_(0),
46 syncInterface_(nullptr),
47 communicator_(nullptr),
48 lastSessionId_(0),
49 lastTaskId_(0),
50 closed_(false)
51 {
52 }
53
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)54 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
55 {
56 if (syncInterface == nullptr || communicator == nullptr) {
57 return -E_INVALID_ARGS;
58 }
59 closed_ = false;
60 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
61 syncInterface_ = syncInterface;
62 communicator_ = communicator;
63 return E_OK;
64 }
65
RemoteQuery(const std::string device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)66 int RemoteExecutor::RemoteQuery(const std::string device, const RemoteCondition &condition,
67 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
68 {
69 if (closed_) {
70 return -E_BUSY;
71 }
72 if (!CheckParamValid(device, timeout)) {
73 return -E_INVALID_ARGS;
74 }
75 int errCode = E_OK;
76 SemaphoreUtils semaphore(0);
77 Task task;
78 task.result = std::make_shared<RelationalResultSetImpl>();
79 task.target = device;
80 task.timeout = timeout;
81 task.condition = condition;
82 task.onFinished = [&semaphore, &errCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
83 errCode = retCode;
84 result = taskResult;
85 semaphore.SendSemaphore();
86 };
87 task.connectionId = connectionId;
88 errCode = RemoteQueryInner(task);
89 if (errCode != E_OK) {
90 return errCode;
91 }
92 semaphore.WaitSemaphore();
93 return errCode;
94 }
95
ReceiveMessage(const std::string & targetDev,Message * inMsg)96 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
97 {
98 if (inMsg == nullptr) {
99 return -E_INVALID_ARGS;
100 }
101 if (closed_) {
102 LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
103 delete inMsg;
104 inMsg = nullptr;
105 return -E_BUSY;
106 }
107 RefObject::IncObjRef(this);
108 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
109 ReceiveMessageInner(targetDev, inMsg);
110 RefObject::DecObjRef(this);
111 });
112 if (errCode != E_OK) {
113 RefObject::DecObjRef(this);
114 }
115 return errCode;
116 }
117
NotifyDeviceOffline(const std::string & device)118 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
119 {
120 if (closed_) {
121 return;
122 }
123 LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
124 std::vector<uint32_t> removeList;
125 RemoveTaskByDevice(device, removeList);
126 for (const auto &sessionId : removeList) {
127 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
128 }
129 }
130
NotifyUserChange()131 void RemoteExecutor::NotifyUserChange()
132 {
133 if (closed_) {
134 return;
135 }
136 LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
137 RemoveAllTask(-E_USER_CHANGE);
138 LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
139 }
140
Close()141 void RemoteExecutor::Close()
142 {
143 closed_ = true;
144 LOGD("[RemoteExecutor][Close] close enter");
145 RemoveAllTask(-E_BUSY);
146 ClearInnerSource();
147 {
148 std::unique_lock<std::mutex> lock(msgQueueLock_);
149 clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
150 }
151 LOGD("[RemoteExecutor][Close] close exist");
152 }
153
NotifyConnectionClosed(uint64_t connectionId)154 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
155 {
156 if (closed_) {
157 return;
158 }
159 std::vector<uint32_t> removeList;
160 RemoveTaskByConnection(connectionId, removeList);
161 for (const auto &sessionId : removeList) {
162 DoFinished(sessionId, -E_BUSY);
163 }
164 }
165
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)166 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
167 {
168 LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
169 {
170 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
171 searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
172 if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
173 // message deal in work thread, exist here
174 return -E_NOT_NEED_DELETE_MSG;
175 }
176 workingThreadsCount_++;
177 }
178 RefObject::IncObjRef(this);
179 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
180 bool empty = true;
181 do {
182 std::pair<std::string, Message *> entry;
183 {
184 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
185 empty = searchMessageQueue_.empty();
186 if (empty) {
187 workingThreadsCount_--;
188 continue;
189 }
190 entry = searchMessageQueue_.front();
191 searchMessageQueue_.pop();
192 }
193 ParseOneRequestMessage(entry.first, entry.second);
194 delete entry.second;
195 entry.second = nullptr;
196 } while (!empty);
197 clearCV_.notify_one();
198 RefObject::DecObjRef(this);
199 });
200 if (errCode != E_OK) {
201 workingThreadsCount_--;
202 clearCV_.notify_one();
203 RefObject::DecObjRef(this);
204 } else {
205 errCode = -E_NOT_NEED_DELETE_MSG;
206 }
207 return errCode;
208 }
209
ParseOneRequestMessage(const std::string & device,Message * inMsg)210 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
211 {
212 if (closed_) {
213 LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
214 return;
215 }
216 int errCode = CheckPermissions(device, inMsg);
217 if (errCode != E_OK) {
218 ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
219 return;
220 }
221 errCode = SendRemoteExecutorData(device, inMsg);
222 if (errCode != E_OK) {
223 ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
224 }
225 }
226
CheckPermissions(const std::string & device,Message * inMsg)227 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
228 {
229 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
230 if (storage == nullptr) {
231 LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
232 return -E_BUSY;
233 }
234 // permission check
235 std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
236 std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
237 std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
238 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
239 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
240 { userId, appId, storeId, device, instanceId }, CHECK_FLAG_SEND);
241 if (errCode != E_OK) {
242 LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
243 storage->DecRefCount();
244 return errCode;
245 }
246 const auto *requestPacket = inMsg->GetObject<RemoteExecutorRequestPacket>();
247 if (requestPacket == nullptr) {
248 LOGE("[RemoteExecutor] get packet object failed");
249 storage->DecRefCount();
250 return -E_INVALID_ARGS;
251 }
252 errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel());
253 storage->DecRefCount();
254 return errCode;
255 }
256
SendRemoteExecutorData(const std::string & device,const Message * inMsg)257 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
258 {
259 auto *syncInterface = GetAndIncSyncInterface();
260 if (syncInterface == nullptr) {
261 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
262 return -E_INVALID_ARGS;
263 }
264 if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
265 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
266 syncInterface->DecRefCount();
267 return -E_NOT_SUPPORT;
268 }
269 RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
270
271 const RemoteExecutorRequestPacket *requestPacket = inMsg->GetObject<RemoteExecutorRequestPacket>();
272 if (requestPacket == nullptr) {
273 LOGE("[RemoteExecutor] get packet object failed");
274 storage->DecRefCount();
275 return -E_INVALID_ARGS;
276 }
277
278 int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
279 storage->DecRefCount();
280 return errCode;
281 }
282
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)283 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
284 {
285 auto *packet = inMsg->GetObject<RemoteExecutorAckPacket>();
286 if (packet == nullptr) {
287 return -E_INVALID_ARGS;
288 }
289 int errCode = packet->GetAckCode();
290 uint32_t sessionId = inMsg->GetSessionId();
291 uint32_t sequenceId = inMsg->GetSequenceId();
292 if (!IsPacketValid(sessionId)) {
293 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
294 return -E_INVALID_ARGS;
295 }
296 if (errCode == E_OK) {
297 auto storage = GetAndIncSyncInterface();
298 auto communicator = GetAndIncCommunicator();
299 errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
300 if (storage != nullptr) {
301 storage->DecRefCount();
302 }
303 RefObject::DecObjRef(communicator);
304 }
305 if (errCode != E_OK) {
306 DoFinished(sessionId, errCode);
307 } else {
308 ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
309 }
310 return E_OK;
311 }
312
CheckParamValid(const std::string & device,uint64_t timeout) const313 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
314 {
315 if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
316 LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
317 return false;
318 }
319 if (device.empty()) {
320 LOGD("[RemoteExecutor][CheckParamValid] device is empty");
321 return false;
322 }
323 ICommunicator *communicator = GetAndIncCommunicator();
324 if (communicator == nullptr) {
325 return false;
326 }
327 std::string localId;
328 int errCode = communicator->GetLocalIdentity(localId);
329 RefObject::DecObjRef(communicator);
330 if (errCode != E_OK) {
331 return false;
332 }
333 if (localId == device) {
334 LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
335 return false;
336 }
337 return true;
338 }
339
CheckTaskExeStatus(const std::string & device)340 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
341 {
342 uint32_t queueCount = 0u;
343 uint32_t exeTaskCount = 0u;
344 uint32_t totalCount = 0u; // waiting task count in all queue
345 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
346 queueCount = searchTaskQueue_.at(device).size();
347 }
348 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
349 exeTaskCount = deviceWorkingSet_.at(device).size();
350 }
351 for (auto &entry : searchTaskQueue_) {
352 int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
353 int currentQueueCount = static_cast<int>(entry.second.size());
354 if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
355 // all task in this queue can execute, no need calculate as waiting task count
356 continue;
357 }
358 totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
359 static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
360 }
361 return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
362 (totalCount + 1 <= MAX_QUEUE_COUNT);
363 }
364
GenerateSessionId()365 uint32_t RemoteExecutor::GenerateSessionId()
366 {
367 uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
368 while (taskMap_.find(sessionId) != taskMap_.end()) {
369 sessionId++;
370 if (sessionId == 0) { // if over flow start with 1
371 sessionId++;
372 }
373 }
374 lastSessionId_ = sessionId;
375 return sessionId;
376 }
377
GenerateTaskId()378 uint32_t RemoteExecutor::GenerateTaskId()
379 {
380 lastTaskId_++;
381 if (lastTaskId_ == 0) { // if over flow start with 1
382 lastTaskId_++;
383 }
384 return lastTaskId_;
385 }
386
RemoteQueryInner(const Task & task)387 int RemoteExecutor::RemoteQueryInner(const Task &task)
388 {
389 uint32_t sessionId = 0u;
390 {
391 // check task count and push into queue in lock
392 std::lock_guard<std::mutex> autoLock(taskLock_);
393 if (!CheckTaskExeStatus(task.target)) {
394 LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
395 return -E_MAX_LIMITS;
396 }
397 sessionId = GenerateSessionId();
398 searchTaskQueue_[task.target].push_back(sessionId);
399 if (taskMap_.find(sessionId) != taskMap_.end()) {
400 LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
401 return -E_INTERNAL_ERROR; // should not happen
402 }
403 taskMap_[sessionId] = task;
404 taskMap_[sessionId].taskId = GenerateTaskId();
405 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
406 taskMap_[sessionId].taskId, task.target.c_str());
407 }
408 std::string device = task.target;
409 RefObject::IncObjRef(this);
410 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
411 TryExecuteTaskInLock(device);
412 RefObject::DecObjRef(this);
413 });
414 if (errCode != E_OK) {
415 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
416 DoRollBack(sessionId);
417 RefObject::DecObjRef(this);
418 }
419 return errCode;
420 }
421
TryExecuteTaskInLock(const std::string & device)422 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
423 {
424 uint32_t sessionId = 0u;
425 {
426 std::lock_guard<std::mutex> autoLock(taskLock_);
427 if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
428 return;
429 }
430 if (searchTaskQueue_[device].empty()) {
431 LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
432 return;
433 }
434 sessionId = searchTaskQueue_[device].front();
435 if (taskMap_.find(sessionId) == taskMap_.end()) {
436 searchTaskQueue_[device].pop_front();
437 LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
438 return;
439 }
440 taskMap_[sessionId].status = Status::WORKING;
441 searchTaskQueue_[device].pop_front();
442 deviceWorkingSet_[device].insert(sessionId);
443 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
444 StartTimer(taskMap_[sessionId].timeout, sessionId);
445 }
446 int errCode = RequestStart(sessionId);
447 if (errCode != E_OK) {
448 DoFinished(sessionId, errCode);
449 }
450 }
451
DoRollBack(uint32_t sessionId)452 void RemoteExecutor::DoRollBack(uint32_t sessionId)
453 {
454 Task task;
455 std::lock_guard<std::mutex> autoLock(taskLock_);
456 if (taskMap_.find(sessionId) == taskMap_.end()) {
457 return;
458 }
459 task = taskMap_[sessionId];
460 if (task.status != Status::WAITING) {
461 // task is execute, abort roll back
462 return;
463 }
464 taskMap_.erase(sessionId);
465
466 auto iter = searchTaskQueue_[task.target].begin();
467 while (iter != searchTaskQueue_[task.target].end()) {
468 if ((*iter) == sessionId) {
469 break;
470 }
471 iter++;
472 }
473 if (iter != searchTaskQueue_[task.target].end()) {
474 searchTaskQueue_[task.target].erase(iter);
475 }
476 // this task should not in workingSet
477 deviceWorkingSet_[task.target].erase(sessionId);
478 }
479
RequestStart(uint32_t sessionId)480 int RemoteExecutor::RequestStart(uint32_t sessionId)
481 {
482 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
483 if (message == nullptr) {
484 LOGE("[RemoteExecutor][RequestStart] new message error");
485 return -E_OUT_OF_MEMORY;
486 }
487 message->SetSessionId(sessionId);
488 message->SetMessageType(TYPE_REQUEST);
489 auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
490 if (packet == nullptr) {
491 LOGE("[RemoteExecutor][RequestStart] new packet error");
492 ReleaseMessageAndPacket(message, nullptr);
493 return -E_OUT_OF_MEMORY;
494 }
495 std::string target;
496 int errCode = FillRequestPacket(packet, sessionId, target);
497 if (errCode != E_OK) {
498 ReleaseMessageAndPacket(message, packet);
499 return errCode;
500 }
501 errCode = message->SetExternalObject(packet);
502 if (errCode != E_OK) {
503 ReleaseMessageAndPacket(message, packet);
504 LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
505 }
506 auto communicator = GetAndIncCommunicator();
507 auto syncInterface = GetAndIncSyncInterface();
508 if (communicator == nullptr || syncInterface == nullptr) {
509 ReleaseMessageAndPacket(message, nullptr);
510 if (syncInterface != nullptr) {
511 syncInterface->DecRefCount();
512 }
513 RefObject::DecObjRef(communicator);
514 return -E_BUSY;
515 }
516 SendConfig sendConfig;
517 SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
518 RefObject::IncObjRef(this);
519 errCode = communicator->SendMessage(target, message, sendConfig, [this, sessionId](int errCode) {
520 if (errCode != E_OK) {
521 DoSendFailed(sessionId, errCode);
522 }
523 RefObject::DecObjRef(this);
524 });
525 RefObject::DecObjRef(communicator);
526 syncInterface->DecRefCount();
527 return errCode;
528 }
529
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)530 void RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
531 const std::string &device)
532 {
533 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
534 if (packet == nullptr) {
535 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
536 return;
537 }
538 packet->SetAckCode(errCode);
539 packet->SetLastAck();
540 (void)ResponseStart(packet, sessionId, sequenceId, device);
541 }
542
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)543 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
544 const std::string &device)
545 {
546 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
547 if (packet == nullptr) {
548 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
549 return -E_OUT_OF_MEMORY;
550 }
551 packet->SetAckCode(E_OK);
552 if (sendMessage.isLast) {
553 packet->SetLastAck();
554 }
555 packet->SetSecurityOption(sendMessage.option);
556 packet->MoveInRowDataSet(std::move(dataSet));
557 return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
558 }
559
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)560 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
561 const std::string &device)
562 {
563 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
564 if (storage == nullptr) {
565 ReleaseMessageAndPacket(nullptr, packet);
566 LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
567 return -E_BUSY;
568 }
569 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
570 if (message == nullptr) {
571 LOGE("[RemoteExecutor][ResponseStart] new message error");
572 storage->DecRefCount();
573 ReleaseMessageAndPacket(nullptr, packet);
574 return -E_OUT_OF_MEMORY;
575 }
576 packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
577
578 int errCode = message->SetExternalObject(packet);
579 if (errCode != E_OK) {
580 ReleaseMessageAndPacket(message, packet);
581 storage->DecRefCount();
582 LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
583 return errCode;
584 }
585 auto *communicator = GetAndIncCommunicator();
586 if (communicator == nullptr) {
587 ReleaseMessageAndPacket(message, nullptr);
588 storage->DecRefCount();
589 LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
590 return -E_BUSY;
591 }
592
593 message->SetTarget(device);
594 message->SetSessionId(sessionId);
595 message->SetSequenceId(sequenceId);
596 message->SetMessageType(TYPE_RESPONSE);
597 SendConfig sendConfig;
598 SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
599 errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
600 RefObject::DecObjRef(communicator);
601 if (errCode != E_OK) {
602 ReleaseMessageAndPacket(message, nullptr);
603 LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
604 }
605 storage->DecRefCount();
606 return errCode;
607 }
608
StartTimer(uint64_t timeout,uint32_t sessionId)609 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
610 {
611 TimerId timerId = 0u;
612 RefObject::IncObjRef(this);
613 TimerAction timeoutCallBack = std::bind(&RemoteExecutor::TimeoutCallBack, this, std::placeholders::_1);
614 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
615 RefObject::DecObjRef(this);
616 }, timerId);
617 if (errCode != E_OK) {
618 RefObject::DecObjRef(this);
619 LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
620 }
621 LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
622 std::lock_guard<std::mutex> autoLock(timeoutLock_);
623 timeoutMap_[timerId] = sessionId;
624 taskFinishMap_[sessionId] = timerId;
625 }
626
RemoveTimer(uint32_t sessionId)627 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
628 {
629 TimerId timerId = 0u;
630 {
631 std::lock_guard<std::mutex> autoLock(timeoutLock_);
632 if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
633 return;
634 }
635 timerId = taskFinishMap_[sessionId];
636 LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
637 timeoutMap_.erase(timerId);
638 taskFinishMap_.erase(sessionId);
639 }
640 RuntimeContext::GetInstance()->RemoveTimer(timerId);
641 }
642
TimeoutCallBack(TimerId timerId)643 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
644 {
645 RefObject::IncObjRef(this);
646 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
647 DoTimeout(timerId);
648 RefObject::DecObjRef(this);
649 });
650 if (errCode != E_OK) {
651 LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
652 RefObject::DecObjRef(this);
653 }
654 return -E_NO_NEED_TIMER;
655 }
656
DoTimeout(TimerId timerId)657 void RemoteExecutor::DoTimeout(TimerId timerId)
658 {
659 LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
660 uint32_t sessionId = 0u;
661 {
662 std::lock_guard<std::mutex> autoLock(timeoutLock_);
663 if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
664 return;
665 }
666 sessionId = timeoutMap_[timerId];
667 }
668 DoFinished(sessionId, -E_TIMEOUT);
669 }
670
DoSendFailed(uint32_t sessionId,int errCode)671 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
672 {
673 LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
674 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
675 }
676
DoFinished(uint32_t sessionId,int errCode)677 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
678 {
679 Task task;
680 if (ClearTaskInfo(sessionId, task) == E_OK) {
681 LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
682 } else {
683 return;
684 }
685 RefObject::IncObjRef(this);
686 if (task.onFinished != nullptr) {
687 task.onFinished(errCode, task.result);
688 LOGD("[RemoteExecutor][DoFinished] onFinished");
689 }
690 std::string device = task.target;
691 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
692 TryExecuteTaskInLock(device);
693 RefObject::DecObjRef(this);
694 });
695 if (retCode != E_OK) {
696 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
697 RefObject::DecObjRef(this);
698 }
699 }
700
ClearTaskInfo(uint32_t sessionId,Task & task)701 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
702 {
703 {
704 std::lock_guard<std::mutex> autoLock(taskLock_);
705 if (taskMap_.find(sessionId) == taskMap_.end()) {
706 return -E_NOT_FOUND;
707 }
708 task = taskMap_[sessionId];
709 taskMap_.erase(sessionId);
710 deviceWorkingSet_[task.target].erase(sessionId);
711 }
712 RemoveTimer(sessionId);
713 return E_OK;
714 }
715
ClearInnerSource()716 void RemoteExecutor::ClearInnerSource()
717 {
718 {
719 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
720 syncInterface_ = nullptr;
721 communicator_ = nullptr;
722 }
723 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
724 LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
725 while (!searchMessageQueue_.empty()) {
726 auto entry = searchMessageQueue_.front();
727 searchMessageQueue_.pop();
728 delete entry.second;
729 entry.second = nullptr;
730 }
731 }
732
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)733 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
734 {
735 ISyncInterface *storage = GetAndIncSyncInterface();
736 if (storage == nullptr) {
737 return -E_BUSY;
738 }
739 SecurityOption localOption;
740 int errCode = storage->GetSecurityOption(localOption);
741 storage->DecRefCount();
742 storage = nullptr;
743 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
744 return -E_SECURITY_OPTION_CHECK_ERROR;
745 }
746 Task task;
747 {
748 std::lock_guard<std::mutex> autoLock(taskLock_);
749 if (taskMap_.find(sessionId) == taskMap_.end()) {
750 LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
751 return -E_FINISHED;
752 }
753 task = taskMap_[sessionId];
754 }
755 PreparedStmt stmt;
756 stmt.SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
757 stmt.SetSql(task.condition.sql);
758 stmt.SetBindArgs(task.condition.bindArgs);
759 packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
760 packet->SetPreparedStmt(stmt);
761 packet->SetNeedResponse();
762 packet->SetSecLabel(errCode == E_NOT_SUPPORT ? NOT_SURPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
763 target = task.target;
764 return E_OK;
765 }
766
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)767 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
768 {
769 int errCode = E_OK;
770 if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
771 DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
772 delete inMsg;
773 inMsg = nullptr;
774 return;
775 }
776 switch (inMsg->GetMessageType()) {
777 case TYPE_REQUEST:
778 errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
779 break;
780 case TYPE_RESPONSE:
781 errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
782 break;
783 default:
784 LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
785 break;
786 }
787 if (errCode != -E_NOT_NEED_DELETE_MSG) {
788 delete inMsg;
789 inMsg = nullptr;
790 }
791 }
792
IsPacketValid(uint32_t sessionId)793 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
794 {
795 std::lock_guard<std::mutex> autoLock(taskLock_);
796 return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
797 }
798
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)799 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
800 const RemoteExecutorAckPacket *packet)
801 {
802 bool isReceiveFinished = false;
803 {
804 std::lock_guard<std::mutex> autoLock(taskLock_);
805 if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
806 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
807 return;
808 }
809 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
810 taskMap_[sessionId].taskId, sequenceId);
811 taskMap_[sessionId].currentCount++;
812 if (packet->IsLastAck()) {
813 taskMap_[sessionId].targetCount = sequenceId;
814 }
815 taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
816 if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
817 isReceiveFinished = true;
818 }
819 }
820 if (isReceiveFinished) {
821 DoFinished(sessionId, E_OK);
822 }
823 }
824
GetAndIncCommunicator() const825 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
826 {
827 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
828 ICommunicator *communicator = communicator_;
829 RefObject::IncObjRef(communicator);
830 return communicator;
831 }
832
GetAndIncSyncInterface() const833 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
834 {
835 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
836 ISyncInterface *syncInterface = syncInterface_;
837 if (syncInterface != nullptr) {
838 syncInterface->IncRefCount();
839 }
840 return syncInterface;
841 }
842
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)843 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
844 {
845 std::lock_guard<std::mutex> autoLock(taskLock_);
846 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
847 for (auto &sessionId : deviceWorkingSet_[device]) {
848 removeList.push_back(sessionId);
849 }
850 }
851 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
852 for (auto &sessionId : searchTaskQueue_[device]) {
853 removeList.push_back(sessionId);
854 }
855 }
856 }
857
RemoveAllTask(int errCode)858 void RemoteExecutor::RemoveAllTask(int errCode)
859 {
860 std::vector<OnFinished> waitToNotify;
861 std::vector<uint32_t> removeTimerList;
862 {
863 std::lock_guard<std::mutex> autoLock(taskLock_);
864 for (auto &taskEntry : taskMap_) {
865 if (taskEntry.second.onFinished != nullptr) {
866 waitToNotify.push_back(taskEntry.second.onFinished);
867 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
868 taskEntry.second.taskId, errCode);
869 }
870 if (taskEntry.second.status == Status::WORKING) {
871 removeTimerList.push_back(taskEntry.first);
872 }
873 }
874 taskMap_.clear();
875 deviceWorkingSet_.clear();
876 searchTaskQueue_.clear();
877 }
878 for (const auto &callBack : waitToNotify) {
879 callBack(errCode, nullptr);
880 }
881 for (const auto &sessionId : removeTimerList) {
882 RemoveTimer(sessionId);
883 }
884 std::lock_guard<std::mutex> autoLock(timeoutLock_);
885 timeoutMap_.clear();
886 taskFinishMap_.clear();
887 }
888
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)889 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
890 {
891 std::lock_guard<std::mutex> autoLock(taskLock_);
892 for (auto &entry : taskMap_) {
893 if (entry.second.connectionId == connectionId) {
894 removeList.push_back(entry.first);
895 }
896 }
897 }
898
GetPacketSize(const std::string & device,size_t & packetSize)899 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize)
900 {
901 auto *communicator = GetAndIncCommunicator();
902 if (communicator == nullptr) {
903 LOGD("communicator is nullptr");
904 return -E_BUSY;
905 }
906
907 packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
908 RefObject::DecObjRef(communicator);
909 return E_OK;
910 }
911
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)912 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
913 const SecurityOption &localOption)
914 {
915 bool res = false;
916 if (remoteOption.securityLabel == localOption.securityLabel ||
917 (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
918 localOption.securityLabel == SecurityLabel::NOT_SET)) {
919 res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
920 }
921 if (!res) {
922 LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
923 remoteOption.securityLabel, remoteOption.securityFlag,
924 localOption.securityLabel, localOption.securityFlag);
925 }
926 return res;
927 }
928
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)929 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
930 const std::string &device, uint32_t sessionId)
931 {
932 size_t packetSize = 0u;
933 int errCode = GetPacketSize(device, packetSize);
934 if (errCode != E_OK) {
935 return errCode;
936 }
937 SecurityOption option;
938 errCode = storage->GetSecurityOption(option);
939 if (errCode == -E_NOT_SUPPORT) {
940 option.securityLabel = NOT_SURPPORT_SEC_CLASSIFICATION;
941 errCode = E_OK;
942 }
943 if (errCode != E_OK) {
944 LOGD("GetSecurityOption errCode:%d", errCode);
945 return -E_SECURITY_OPTION_CHECK_ERROR;
946 }
947 ContinueToken token = nullptr;
948 uint32_t sequenceId = 1u;
949 do {
950 RelationalRowDataSet dataSet;
951 errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
952 if (errCode != E_OK) {
953 LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
954 break;
955 }
956 SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
957 errCode = ResponseData(std::move(dataSet), sendMessage, device);
958 if (errCode != E_OK) {
959 break;
960 }
961 sequenceId++;
962 } while (token != nullptr);
963 if (token != nullptr) {
964 storage->ReleaseRemoteQueryContinueToken(token);
965 }
966 return errCode;
967 }
968
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)969 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
970 const SecurityOption &remoteOption)
971 {
972 if (storage == nullptr || communicator == nullptr) {
973 return -E_BUSY;
974 }
975 if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
976 return -E_NOT_SUPPORT;
977 }
978 std::string device;
979 communicator->GetLocalIdentity(device);
980 SecurityOption localOption;
981 int errCode = storage->GetSecurityOption(localOption);
982 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
983 return -E_SECURITY_OPTION_CHECK_ERROR;
984 }
985 if (remoteOption.securityLabel == NOT_SURPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
986 return E_OK;
987 }
988 if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
989 errCode = -E_SECURITY_OPTION_CHECK_ERROR;
990 } else {
991 errCode = E_OK;
992 }
993 return errCode;
994 }
995
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel)996 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
997 int32_t remoteSecLabel)
998 {
999 SecurityOption localOption;
1000 int errCode = storage->GetSecurityOption(localOption);
1001 if (errCode == -E_NOT_SUPPORT) {
1002 return E_OK;
1003 }
1004 if (errCode != E_OK) {
1005 return -E_SECURITY_OPTION_CHECK_ERROR;
1006 }
1007 if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SURPPORT_SEC_CLASSIFICATION) {
1008 return E_OK;
1009 }
1010 if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1011 return E_OK;
1012 }
1013 return -E_SECURITY_OPTION_CHECK_ERROR;
1014 }
1015 }