• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "remote_executor.h"
17 
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 namespace {
30     constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31     constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32     constexpr uint32_t MAX_QUEUE_COUNT = 10;
33     constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34 
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35     void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36     {
37         if (message != nullptr) {
38             delete message;
39             message = nullptr;
40         }
41         if (packet != nullptr) {
42             delete packet;
43             packet = nullptr;
44         }
45     }
46 }
47 
RemoteExecutor()48 RemoteExecutor::RemoteExecutor()
49     : workingThreadsCount_(0),
50       syncInterface_(nullptr),
51       communicator_(nullptr),
52       lastSessionId_(0),
53       lastTaskId_(0),
54       closed_(false)
55 {
56 }
57 
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)58 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
59 {
60     if (syncInterface == nullptr || communicator == nullptr) {
61         return -E_INVALID_ARGS;
62     }
63     closed_ = false;
64     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
65     syncInterface_ = syncInterface;
66     communicator_ = communicator;
67     return E_OK;
68 }
69 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)70 int RemoteExecutor::RemoteQuery(const std::string &device, const RemoteCondition &condition,
71     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
72 {
73     if (closed_) {
74         return -E_BUSY;
75     }
76     if (!CheckParamValid(device, timeout)) {
77         return -E_INVALID_ARGS;
78     }
79     int errCode = E_OK;
80     int taskErrCode = E_OK;
81     SemaphoreUtils semaphore(0);
82     Task task;
83     task.result = std::make_shared<RelationalResultSetImpl>();
84     task.target = device;
85     task.timeout = timeout;
86     task.condition = condition;
87     task.onFinished = [&semaphore, &taskErrCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
88         taskErrCode = retCode;
89         result = taskResult;
90         semaphore.SendSemaphore();
91     };
92     task.connectionId = connectionId;
93     errCode = RemoteQueryInner(task);
94     if (errCode != E_OK) {
95         return errCode;
96     }
97     semaphore.WaitSemaphore();
98     return taskErrCode;
99 }
100 
ReceiveMessage(const std::string & targetDev,Message * inMsg)101 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
102 {
103     if (inMsg == nullptr) {
104         return -E_INVALID_ARGS;
105     }
106     if (closed_) {
107         LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
108         return -E_BUSY;
109     }
110     RefObject::IncObjRef(this);
111     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
112         ReceiveMessageInner(targetDev, inMsg);
113         RefObject::DecObjRef(this);
114     });
115     if (errCode != E_OK) {
116         RefObject::DecObjRef(this);
117     }
118     return errCode;
119 }
120 
NotifyDeviceOffline(const std::string & device)121 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
122 {
123     if (closed_) {
124         return;
125     }
126     LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
127     std::vector<uint32_t> removeList;
128     RemoveTaskByDevice(device, removeList);
129     for (const auto &sessionId : removeList) {
130         DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
131     }
132 }
133 
NotifyUserChange()134 void RemoteExecutor::NotifyUserChange()
135 {
136     if (closed_) {
137         return;
138     }
139     LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
140     RemoveAllTask(-E_USER_CHANGE);
141     LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
142 }
143 
Close()144 void RemoteExecutor::Close()
145 {
146     closed_ = true;
147     LOGD("[RemoteExecutor][Close] close enter");
148     RemoveAllTask(-E_BUSY);
149     ClearInnerSource();
150     {
151         std::unique_lock<std::mutex> lock(msgQueueLock_);
152         clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
153     }
154     LOGD("[RemoteExecutor][Close] close exist");
155 }
156 
NotifyConnectionClosed(uint64_t connectionId)157 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
158 {
159     if (closed_) {
160         return;
161     }
162     std::vector<uint32_t> removeList;
163     RemoveTaskByConnection(connectionId, removeList);
164     for (const auto &sessionId : removeList) {
165         DoFinished(sessionId, -E_BUSY);
166     }
167 }
168 
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)169 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
170 {
171     LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
172     {
173         std::lock_guard<std::mutex> autoLock(msgQueueLock_);
174         searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
175         if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
176             // message deal in work thread, exist here
177             return -E_NOT_NEED_DELETE_MSG;
178         }
179         workingThreadsCount_++;
180     }
181     bool empty = true;
182     do {
183         std::pair<std::string, Message *> entry;
184         {
185             std::lock_guard<std::mutex> autoLock(msgQueueLock_);
186             empty = searchMessageQueue_.empty();
187             if (empty) {
188                 workingThreadsCount_--;
189                 continue;
190             }
191             entry = searchMessageQueue_.front();
192             searchMessageQueue_.pop();
193         }
194         ParseOneRequestMessage(entry.first, entry.second);
195         delete entry.second;
196         entry.second = nullptr;
197     } while (!empty);
198     clearCV_.notify_one();
199     return -E_NOT_NEED_DELETE_MSG;
200 }
201 
ParseOneRequestMessage(const std::string & device,Message * inMsg)202 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
203 {
204     if (closed_) {
205         LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
206         return;
207     }
208     int errCode = CheckPermissions(device, inMsg);
209     if (errCode != E_OK) {
210         (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
211         return;
212     }
213     errCode = SendRemoteExecutorData(device, inMsg);
214     if (errCode != E_OK) {
215         (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
216     }
217 }
218 
CheckPermissions(const std::string & device,Message * inMsg)219 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
220 {
221     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
222     if (storage == nullptr) {
223         LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
224         return -E_BUSY;
225     }
226     // permission check
227     std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
228     std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
229     std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
230     std::string subUseId = storage->GetDbProperties().GetStringProp(DBProperties::SUB_USER, "");
231     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
232     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
233         { userId, appId, storeId, device, subUseId, instanceId }, CHECK_FLAG_SEND);
234     if (errCode != E_OK) {
235         LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
236         storage->DecRefCount();
237         return errCode;
238     }
239     const auto *packet = inMsg->GetObject<ISyncPacket>();
240     if (packet == nullptr) {
241         LOGE("[RemoteExecutor] get packet object failed");
242         storage->DecRefCount();
243         return -E_INVALID_ARGS;
244     }
245     const auto *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
246     errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel(), requestPacket->GetVersion());
247     storage->DecRefCount();
248     return errCode;
249 }
250 
SendRemoteExecutorData(const std::string & device,const Message * inMsg)251 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
252 {
253     auto *syncInterface = GetAndIncSyncInterface();
254     if (syncInterface == nullptr) {
255         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
256         return -E_INVALID_ARGS;
257     }
258     if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
259         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
260         syncInterface->DecRefCount();
261         return -E_NOT_SUPPORT;
262     }
263     RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
264 
265     const auto *packet = inMsg->GetObject<ISyncPacket>();
266     if (packet == nullptr) {
267         LOGE("[RemoteExecutor] get packet object failed");
268         storage->DecRefCount();
269         return -E_INVALID_ARGS;
270     }
271     const RemoteExecutorRequestPacket *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
272     int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
273     storage->DecRefCount();
274     return errCode;
275 }
276 
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)277 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
278 {
279     const auto *ack = inMsg->GetObject<ISyncPacket>();
280     if (ack == nullptr) {
281         return -E_INVALID_ARGS;
282     }
283     const auto *packet = static_cast<const RemoteExecutorAckPacket *>(ack);
284     int errCode = packet->GetAckCode();
285     uint32_t sessionId = inMsg->GetSessionId();
286     uint32_t sequenceId = inMsg->GetSequenceId();
287     if (!IsPacketValid(sessionId)) {
288         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
289         return -E_INVALID_ARGS;
290     }
291     if (errCode == E_OK) {
292         auto storage = GetAndIncSyncInterface();
293         auto communicator = GetAndIncCommunicator();
294         errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
295         if (storage != nullptr) {
296             storage->DecRefCount();
297         }
298         RefObject::DecObjRef(communicator);
299     }
300     if (errCode != E_OK) {
301         DoFinished(sessionId, errCode);
302     } else {
303         ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
304     }
305     return E_OK;
306 }
307 
CheckParamValid(const std::string & device,uint64_t timeout) const308 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
309 {
310     if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
311         LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
312         return false;
313     }
314     if (device.empty()) {
315         LOGD("[RemoteExecutor][CheckParamValid] device is empty");
316         return false;
317     }
318     if (device.length() > DBConstant::MAX_DEV_LENGTH) {
319         LOGE("[RemoteExecutor] dev is too long len=%zu", device.length());
320         return false;
321     }
322     ICommunicator *communicator = GetAndIncCommunicator();
323     if (communicator == nullptr) {
324         return false;
325     }
326     std::string localId;
327     int errCode = communicator->GetLocalIdentity(localId);
328     RefObject::DecObjRef(communicator);
329     if (errCode != E_OK) {
330         return false;
331     }
332     if (localId == device) {
333         LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
334         return false;
335     }
336     return true;
337 }
338 
CheckTaskExeStatus(const std::string & device)339 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
340 {
341     uint32_t queueCount = 0u;
342     uint32_t exeTaskCount = 0u;
343     uint32_t totalCount = 0u; // waiting task count in all queue
344     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
345         queueCount = searchTaskQueue_.at(device).size();
346     }
347     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
348         exeTaskCount = deviceWorkingSet_.at(device).size();
349     }
350     for (auto &entry : searchTaskQueue_) {
351         int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
352         int currentQueueCount = static_cast<int>(entry.second.size());
353         if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
354             // all task in this queue can execute, no need calculate as waiting task count
355             continue;
356         }
357         totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
358             static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
359     }
360     return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
361         (totalCount + 1 <= MAX_QUEUE_COUNT);
362 }
363 
GenerateSessionId()364 uint32_t RemoteExecutor::GenerateSessionId()
365 {
366     uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
367     while (taskMap_.find(sessionId) != taskMap_.end()) { // LCOV_EXCL_BR_LINE
368         sessionId++;
369         if (sessionId == 0) { // if over flow start with 1
370             sessionId++;
371         }
372     }
373     lastSessionId_ = sessionId;
374     return sessionId;
375 }
376 
GenerateTaskId()377 uint32_t RemoteExecutor::GenerateTaskId()
378 {
379     lastTaskId_++;
380     if (lastTaskId_ == 0) { // if over flow start with 1
381         lastTaskId_++;
382     }
383     return lastTaskId_;
384 }
385 
RemoteQueryInner(const Task & task)386 int RemoteExecutor::RemoteQueryInner(const Task &task)
387 {
388     uint32_t sessionId = 0u;
389     {
390         // check task count and push into queue in lock
391         std::lock_guard<std::mutex> autoLock(taskLock_);
392         if (!CheckTaskExeStatus(task.target)) {
393             LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
394             return -E_MAX_LIMITS;
395         }
396         sessionId = GenerateSessionId();
397         searchTaskQueue_[task.target].push_back(sessionId);
398         if (taskMap_.find(sessionId) != taskMap_.end()) {
399             LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
400             return -E_INTERNAL_ERROR; // should not happen
401         }
402         taskMap_[sessionId] = task;
403         taskMap_[sessionId].taskId = GenerateTaskId();
404         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
405             taskMap_[sessionId].taskId, task.target.c_str());
406     }
407     std::string device = task.target;
408     RefObject::IncObjRef(this);
409     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
410         TryExecuteTaskInLock(device);
411         RefObject::DecObjRef(this);
412     });
413     if (errCode != E_OK) {
414         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
415         DoRollBack(sessionId);
416         RefObject::DecObjRef(this);
417     }
418     return errCode;
419 }
420 
TryExecuteTaskInLock(const std::string & device)421 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
422 {
423     uint32_t sessionId = 0u;
424     {
425         std::lock_guard<std::mutex> autoLock(taskLock_);
426         if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
427             return;
428         }
429         if (searchTaskQueue_[device].empty()) {
430             LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
431             return;
432         }
433         sessionId = searchTaskQueue_[device].front();
434         if (taskMap_.find(sessionId) == taskMap_.end()) {
435             searchTaskQueue_[device].pop_front();
436             LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
437             return;
438         }
439         taskMap_[sessionId].status = Status::WORKING;
440         searchTaskQueue_[device].pop_front();
441         deviceWorkingSet_[device].insert(sessionId);
442         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
443         StartTimer(taskMap_[sessionId].timeout, sessionId);
444     }
445     int errCode = RequestStart(sessionId);
446     if (errCode != E_OK) {
447         DoFinished(sessionId, errCode);
448     }
449 }
450 
DoRollBack(uint32_t sessionId)451 void RemoteExecutor::DoRollBack(uint32_t sessionId)
452 {
453     Task task;
454     std::lock_guard<std::mutex> autoLock(taskLock_);
455     if (taskMap_.find(sessionId) == taskMap_.end()) { // LCOV_EXCL_BR_LINE
456         return;
457     }
458     task = taskMap_[sessionId];
459     if (task.status != Status::WAITING) { // LCOV_EXCL_BR_LINE
460         // task is execute, abort roll back
461         return;
462     }
463     taskMap_.erase(sessionId);
464 
465     auto iter = searchTaskQueue_[task.target].begin();
466     while (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
467         if ((*iter) == sessionId) { // LCOV_EXCL_BR_LINE
468             break;
469         }
470         iter++;
471     }
472     if (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
473         searchTaskQueue_[task.target].erase(iter);
474     }
475     // this task should not in workingSet
476     deviceWorkingSet_[task.target].erase(sessionId);
477 }
478 
RequestStart(uint32_t sessionId)479 int RemoteExecutor::RequestStart(uint32_t sessionId)
480 {
481     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
482     if (message == nullptr) {
483         LOGE("[RemoteExecutor][RequestStart] new message error");
484         return -E_OUT_OF_MEMORY;
485     }
486     message->SetSessionId(sessionId);
487     message->SetMessageType(TYPE_REQUEST);
488     auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
489     if (packet == nullptr) {
490         LOGE("[RemoteExecutor][RequestStart] new packet error");
491         ReleaseMessageAndPacket(message, nullptr);
492         return -E_OUT_OF_MEMORY;
493     }
494     std::string target;
495     int errCode = FillRequestPacket(packet, sessionId, target);
496     if (errCode != E_OK) {
497         ReleaseMessageAndPacket(message, packet);
498         return errCode;
499     }
500     auto exObj = static_cast<ISyncPacket *>(packet);
501     errCode = message->SetExternalObject(exObj);
502     if (errCode != E_OK) {
503         ReleaseMessageAndPacket(message, packet);
504         LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
505         return errCode;
506     }
507     return SendRequestMessage(target, message, sessionId);
508 }
509 
SendRequestMessage(const std::string & target,Message * message,uint32_t sessionId)510 int RemoteExecutor::SendRequestMessage(const std::string &target, Message *message, uint32_t sessionId)
511 {
512     auto communicator = GetAndIncCommunicator();
513     auto syncInterface = GetAndIncSyncInterface();
514     if (communicator == nullptr || syncInterface == nullptr) {
515         ReleaseMessageAndPacket(message, nullptr);
516         if (syncInterface != nullptr) {
517             syncInterface->DecRefCount();
518         }
519         RefObject::DecObjRef(communicator);
520         return -E_BUSY;
521     }
522     SendConfig sendConfig;
523     SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
524     RefObject::IncObjRef(this);
525     int errCode = communicator->SendMessage(target, message, sendConfig,
526         [this, sessionId](int errCode, bool isDirectEnd) {
527         if (errCode != E_OK) {
528             DoSendFailed(sessionId, errCode);
529         }
530         RefObject::DecObjRef(this);
531     });
532     if (errCode != E_OK) {
533         ReleaseMessageAndPacket(message, nullptr);
534         RefObject::DecObjRef(this);
535     }
536     RefObject::DecObjRef(communicator);
537     syncInterface->DecRefCount();
538     return errCode;
539 }
540 
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)541 int RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
542     const std::string &device)
543 {
544     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
545     if (packet == nullptr) {
546         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
547         return -E_OUT_OF_MEMORY;
548     }
549     packet->SetAckCode(errCode);
550     packet->SetLastAck();
551     return ResponseStart(packet, sessionId, sequenceId, device);
552 }
553 
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)554 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
555     const std::string &device)
556 {
557     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
558     if (packet == nullptr) {
559         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
560         return -E_OUT_OF_MEMORY;
561     }
562     packet->SetAckCode(E_OK);
563     if (sendMessage.isLast) {
564         packet->SetLastAck();
565     }
566     packet->SetSecurityOption(sendMessage.option);
567     packet->MoveInRowDataSet(std::move(dataSet));
568     return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
569 }
570 
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)571 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
572     const std::string &device)
573 {
574     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
575     if (storage == nullptr) {
576         ReleaseMessageAndPacket(nullptr, packet);
577         LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
578         return -E_BUSY;
579     }
580     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
581     if (message == nullptr) {
582         LOGE("[RemoteExecutor][ResponseStart] new message error");
583         storage->DecRefCount();
584         ReleaseMessageAndPacket(nullptr, packet);
585         return -E_OUT_OF_MEMORY;
586     }
587     packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
588     auto exObj = static_cast<ISyncPacket *>(packet);
589     int errCode = message->SetExternalObject(exObj);
590     if (errCode != E_OK) {
591         ReleaseMessageAndPacket(message, packet);
592         storage->DecRefCount();
593         LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
594         return errCode;
595     }
596     auto *communicator = GetAndIncCommunicator();
597     if (communicator == nullptr) {
598         ReleaseMessageAndPacket(message, nullptr);
599         storage->DecRefCount();
600         LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
601         return -E_BUSY;
602     }
603 
604     message->SetTarget(device);
605     message->SetSessionId(sessionId);
606     message->SetSequenceId(sequenceId);
607     message->SetMessageType(TYPE_RESPONSE);
608     SendConfig sendConfig;
609     SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
610     errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
611     RefObject::DecObjRef(communicator);
612     if (errCode != E_OK) {
613         ReleaseMessageAndPacket(message, nullptr);
614         LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
615     }
616     storage->DecRefCount();
617     return errCode;
618 }
619 
StartTimer(uint64_t timeout,uint32_t sessionId)620 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
621 {
622     TimerId timerId = 0u;
623     RefObject::IncObjRef(this);
624     TimerAction timeoutCallBack = [this](TimerId timerId) { return TimeoutCallBack(timerId); };
625     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
626         RefObject::DecObjRef(this);
627     }, timerId);
628     if (errCode != E_OK) {
629         RefObject::DecObjRef(this);
630         LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
631     }
632     LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
633     std::lock_guard<std::mutex> autoLock(timeoutLock_);
634     timeoutMap_[timerId] = sessionId;
635     taskFinishMap_[sessionId] = timerId;
636 }
637 
RemoveTimer(uint32_t sessionId)638 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
639 {
640     TimerId timerId = 0u;
641     {
642         std::lock_guard<std::mutex> autoLock(timeoutLock_);
643         if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
644             return;
645         }
646         timerId = taskFinishMap_[sessionId];
647         LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
648         timeoutMap_.erase(timerId);
649         taskFinishMap_.erase(sessionId);
650     }
651     RuntimeContext::GetInstance()->RemoveTimer(timerId);
652 }
653 
TimeoutCallBack(TimerId timerId)654 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
655 {
656     RefObject::IncObjRef(this);
657     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
658         DoTimeout(timerId);
659         RefObject::DecObjRef(this);
660     });
661     if (errCode != E_OK) {
662         LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
663         RefObject::DecObjRef(this);
664     }
665     return -E_NO_NEED_TIMER;
666 }
667 
DoTimeout(TimerId timerId)668 void RemoteExecutor::DoTimeout(TimerId timerId)
669 {
670     LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
671     uint32_t sessionId = 0u;
672     {
673         std::lock_guard<std::mutex> autoLock(timeoutLock_);
674         if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
675             return;
676         }
677         sessionId = timeoutMap_[timerId];
678     }
679     DoFinished(sessionId, -E_TIMEOUT);
680 }
681 
DoSendFailed(uint32_t sessionId,int errCode)682 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
683 {
684     LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
685     DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
686 }
687 
DoFinished(uint32_t sessionId,int errCode)688 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
689 {
690     Task task;
691     if (ClearTaskInfo(sessionId, task) == E_OK) {
692         LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
693     } else {
694         return;
695     }
696     RefObject::IncObjRef(this);
697     if (task.onFinished != nullptr) {
698         task.onFinished(errCode, task.result);
699         LOGD("[RemoteExecutor][DoFinished] onFinished");
700     }
701     std::string device = task.target;
702     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
703         TryExecuteTaskInLock(device);
704         RefObject::DecObjRef(this);
705     });
706     if (retCode != E_OK) {
707         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
708         RefObject::DecObjRef(this);
709     }
710 }
711 
ClearTaskInfo(uint32_t sessionId,Task & task)712 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
713 {
714     {
715         std::lock_guard<std::mutex> autoLock(taskLock_);
716         if (taskMap_.find(sessionId) == taskMap_.end()) {
717             return -E_NOT_FOUND;
718         }
719         task = taskMap_[sessionId];
720         taskMap_.erase(sessionId);
721         deviceWorkingSet_[task.target].erase(sessionId);
722     }
723     RemoveTimer(sessionId);
724     return E_OK;
725 }
726 
ClearInnerSource()727 void RemoteExecutor::ClearInnerSource()
728 {
729     {
730         std::lock_guard<std::mutex> autoLock(innerSourceLock_);
731         syncInterface_ = nullptr;
732         communicator_ = nullptr;
733     }
734     std::lock_guard<std::mutex> autoLock(msgQueueLock_);
735     LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
736     while (!searchMessageQueue_.empty()) {
737         auto entry = searchMessageQueue_.front();
738         searchMessageQueue_.pop();
739         delete entry.second;
740         entry.second = nullptr;
741     }
742 }
743 
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)744 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
745 {
746     ISyncInterface *storage = GetAndIncSyncInterface();
747     if (storage == nullptr) {
748         return -E_BUSY;
749     }
750     SecurityOption localOption;
751     int errCode = storage->GetSecurityOption(localOption);
752     storage->DecRefCount();
753     storage = nullptr;
754     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
755         return -E_SECURITY_OPTION_CHECK_ERROR;
756     }
757     if (errCode == E_OK && localOption.securityLabel == NOT_SET) {
758         LOGE("[AbilitySync] Local not set security option");
759         return -E_SECURITY_OPTION_CHECK_ERROR;
760     }
761     Task task;
762     {
763         std::lock_guard<std::mutex> autoLock(taskLock_);
764         if (taskMap_.find(sessionId) == taskMap_.end()) {
765             LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
766             return -E_FINISHED;
767         }
768         task = taskMap_[sessionId];
769     }
770     packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
771     packet->SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
772     packet->SetSql(task.condition.sql);
773     packet->SetBindArgs(task.condition.bindArgs);
774     packet->SetNeedResponse();
775     packet->SetSecLabel(errCode == -E_NOT_SUPPORT ? NOT_SUPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
776     target = task.target;
777     return E_OK;
778 }
779 
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)780 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
781 {
782     int errCode = E_OK;
783     if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
784         DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
785         delete inMsg;
786         inMsg = nullptr;
787         return;
788     }
789     switch (inMsg->GetMessageType()) {
790         case TYPE_REQUEST:
791             errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
792             break;
793         case TYPE_RESPONSE:
794             errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
795             break;
796         default:
797             LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
798             break;
799     }
800     if (errCode != -E_NOT_NEED_DELETE_MSG) {
801         delete inMsg;
802         inMsg = nullptr;
803     }
804 }
805 
IsPacketValid(uint32_t sessionId)806 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
807 {
808     std::lock_guard<std::mutex> autoLock(taskLock_);
809     return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
810 }
811 
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)812 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
813     const RemoteExecutorAckPacket *packet)
814 {
815     bool isReceiveFinished = false;
816     {
817         std::lock_guard<std::mutex> autoLock(taskLock_);
818         if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
819             LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
820             return;
821         }
822         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
823             taskMap_[sessionId].taskId, sequenceId);
824         taskMap_[sessionId].currentCount++;
825         if (packet->IsLastAck()) {
826             taskMap_[sessionId].targetCount = sequenceId;
827         }
828         taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
829         if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
830             isReceiveFinished = true;
831         }
832     }
833     if (isReceiveFinished) {
834         DoFinished(sessionId, E_OK);
835     }
836 }
837 
GetAndIncCommunicator() const838 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
839 {
840     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
841     ICommunicator *communicator = communicator_;
842     RefObject::IncObjRef(communicator);
843     return communicator;
844 }
845 
GetAndIncSyncInterface() const846 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
847 {
848     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
849     ISyncInterface *syncInterface = syncInterface_;
850     if (syncInterface != nullptr) {
851         syncInterface->IncRefCount();
852     }
853     return syncInterface;
854 }
855 
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)856 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
857 {
858     std::lock_guard<std::mutex> autoLock(taskLock_);
859     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
860         for (auto &sessionId : deviceWorkingSet_[device]) {
861             removeList.push_back(sessionId);
862         }
863     }
864     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
865         for (auto &sessionId : searchTaskQueue_[device]) {
866             removeList.push_back(sessionId);
867         }
868     }
869 }
870 
RemoveAllTask(int errCode)871 void RemoteExecutor::RemoveAllTask(int errCode)
872 {
873     std::vector<OnFinished> waitToNotify;
874     std::vector<uint32_t> removeTimerList;
875     {
876         std::lock_guard<std::mutex> autoLock(taskLock_);
877         for (auto &taskEntry : taskMap_) {
878             if (taskEntry.second.onFinished != nullptr) {
879                 waitToNotify.push_back(taskEntry.second.onFinished);
880                 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
881                     taskEntry.second.taskId, errCode);
882             }
883             if (taskEntry.second.status == Status::WORKING) {
884                 removeTimerList.push_back(taskEntry.first);
885             }
886         }
887         taskMap_.clear();
888         deviceWorkingSet_.clear();
889         searchTaskQueue_.clear();
890     }
891     for (const auto &callBack : waitToNotify) {
892         callBack(errCode, nullptr);
893     }
894     for (const auto &sessionId : removeTimerList) {
895         RemoveTimer(sessionId);
896     }
897     std::lock_guard<std::mutex> autoLock(timeoutLock_);
898     timeoutMap_.clear();
899     taskFinishMap_.clear();
900 }
901 
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)902 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
903 {
904     std::lock_guard<std::mutex> autoLock(taskLock_);
905     for (auto &entry : taskMap_) {
906         if (entry.second.connectionId == connectionId) {
907             removeList.push_back(entry.first);
908         }
909     }
910 }
911 
GetPacketSize(const std::string & device,size_t & packetSize) const912 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize) const
913 {
914     auto *communicator = GetAndIncCommunicator();
915     if (communicator == nullptr) {
916         LOGD("communicator is nullptr");
917         return -E_BUSY;
918     }
919 
920     packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
921     RefObject::DecObjRef(communicator);
922     return E_OK;
923 }
924 
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)925 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
926     const SecurityOption &localOption)
927 {
928     bool res = false;
929     if (remoteOption.securityLabel == localOption.securityLabel ||
930         (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
931         localOption.securityLabel == SecurityLabel::NOT_SET)) {
932         res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
933     }
934     if (!res) {
935         LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
936             remoteOption.securityLabel, remoteOption.securityFlag,
937             localOption.securityLabel, localOption.securityFlag);
938     }
939     return res;
940 }
941 
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)942 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
943     const std::string &device, uint32_t sessionId)
944 {
945     size_t packetSize = 0u;
946     int errCode = GetPacketSize(device, packetSize);
947     if (errCode != E_OK) {
948         return errCode;
949     }
950     SecurityOption option;
951     errCode = storage->GetSecurityOption(option);
952     if (errCode == -E_NOT_SUPPORT) {
953         option.securityLabel = NOT_SUPPORT_SEC_CLASSIFICATION;
954         errCode = E_OK;
955     }
956     if (errCode != E_OK) {
957         LOGD("GetSecurityOption errCode:%d", errCode);
958         return -E_SECURITY_OPTION_CHECK_ERROR;
959     }
960     ContinueToken token = nullptr;
961     uint32_t sequenceId = 1u;
962     do {
963         RelationalRowDataSet dataSet;
964         errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
965         if (errCode != E_OK) {
966             LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
967             break;
968         }
969         SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
970         errCode = ResponseData(std::move(dataSet), sendMessage, device);
971         if (errCode != E_OK) {
972             break;
973         }
974         sequenceId++;
975     } while (token != nullptr);
976     if (token != nullptr) {
977         storage->ReleaseRemoteQueryContinueToken(token);
978     }
979     return errCode;
980 }
981 
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)982 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
983     const SecurityOption &remoteOption)
984 {
985     if (storage == nullptr || communicator == nullptr) {
986         return -E_BUSY;
987     }
988     if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
989         return -E_NOT_SUPPORT;
990     }
991     std::string device;
992     communicator->GetLocalIdentity(device);
993     SecurityOption localOption;
994     int errCode = storage->GetSecurityOption(localOption);
995     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
996         return -E_SECURITY_OPTION_CHECK_ERROR;
997     }
998     if (remoteOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
999         return E_OK;
1000     }
1001     if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
1002         errCode = -E_SECURITY_OPTION_CHECK_ERROR;
1003     } else {
1004         errCode = E_OK;
1005     }
1006     return errCode;
1007 }
1008 
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel,uint32_t remoteVersion)1009 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
1010     int32_t remoteSecLabel, uint32_t remoteVersion)
1011 {
1012     SecurityOption localOption;
1013     int errCode = storage->GetSecurityOption(localOption);
1014     LOGI("[RemoteExecutor] remote label:%d local l:%d, f:%d, errCode:%d, remote ver %" PRIu32, remoteSecLabel,
1015          localOption.securityLabel, localOption.securityFlag, errCode, remoteVersion);
1016     if (remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION && errCode == -E_NOT_SUPPORT) {
1017         return E_OK;
1018     }
1019     if (errCode != -E_NOT_SUPPORT && localOption.securityLabel == SecurityLabel::NOT_SET) {
1020         LOGE("[RemoteExecutor] local security label not set!");
1021         return -E_SECURITY_OPTION_CHECK_ERROR;
1022     }
1023     if (remoteVersion >= RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_V4 && remoteSecLabel == NOT_SET) {
1024         LOGE("[RemoteExecutor] remote security label not set!");
1025         return -E_SECURITY_OPTION_CHECK_ERROR;
1026     }
1027     if (errCode == -E_NOT_SUPPORT) {
1028         return E_OK;
1029     }
1030     if (errCode != E_OK) {
1031         return -E_SECURITY_OPTION_CHECK_ERROR;
1032     }
1033     if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
1034         return E_OK;
1035     }
1036     if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1037         return E_OK;
1038     }
1039     return -E_SECURITY_OPTION_CHECK_ERROR;
1040 }
1041 
GetTaskCount() const1042 int32_t RemoteExecutor::GetTaskCount() const
1043 {
1044     std::lock_guard<std::mutex> autoLock(taskLock_);
1045     return static_cast<int32_t>(taskMap_.size()); // max taskMap_ size is 32
1046 }
1047 }