• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "remote_executor.h"
17 
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 namespace {
30     constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31     constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32     constexpr uint32_t MAX_QUEUE_COUNT = 10;
33     constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34 
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35     void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36     {
37         if (message != nullptr) {
38             delete message;
39             message = nullptr;
40         }
41         if (packet != nullptr) {
42             delete packet;
43             packet = nullptr;
44         }
45     }
46 }
47 
RemoteExecutor()48 RemoteExecutor::RemoteExecutor()
49     : workingThreadsCount_(0),
50       syncInterface_(nullptr),
51       communicator_(nullptr),
52       lastSessionId_(0),
53       lastTaskId_(0),
54       closed_(false)
55 {
56 }
57 
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)58 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
59 {
60     if (syncInterface == nullptr || communicator == nullptr) {
61         return -E_INVALID_ARGS;
62     }
63     closed_ = false;
64     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
65     syncInterface_ = syncInterface;
66     communicator_ = communicator;
67     return E_OK;
68 }
69 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)70 int RemoteExecutor::RemoteQuery(const std::string &device, const RemoteCondition &condition,
71     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
72 {
73     if (closed_) {
74         return -E_BUSY;
75     }
76     if (!CheckParamValid(device, timeout)) {
77         return -E_INVALID_ARGS;
78     }
79     int errCode = E_OK;
80     int taskErrCode = E_OK;
81     SemaphoreUtils semaphore(0);
82     Task task;
83     task.result = std::make_shared<RelationalResultSetImpl>();
84     task.target = device;
85     task.timeout = timeout;
86     task.condition = condition;
87     task.onFinished = [&semaphore, &taskErrCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
88         taskErrCode = retCode;
89         result = taskResult;
90         semaphore.SendSemaphore();
91     };
92     task.connectionId = connectionId;
93     errCode = RemoteQueryInner(task);
94     if (errCode != E_OK) {
95         return errCode;
96     }
97     semaphore.WaitSemaphore();
98     return taskErrCode;
99 }
100 
ReceiveMessage(const std::string & targetDev,Message * inMsg)101 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
102 {
103     if (inMsg == nullptr) {
104         return -E_INVALID_ARGS;
105     }
106     if (closed_) {
107         LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
108         delete inMsg;
109         inMsg = nullptr;
110         return -E_BUSY;
111     }
112     RefObject::IncObjRef(this);
113     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
114         ReceiveMessageInner(targetDev, inMsg);
115         RefObject::DecObjRef(this);
116     });
117     if (errCode != E_OK) {
118         RefObject::DecObjRef(this);
119     }
120     return errCode;
121 }
122 
NotifyDeviceOffline(const std::string & device)123 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
124 {
125     if (closed_) {
126         return;
127     }
128     LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
129     std::vector<uint32_t> removeList;
130     RemoveTaskByDevice(device, removeList);
131     for (const auto &sessionId : removeList) {
132         DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
133     }
134 }
135 
NotifyUserChange()136 void RemoteExecutor::NotifyUserChange()
137 {
138     if (closed_) {
139         return;
140     }
141     LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
142     RemoveAllTask(-E_USER_CHANGE);
143     LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
144 }
145 
Close()146 void RemoteExecutor::Close()
147 {
148     closed_ = true;
149     LOGD("[RemoteExecutor][Close] close enter");
150     RemoveAllTask(-E_BUSY);
151     ClearInnerSource();
152     {
153         std::unique_lock<std::mutex> lock(msgQueueLock_);
154         clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
155     }
156     LOGD("[RemoteExecutor][Close] close exist");
157 }
158 
NotifyConnectionClosed(uint64_t connectionId)159 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
160 {
161     if (closed_) {
162         return;
163     }
164     std::vector<uint32_t> removeList;
165     RemoveTaskByConnection(connectionId, removeList);
166     for (const auto &sessionId : removeList) {
167         DoFinished(sessionId, -E_BUSY);
168     }
169 }
170 
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)171 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
172 {
173     LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
174     {
175         std::lock_guard<std::mutex> autoLock(msgQueueLock_);
176         searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
177         if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
178             // message deal in work thread, exist here
179             return -E_NOT_NEED_DELETE_MSG;
180         }
181         workingThreadsCount_++;
182     }
183     bool empty = true;
184     do {
185         std::pair<std::string, Message *> entry;
186         {
187             std::lock_guard<std::mutex> autoLock(msgQueueLock_);
188             empty = searchMessageQueue_.empty();
189             if (empty) {
190                 workingThreadsCount_--;
191                 continue;
192             }
193             entry = searchMessageQueue_.front();
194             searchMessageQueue_.pop();
195         }
196         ParseOneRequestMessage(entry.first, entry.second);
197         delete entry.second;
198         entry.second = nullptr;
199     } while (!empty);
200     clearCV_.notify_one();
201     return -E_NOT_NEED_DELETE_MSG;
202 }
203 
ParseOneRequestMessage(const std::string & device,Message * inMsg)204 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
205 {
206     if (closed_) {
207         LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
208         return;
209     }
210     int errCode = CheckPermissions(device, inMsg);
211     if (errCode != E_OK) {
212         (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
213         return;
214     }
215     errCode = SendRemoteExecutorData(device, inMsg);
216     if (errCode != E_OK) {
217         (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
218     }
219 }
220 
CheckPermissions(const std::string & device,Message * inMsg)221 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
222 {
223     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
224     if (storage == nullptr) {
225         LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
226         return -E_BUSY;
227     }
228     // permission check
229     std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
230     std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
231     std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
232     std::string subUseId = storage->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
233     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
234     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
235         { userId, appId, storeId, device, subUseId, instanceId }, CHECK_FLAG_SEND);
236     if (errCode != E_OK) {
237         LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
238         storage->DecRefCount();
239         return errCode;
240     }
241     const auto *packet = inMsg->GetObject<ISyncPacket>();
242     if (packet == nullptr) {
243         LOGE("[RemoteExecutor] get packet object failed");
244         storage->DecRefCount();
245         return -E_INVALID_ARGS;
246     }
247     const auto *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
248     errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel(), requestPacket->GetVersion());
249     storage->DecRefCount();
250     return errCode;
251 }
252 
SendRemoteExecutorData(const std::string & device,const Message * inMsg)253 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
254 {
255     auto *syncInterface = GetAndIncSyncInterface();
256     if (syncInterface == nullptr) {
257         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
258         return -E_INVALID_ARGS;
259     }
260     if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
261         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
262         syncInterface->DecRefCount();
263         return -E_NOT_SUPPORT;
264     }
265     RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
266 
267     const auto *packet = inMsg->GetObject<ISyncPacket>();
268     if (packet == nullptr) {
269         LOGE("[RemoteExecutor] get packet object failed");
270         storage->DecRefCount();
271         return -E_INVALID_ARGS;
272     }
273     const RemoteExecutorRequestPacket *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
274     int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
275     storage->DecRefCount();
276     return errCode;
277 }
278 
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)279 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
280 {
281     const auto *ack = inMsg->GetObject<ISyncPacket>();
282     if (ack == nullptr) {
283         return -E_INVALID_ARGS;
284     }
285     const auto *packet = static_cast<const RemoteExecutorAckPacket *>(ack);
286     int errCode = packet->GetAckCode();
287     uint32_t sessionId = inMsg->GetSessionId();
288     uint32_t sequenceId = inMsg->GetSequenceId();
289     if (!IsPacketValid(sessionId)) {
290         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
291         return -E_INVALID_ARGS;
292     }
293     if (errCode == E_OK) {
294         auto storage = GetAndIncSyncInterface();
295         auto communicator = GetAndIncCommunicator();
296         errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
297         if (storage != nullptr) {
298             storage->DecRefCount();
299         }
300         RefObject::DecObjRef(communicator);
301     }
302     if (errCode != E_OK) {
303         DoFinished(sessionId, errCode);
304     } else {
305         ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
306     }
307     return E_OK;
308 }
309 
CheckParamValid(const std::string & device,uint64_t timeout) const310 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
311 {
312     if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
313         LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
314         return false;
315     }
316     if (device.empty()) {
317         LOGD("[RemoteExecutor][CheckParamValid] device is empty");
318         return false;
319     }
320     if (device.length() > DBConstant::MAX_DEV_LENGTH) {
321         LOGE("[RemoteExecutor] dev is too long len=%zu", device.length());
322         return false;
323     }
324     ICommunicator *communicator = GetAndIncCommunicator();
325     if (communicator == nullptr) {
326         return false;
327     }
328     std::string localId;
329     int errCode = communicator->GetLocalIdentity(localId);
330     RefObject::DecObjRef(communicator);
331     if (errCode != E_OK) {
332         return false;
333     }
334     if (localId == device) {
335         LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
336         return false;
337     }
338     return true;
339 }
340 
CheckTaskExeStatus(const std::string & device)341 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
342 {
343     uint32_t queueCount = 0u;
344     uint32_t exeTaskCount = 0u;
345     uint32_t totalCount = 0u; // waiting task count in all queue
346     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
347         queueCount = searchTaskQueue_.at(device).size();
348     }
349     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
350         exeTaskCount = deviceWorkingSet_.at(device).size();
351     }
352     for (auto &entry : searchTaskQueue_) {
353         int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
354         int currentQueueCount = static_cast<int>(entry.second.size());
355         if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
356             // all task in this queue can execute, no need calculate as waiting task count
357             continue;
358         }
359         totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
360             static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
361     }
362     return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
363         (totalCount + 1 <= MAX_QUEUE_COUNT);
364 }
365 
GenerateSessionId()366 uint32_t RemoteExecutor::GenerateSessionId()
367 {
368     uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
369     while (taskMap_.find(sessionId) != taskMap_.end()) { // LCOV_EXCL_BR_LINE
370         sessionId++;
371         if (sessionId == 0) { // if over flow start with 1
372             sessionId++;
373         }
374     }
375     lastSessionId_ = sessionId;
376     return sessionId;
377 }
378 
GenerateTaskId()379 uint32_t RemoteExecutor::GenerateTaskId()
380 {
381     lastTaskId_++;
382     if (lastTaskId_ == 0) { // if over flow start with 1
383         lastTaskId_++;
384     }
385     return lastTaskId_;
386 }
387 
RemoteQueryInner(const Task & task)388 int RemoteExecutor::RemoteQueryInner(const Task &task)
389 {
390     uint32_t sessionId = 0u;
391     {
392         // check task count and push into queue in lock
393         std::lock_guard<std::mutex> autoLock(taskLock_);
394         if (!CheckTaskExeStatus(task.target)) {
395             LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
396             return -E_MAX_LIMITS;
397         }
398         sessionId = GenerateSessionId();
399         searchTaskQueue_[task.target].push_back(sessionId);
400         if (taskMap_.find(sessionId) != taskMap_.end()) {
401             LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
402             return -E_INTERNAL_ERROR; // should not happen
403         }
404         taskMap_[sessionId] = task;
405         taskMap_[sessionId].taskId = GenerateTaskId();
406         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
407             taskMap_[sessionId].taskId, task.target.c_str());
408     }
409     std::string device = task.target;
410     RefObject::IncObjRef(this);
411     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
412         TryExecuteTaskInLock(device);
413         RefObject::DecObjRef(this);
414     });
415     if (errCode != E_OK) {
416         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
417         DoRollBack(sessionId);
418         RefObject::DecObjRef(this);
419     }
420     return errCode;
421 }
422 
TryExecuteTaskInLock(const std::string & device)423 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
424 {
425     uint32_t sessionId = 0u;
426     {
427         std::lock_guard<std::mutex> autoLock(taskLock_);
428         if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
429             return;
430         }
431         if (searchTaskQueue_[device].empty()) {
432             LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
433             return;
434         }
435         sessionId = searchTaskQueue_[device].front();
436         if (taskMap_.find(sessionId) == taskMap_.end()) {
437             searchTaskQueue_[device].pop_front();
438             LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
439             return;
440         }
441         taskMap_[sessionId].status = Status::WORKING;
442         searchTaskQueue_[device].pop_front();
443         deviceWorkingSet_[device].insert(sessionId);
444         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
445         StartTimer(taskMap_[sessionId].timeout, sessionId);
446     }
447     int errCode = RequestStart(sessionId);
448     if (errCode != E_OK) {
449         DoFinished(sessionId, errCode);
450     }
451 }
452 
DoRollBack(uint32_t sessionId)453 void RemoteExecutor::DoRollBack(uint32_t sessionId)
454 {
455     Task task;
456     std::lock_guard<std::mutex> autoLock(taskLock_);
457     if (taskMap_.find(sessionId) == taskMap_.end()) { // LCOV_EXCL_BR_LINE
458         return;
459     }
460     task = taskMap_[sessionId];
461     if (task.status != Status::WAITING) { // LCOV_EXCL_BR_LINE
462         // task is execute, abort roll back
463         return;
464     }
465     taskMap_.erase(sessionId);
466 
467     auto iter = searchTaskQueue_[task.target].begin();
468     while (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
469         if ((*iter) == sessionId) { // LCOV_EXCL_BR_LINE
470             break;
471         }
472         iter++;
473     }
474     if (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
475         searchTaskQueue_[task.target].erase(iter);
476     }
477     // this task should not in workingSet
478     deviceWorkingSet_[task.target].erase(sessionId);
479 }
480 
RequestStart(uint32_t sessionId)481 int RemoteExecutor::RequestStart(uint32_t sessionId)
482 {
483     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
484     if (message == nullptr) {
485         LOGE("[RemoteExecutor][RequestStart] new message error");
486         return -E_OUT_OF_MEMORY;
487     }
488     message->SetSessionId(sessionId);
489     message->SetMessageType(TYPE_REQUEST);
490     auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
491     if (packet == nullptr) {
492         LOGE("[RemoteExecutor][RequestStart] new packet error");
493         ReleaseMessageAndPacket(message, nullptr);
494         return -E_OUT_OF_MEMORY;
495     }
496     std::string target;
497     int errCode = FillRequestPacket(packet, sessionId, target);
498     if (errCode != E_OK) {
499         ReleaseMessageAndPacket(message, packet);
500         return errCode;
501     }
502     auto exObj = static_cast<ISyncPacket *>(packet);
503     errCode = message->SetExternalObject(exObj);
504     if (errCode != E_OK) {
505         ReleaseMessageAndPacket(message, packet);
506         LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
507         return errCode;
508     }
509     return SendRequestMessage(target, message, sessionId);
510 }
511 
SendRequestMessage(const std::string & target,Message * message,uint32_t sessionId)512 int RemoteExecutor::SendRequestMessage(const std::string &target, Message *message, uint32_t sessionId)
513 {
514     auto communicator = GetAndIncCommunicator();
515     auto syncInterface = GetAndIncSyncInterface();
516     if (communicator == nullptr || syncInterface == nullptr) {
517         ReleaseMessageAndPacket(message, nullptr);
518         if (syncInterface != nullptr) {
519             syncInterface->DecRefCount();
520         }
521         RefObject::DecObjRef(communicator);
522         return -E_BUSY;
523     }
524     SendConfig sendConfig;
525     SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
526     RefObject::IncObjRef(this);
527     int errCode = communicator->SendMessage(target, message, sendConfig,
528         [this, sessionId](int errCode, bool isDirectEnd) {
529         if (errCode != E_OK) {
530             DoSendFailed(sessionId, errCode);
531         }
532         RefObject::DecObjRef(this);
533     });
534     if (errCode != E_OK) {
535         ReleaseMessageAndPacket(message, nullptr);
536         RefObject::DecObjRef(this);
537     }
538     RefObject::DecObjRef(communicator);
539     syncInterface->DecRefCount();
540     return errCode;
541 }
542 
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)543 int RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
544     const std::string &device)
545 {
546     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
547     if (packet == nullptr) {
548         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
549         return -E_OUT_OF_MEMORY;
550     }
551     packet->SetAckCode(errCode);
552     packet->SetLastAck();
553     return ResponseStart(packet, sessionId, sequenceId, device);
554 }
555 
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)556 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
557     const std::string &device)
558 {
559     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
560     if (packet == nullptr) {
561         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
562         return -E_OUT_OF_MEMORY;
563     }
564     packet->SetAckCode(E_OK);
565     if (sendMessage.isLast) {
566         packet->SetLastAck();
567     }
568     packet->SetSecurityOption(sendMessage.option);
569     packet->MoveInRowDataSet(std::move(dataSet));
570     return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
571 }
572 
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)573 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
574     const std::string &device)
575 {
576     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
577     if (storage == nullptr) {
578         ReleaseMessageAndPacket(nullptr, packet);
579         LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
580         return -E_BUSY;
581     }
582     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
583     if (message == nullptr) {
584         LOGE("[RemoteExecutor][ResponseStart] new message error");
585         storage->DecRefCount();
586         ReleaseMessageAndPacket(nullptr, packet);
587         return -E_OUT_OF_MEMORY;
588     }
589     packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
590     auto exObj = static_cast<ISyncPacket *>(packet);
591     int errCode = message->SetExternalObject(exObj);
592     if (errCode != E_OK) {
593         ReleaseMessageAndPacket(message, packet);
594         storage->DecRefCount();
595         LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
596         return errCode;
597     }
598     auto *communicator = GetAndIncCommunicator();
599     if (communicator == nullptr) {
600         ReleaseMessageAndPacket(message, nullptr);
601         storage->DecRefCount();
602         LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
603         return -E_BUSY;
604     }
605 
606     message->SetTarget(device);
607     message->SetSessionId(sessionId);
608     message->SetSequenceId(sequenceId);
609     message->SetMessageType(TYPE_RESPONSE);
610     SendConfig sendConfig;
611     SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
612     errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
613     RefObject::DecObjRef(communicator);
614     if (errCode != E_OK) {
615         ReleaseMessageAndPacket(message, nullptr);
616         LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
617     }
618     storage->DecRefCount();
619     return errCode;
620 }
621 
StartTimer(uint64_t timeout,uint32_t sessionId)622 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
623 {
624     TimerId timerId = 0u;
625     RefObject::IncObjRef(this);
626     TimerAction timeoutCallBack = [this](TimerId timerId) { return TimeoutCallBack(timerId); };
627     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
628         RefObject::DecObjRef(this);
629     }, timerId);
630     if (errCode != E_OK) {
631         RefObject::DecObjRef(this);
632         LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
633     }
634     LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
635     std::lock_guard<std::mutex> autoLock(timeoutLock_);
636     timeoutMap_[timerId] = sessionId;
637     taskFinishMap_[sessionId] = timerId;
638 }
639 
RemoveTimer(uint32_t sessionId)640 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
641 {
642     TimerId timerId = 0u;
643     {
644         std::lock_guard<std::mutex> autoLock(timeoutLock_);
645         if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
646             return;
647         }
648         timerId = taskFinishMap_[sessionId];
649         LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
650         timeoutMap_.erase(timerId);
651         taskFinishMap_.erase(sessionId);
652     }
653     RuntimeContext::GetInstance()->RemoveTimer(timerId);
654 }
655 
TimeoutCallBack(TimerId timerId)656 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
657 {
658     RefObject::IncObjRef(this);
659     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
660         DoTimeout(timerId);
661         RefObject::DecObjRef(this);
662     });
663     if (errCode != E_OK) {
664         LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
665         RefObject::DecObjRef(this);
666     }
667     return -E_NO_NEED_TIMER;
668 }
669 
DoTimeout(TimerId timerId)670 void RemoteExecutor::DoTimeout(TimerId timerId)
671 {
672     LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
673     uint32_t sessionId = 0u;
674     {
675         std::lock_guard<std::mutex> autoLock(timeoutLock_);
676         if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
677             return;
678         }
679         sessionId = timeoutMap_[timerId];
680     }
681     DoFinished(sessionId, -E_TIMEOUT);
682 }
683 
DoSendFailed(uint32_t sessionId,int errCode)684 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
685 {
686     LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
687     DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
688 }
689 
DoFinished(uint32_t sessionId,int errCode)690 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
691 {
692     Task task;
693     if (ClearTaskInfo(sessionId, task) == E_OK) {
694         LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
695     } else {
696         return;
697     }
698     RefObject::IncObjRef(this);
699     if (task.onFinished != nullptr) {
700         task.onFinished(errCode, task.result);
701         LOGD("[RemoteExecutor][DoFinished] onFinished");
702     }
703     std::string device = task.target;
704     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
705         TryExecuteTaskInLock(device);
706         RefObject::DecObjRef(this);
707     });
708     if (retCode != E_OK) {
709         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
710         RefObject::DecObjRef(this);
711     }
712 }
713 
ClearTaskInfo(uint32_t sessionId,Task & task)714 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
715 {
716     {
717         std::lock_guard<std::mutex> autoLock(taskLock_);
718         if (taskMap_.find(sessionId) == taskMap_.end()) {
719             return -E_NOT_FOUND;
720         }
721         task = taskMap_[sessionId];
722         taskMap_.erase(sessionId);
723         deviceWorkingSet_[task.target].erase(sessionId);
724     }
725     RemoveTimer(sessionId);
726     return E_OK;
727 }
728 
ClearInnerSource()729 void RemoteExecutor::ClearInnerSource()
730 {
731     {
732         std::lock_guard<std::mutex> autoLock(innerSourceLock_);
733         syncInterface_ = nullptr;
734         communicator_ = nullptr;
735     }
736     std::lock_guard<std::mutex> autoLock(msgQueueLock_);
737     LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
738     while (!searchMessageQueue_.empty()) {
739         auto entry = searchMessageQueue_.front();
740         searchMessageQueue_.pop();
741         delete entry.second;
742         entry.second = nullptr;
743     }
744 }
745 
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)746 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
747 {
748     ISyncInterface *storage = GetAndIncSyncInterface();
749     if (storage == nullptr) {
750         return -E_BUSY;
751     }
752     SecurityOption localOption;
753     int errCode = storage->GetSecurityOption(localOption);
754     storage->DecRefCount();
755     storage = nullptr;
756     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
757         return -E_SECURITY_OPTION_CHECK_ERROR;
758     }
759     if (errCode == E_OK && localOption.securityLabel == NOT_SET) {
760         LOGE("[AbilitySync] Local not set security option");
761         return -E_SECURITY_OPTION_CHECK_ERROR;
762     }
763     Task task;
764     {
765         std::lock_guard<std::mutex> autoLock(taskLock_);
766         if (taskMap_.find(sessionId) == taskMap_.end()) {
767             LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
768             return -E_FINISHED;
769         }
770         task = taskMap_[sessionId];
771     }
772     packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
773     packet->SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
774     packet->SetSql(task.condition.sql);
775     packet->SetBindArgs(task.condition.bindArgs);
776     packet->SetNeedResponse();
777     packet->SetSecLabel(errCode == -E_NOT_SUPPORT ? NOT_SUPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
778     target = task.target;
779     return E_OK;
780 }
781 
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)782 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
783 {
784     int errCode = E_OK;
785     if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
786         DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
787         delete inMsg;
788         inMsg = nullptr;
789         return;
790     }
791     switch (inMsg->GetMessageType()) {
792         case TYPE_REQUEST:
793             errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
794             break;
795         case TYPE_RESPONSE:
796             errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
797             break;
798         default:
799             LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
800             break;
801     }
802     if (errCode != -E_NOT_NEED_DELETE_MSG) {
803         delete inMsg;
804         inMsg = nullptr;
805     }
806 }
807 
IsPacketValid(uint32_t sessionId)808 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
809 {
810     std::lock_guard<std::mutex> autoLock(taskLock_);
811     return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
812 }
813 
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)814 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
815     const RemoteExecutorAckPacket *packet)
816 {
817     bool isReceiveFinished = false;
818     {
819         std::lock_guard<std::mutex> autoLock(taskLock_);
820         if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
821             LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
822             return;
823         }
824         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
825             taskMap_[sessionId].taskId, sequenceId);
826         taskMap_[sessionId].currentCount++;
827         if (packet->IsLastAck()) {
828             taskMap_[sessionId].targetCount = sequenceId;
829         }
830         taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
831         if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
832             isReceiveFinished = true;
833         }
834     }
835     if (isReceiveFinished) {
836         DoFinished(sessionId, E_OK);
837     }
838 }
839 
GetAndIncCommunicator() const840 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
841 {
842     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
843     ICommunicator *communicator = communicator_;
844     RefObject::IncObjRef(communicator);
845     return communicator;
846 }
847 
GetAndIncSyncInterface() const848 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
849 {
850     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
851     ISyncInterface *syncInterface = syncInterface_;
852     if (syncInterface != nullptr) {
853         syncInterface->IncRefCount();
854     }
855     return syncInterface;
856 }
857 
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)858 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
859 {
860     std::lock_guard<std::mutex> autoLock(taskLock_);
861     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
862         for (auto &sessionId : deviceWorkingSet_[device]) {
863             removeList.push_back(sessionId);
864         }
865     }
866     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
867         for (auto &sessionId : searchTaskQueue_[device]) {
868             removeList.push_back(sessionId);
869         }
870     }
871 }
872 
RemoveAllTask(int errCode)873 void RemoteExecutor::RemoveAllTask(int errCode)
874 {
875     std::vector<OnFinished> waitToNotify;
876     std::vector<uint32_t> removeTimerList;
877     {
878         std::lock_guard<std::mutex> autoLock(taskLock_);
879         for (auto &taskEntry : taskMap_) {
880             if (taskEntry.second.onFinished != nullptr) {
881                 waitToNotify.push_back(taskEntry.second.onFinished);
882                 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
883                     taskEntry.second.taskId, errCode);
884             }
885             if (taskEntry.second.status == Status::WORKING) {
886                 removeTimerList.push_back(taskEntry.first);
887             }
888         }
889         taskMap_.clear();
890         deviceWorkingSet_.clear();
891         searchTaskQueue_.clear();
892     }
893     for (const auto &callBack : waitToNotify) {
894         callBack(errCode, nullptr);
895     }
896     for (const auto &sessionId : removeTimerList) {
897         RemoveTimer(sessionId);
898     }
899     std::lock_guard<std::mutex> autoLock(timeoutLock_);
900     timeoutMap_.clear();
901     taskFinishMap_.clear();
902 }
903 
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)904 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
905 {
906     std::lock_guard<std::mutex> autoLock(taskLock_);
907     for (auto &entry : taskMap_) {
908         if (entry.second.connectionId == connectionId) {
909             removeList.push_back(entry.first);
910         }
911     }
912 }
913 
GetPacketSize(const std::string & device,size_t & packetSize) const914 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize) const
915 {
916     auto *communicator = GetAndIncCommunicator();
917     if (communicator == nullptr) {
918         LOGD("communicator is nullptr");
919         return -E_BUSY;
920     }
921 
922     packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
923     RefObject::DecObjRef(communicator);
924     return E_OK;
925 }
926 
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)927 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
928     const SecurityOption &localOption)
929 {
930     bool res = false;
931     if (remoteOption.securityLabel == localOption.securityLabel ||
932         (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
933         localOption.securityLabel == SecurityLabel::NOT_SET)) {
934         res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
935     }
936     if (!res) {
937         LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
938             remoteOption.securityLabel, remoteOption.securityFlag,
939             localOption.securityLabel, localOption.securityFlag);
940     }
941     return res;
942 }
943 
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)944 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
945     const std::string &device, uint32_t sessionId)
946 {
947     size_t packetSize = 0u;
948     int errCode = GetPacketSize(device, packetSize);
949     if (errCode != E_OK) {
950         return errCode;
951     }
952     SecurityOption option;
953     errCode = storage->GetSecurityOption(option);
954     if (errCode == -E_NOT_SUPPORT) {
955         option.securityLabel = NOT_SUPPORT_SEC_CLASSIFICATION;
956         errCode = E_OK;
957     }
958     if (errCode != E_OK) {
959         LOGD("GetSecurityOption errCode:%d", errCode);
960         return -E_SECURITY_OPTION_CHECK_ERROR;
961     }
962     ContinueToken token = nullptr;
963     uint32_t sequenceId = 1u;
964     do {
965         RelationalRowDataSet dataSet;
966         errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
967         if (errCode != E_OK) {
968             LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
969             break;
970         }
971         SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
972         errCode = ResponseData(std::move(dataSet), sendMessage, device);
973         if (errCode != E_OK) {
974             break;
975         }
976         sequenceId++;
977     } while (token != nullptr);
978     if (token != nullptr) {
979         storage->ReleaseRemoteQueryContinueToken(token);
980     }
981     return errCode;
982 }
983 
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)984 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
985     const SecurityOption &remoteOption)
986 {
987     if (storage == nullptr || communicator == nullptr) {
988         return -E_BUSY;
989     }
990     if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
991         return -E_NOT_SUPPORT;
992     }
993     std::string device;
994     communicator->GetLocalIdentity(device);
995     SecurityOption localOption;
996     int errCode = storage->GetSecurityOption(localOption);
997     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
998         return -E_SECURITY_OPTION_CHECK_ERROR;
999     }
1000     if (remoteOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
1001         return E_OK;
1002     }
1003     if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
1004         errCode = -E_SECURITY_OPTION_CHECK_ERROR;
1005     } else {
1006         errCode = E_OK;
1007     }
1008     return errCode;
1009 }
1010 
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel,uint32_t remoteVersion)1011 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
1012     int32_t remoteSecLabel, uint32_t remoteVersion)
1013 {
1014     SecurityOption localOption;
1015     int errCode = storage->GetSecurityOption(localOption);
1016     LOGI("[RemoteExecutor] remote label:%d local l:%d, f:%d, errCode:%d, remote ver %" PRIu32, remoteSecLabel,
1017          localOption.securityLabel, localOption.securityFlag, errCode, remoteVersion);
1018     if (remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION && errCode == -E_NOT_SUPPORT) {
1019         return E_OK;
1020     }
1021     if (errCode != -E_NOT_SUPPORT && localOption.securityLabel == SecurityLabel::NOT_SET) {
1022         LOGE("[RemoteExecutor] local security label not set!");
1023         return -E_SECURITY_OPTION_CHECK_ERROR;
1024     }
1025     if (remoteVersion >= RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_V4 && remoteSecLabel == NOT_SET) {
1026         LOGE("[RemoteExecutor] remote security label not set!");
1027         return -E_SECURITY_OPTION_CHECK_ERROR;
1028     }
1029     if (errCode == -E_NOT_SUPPORT) {
1030         return E_OK;
1031     }
1032     if (errCode != E_OK) {
1033         return -E_SECURITY_OPTION_CHECK_ERROR;
1034     }
1035     if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
1036         return E_OK;
1037     }
1038     if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1039         return E_OK;
1040     }
1041     return -E_SECURITY_OPTION_CHECK_ERROR;
1042 }
1043 }