1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "remote_executor.h"
17
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27
28 namespace DistributedDB {
29 namespace {
30 constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31 constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32 constexpr uint32_t MAX_QUEUE_COUNT = 10;
33 constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35 void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36 {
37 delete message;
38 message = nullptr;
39 delete packet;
40 packet = nullptr;
41 }
42 }
43
RemoteExecutor()44 RemoteExecutor::RemoteExecutor()
45 : workingThreadsCount_(0),
46 syncInterface_(nullptr),
47 communicator_(nullptr),
48 lastSessionId_(0),
49 lastTaskId_(0),
50 closed_(false)
51 {
52 }
53
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)54 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
55 {
56 if (syncInterface == nullptr || communicator == nullptr) {
57 return -E_INVALID_ARGS;
58 }
59 closed_ = false;
60 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
61 syncInterface_ = syncInterface;
62 communicator_ = communicator;
63 return E_OK;
64 }
65
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)66 int RemoteExecutor::RemoteQuery(const std::string &device, const RemoteCondition &condition,
67 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
68 {
69 if (closed_) {
70 return -E_BUSY;
71 }
72 if (!CheckParamValid(device, timeout)) {
73 return -E_INVALID_ARGS;
74 }
75 int errCode = E_OK;
76 int taskErrCode = E_OK;
77 SemaphoreUtils semaphore(0);
78 Task task;
79 task.result = std::make_shared<RelationalResultSetImpl>();
80 task.target = device;
81 task.timeout = timeout;
82 task.condition = condition;
83 task.onFinished = [&semaphore, &taskErrCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
84 taskErrCode = retCode;
85 result = taskResult;
86 semaphore.SendSemaphore();
87 };
88 task.connectionId = connectionId;
89 errCode = RemoteQueryInner(task);
90 if (errCode != E_OK) {
91 return errCode;
92 }
93 semaphore.WaitSemaphore();
94 return taskErrCode;
95 }
96
ReceiveMessage(const std::string & targetDev,Message * inMsg)97 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
98 {
99 if (inMsg == nullptr) {
100 return -E_INVALID_ARGS;
101 }
102 if (closed_) {
103 LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
104 delete inMsg;
105 inMsg = nullptr;
106 return -E_BUSY;
107 }
108 RefObject::IncObjRef(this);
109 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
110 ReceiveMessageInner(targetDev, inMsg);
111 RefObject::DecObjRef(this);
112 });
113 if (errCode != E_OK) {
114 RefObject::DecObjRef(this);
115 }
116 return errCode;
117 }
118
NotifyDeviceOffline(const std::string & device)119 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
120 {
121 if (closed_) {
122 return;
123 }
124 LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
125 std::vector<uint32_t> removeList;
126 RemoveTaskByDevice(device, removeList);
127 for (const auto &sessionId : removeList) {
128 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
129 }
130 }
131
NotifyUserChange()132 void RemoteExecutor::NotifyUserChange()
133 {
134 if (closed_) {
135 return;
136 }
137 LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
138 RemoveAllTask(-E_USER_CHANGE);
139 LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
140 }
141
Close()142 void RemoteExecutor::Close()
143 {
144 closed_ = true;
145 LOGD("[RemoteExecutor][Close] close enter");
146 RemoveAllTask(-E_BUSY);
147 ClearInnerSource();
148 {
149 std::unique_lock<std::mutex> lock(msgQueueLock_);
150 clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
151 }
152 LOGD("[RemoteExecutor][Close] close exist");
153 }
154
NotifyConnectionClosed(uint64_t connectionId)155 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
156 {
157 if (closed_) {
158 return;
159 }
160 std::vector<uint32_t> removeList;
161 RemoveTaskByConnection(connectionId, removeList);
162 for (const auto &sessionId : removeList) {
163 DoFinished(sessionId, -E_BUSY);
164 }
165 }
166
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)167 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
168 {
169 LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
170 {
171 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
172 searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
173 if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
174 // message deal in work thread, exist here
175 return -E_NOT_NEED_DELETE_MSG;
176 }
177 workingThreadsCount_++;
178 }
179 bool empty = true;
180 do {
181 std::pair<std::string, Message *> entry;
182 {
183 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
184 empty = searchMessageQueue_.empty();
185 if (empty) {
186 workingThreadsCount_--;
187 continue;
188 }
189 entry = searchMessageQueue_.front();
190 searchMessageQueue_.pop();
191 }
192 ParseOneRequestMessage(entry.first, entry.second);
193 delete entry.second;
194 entry.second = nullptr;
195 } while (!empty);
196 clearCV_.notify_one();
197 return -E_NOT_NEED_DELETE_MSG;
198 }
199
ParseOneRequestMessage(const std::string & device,Message * inMsg)200 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
201 {
202 if (closed_) {
203 LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
204 return;
205 }
206 int errCode = CheckPermissions(device, inMsg);
207 if (errCode != E_OK) {
208 (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
209 return;
210 }
211 errCode = SendRemoteExecutorData(device, inMsg);
212 if (errCode != E_OK) {
213 (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
214 }
215 }
216
CheckPermissions(const std::string & device,Message * inMsg)217 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
218 {
219 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
220 if (storage == nullptr) {
221 LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
222 return -E_BUSY;
223 }
224 // permission check
225 std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
226 std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
227 std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
228 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
229 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
230 { userId, appId, storeId, device, instanceId }, CHECK_FLAG_SEND);
231 if (errCode != E_OK) {
232 LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
233 storage->DecRefCount();
234 return errCode;
235 }
236 const auto *packet = inMsg->GetObject<ISyncPacket>();
237 if (packet == nullptr) {
238 LOGE("[RemoteExecutor] get packet object failed");
239 storage->DecRefCount();
240 return -E_INVALID_ARGS;
241 }
242 const auto *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
243 errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel(), requestPacket->GetVersion());
244 storage->DecRefCount();
245 return errCode;
246 }
247
SendRemoteExecutorData(const std::string & device,const Message * inMsg)248 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
249 {
250 auto *syncInterface = GetAndIncSyncInterface();
251 if (syncInterface == nullptr) {
252 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
253 return -E_INVALID_ARGS;
254 }
255 if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
256 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
257 syncInterface->DecRefCount();
258 return -E_NOT_SUPPORT;
259 }
260 RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
261
262 const auto *packet = inMsg->GetObject<ISyncPacket>();
263 if (packet == nullptr) {
264 LOGE("[RemoteExecutor] get packet object failed");
265 storage->DecRefCount();
266 return -E_INVALID_ARGS;
267 }
268 const RemoteExecutorRequestPacket *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
269 int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
270 storage->DecRefCount();
271 return errCode;
272 }
273
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)274 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
275 {
276 const auto *ack = inMsg->GetObject<ISyncPacket>();
277 if (ack == nullptr) {
278 return -E_INVALID_ARGS;
279 }
280 const auto *packet = static_cast<const RemoteExecutorAckPacket *>(ack);
281 int errCode = packet->GetAckCode();
282 uint32_t sessionId = inMsg->GetSessionId();
283 uint32_t sequenceId = inMsg->GetSequenceId();
284 if (!IsPacketValid(sessionId)) {
285 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
286 return -E_INVALID_ARGS;
287 }
288 if (errCode == E_OK) {
289 auto storage = GetAndIncSyncInterface();
290 auto communicator = GetAndIncCommunicator();
291 errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
292 if (storage != nullptr) {
293 storage->DecRefCount();
294 }
295 RefObject::DecObjRef(communicator);
296 }
297 if (errCode != E_OK) {
298 DoFinished(sessionId, errCode);
299 } else {
300 ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
301 }
302 return E_OK;
303 }
304
CheckParamValid(const std::string & device,uint64_t timeout) const305 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
306 {
307 if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
308 LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
309 return false;
310 }
311 if (device.empty()) {
312 LOGD("[RemoteExecutor][CheckParamValid] device is empty");
313 return false;
314 }
315 if (device.length() > DBConstant::MAX_DEV_LENGTH) {
316 LOGE("[RemoteExecutor] dev is too long len=%zu", device.length());
317 return false;
318 }
319 ICommunicator *communicator = GetAndIncCommunicator();
320 if (communicator == nullptr) {
321 return false;
322 }
323 std::string localId;
324 int errCode = communicator->GetLocalIdentity(localId);
325 RefObject::DecObjRef(communicator);
326 if (errCode != E_OK) {
327 return false;
328 }
329 if (localId == device) {
330 LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
331 return false;
332 }
333 return true;
334 }
335
CheckTaskExeStatus(const std::string & device)336 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
337 {
338 uint32_t queueCount = 0u;
339 uint32_t exeTaskCount = 0u;
340 uint32_t totalCount = 0u; // waiting task count in all queue
341 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
342 queueCount = searchTaskQueue_.at(device).size();
343 }
344 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
345 exeTaskCount = deviceWorkingSet_.at(device).size();
346 }
347 for (auto &entry : searchTaskQueue_) {
348 int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
349 int currentQueueCount = static_cast<int>(entry.second.size());
350 if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
351 // all task in this queue can execute, no need calculate as waiting task count
352 continue;
353 }
354 totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
355 static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
356 }
357 return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
358 (totalCount + 1 <= MAX_QUEUE_COUNT);
359 }
360
GenerateSessionId()361 uint32_t RemoteExecutor::GenerateSessionId()
362 {
363 uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
364 while (taskMap_.find(sessionId) != taskMap_.end()) { // LCOV_EXCL_BR_LINE
365 sessionId++;
366 if (sessionId == 0) { // if over flow start with 1
367 sessionId++;
368 }
369 }
370 lastSessionId_ = sessionId;
371 return sessionId;
372 }
373
GenerateTaskId()374 uint32_t RemoteExecutor::GenerateTaskId()
375 {
376 lastTaskId_++;
377 if (lastTaskId_ == 0) { // if over flow start with 1
378 lastTaskId_++;
379 }
380 return lastTaskId_;
381 }
382
RemoteQueryInner(const Task & task)383 int RemoteExecutor::RemoteQueryInner(const Task &task)
384 {
385 uint32_t sessionId = 0u;
386 {
387 // check task count and push into queue in lock
388 std::lock_guard<std::mutex> autoLock(taskLock_);
389 if (!CheckTaskExeStatus(task.target)) {
390 LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
391 return -E_MAX_LIMITS;
392 }
393 sessionId = GenerateSessionId();
394 searchTaskQueue_[task.target].push_back(sessionId);
395 if (taskMap_.find(sessionId) != taskMap_.end()) {
396 LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
397 return -E_INTERNAL_ERROR; // should not happen
398 }
399 taskMap_[sessionId] = task;
400 taskMap_[sessionId].taskId = GenerateTaskId();
401 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
402 taskMap_[sessionId].taskId, task.target.c_str());
403 }
404 std::string device = task.target;
405 RefObject::IncObjRef(this);
406 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
407 TryExecuteTaskInLock(device);
408 RefObject::DecObjRef(this);
409 });
410 if (errCode != E_OK) {
411 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
412 DoRollBack(sessionId);
413 RefObject::DecObjRef(this);
414 }
415 return errCode;
416 }
417
TryExecuteTaskInLock(const std::string & device)418 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
419 {
420 uint32_t sessionId = 0u;
421 {
422 std::lock_guard<std::mutex> autoLock(taskLock_);
423 if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
424 return;
425 }
426 if (searchTaskQueue_[device].empty()) {
427 LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
428 return;
429 }
430 sessionId = searchTaskQueue_[device].front();
431 if (taskMap_.find(sessionId) == taskMap_.end()) {
432 searchTaskQueue_[device].pop_front();
433 LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
434 return;
435 }
436 taskMap_[sessionId].status = Status::WORKING;
437 searchTaskQueue_[device].pop_front();
438 deviceWorkingSet_[device].insert(sessionId);
439 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
440 StartTimer(taskMap_[sessionId].timeout, sessionId);
441 }
442 int errCode = RequestStart(sessionId);
443 if (errCode != E_OK) {
444 DoFinished(sessionId, errCode);
445 }
446 }
447
DoRollBack(uint32_t sessionId)448 void RemoteExecutor::DoRollBack(uint32_t sessionId)
449 {
450 Task task;
451 std::lock_guard<std::mutex> autoLock(taskLock_);
452 if (taskMap_.find(sessionId) == taskMap_.end()) { // LCOV_EXCL_BR_LINE
453 return;
454 }
455 task = taskMap_[sessionId];
456 if (task.status != Status::WAITING) { // LCOV_EXCL_BR_LINE
457 // task is execute, abort roll back
458 return;
459 }
460 taskMap_.erase(sessionId);
461
462 auto iter = searchTaskQueue_[task.target].begin();
463 while (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
464 if ((*iter) == sessionId) { // LCOV_EXCL_BR_LINE
465 break;
466 }
467 iter++;
468 }
469 if (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
470 searchTaskQueue_[task.target].erase(iter);
471 }
472 // this task should not in workingSet
473 deviceWorkingSet_[task.target].erase(sessionId);
474 }
475
RequestStart(uint32_t sessionId)476 int RemoteExecutor::RequestStart(uint32_t sessionId)
477 {
478 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
479 if (message == nullptr) {
480 LOGE("[RemoteExecutor][RequestStart] new message error");
481 return -E_OUT_OF_MEMORY;
482 }
483 message->SetSessionId(sessionId);
484 message->SetMessageType(TYPE_REQUEST);
485 auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
486 if (packet == nullptr) {
487 LOGE("[RemoteExecutor][RequestStart] new packet error");
488 ReleaseMessageAndPacket(message, nullptr);
489 return -E_OUT_OF_MEMORY;
490 }
491 std::string target;
492 int errCode = FillRequestPacket(packet, sessionId, target);
493 if (errCode != E_OK) {
494 ReleaseMessageAndPacket(message, packet);
495 return errCode;
496 }
497 auto exObj = static_cast<ISyncPacket *>(packet);
498 errCode = message->SetExternalObject(exObj);
499 if (errCode != E_OK) {
500 ReleaseMessageAndPacket(message, packet);
501 LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
502 return errCode;
503 }
504 return SendRequestMessage(target, message, sessionId);
505 }
506
SendRequestMessage(const std::string & target,Message * message,uint32_t sessionId)507 int RemoteExecutor::SendRequestMessage(const std::string &target, Message *message, uint32_t sessionId)
508 {
509 auto communicator = GetAndIncCommunicator();
510 auto syncInterface = GetAndIncSyncInterface();
511 if (communicator == nullptr || syncInterface == nullptr) {
512 ReleaseMessageAndPacket(message, nullptr);
513 if (syncInterface != nullptr) {
514 syncInterface->DecRefCount();
515 }
516 RefObject::DecObjRef(communicator);
517 return -E_BUSY;
518 }
519 SendConfig sendConfig;
520 SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
521 RefObject::IncObjRef(this);
522 int errCode = communicator->SendMessage(target, message, sendConfig, [this, sessionId](int errCode) {
523 if (errCode != E_OK) {
524 DoSendFailed(sessionId, errCode);
525 }
526 RefObject::DecObjRef(this);
527 });
528 if (errCode != E_OK) {
529 ReleaseMessageAndPacket(message, nullptr);
530 RefObject::DecObjRef(this);
531 }
532 RefObject::DecObjRef(communicator);
533 syncInterface->DecRefCount();
534 return errCode;
535 }
536
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)537 int RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
538 const std::string &device)
539 {
540 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
541 if (packet == nullptr) {
542 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
543 return -E_OUT_OF_MEMORY;
544 }
545 packet->SetAckCode(errCode);
546 packet->SetLastAck();
547 return ResponseStart(packet, sessionId, sequenceId, device);
548 }
549
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)550 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
551 const std::string &device)
552 {
553 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
554 if (packet == nullptr) {
555 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
556 return -E_OUT_OF_MEMORY;
557 }
558 packet->SetAckCode(E_OK);
559 if (sendMessage.isLast) {
560 packet->SetLastAck();
561 }
562 packet->SetSecurityOption(sendMessage.option);
563 packet->MoveInRowDataSet(std::move(dataSet));
564 return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
565 }
566
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)567 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
568 const std::string &device)
569 {
570 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
571 if (storage == nullptr) {
572 ReleaseMessageAndPacket(nullptr, packet);
573 LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
574 return -E_BUSY;
575 }
576 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
577 if (message == nullptr) {
578 LOGE("[RemoteExecutor][ResponseStart] new message error");
579 storage->DecRefCount();
580 ReleaseMessageAndPacket(nullptr, packet);
581 return -E_OUT_OF_MEMORY;
582 }
583 packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
584 auto exObj = static_cast<ISyncPacket *>(packet);
585 int errCode = message->SetExternalObject(exObj);
586 if (errCode != E_OK) {
587 ReleaseMessageAndPacket(message, packet);
588 storage->DecRefCount();
589 LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
590 return errCode;
591 }
592 auto *communicator = GetAndIncCommunicator();
593 if (communicator == nullptr) {
594 ReleaseMessageAndPacket(message, nullptr);
595 storage->DecRefCount();
596 LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
597 return -E_BUSY;
598 }
599
600 message->SetTarget(device);
601 message->SetSessionId(sessionId);
602 message->SetSequenceId(sequenceId);
603 message->SetMessageType(TYPE_RESPONSE);
604 SendConfig sendConfig;
605 SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
606 errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
607 RefObject::DecObjRef(communicator);
608 if (errCode != E_OK) {
609 ReleaseMessageAndPacket(message, nullptr);
610 LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
611 }
612 storage->DecRefCount();
613 return errCode;
614 }
615
StartTimer(uint64_t timeout,uint32_t sessionId)616 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
617 {
618 TimerId timerId = 0u;
619 RefObject::IncObjRef(this);
620 TimerAction timeoutCallBack = [this](TimerId timerId) { return TimeoutCallBack(timerId); };
621 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
622 RefObject::DecObjRef(this);
623 }, timerId);
624 if (errCode != E_OK) {
625 RefObject::DecObjRef(this);
626 LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
627 }
628 LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
629 std::lock_guard<std::mutex> autoLock(timeoutLock_);
630 timeoutMap_[timerId] = sessionId;
631 taskFinishMap_[sessionId] = timerId;
632 }
633
RemoveTimer(uint32_t sessionId)634 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
635 {
636 TimerId timerId = 0u;
637 {
638 std::lock_guard<std::mutex> autoLock(timeoutLock_);
639 if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
640 return;
641 }
642 timerId = taskFinishMap_[sessionId];
643 LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
644 timeoutMap_.erase(timerId);
645 taskFinishMap_.erase(sessionId);
646 }
647 RuntimeContext::GetInstance()->RemoveTimer(timerId);
648 }
649
TimeoutCallBack(TimerId timerId)650 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
651 {
652 RefObject::IncObjRef(this);
653 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
654 DoTimeout(timerId);
655 RefObject::DecObjRef(this);
656 });
657 if (errCode != E_OK) {
658 LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
659 RefObject::DecObjRef(this);
660 }
661 return -E_NO_NEED_TIMER;
662 }
663
DoTimeout(TimerId timerId)664 void RemoteExecutor::DoTimeout(TimerId timerId)
665 {
666 LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
667 uint32_t sessionId = 0u;
668 {
669 std::lock_guard<std::mutex> autoLock(timeoutLock_);
670 if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
671 return;
672 }
673 sessionId = timeoutMap_[timerId];
674 }
675 DoFinished(sessionId, -E_TIMEOUT);
676 }
677
DoSendFailed(uint32_t sessionId,int errCode)678 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
679 {
680 LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
681 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
682 }
683
DoFinished(uint32_t sessionId,int errCode)684 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
685 {
686 Task task;
687 if (ClearTaskInfo(sessionId, task) == E_OK) {
688 LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
689 } else {
690 return;
691 }
692 RefObject::IncObjRef(this);
693 if (task.onFinished != nullptr) {
694 task.onFinished(errCode, task.result);
695 LOGD("[RemoteExecutor][DoFinished] onFinished");
696 }
697 std::string device = task.target;
698 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
699 TryExecuteTaskInLock(device);
700 RefObject::DecObjRef(this);
701 });
702 if (retCode != E_OK) {
703 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
704 RefObject::DecObjRef(this);
705 }
706 }
707
ClearTaskInfo(uint32_t sessionId,Task & task)708 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
709 {
710 {
711 std::lock_guard<std::mutex> autoLock(taskLock_);
712 if (taskMap_.find(sessionId) == taskMap_.end()) {
713 return -E_NOT_FOUND;
714 }
715 task = taskMap_[sessionId];
716 taskMap_.erase(sessionId);
717 deviceWorkingSet_[task.target].erase(sessionId);
718 }
719 RemoveTimer(sessionId);
720 return E_OK;
721 }
722
ClearInnerSource()723 void RemoteExecutor::ClearInnerSource()
724 {
725 {
726 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
727 syncInterface_ = nullptr;
728 communicator_ = nullptr;
729 }
730 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
731 LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
732 while (!searchMessageQueue_.empty()) {
733 auto entry = searchMessageQueue_.front();
734 searchMessageQueue_.pop();
735 delete entry.second;
736 entry.second = nullptr;
737 }
738 }
739
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)740 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
741 {
742 ISyncInterface *storage = GetAndIncSyncInterface();
743 if (storage == nullptr) {
744 return -E_BUSY;
745 }
746 SecurityOption localOption;
747 int errCode = storage->GetSecurityOption(localOption);
748 storage->DecRefCount();
749 storage = nullptr;
750 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
751 return -E_SECURITY_OPTION_CHECK_ERROR;
752 }
753 if (errCode == E_OK && localOption.securityLabel == NOT_SET) {
754 LOGE("[AbilitySync] Local not set security option");
755 return -E_SECURITY_OPTION_CHECK_ERROR;
756 }
757 Task task;
758 {
759 std::lock_guard<std::mutex> autoLock(taskLock_);
760 if (taskMap_.find(sessionId) == taskMap_.end()) {
761 LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
762 return -E_FINISHED;
763 }
764 task = taskMap_[sessionId];
765 }
766 packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
767 packet->SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
768 packet->SetSql(task.condition.sql);
769 packet->SetBindArgs(task.condition.bindArgs);
770 packet->SetNeedResponse();
771 packet->SetSecLabel(errCode == -E_NOT_SUPPORT ? NOT_SUPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
772 target = task.target;
773 return E_OK;
774 }
775
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)776 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
777 {
778 int errCode = E_OK;
779 if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
780 DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
781 delete inMsg;
782 inMsg = nullptr;
783 return;
784 }
785 switch (inMsg->GetMessageType()) {
786 case TYPE_REQUEST:
787 errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
788 break;
789 case TYPE_RESPONSE:
790 errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
791 break;
792 default:
793 LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
794 break;
795 }
796 if (errCode != -E_NOT_NEED_DELETE_MSG) {
797 delete inMsg;
798 inMsg = nullptr;
799 }
800 }
801
IsPacketValid(uint32_t sessionId)802 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
803 {
804 std::lock_guard<std::mutex> autoLock(taskLock_);
805 return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
806 }
807
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)808 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
809 const RemoteExecutorAckPacket *packet)
810 {
811 bool isReceiveFinished = false;
812 {
813 std::lock_guard<std::mutex> autoLock(taskLock_);
814 if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
815 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
816 return;
817 }
818 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
819 taskMap_[sessionId].taskId, sequenceId);
820 taskMap_[sessionId].currentCount++;
821 if (packet->IsLastAck()) {
822 taskMap_[sessionId].targetCount = sequenceId;
823 }
824 taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
825 if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
826 isReceiveFinished = true;
827 }
828 }
829 if (isReceiveFinished) {
830 DoFinished(sessionId, E_OK);
831 }
832 }
833
GetAndIncCommunicator() const834 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
835 {
836 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
837 ICommunicator *communicator = communicator_;
838 RefObject::IncObjRef(communicator);
839 return communicator;
840 }
841
GetAndIncSyncInterface() const842 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
843 {
844 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
845 ISyncInterface *syncInterface = syncInterface_;
846 if (syncInterface != nullptr) {
847 syncInterface->IncRefCount();
848 }
849 return syncInterface;
850 }
851
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)852 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
853 {
854 std::lock_guard<std::mutex> autoLock(taskLock_);
855 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
856 for (auto &sessionId : deviceWorkingSet_[device]) {
857 removeList.push_back(sessionId);
858 }
859 }
860 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
861 for (auto &sessionId : searchTaskQueue_[device]) {
862 removeList.push_back(sessionId);
863 }
864 }
865 }
866
RemoveAllTask(int errCode)867 void RemoteExecutor::RemoveAllTask(int errCode)
868 {
869 std::vector<OnFinished> waitToNotify;
870 std::vector<uint32_t> removeTimerList;
871 {
872 std::lock_guard<std::mutex> autoLock(taskLock_);
873 for (auto &taskEntry : taskMap_) {
874 if (taskEntry.second.onFinished != nullptr) {
875 waitToNotify.push_back(taskEntry.second.onFinished);
876 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
877 taskEntry.second.taskId, errCode);
878 }
879 if (taskEntry.second.status == Status::WORKING) {
880 removeTimerList.push_back(taskEntry.first);
881 }
882 }
883 taskMap_.clear();
884 deviceWorkingSet_.clear();
885 searchTaskQueue_.clear();
886 }
887 for (const auto &callBack : waitToNotify) {
888 callBack(errCode, nullptr);
889 }
890 for (const auto &sessionId : removeTimerList) {
891 RemoveTimer(sessionId);
892 }
893 std::lock_guard<std::mutex> autoLock(timeoutLock_);
894 timeoutMap_.clear();
895 taskFinishMap_.clear();
896 }
897
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)898 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
899 {
900 std::lock_guard<std::mutex> autoLock(taskLock_);
901 for (auto &entry : taskMap_) {
902 if (entry.second.connectionId == connectionId) {
903 removeList.push_back(entry.first);
904 }
905 }
906 }
907
GetPacketSize(const std::string & device,size_t & packetSize) const908 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize) const
909 {
910 auto *communicator = GetAndIncCommunicator();
911 if (communicator == nullptr) {
912 LOGD("communicator is nullptr");
913 return -E_BUSY;
914 }
915
916 packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
917 RefObject::DecObjRef(communicator);
918 return E_OK;
919 }
920
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)921 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
922 const SecurityOption &localOption)
923 {
924 bool res = false;
925 if (remoteOption.securityLabel == localOption.securityLabel ||
926 (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
927 localOption.securityLabel == SecurityLabel::NOT_SET)) {
928 res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
929 }
930 if (!res) {
931 LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
932 remoteOption.securityLabel, remoteOption.securityFlag,
933 localOption.securityLabel, localOption.securityFlag);
934 }
935 return res;
936 }
937
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)938 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
939 const std::string &device, uint32_t sessionId)
940 {
941 size_t packetSize = 0u;
942 int errCode = GetPacketSize(device, packetSize);
943 if (errCode != E_OK) {
944 return errCode;
945 }
946 SecurityOption option;
947 errCode = storage->GetSecurityOption(option);
948 if (errCode == -E_NOT_SUPPORT) {
949 option.securityLabel = NOT_SUPPORT_SEC_CLASSIFICATION;
950 errCode = E_OK;
951 }
952 if (errCode != E_OK) {
953 LOGD("GetSecurityOption errCode:%d", errCode);
954 return -E_SECURITY_OPTION_CHECK_ERROR;
955 }
956 ContinueToken token = nullptr;
957 uint32_t sequenceId = 1u;
958 do {
959 RelationalRowDataSet dataSet;
960 errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
961 if (errCode != E_OK) {
962 LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
963 break;
964 }
965 SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
966 errCode = ResponseData(std::move(dataSet), sendMessage, device);
967 if (errCode != E_OK) {
968 break;
969 }
970 sequenceId++;
971 } while (token != nullptr);
972 if (token != nullptr) {
973 storage->ReleaseRemoteQueryContinueToken(token);
974 }
975 return errCode;
976 }
977
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)978 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
979 const SecurityOption &remoteOption)
980 {
981 if (storage == nullptr || communicator == nullptr) {
982 return -E_BUSY;
983 }
984 if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
985 return -E_NOT_SUPPORT;
986 }
987 std::string device;
988 communicator->GetLocalIdentity(device);
989 SecurityOption localOption;
990 int errCode = storage->GetSecurityOption(localOption);
991 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
992 return -E_SECURITY_OPTION_CHECK_ERROR;
993 }
994 if (remoteOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
995 return E_OK;
996 }
997 if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
998 errCode = -E_SECURITY_OPTION_CHECK_ERROR;
999 } else {
1000 errCode = E_OK;
1001 }
1002 return errCode;
1003 }
1004
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel,uint32_t remoteVersion)1005 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
1006 int32_t remoteSecLabel, uint32_t remoteVersion)
1007 {
1008 SecurityOption localOption;
1009 int errCode = storage->GetSecurityOption(localOption);
1010 LOGI("[RemoteExecutor] remote label:%d local l:%d, f:%d, errCode:%d, remote ver %" PRIu32, remoteSecLabel,
1011 localOption.securityLabel, localOption.securityFlag, errCode, remoteVersion);
1012 if (remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION && errCode == -E_NOT_SUPPORT) {
1013 return E_OK;
1014 }
1015 if (errCode != -E_NOT_SUPPORT && localOption.securityLabel == SecurityLabel::NOT_SET) {
1016 LOGE("[RemoteExecutor] local security label not set!");
1017 return -E_SECURITY_OPTION_CHECK_ERROR;
1018 }
1019 if (remoteVersion >= RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_V4 && remoteSecLabel == NOT_SET) {
1020 LOGE("[RemoteExecutor] remote security label not set!");
1021 return -E_SECURITY_OPTION_CHECK_ERROR;
1022 }
1023 if (errCode == -E_NOT_SUPPORT) {
1024 return E_OK;
1025 }
1026 if (errCode != E_OK) {
1027 return -E_SECURITY_OPTION_CHECK_ERROR;
1028 }
1029 if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
1030 return E_OK;
1031 }
1032 if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1033 return E_OK;
1034 }
1035 return -E_SECURITY_OPTION_CHECK_ERROR;
1036 }
1037 }