1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "remote_executor.h"
17
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27
28 namespace DistributedDB {
29 namespace {
30 constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31 constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32 constexpr uint32_t MAX_QUEUE_COUNT = 10;
33 constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35 void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36 {
37 delete message;
38 message = nullptr;
39 delete packet;
40 packet = nullptr;
41 }
42 }
43
RemoteExecutor()44 RemoteExecutor::RemoteExecutor()
45 : workingThreadsCount_(0),
46 syncInterface_(nullptr),
47 communicator_(nullptr),
48 lastSessionId_(0),
49 lastTaskId_(0),
50 closed_(false)
51 {
52 }
53
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)54 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
55 {
56 if (syncInterface == nullptr || communicator == nullptr) {
57 return -E_INVALID_ARGS;
58 }
59 closed_ = false;
60 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
61 syncInterface_ = syncInterface;
62 communicator_ = communicator;
63 return E_OK;
64 }
65
RemoteQuery(const std::string device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)66 int RemoteExecutor::RemoteQuery(const std::string device, const RemoteCondition &condition,
67 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
68 {
69 if (closed_) {
70 return -E_BUSY;
71 }
72 if (!CheckParamValid(device, timeout)) {
73 return -E_INVALID_ARGS;
74 }
75 int errCode = E_OK;
76 int taskErrCode = E_OK;
77 SemaphoreUtils semaphore(0);
78 Task task;
79 task.result = std::make_shared<RelationalResultSetImpl>();
80 task.target = device;
81 task.timeout = timeout;
82 task.condition = condition;
83 task.onFinished = [&semaphore, &taskErrCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
84 taskErrCode = retCode;
85 result = taskResult;
86 semaphore.SendSemaphore();
87 };
88 task.connectionId = connectionId;
89 errCode = RemoteQueryInner(task);
90 if (errCode != E_OK) {
91 return errCode;
92 }
93 semaphore.WaitSemaphore();
94 return taskErrCode;
95 }
96
ReceiveMessage(const std::string & targetDev,Message * inMsg)97 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
98 {
99 if (inMsg == nullptr) {
100 return -E_INVALID_ARGS;
101 }
102 if (closed_) {
103 LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
104 delete inMsg;
105 inMsg = nullptr;
106 return -E_BUSY;
107 }
108 RefObject::IncObjRef(this);
109 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
110 ReceiveMessageInner(targetDev, inMsg);
111 RefObject::DecObjRef(this);
112 });
113 if (errCode != E_OK) {
114 RefObject::DecObjRef(this);
115 }
116 return errCode;
117 }
118
NotifyDeviceOffline(const std::string & device)119 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
120 {
121 if (closed_) {
122 return;
123 }
124 LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
125 std::vector<uint32_t> removeList;
126 RemoveTaskByDevice(device, removeList);
127 for (const auto &sessionId : removeList) {
128 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
129 }
130 }
131
NotifyUserChange()132 void RemoteExecutor::NotifyUserChange()
133 {
134 if (closed_) {
135 return;
136 }
137 LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
138 RemoveAllTask(-E_USER_CHANGE);
139 LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
140 }
141
Close()142 void RemoteExecutor::Close()
143 {
144 closed_ = true;
145 LOGD("[RemoteExecutor][Close] close enter");
146 RemoveAllTask(-E_BUSY);
147 ClearInnerSource();
148 {
149 std::unique_lock<std::mutex> lock(msgQueueLock_);
150 clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
151 }
152 LOGD("[RemoteExecutor][Close] close exist");
153 }
154
NotifyConnectionClosed(uint64_t connectionId)155 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
156 {
157 if (closed_) {
158 return;
159 }
160 std::vector<uint32_t> removeList;
161 RemoveTaskByConnection(connectionId, removeList);
162 for (const auto &sessionId : removeList) {
163 DoFinished(sessionId, -E_BUSY);
164 }
165 }
166
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)167 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
168 {
169 LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
170 {
171 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
172 searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
173 if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
174 // message deal in work thread, exist here
175 return -E_NOT_NEED_DELETE_MSG;
176 }
177 workingThreadsCount_++;
178 }
179 bool empty = true;
180 do {
181 std::pair<std::string, Message *> entry;
182 {
183 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
184 empty = searchMessageQueue_.empty();
185 if (empty) {
186 workingThreadsCount_--;
187 continue;
188 }
189 entry = searchMessageQueue_.front();
190 searchMessageQueue_.pop();
191 }
192 ParseOneRequestMessage(entry.first, entry.second);
193 delete entry.second;
194 entry.second = nullptr;
195 } while (!empty);
196 clearCV_.notify_one();
197 return -E_NOT_NEED_DELETE_MSG;
198 }
199
ParseOneRequestMessage(const std::string & device,Message * inMsg)200 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
201 {
202 if (closed_) {
203 LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
204 return;
205 }
206 int errCode = CheckPermissions(device, inMsg);
207 if (errCode != E_OK) {
208 (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
209 return;
210 }
211 errCode = SendRemoteExecutorData(device, inMsg);
212 if (errCode != E_OK) {
213 (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
214 }
215 }
216
CheckPermissions(const std::string & device,Message * inMsg)217 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
218 {
219 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
220 if (storage == nullptr) {
221 LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
222 return -E_BUSY;
223 }
224 // permission check
225 std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
226 std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
227 std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
228 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
229 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
230 { userId, appId, storeId, device, instanceId }, CHECK_FLAG_SEND);
231 if (errCode != E_OK) {
232 LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
233 storage->DecRefCount();
234 return errCode;
235 }
236 const auto *requestPacket = inMsg->GetObject<RemoteExecutorRequestPacket>();
237 if (requestPacket == nullptr) {
238 LOGE("[RemoteExecutor] get packet object failed");
239 storage->DecRefCount();
240 return -E_INVALID_ARGS;
241 }
242 errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel());
243 storage->DecRefCount();
244 return errCode;
245 }
246
SendRemoteExecutorData(const std::string & device,const Message * inMsg)247 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
248 {
249 auto *syncInterface = GetAndIncSyncInterface();
250 if (syncInterface == nullptr) {
251 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
252 return -E_INVALID_ARGS;
253 }
254 if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
255 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
256 syncInterface->DecRefCount();
257 return -E_NOT_SUPPORT;
258 }
259 RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
260
261 const RemoteExecutorRequestPacket *requestPacket = inMsg->GetObject<RemoteExecutorRequestPacket>();
262 if (requestPacket == nullptr) {
263 LOGE("[RemoteExecutor] get packet object failed");
264 storage->DecRefCount();
265 return -E_INVALID_ARGS;
266 }
267
268 int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
269 storage->DecRefCount();
270 return errCode;
271 }
272
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)273 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
274 {
275 auto *packet = inMsg->GetObject<RemoteExecutorAckPacket>();
276 if (packet == nullptr) {
277 return -E_INVALID_ARGS;
278 }
279 int errCode = packet->GetAckCode();
280 uint32_t sessionId = inMsg->GetSessionId();
281 uint32_t sequenceId = inMsg->GetSequenceId();
282 if (!IsPacketValid(sessionId)) {
283 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
284 return -E_INVALID_ARGS;
285 }
286 if (errCode == E_OK) {
287 auto storage = GetAndIncSyncInterface();
288 auto communicator = GetAndIncCommunicator();
289 errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
290 if (storage != nullptr) {
291 storage->DecRefCount();
292 }
293 RefObject::DecObjRef(communicator);
294 }
295 if (errCode != E_OK) {
296 DoFinished(sessionId, errCode);
297 } else {
298 ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
299 }
300 return E_OK;
301 }
302
CheckParamValid(const std::string & device,uint64_t timeout) const303 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
304 {
305 if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
306 LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
307 return false;
308 }
309 if (device.empty()) {
310 LOGD("[RemoteExecutor][CheckParamValid] device is empty");
311 return false;
312 }
313 if (device.length() > DBConstant::MAX_DEV_LENGTH) {
314 LOGE("[RemoteExecutor] dev is too long len=%zu", device.length());
315 return false;
316 }
317 ICommunicator *communicator = GetAndIncCommunicator();
318 if (communicator == nullptr) {
319 return false;
320 }
321 std::string localId;
322 int errCode = communicator->GetLocalIdentity(localId);
323 RefObject::DecObjRef(communicator);
324 if (errCode != E_OK) {
325 return false;
326 }
327 if (localId == device) {
328 LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
329 return false;
330 }
331 return true;
332 }
333
CheckTaskExeStatus(const std::string & device)334 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
335 {
336 uint32_t queueCount = 0u;
337 uint32_t exeTaskCount = 0u;
338 uint32_t totalCount = 0u; // waiting task count in all queue
339 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
340 queueCount = searchTaskQueue_.at(device).size();
341 }
342 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
343 exeTaskCount = deviceWorkingSet_.at(device).size();
344 }
345 for (auto &entry : searchTaskQueue_) {
346 int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
347 int currentQueueCount = static_cast<int>(entry.second.size());
348 if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
349 // all task in this queue can execute, no need calculate as waiting task count
350 continue;
351 }
352 totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
353 static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
354 }
355 return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
356 (totalCount + 1 <= MAX_QUEUE_COUNT);
357 }
358
GenerateSessionId()359 uint32_t RemoteExecutor::GenerateSessionId()
360 {
361 uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
362 while (taskMap_.find(sessionId) != taskMap_.end()) {
363 sessionId++;
364 if (sessionId == 0) { // if over flow start with 1
365 sessionId++;
366 }
367 }
368 lastSessionId_ = sessionId;
369 return sessionId;
370 }
371
GenerateTaskId()372 uint32_t RemoteExecutor::GenerateTaskId()
373 {
374 lastTaskId_++;
375 if (lastTaskId_ == 0) { // if over flow start with 1
376 lastTaskId_++;
377 }
378 return lastTaskId_;
379 }
380
RemoteQueryInner(const Task & task)381 int RemoteExecutor::RemoteQueryInner(const Task &task)
382 {
383 uint32_t sessionId = 0u;
384 {
385 // check task count and push into queue in lock
386 std::lock_guard<std::mutex> autoLock(taskLock_);
387 if (!CheckTaskExeStatus(task.target)) {
388 LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
389 return -E_MAX_LIMITS;
390 }
391 sessionId = GenerateSessionId();
392 searchTaskQueue_[task.target].push_back(sessionId);
393 if (taskMap_.find(sessionId) != taskMap_.end()) {
394 LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
395 return -E_INTERNAL_ERROR; // should not happen
396 }
397 taskMap_[sessionId] = task;
398 taskMap_[sessionId].taskId = GenerateTaskId();
399 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
400 taskMap_[sessionId].taskId, task.target.c_str());
401 }
402 std::string device = task.target;
403 RefObject::IncObjRef(this);
404 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
405 TryExecuteTaskInLock(device);
406 RefObject::DecObjRef(this);
407 });
408 if (errCode != E_OK) {
409 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
410 DoRollBack(sessionId);
411 RefObject::DecObjRef(this);
412 }
413 return errCode;
414 }
415
TryExecuteTaskInLock(const std::string & device)416 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
417 {
418 uint32_t sessionId = 0u;
419 {
420 std::lock_guard<std::mutex> autoLock(taskLock_);
421 if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
422 return;
423 }
424 if (searchTaskQueue_[device].empty()) {
425 LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
426 return;
427 }
428 sessionId = searchTaskQueue_[device].front();
429 if (taskMap_.find(sessionId) == taskMap_.end()) {
430 searchTaskQueue_[device].pop_front();
431 LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
432 return;
433 }
434 taskMap_[sessionId].status = Status::WORKING;
435 searchTaskQueue_[device].pop_front();
436 deviceWorkingSet_[device].insert(sessionId);
437 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
438 StartTimer(taskMap_[sessionId].timeout, sessionId);
439 }
440 int errCode = RequestStart(sessionId);
441 if (errCode != E_OK) {
442 DoFinished(sessionId, errCode);
443 }
444 }
445
DoRollBack(uint32_t sessionId)446 void RemoteExecutor::DoRollBack(uint32_t sessionId)
447 {
448 Task task;
449 std::lock_guard<std::mutex> autoLock(taskLock_);
450 if (taskMap_.find(sessionId) == taskMap_.end()) {
451 return;
452 }
453 task = taskMap_[sessionId];
454 if (task.status != Status::WAITING) {
455 // task is execute, abort roll back
456 return;
457 }
458 taskMap_.erase(sessionId);
459
460 auto iter = searchTaskQueue_[task.target].begin();
461 while (iter != searchTaskQueue_[task.target].end()) {
462 if ((*iter) == sessionId) {
463 break;
464 }
465 iter++;
466 }
467 if (iter != searchTaskQueue_[task.target].end()) {
468 searchTaskQueue_[task.target].erase(iter);
469 }
470 // this task should not in workingSet
471 deviceWorkingSet_[task.target].erase(sessionId);
472 }
473
RequestStart(uint32_t sessionId)474 int RemoteExecutor::RequestStart(uint32_t sessionId)
475 {
476 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
477 if (message == nullptr) {
478 LOGE("[RemoteExecutor][RequestStart] new message error");
479 return -E_OUT_OF_MEMORY;
480 }
481 message->SetSessionId(sessionId);
482 message->SetMessageType(TYPE_REQUEST);
483 auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
484 if (packet == nullptr) {
485 LOGE("[RemoteExecutor][RequestStart] new packet error");
486 ReleaseMessageAndPacket(message, nullptr);
487 return -E_OUT_OF_MEMORY;
488 }
489 std::string target;
490 int errCode = FillRequestPacket(packet, sessionId, target);
491 if (errCode != E_OK) {
492 ReleaseMessageAndPacket(message, packet);
493 return errCode;
494 }
495 errCode = message->SetExternalObject(packet);
496 if (errCode != E_OK) {
497 ReleaseMessageAndPacket(message, packet);
498 LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
499 return errCode;
500 }
501 return SendRequestMessage(target, message, sessionId);
502 }
503
SendRequestMessage(const std::string & target,Message * message,uint32_t sessionId)504 int RemoteExecutor::SendRequestMessage(const std::string &target, Message *message, uint32_t sessionId)
505 {
506 auto communicator = GetAndIncCommunicator();
507 auto syncInterface = GetAndIncSyncInterface();
508 if (communicator == nullptr || syncInterface == nullptr) {
509 ReleaseMessageAndPacket(message, nullptr);
510 if (syncInterface != nullptr) {
511 syncInterface->DecRefCount();
512 }
513 RefObject::DecObjRef(communicator);
514 return -E_BUSY;
515 }
516 SendConfig sendConfig;
517 SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
518 RefObject::IncObjRef(this);
519 int errCode = communicator->SendMessage(target, message, sendConfig, [this, sessionId](int errCode) {
520 if (errCode != E_OK) {
521 DoSendFailed(sessionId, errCode);
522 }
523 RefObject::DecObjRef(this);
524 });
525 if (errCode != E_OK) {
526 ReleaseMessageAndPacket(message, nullptr);
527 RefObject::DecObjRef(this);
528 }
529 RefObject::DecObjRef(communicator);
530 syncInterface->DecRefCount();
531 return errCode;
532 }
533
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)534 int RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
535 const std::string &device)
536 {
537 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
538 if (packet == nullptr) {
539 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
540 return -E_OUT_OF_MEMORY;
541 }
542 packet->SetAckCode(errCode);
543 packet->SetLastAck();
544 return ResponseStart(packet, sessionId, sequenceId, device);
545 }
546
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)547 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
548 const std::string &device)
549 {
550 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
551 if (packet == nullptr) {
552 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
553 return -E_OUT_OF_MEMORY;
554 }
555 packet->SetAckCode(E_OK);
556 if (sendMessage.isLast) {
557 packet->SetLastAck();
558 }
559 packet->SetSecurityOption(sendMessage.option);
560 packet->MoveInRowDataSet(std::move(dataSet));
561 return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
562 }
563
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)564 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
565 const std::string &device)
566 {
567 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
568 if (storage == nullptr) {
569 ReleaseMessageAndPacket(nullptr, packet);
570 LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
571 return -E_BUSY;
572 }
573 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
574 if (message == nullptr) {
575 LOGE("[RemoteExecutor][ResponseStart] new message error");
576 storage->DecRefCount();
577 ReleaseMessageAndPacket(nullptr, packet);
578 return -E_OUT_OF_MEMORY;
579 }
580 packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
581
582 int errCode = message->SetExternalObject(packet);
583 if (errCode != E_OK) {
584 ReleaseMessageAndPacket(message, packet);
585 storage->DecRefCount();
586 LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
587 return errCode;
588 }
589 auto *communicator = GetAndIncCommunicator();
590 if (communicator == nullptr) {
591 ReleaseMessageAndPacket(message, nullptr);
592 storage->DecRefCount();
593 LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
594 return -E_BUSY;
595 }
596
597 message->SetTarget(device);
598 message->SetSessionId(sessionId);
599 message->SetSequenceId(sequenceId);
600 message->SetMessageType(TYPE_RESPONSE);
601 SendConfig sendConfig;
602 SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
603 errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
604 RefObject::DecObjRef(communicator);
605 if (errCode != E_OK) {
606 ReleaseMessageAndPacket(message, nullptr);
607 LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
608 }
609 storage->DecRefCount();
610 return errCode;
611 }
612
StartTimer(uint64_t timeout,uint32_t sessionId)613 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
614 {
615 TimerId timerId = 0u;
616 RefObject::IncObjRef(this);
617 TimerAction timeoutCallBack = std::bind(&RemoteExecutor::TimeoutCallBack, this, std::placeholders::_1);
618 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
619 RefObject::DecObjRef(this);
620 }, timerId);
621 if (errCode != E_OK) {
622 RefObject::DecObjRef(this);
623 LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
624 }
625 LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
626 std::lock_guard<std::mutex> autoLock(timeoutLock_);
627 timeoutMap_[timerId] = sessionId;
628 taskFinishMap_[sessionId] = timerId;
629 }
630
RemoveTimer(uint32_t sessionId)631 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
632 {
633 TimerId timerId = 0u;
634 {
635 std::lock_guard<std::mutex> autoLock(timeoutLock_);
636 if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
637 return;
638 }
639 timerId = taskFinishMap_[sessionId];
640 LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
641 timeoutMap_.erase(timerId);
642 taskFinishMap_.erase(sessionId);
643 }
644 RuntimeContext::GetInstance()->RemoveTimer(timerId);
645 }
646
TimeoutCallBack(TimerId timerId)647 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
648 {
649 RefObject::IncObjRef(this);
650 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
651 DoTimeout(timerId);
652 RefObject::DecObjRef(this);
653 });
654 if (errCode != E_OK) {
655 LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
656 RefObject::DecObjRef(this);
657 }
658 return -E_NO_NEED_TIMER;
659 }
660
DoTimeout(TimerId timerId)661 void RemoteExecutor::DoTimeout(TimerId timerId)
662 {
663 LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
664 uint32_t sessionId = 0u;
665 {
666 std::lock_guard<std::mutex> autoLock(timeoutLock_);
667 if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
668 return;
669 }
670 sessionId = timeoutMap_[timerId];
671 }
672 DoFinished(sessionId, -E_TIMEOUT);
673 }
674
DoSendFailed(uint32_t sessionId,int errCode)675 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
676 {
677 LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
678 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
679 }
680
DoFinished(uint32_t sessionId,int errCode)681 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
682 {
683 Task task;
684 if (ClearTaskInfo(sessionId, task) == E_OK) {
685 LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
686 } else {
687 return;
688 }
689 RefObject::IncObjRef(this);
690 if (task.onFinished != nullptr) {
691 task.onFinished(errCode, task.result);
692 LOGD("[RemoteExecutor][DoFinished] onFinished");
693 }
694 std::string device = task.target;
695 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
696 TryExecuteTaskInLock(device);
697 RefObject::DecObjRef(this);
698 });
699 if (retCode != E_OK) {
700 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
701 RefObject::DecObjRef(this);
702 }
703 }
704
ClearTaskInfo(uint32_t sessionId,Task & task)705 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
706 {
707 {
708 std::lock_guard<std::mutex> autoLock(taskLock_);
709 if (taskMap_.find(sessionId) == taskMap_.end()) {
710 return -E_NOT_FOUND;
711 }
712 task = taskMap_[sessionId];
713 taskMap_.erase(sessionId);
714 deviceWorkingSet_[task.target].erase(sessionId);
715 }
716 RemoveTimer(sessionId);
717 return E_OK;
718 }
719
ClearInnerSource()720 void RemoteExecutor::ClearInnerSource()
721 {
722 {
723 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
724 syncInterface_ = nullptr;
725 communicator_ = nullptr;
726 }
727 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
728 LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
729 while (!searchMessageQueue_.empty()) {
730 auto entry = searchMessageQueue_.front();
731 searchMessageQueue_.pop();
732 delete entry.second;
733 entry.second = nullptr;
734 }
735 }
736
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)737 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
738 {
739 ISyncInterface *storage = GetAndIncSyncInterface();
740 if (storage == nullptr) {
741 return -E_BUSY;
742 }
743 SecurityOption localOption;
744 int errCode = storage->GetSecurityOption(localOption);
745 storage->DecRefCount();
746 storage = nullptr;
747 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
748 return -E_SECURITY_OPTION_CHECK_ERROR;
749 }
750 Task task;
751 {
752 std::lock_guard<std::mutex> autoLock(taskLock_);
753 if (taskMap_.find(sessionId) == taskMap_.end()) {
754 LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
755 return -E_FINISHED;
756 }
757 task = taskMap_[sessionId];
758 }
759 packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
760 packet->SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
761 packet->SetSql(task.condition.sql);
762 packet->SetBindArgs(task.condition.bindArgs);
763 packet->SetNeedResponse();
764 packet->SetSecLabel(errCode == E_NOT_SUPPORT ? NOT_SURPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
765 target = task.target;
766 return E_OK;
767 }
768
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)769 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
770 {
771 int errCode = E_OK;
772 if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
773 DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
774 delete inMsg;
775 inMsg = nullptr;
776 return;
777 }
778 switch (inMsg->GetMessageType()) {
779 case TYPE_REQUEST:
780 errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
781 break;
782 case TYPE_RESPONSE:
783 errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
784 break;
785 default:
786 LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
787 break;
788 }
789 if (errCode != -E_NOT_NEED_DELETE_MSG) {
790 delete inMsg;
791 inMsg = nullptr;
792 }
793 }
794
IsPacketValid(uint32_t sessionId)795 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
796 {
797 std::lock_guard<std::mutex> autoLock(taskLock_);
798 return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
799 }
800
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)801 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
802 const RemoteExecutorAckPacket *packet)
803 {
804 bool isReceiveFinished = false;
805 {
806 std::lock_guard<std::mutex> autoLock(taskLock_);
807 if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
808 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
809 return;
810 }
811 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
812 taskMap_[sessionId].taskId, sequenceId);
813 taskMap_[sessionId].currentCount++;
814 if (packet->IsLastAck()) {
815 taskMap_[sessionId].targetCount = sequenceId;
816 }
817 taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
818 if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
819 isReceiveFinished = true;
820 }
821 }
822 if (isReceiveFinished) {
823 DoFinished(sessionId, E_OK);
824 }
825 }
826
GetAndIncCommunicator() const827 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
828 {
829 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
830 ICommunicator *communicator = communicator_;
831 RefObject::IncObjRef(communicator);
832 return communicator;
833 }
834
GetAndIncSyncInterface() const835 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
836 {
837 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
838 ISyncInterface *syncInterface = syncInterface_;
839 if (syncInterface != nullptr) {
840 syncInterface->IncRefCount();
841 }
842 return syncInterface;
843 }
844
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)845 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
846 {
847 std::lock_guard<std::mutex> autoLock(taskLock_);
848 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
849 for (auto &sessionId : deviceWorkingSet_[device]) {
850 removeList.push_back(sessionId);
851 }
852 }
853 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
854 for (auto &sessionId : searchTaskQueue_[device]) {
855 removeList.push_back(sessionId);
856 }
857 }
858 }
859
RemoveAllTask(int errCode)860 void RemoteExecutor::RemoveAllTask(int errCode)
861 {
862 std::vector<OnFinished> waitToNotify;
863 std::vector<uint32_t> removeTimerList;
864 {
865 std::lock_guard<std::mutex> autoLock(taskLock_);
866 for (auto &taskEntry : taskMap_) {
867 if (taskEntry.second.onFinished != nullptr) {
868 waitToNotify.push_back(taskEntry.second.onFinished);
869 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
870 taskEntry.second.taskId, errCode);
871 }
872 if (taskEntry.second.status == Status::WORKING) {
873 removeTimerList.push_back(taskEntry.first);
874 }
875 }
876 taskMap_.clear();
877 deviceWorkingSet_.clear();
878 searchTaskQueue_.clear();
879 }
880 for (const auto &callBack : waitToNotify) {
881 callBack(errCode, nullptr);
882 }
883 for (const auto &sessionId : removeTimerList) {
884 RemoveTimer(sessionId);
885 }
886 std::lock_guard<std::mutex> autoLock(timeoutLock_);
887 timeoutMap_.clear();
888 taskFinishMap_.clear();
889 }
890
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)891 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
892 {
893 std::lock_guard<std::mutex> autoLock(taskLock_);
894 for (auto &entry : taskMap_) {
895 if (entry.second.connectionId == connectionId) {
896 removeList.push_back(entry.first);
897 }
898 }
899 }
900
GetPacketSize(const std::string & device,size_t & packetSize) const901 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize) const
902 {
903 auto *communicator = GetAndIncCommunicator();
904 if (communicator == nullptr) {
905 LOGD("communicator is nullptr");
906 return -E_BUSY;
907 }
908
909 packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
910 RefObject::DecObjRef(communicator);
911 return E_OK;
912 }
913
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)914 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
915 const SecurityOption &localOption)
916 {
917 bool res = false;
918 if (remoteOption.securityLabel == localOption.securityLabel ||
919 (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
920 localOption.securityLabel == SecurityLabel::NOT_SET)) {
921 res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
922 }
923 if (!res) {
924 LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
925 remoteOption.securityLabel, remoteOption.securityFlag,
926 localOption.securityLabel, localOption.securityFlag);
927 }
928 return res;
929 }
930
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)931 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
932 const std::string &device, uint32_t sessionId)
933 {
934 size_t packetSize = 0u;
935 int errCode = GetPacketSize(device, packetSize);
936 if (errCode != E_OK) {
937 return errCode;
938 }
939 SecurityOption option;
940 errCode = storage->GetSecurityOption(option);
941 if (errCode == -E_NOT_SUPPORT) {
942 option.securityLabel = NOT_SURPPORT_SEC_CLASSIFICATION;
943 errCode = E_OK;
944 }
945 if (errCode != E_OK) {
946 LOGD("GetSecurityOption errCode:%d", errCode);
947 return -E_SECURITY_OPTION_CHECK_ERROR;
948 }
949 ContinueToken token = nullptr;
950 uint32_t sequenceId = 1u;
951 do {
952 RelationalRowDataSet dataSet;
953 errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
954 if (errCode != E_OK) {
955 LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
956 break;
957 }
958 SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
959 errCode = ResponseData(std::move(dataSet), sendMessage, device);
960 if (errCode != E_OK) {
961 break;
962 }
963 sequenceId++;
964 } while (token != nullptr);
965 if (token != nullptr) {
966 storage->ReleaseRemoteQueryContinueToken(token);
967 }
968 return errCode;
969 }
970
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)971 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
972 const SecurityOption &remoteOption)
973 {
974 if (storage == nullptr || communicator == nullptr) {
975 return -E_BUSY;
976 }
977 if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
978 return -E_NOT_SUPPORT;
979 }
980 std::string device;
981 communicator->GetLocalIdentity(device);
982 SecurityOption localOption;
983 int errCode = storage->GetSecurityOption(localOption);
984 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
985 return -E_SECURITY_OPTION_CHECK_ERROR;
986 }
987 if (remoteOption.securityLabel == NOT_SURPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
988 return E_OK;
989 }
990 if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
991 errCode = -E_SECURITY_OPTION_CHECK_ERROR;
992 } else {
993 errCode = E_OK;
994 }
995 return errCode;
996 }
997
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel)998 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
999 int32_t remoteSecLabel)
1000 {
1001 SecurityOption localOption;
1002 int errCode = storage->GetSecurityOption(localOption);
1003 if (errCode == -E_NOT_SUPPORT) {
1004 return E_OK;
1005 }
1006 if (errCode != E_OK) {
1007 return -E_SECURITY_OPTION_CHECK_ERROR;
1008 }
1009 if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SURPPORT_SEC_CLASSIFICATION) {
1010 return E_OK;
1011 }
1012 if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1013 return E_OK;
1014 }
1015 return -E_SECURITY_OPTION_CHECK_ERROR;
1016 }
1017 }