1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "remote_executor.h"
17
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27
28 namespace DistributedDB {
29 namespace {
30 constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31 constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32 constexpr uint32_t MAX_QUEUE_COUNT = 10;
33 constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35 void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36 {
37 if (message != nullptr) {
38 delete message;
39 message = nullptr;
40 }
41 if (packet != nullptr) {
42 delete packet;
43 packet = nullptr;
44 }
45 }
46 }
47
RemoteExecutor()48 RemoteExecutor::RemoteExecutor()
49 : workingThreadsCount_(0),
50 syncInterface_(nullptr),
51 communicator_(nullptr),
52 lastSessionId_(0),
53 lastTaskId_(0),
54 closed_(false)
55 {
56 }
57
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)58 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
59 {
60 if (syncInterface == nullptr || communicator == nullptr) {
61 return -E_INVALID_ARGS;
62 }
63 closed_ = false;
64 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
65 syncInterface_ = syncInterface;
66 communicator_ = communicator;
67 return E_OK;
68 }
69
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)70 int RemoteExecutor::RemoteQuery(const std::string &device, const RemoteCondition &condition,
71 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
72 {
73 if (closed_) {
74 return -E_BUSY;
75 }
76 if (!CheckParamValid(device, timeout)) {
77 return -E_INVALID_ARGS;
78 }
79 int errCode = E_OK;
80 int taskErrCode = E_OK;
81 SemaphoreUtils semaphore(0);
82 Task task;
83 task.result = std::make_shared<RelationalResultSetImpl>();
84 task.target = device;
85 task.timeout = timeout;
86 task.condition = condition;
87 task.onFinished = [&semaphore, &taskErrCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
88 taskErrCode = retCode;
89 result = taskResult;
90 semaphore.SendSemaphore();
91 };
92 task.connectionId = connectionId;
93 errCode = RemoteQueryInner(task);
94 if (errCode != E_OK) {
95 return errCode;
96 }
97 semaphore.WaitSemaphore();
98 return taskErrCode;
99 }
100
ReceiveMessage(const std::string & targetDev,Message * inMsg)101 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
102 {
103 if (inMsg == nullptr) {
104 return -E_INVALID_ARGS;
105 }
106 if (closed_) {
107 LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
108 delete inMsg;
109 inMsg = nullptr;
110 return -E_BUSY;
111 }
112 RefObject::IncObjRef(this);
113 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
114 ReceiveMessageInner(targetDev, inMsg);
115 RefObject::DecObjRef(this);
116 });
117 if (errCode != E_OK) {
118 RefObject::DecObjRef(this);
119 }
120 return errCode;
121 }
122
NotifyDeviceOffline(const std::string & device)123 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
124 {
125 if (closed_) {
126 return;
127 }
128 LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
129 std::vector<uint32_t> removeList;
130 RemoveTaskByDevice(device, removeList);
131 for (const auto &sessionId : removeList) {
132 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
133 }
134 }
135
NotifyUserChange()136 void RemoteExecutor::NotifyUserChange()
137 {
138 if (closed_) {
139 return;
140 }
141 LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
142 RemoveAllTask(-E_USER_CHANGE);
143 LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
144 }
145
Close()146 void RemoteExecutor::Close()
147 {
148 closed_ = true;
149 LOGD("[RemoteExecutor][Close] close enter");
150 RemoveAllTask(-E_BUSY);
151 ClearInnerSource();
152 {
153 std::unique_lock<std::mutex> lock(msgQueueLock_);
154 clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
155 }
156 LOGD("[RemoteExecutor][Close] close exist");
157 }
158
NotifyConnectionClosed(uint64_t connectionId)159 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
160 {
161 if (closed_) {
162 return;
163 }
164 std::vector<uint32_t> removeList;
165 RemoveTaskByConnection(connectionId, removeList);
166 for (const auto &sessionId : removeList) {
167 DoFinished(sessionId, -E_BUSY);
168 }
169 }
170
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)171 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
172 {
173 LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
174 {
175 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
176 searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
177 if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
178 // message deal in work thread, exist here
179 return -E_NOT_NEED_DELETE_MSG;
180 }
181 workingThreadsCount_++;
182 }
183 bool empty = true;
184 do {
185 std::pair<std::string, Message *> entry;
186 {
187 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
188 empty = searchMessageQueue_.empty();
189 if (empty) {
190 workingThreadsCount_--;
191 continue;
192 }
193 entry = searchMessageQueue_.front();
194 searchMessageQueue_.pop();
195 }
196 ParseOneRequestMessage(entry.first, entry.second);
197 delete entry.second;
198 entry.second = nullptr;
199 } while (!empty);
200 clearCV_.notify_one();
201 return -E_NOT_NEED_DELETE_MSG;
202 }
203
ParseOneRequestMessage(const std::string & device,Message * inMsg)204 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
205 {
206 if (closed_) {
207 LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
208 return;
209 }
210 int errCode = CheckPermissions(device, inMsg);
211 if (errCode != E_OK) {
212 (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
213 return;
214 }
215 errCode = SendRemoteExecutorData(device, inMsg);
216 if (errCode != E_OK) {
217 (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
218 }
219 }
220
CheckPermissions(const std::string & device,Message * inMsg)221 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
222 {
223 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
224 if (storage == nullptr) {
225 LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
226 return -E_BUSY;
227 }
228 // permission check
229 std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
230 std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
231 std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
232 std::string subUseId = storage->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
233 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
234 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
235 { userId, appId, storeId, device, subUseId, instanceId }, CHECK_FLAG_SEND);
236 if (errCode != E_OK) {
237 LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
238 storage->DecRefCount();
239 return errCode;
240 }
241 const auto *packet = inMsg->GetObject<ISyncPacket>();
242 if (packet == nullptr) {
243 LOGE("[RemoteExecutor] get packet object failed");
244 storage->DecRefCount();
245 return -E_INVALID_ARGS;
246 }
247 const auto *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
248 errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel(), requestPacket->GetVersion());
249 storage->DecRefCount();
250 return errCode;
251 }
252
SendRemoteExecutorData(const std::string & device,const Message * inMsg)253 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
254 {
255 auto *syncInterface = GetAndIncSyncInterface();
256 if (syncInterface == nullptr) {
257 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
258 return -E_INVALID_ARGS;
259 }
260 if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
261 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
262 syncInterface->DecRefCount();
263 return -E_NOT_SUPPORT;
264 }
265 RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
266
267 const auto *packet = inMsg->GetObject<ISyncPacket>();
268 if (packet == nullptr) {
269 LOGE("[RemoteExecutor] get packet object failed");
270 storage->DecRefCount();
271 return -E_INVALID_ARGS;
272 }
273 const RemoteExecutorRequestPacket *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
274 int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
275 storage->DecRefCount();
276 return errCode;
277 }
278
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)279 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
280 {
281 const auto *ack = inMsg->GetObject<ISyncPacket>();
282 if (ack == nullptr) {
283 return -E_INVALID_ARGS;
284 }
285 const auto *packet = static_cast<const RemoteExecutorAckPacket *>(ack);
286 int errCode = packet->GetAckCode();
287 uint32_t sessionId = inMsg->GetSessionId();
288 uint32_t sequenceId = inMsg->GetSequenceId();
289 if (!IsPacketValid(sessionId)) {
290 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
291 return -E_INVALID_ARGS;
292 }
293 if (errCode == E_OK) {
294 auto storage = GetAndIncSyncInterface();
295 auto communicator = GetAndIncCommunicator();
296 errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
297 if (storage != nullptr) {
298 storage->DecRefCount();
299 }
300 RefObject::DecObjRef(communicator);
301 }
302 if (errCode != E_OK) {
303 DoFinished(sessionId, errCode);
304 } else {
305 ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
306 }
307 return E_OK;
308 }
309
CheckParamValid(const std::string & device,uint64_t timeout) const310 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
311 {
312 if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
313 LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
314 return false;
315 }
316 if (device.empty()) {
317 LOGD("[RemoteExecutor][CheckParamValid] device is empty");
318 return false;
319 }
320 if (device.length() > DBConstant::MAX_DEV_LENGTH) {
321 LOGE("[RemoteExecutor] dev is too long len=%zu", device.length());
322 return false;
323 }
324 ICommunicator *communicator = GetAndIncCommunicator();
325 if (communicator == nullptr) {
326 return false;
327 }
328 std::string localId;
329 int errCode = communicator->GetLocalIdentity(localId);
330 RefObject::DecObjRef(communicator);
331 if (errCode != E_OK) {
332 return false;
333 }
334 if (localId == device) {
335 LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
336 return false;
337 }
338 return true;
339 }
340
CheckTaskExeStatus(const std::string & device)341 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
342 {
343 uint32_t queueCount = 0u;
344 uint32_t exeTaskCount = 0u;
345 uint32_t totalCount = 0u; // waiting task count in all queue
346 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
347 queueCount = searchTaskQueue_.at(device).size();
348 }
349 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
350 exeTaskCount = deviceWorkingSet_.at(device).size();
351 }
352 for (auto &entry : searchTaskQueue_) {
353 int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
354 int currentQueueCount = static_cast<int>(entry.second.size());
355 if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
356 // all task in this queue can execute, no need calculate as waiting task count
357 continue;
358 }
359 totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
360 static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
361 }
362 return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
363 (totalCount + 1 <= MAX_QUEUE_COUNT);
364 }
365
GenerateSessionId()366 uint32_t RemoteExecutor::GenerateSessionId()
367 {
368 uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
369 while (taskMap_.find(sessionId) != taskMap_.end()) { // LCOV_EXCL_BR_LINE
370 sessionId++;
371 if (sessionId == 0) { // if over flow start with 1
372 sessionId++;
373 }
374 }
375 lastSessionId_ = sessionId;
376 return sessionId;
377 }
378
GenerateTaskId()379 uint32_t RemoteExecutor::GenerateTaskId()
380 {
381 lastTaskId_++;
382 if (lastTaskId_ == 0) { // if over flow start with 1
383 lastTaskId_++;
384 }
385 return lastTaskId_;
386 }
387
RemoteQueryInner(const Task & task)388 int RemoteExecutor::RemoteQueryInner(const Task &task)
389 {
390 uint32_t sessionId = 0u;
391 {
392 // check task count and push into queue in lock
393 std::lock_guard<std::mutex> autoLock(taskLock_);
394 if (!CheckTaskExeStatus(task.target)) {
395 LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
396 return -E_MAX_LIMITS;
397 }
398 sessionId = GenerateSessionId();
399 searchTaskQueue_[task.target].push_back(sessionId);
400 if (taskMap_.find(sessionId) != taskMap_.end()) {
401 LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
402 return -E_INTERNAL_ERROR; // should not happen
403 }
404 taskMap_[sessionId] = task;
405 taskMap_[sessionId].taskId = GenerateTaskId();
406 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
407 taskMap_[sessionId].taskId, task.target.c_str());
408 }
409 std::string device = task.target;
410 RefObject::IncObjRef(this);
411 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
412 TryExecuteTaskInLock(device);
413 RefObject::DecObjRef(this);
414 });
415 if (errCode != E_OK) {
416 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
417 DoRollBack(sessionId);
418 RefObject::DecObjRef(this);
419 }
420 return errCode;
421 }
422
TryExecuteTaskInLock(const std::string & device)423 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
424 {
425 uint32_t sessionId = 0u;
426 {
427 std::lock_guard<std::mutex> autoLock(taskLock_);
428 if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
429 return;
430 }
431 if (searchTaskQueue_[device].empty()) {
432 LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
433 return;
434 }
435 sessionId = searchTaskQueue_[device].front();
436 if (taskMap_.find(sessionId) == taskMap_.end()) {
437 searchTaskQueue_[device].pop_front();
438 LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
439 return;
440 }
441 taskMap_[sessionId].status = Status::WORKING;
442 searchTaskQueue_[device].pop_front();
443 deviceWorkingSet_[device].insert(sessionId);
444 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
445 StartTimer(taskMap_[sessionId].timeout, sessionId);
446 }
447 int errCode = RequestStart(sessionId);
448 if (errCode != E_OK) {
449 DoFinished(sessionId, errCode);
450 }
451 }
452
DoRollBack(uint32_t sessionId)453 void RemoteExecutor::DoRollBack(uint32_t sessionId)
454 {
455 Task task;
456 std::lock_guard<std::mutex> autoLock(taskLock_);
457 if (taskMap_.find(sessionId) == taskMap_.end()) { // LCOV_EXCL_BR_LINE
458 return;
459 }
460 task = taskMap_[sessionId];
461 if (task.status != Status::WAITING) { // LCOV_EXCL_BR_LINE
462 // task is execute, abort roll back
463 return;
464 }
465 taskMap_.erase(sessionId);
466
467 auto iter = searchTaskQueue_[task.target].begin();
468 while (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
469 if ((*iter) == sessionId) { // LCOV_EXCL_BR_LINE
470 break;
471 }
472 iter++;
473 }
474 if (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
475 searchTaskQueue_[task.target].erase(iter);
476 }
477 // this task should not in workingSet
478 deviceWorkingSet_[task.target].erase(sessionId);
479 }
480
RequestStart(uint32_t sessionId)481 int RemoteExecutor::RequestStart(uint32_t sessionId)
482 {
483 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
484 if (message == nullptr) {
485 LOGE("[RemoteExecutor][RequestStart] new message error");
486 return -E_OUT_OF_MEMORY;
487 }
488 message->SetSessionId(sessionId);
489 message->SetMessageType(TYPE_REQUEST);
490 auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
491 if (packet == nullptr) {
492 LOGE("[RemoteExecutor][RequestStart] new packet error");
493 ReleaseMessageAndPacket(message, nullptr);
494 return -E_OUT_OF_MEMORY;
495 }
496 std::string target;
497 int errCode = FillRequestPacket(packet, sessionId, target);
498 if (errCode != E_OK) {
499 ReleaseMessageAndPacket(message, packet);
500 return errCode;
501 }
502 auto exObj = static_cast<ISyncPacket *>(packet);
503 errCode = message->SetExternalObject(exObj);
504 if (errCode != E_OK) {
505 ReleaseMessageAndPacket(message, packet);
506 LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
507 return errCode;
508 }
509 return SendRequestMessage(target, message, sessionId);
510 }
511
SendRequestMessage(const std::string & target,Message * message,uint32_t sessionId)512 int RemoteExecutor::SendRequestMessage(const std::string &target, Message *message, uint32_t sessionId)
513 {
514 auto communicator = GetAndIncCommunicator();
515 auto syncInterface = GetAndIncSyncInterface();
516 if (communicator == nullptr || syncInterface == nullptr) {
517 ReleaseMessageAndPacket(message, nullptr);
518 if (syncInterface != nullptr) {
519 syncInterface->DecRefCount();
520 }
521 RefObject::DecObjRef(communicator);
522 return -E_BUSY;
523 }
524 SendConfig sendConfig;
525 SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
526 RefObject::IncObjRef(this);
527 int errCode = communicator->SendMessage(target, message, sendConfig,
528 [this, sessionId](int errCode, bool isDirectEnd) {
529 if (errCode != E_OK) {
530 DoSendFailed(sessionId, errCode);
531 }
532 RefObject::DecObjRef(this);
533 });
534 if (errCode != E_OK) {
535 ReleaseMessageAndPacket(message, nullptr);
536 RefObject::DecObjRef(this);
537 }
538 RefObject::DecObjRef(communicator);
539 syncInterface->DecRefCount();
540 return errCode;
541 }
542
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)543 int RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
544 const std::string &device)
545 {
546 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
547 if (packet == nullptr) {
548 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
549 return -E_OUT_OF_MEMORY;
550 }
551 packet->SetAckCode(errCode);
552 packet->SetLastAck();
553 return ResponseStart(packet, sessionId, sequenceId, device);
554 }
555
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)556 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
557 const std::string &device)
558 {
559 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
560 if (packet == nullptr) {
561 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
562 return -E_OUT_OF_MEMORY;
563 }
564 packet->SetAckCode(E_OK);
565 if (sendMessage.isLast) {
566 packet->SetLastAck();
567 }
568 packet->SetSecurityOption(sendMessage.option);
569 packet->MoveInRowDataSet(std::move(dataSet));
570 return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
571 }
572
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)573 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
574 const std::string &device)
575 {
576 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
577 if (storage == nullptr) {
578 ReleaseMessageAndPacket(nullptr, packet);
579 LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
580 return -E_BUSY;
581 }
582 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
583 if (message == nullptr) {
584 LOGE("[RemoteExecutor][ResponseStart] new message error");
585 storage->DecRefCount();
586 ReleaseMessageAndPacket(nullptr, packet);
587 return -E_OUT_OF_MEMORY;
588 }
589 packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
590 auto exObj = static_cast<ISyncPacket *>(packet);
591 int errCode = message->SetExternalObject(exObj);
592 if (errCode != E_OK) {
593 ReleaseMessageAndPacket(message, packet);
594 storage->DecRefCount();
595 LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
596 return errCode;
597 }
598 auto *communicator = GetAndIncCommunicator();
599 if (communicator == nullptr) {
600 ReleaseMessageAndPacket(message, nullptr);
601 storage->DecRefCount();
602 LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
603 return -E_BUSY;
604 }
605
606 message->SetTarget(device);
607 message->SetSessionId(sessionId);
608 message->SetSequenceId(sequenceId);
609 message->SetMessageType(TYPE_RESPONSE);
610 SendConfig sendConfig;
611 SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
612 errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
613 RefObject::DecObjRef(communicator);
614 if (errCode != E_OK) {
615 ReleaseMessageAndPacket(message, nullptr);
616 LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
617 }
618 storage->DecRefCount();
619 return errCode;
620 }
621
StartTimer(uint64_t timeout,uint32_t sessionId)622 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
623 {
624 TimerId timerId = 0u;
625 RefObject::IncObjRef(this);
626 TimerAction timeoutCallBack = [this](TimerId timerId) { return TimeoutCallBack(timerId); };
627 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
628 RefObject::DecObjRef(this);
629 }, timerId);
630 if (errCode != E_OK) {
631 RefObject::DecObjRef(this);
632 LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
633 }
634 LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
635 std::lock_guard<std::mutex> autoLock(timeoutLock_);
636 timeoutMap_[timerId] = sessionId;
637 taskFinishMap_[sessionId] = timerId;
638 }
639
RemoveTimer(uint32_t sessionId)640 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
641 {
642 TimerId timerId = 0u;
643 {
644 std::lock_guard<std::mutex> autoLock(timeoutLock_);
645 if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
646 return;
647 }
648 timerId = taskFinishMap_[sessionId];
649 LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
650 timeoutMap_.erase(timerId);
651 taskFinishMap_.erase(sessionId);
652 }
653 RuntimeContext::GetInstance()->RemoveTimer(timerId);
654 }
655
TimeoutCallBack(TimerId timerId)656 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
657 {
658 RefObject::IncObjRef(this);
659 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
660 DoTimeout(timerId);
661 RefObject::DecObjRef(this);
662 });
663 if (errCode != E_OK) {
664 LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
665 RefObject::DecObjRef(this);
666 }
667 return -E_NO_NEED_TIMER;
668 }
669
DoTimeout(TimerId timerId)670 void RemoteExecutor::DoTimeout(TimerId timerId)
671 {
672 LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
673 uint32_t sessionId = 0u;
674 {
675 std::lock_guard<std::mutex> autoLock(timeoutLock_);
676 if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
677 return;
678 }
679 sessionId = timeoutMap_[timerId];
680 }
681 DoFinished(sessionId, -E_TIMEOUT);
682 }
683
DoSendFailed(uint32_t sessionId,int errCode)684 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
685 {
686 LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
687 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
688 }
689
DoFinished(uint32_t sessionId,int errCode)690 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
691 {
692 Task task;
693 if (ClearTaskInfo(sessionId, task) == E_OK) {
694 LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
695 } else {
696 return;
697 }
698 RefObject::IncObjRef(this);
699 if (task.onFinished != nullptr) {
700 task.onFinished(errCode, task.result);
701 LOGD("[RemoteExecutor][DoFinished] onFinished");
702 }
703 std::string device = task.target;
704 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
705 TryExecuteTaskInLock(device);
706 RefObject::DecObjRef(this);
707 });
708 if (retCode != E_OK) {
709 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
710 RefObject::DecObjRef(this);
711 }
712 }
713
ClearTaskInfo(uint32_t sessionId,Task & task)714 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
715 {
716 {
717 std::lock_guard<std::mutex> autoLock(taskLock_);
718 if (taskMap_.find(sessionId) == taskMap_.end()) {
719 return -E_NOT_FOUND;
720 }
721 task = taskMap_[sessionId];
722 taskMap_.erase(sessionId);
723 deviceWorkingSet_[task.target].erase(sessionId);
724 }
725 RemoveTimer(sessionId);
726 return E_OK;
727 }
728
ClearInnerSource()729 void RemoteExecutor::ClearInnerSource()
730 {
731 {
732 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
733 syncInterface_ = nullptr;
734 communicator_ = nullptr;
735 }
736 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
737 LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
738 while (!searchMessageQueue_.empty()) {
739 auto entry = searchMessageQueue_.front();
740 searchMessageQueue_.pop();
741 delete entry.second;
742 entry.second = nullptr;
743 }
744 }
745
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)746 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
747 {
748 ISyncInterface *storage = GetAndIncSyncInterface();
749 if (storage == nullptr) {
750 return -E_BUSY;
751 }
752 SecurityOption localOption;
753 int errCode = storage->GetSecurityOption(localOption);
754 storage->DecRefCount();
755 storage = nullptr;
756 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
757 return -E_SECURITY_OPTION_CHECK_ERROR;
758 }
759 if (errCode == E_OK && localOption.securityLabel == NOT_SET) {
760 LOGE("[AbilitySync] Local not set security option");
761 return -E_SECURITY_OPTION_CHECK_ERROR;
762 }
763 Task task;
764 {
765 std::lock_guard<std::mutex> autoLock(taskLock_);
766 if (taskMap_.find(sessionId) == taskMap_.end()) {
767 LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
768 return -E_FINISHED;
769 }
770 task = taskMap_[sessionId];
771 }
772 packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
773 packet->SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
774 packet->SetSql(task.condition.sql);
775 packet->SetBindArgs(task.condition.bindArgs);
776 packet->SetNeedResponse();
777 packet->SetSecLabel(errCode == -E_NOT_SUPPORT ? NOT_SUPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
778 target = task.target;
779 return E_OK;
780 }
781
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)782 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
783 {
784 int errCode = E_OK;
785 if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
786 DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
787 delete inMsg;
788 inMsg = nullptr;
789 return;
790 }
791 switch (inMsg->GetMessageType()) {
792 case TYPE_REQUEST:
793 errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
794 break;
795 case TYPE_RESPONSE:
796 errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
797 break;
798 default:
799 LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
800 break;
801 }
802 if (errCode != -E_NOT_NEED_DELETE_MSG) {
803 delete inMsg;
804 inMsg = nullptr;
805 }
806 }
807
IsPacketValid(uint32_t sessionId)808 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
809 {
810 std::lock_guard<std::mutex> autoLock(taskLock_);
811 return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
812 }
813
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)814 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
815 const RemoteExecutorAckPacket *packet)
816 {
817 bool isReceiveFinished = false;
818 {
819 std::lock_guard<std::mutex> autoLock(taskLock_);
820 if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
821 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
822 return;
823 }
824 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
825 taskMap_[sessionId].taskId, sequenceId);
826 taskMap_[sessionId].currentCount++;
827 if (packet->IsLastAck()) {
828 taskMap_[sessionId].targetCount = sequenceId;
829 }
830 taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
831 if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
832 isReceiveFinished = true;
833 }
834 }
835 if (isReceiveFinished) {
836 DoFinished(sessionId, E_OK);
837 }
838 }
839
GetAndIncCommunicator() const840 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
841 {
842 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
843 ICommunicator *communicator = communicator_;
844 RefObject::IncObjRef(communicator);
845 return communicator;
846 }
847
GetAndIncSyncInterface() const848 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
849 {
850 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
851 ISyncInterface *syncInterface = syncInterface_;
852 if (syncInterface != nullptr) {
853 syncInterface->IncRefCount();
854 }
855 return syncInterface;
856 }
857
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)858 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
859 {
860 std::lock_guard<std::mutex> autoLock(taskLock_);
861 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
862 for (auto &sessionId : deviceWorkingSet_[device]) {
863 removeList.push_back(sessionId);
864 }
865 }
866 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
867 for (auto &sessionId : searchTaskQueue_[device]) {
868 removeList.push_back(sessionId);
869 }
870 }
871 }
872
RemoveAllTask(int errCode)873 void RemoteExecutor::RemoveAllTask(int errCode)
874 {
875 std::vector<OnFinished> waitToNotify;
876 std::vector<uint32_t> removeTimerList;
877 {
878 std::lock_guard<std::mutex> autoLock(taskLock_);
879 for (auto &taskEntry : taskMap_) {
880 if (taskEntry.second.onFinished != nullptr) {
881 waitToNotify.push_back(taskEntry.second.onFinished);
882 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
883 taskEntry.second.taskId, errCode);
884 }
885 if (taskEntry.second.status == Status::WORKING) {
886 removeTimerList.push_back(taskEntry.first);
887 }
888 }
889 taskMap_.clear();
890 deviceWorkingSet_.clear();
891 searchTaskQueue_.clear();
892 }
893 for (const auto &callBack : waitToNotify) {
894 callBack(errCode, nullptr);
895 }
896 for (const auto &sessionId : removeTimerList) {
897 RemoveTimer(sessionId);
898 }
899 std::lock_guard<std::mutex> autoLock(timeoutLock_);
900 timeoutMap_.clear();
901 taskFinishMap_.clear();
902 }
903
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)904 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
905 {
906 std::lock_guard<std::mutex> autoLock(taskLock_);
907 for (auto &entry : taskMap_) {
908 if (entry.second.connectionId == connectionId) {
909 removeList.push_back(entry.first);
910 }
911 }
912 }
913
GetPacketSize(const std::string & device,size_t & packetSize) const914 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize) const
915 {
916 auto *communicator = GetAndIncCommunicator();
917 if (communicator == nullptr) {
918 LOGD("communicator is nullptr");
919 return -E_BUSY;
920 }
921
922 packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
923 RefObject::DecObjRef(communicator);
924 return E_OK;
925 }
926
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)927 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
928 const SecurityOption &localOption)
929 {
930 bool res = false;
931 if (remoteOption.securityLabel == localOption.securityLabel ||
932 (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
933 localOption.securityLabel == SecurityLabel::NOT_SET)) {
934 res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
935 }
936 if (!res) {
937 LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
938 remoteOption.securityLabel, remoteOption.securityFlag,
939 localOption.securityLabel, localOption.securityFlag);
940 }
941 return res;
942 }
943
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)944 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
945 const std::string &device, uint32_t sessionId)
946 {
947 size_t packetSize = 0u;
948 int errCode = GetPacketSize(device, packetSize);
949 if (errCode != E_OK) {
950 return errCode;
951 }
952 SecurityOption option;
953 errCode = storage->GetSecurityOption(option);
954 if (errCode == -E_NOT_SUPPORT) {
955 option.securityLabel = NOT_SUPPORT_SEC_CLASSIFICATION;
956 errCode = E_OK;
957 }
958 if (errCode != E_OK) {
959 LOGD("GetSecurityOption errCode:%d", errCode);
960 return -E_SECURITY_OPTION_CHECK_ERROR;
961 }
962 ContinueToken token = nullptr;
963 uint32_t sequenceId = 1u;
964 do {
965 RelationalRowDataSet dataSet;
966 errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
967 if (errCode != E_OK) {
968 LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
969 break;
970 }
971 SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
972 errCode = ResponseData(std::move(dataSet), sendMessage, device);
973 if (errCode != E_OK) {
974 break;
975 }
976 sequenceId++;
977 } while (token != nullptr);
978 if (token != nullptr) {
979 storage->ReleaseRemoteQueryContinueToken(token);
980 }
981 return errCode;
982 }
983
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)984 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
985 const SecurityOption &remoteOption)
986 {
987 if (storage == nullptr || communicator == nullptr) {
988 return -E_BUSY;
989 }
990 if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
991 return -E_NOT_SUPPORT;
992 }
993 std::string device;
994 communicator->GetLocalIdentity(device);
995 SecurityOption localOption;
996 int errCode = storage->GetSecurityOption(localOption);
997 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
998 return -E_SECURITY_OPTION_CHECK_ERROR;
999 }
1000 if (remoteOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
1001 return E_OK;
1002 }
1003 if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
1004 errCode = -E_SECURITY_OPTION_CHECK_ERROR;
1005 } else {
1006 errCode = E_OK;
1007 }
1008 return errCode;
1009 }
1010
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel,uint32_t remoteVersion)1011 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
1012 int32_t remoteSecLabel, uint32_t remoteVersion)
1013 {
1014 SecurityOption localOption;
1015 int errCode = storage->GetSecurityOption(localOption);
1016 LOGI("[RemoteExecutor] remote label:%d local l:%d, f:%d, errCode:%d, remote ver %" PRIu32, remoteSecLabel,
1017 localOption.securityLabel, localOption.securityFlag, errCode, remoteVersion);
1018 if (remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION && errCode == -E_NOT_SUPPORT) {
1019 return E_OK;
1020 }
1021 if (errCode != -E_NOT_SUPPORT && localOption.securityLabel == SecurityLabel::NOT_SET) {
1022 LOGE("[RemoteExecutor] local security label not set!");
1023 return -E_SECURITY_OPTION_CHECK_ERROR;
1024 }
1025 if (remoteVersion >= RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_V4 && remoteSecLabel == NOT_SET) {
1026 LOGE("[RemoteExecutor] remote security label not set!");
1027 return -E_SECURITY_OPTION_CHECK_ERROR;
1028 }
1029 if (errCode == -E_NOT_SUPPORT) {
1030 return E_OK;
1031 }
1032 if (errCode != E_OK) {
1033 return -E_SECURITY_OPTION_CHECK_ERROR;
1034 }
1035 if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
1036 return E_OK;
1037 }
1038 if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1039 return E_OK;
1040 }
1041 return -E_SECURITY_OPTION_CHECK_ERROR;
1042 }
1043 }