• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "remote_executor.h"
17 
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 namespace {
30     constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31     constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32     constexpr uint32_t MAX_QUEUE_COUNT = 10;
33     constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34 
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35     void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36     {
37         delete message;
38         message = nullptr;
39         delete packet;
40         packet = nullptr;
41     }
42 }
43 
RemoteExecutor()44 RemoteExecutor::RemoteExecutor()
45     : workingThreadsCount_(0),
46       syncInterface_(nullptr),
47       communicator_(nullptr),
48       lastSessionId_(0),
49       lastTaskId_(0),
50       closed_(false)
51 {
52 }
53 
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)54 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
55 {
56     if (syncInterface == nullptr || communicator == nullptr) {
57         return -E_INVALID_ARGS;
58     }
59     closed_ = false;
60     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
61     syncInterface_ = syncInterface;
62     communicator_ = communicator;
63     return E_OK;
64 }
65 
RemoteQuery(const std::string device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)66 int RemoteExecutor::RemoteQuery(const std::string device, const RemoteCondition &condition,
67     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
68 {
69     if (closed_) {
70         return -E_BUSY;
71     }
72     if (!CheckParamValid(device, timeout)) {
73         return -E_INVALID_ARGS;
74     }
75     int errCode = E_OK;
76     int taskErrCode = E_OK;
77     SemaphoreUtils semaphore(0);
78     Task task;
79     task.result = std::make_shared<RelationalResultSetImpl>();
80     task.target = device;
81     task.timeout = timeout;
82     task.condition = condition;
83     task.onFinished = [&semaphore, &taskErrCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
84         taskErrCode = retCode;
85         result = taskResult;
86         semaphore.SendSemaphore();
87     };
88     task.connectionId = connectionId;
89     errCode = RemoteQueryInner(task);
90     if (errCode != E_OK) {
91         return errCode;
92     }
93     semaphore.WaitSemaphore();
94     return taskErrCode;
95 }
96 
ReceiveMessage(const std::string & targetDev,Message * inMsg)97 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
98 {
99     if (inMsg == nullptr) {
100         return -E_INVALID_ARGS;
101     }
102     if (closed_) {
103         LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
104         delete inMsg;
105         inMsg = nullptr;
106         return -E_BUSY;
107     }
108     RefObject::IncObjRef(this);
109     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
110         ReceiveMessageInner(targetDev, inMsg);
111         RefObject::DecObjRef(this);
112     });
113     if (errCode != E_OK) {
114         RefObject::DecObjRef(this);
115     }
116     return errCode;
117 }
118 
NotifyDeviceOffline(const std::string & device)119 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
120 {
121     if (closed_) {
122         return;
123     }
124     LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
125     std::vector<uint32_t> removeList;
126     RemoveTaskByDevice(device, removeList);
127     for (const auto &sessionId : removeList) {
128         DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
129     }
130 }
131 
NotifyUserChange()132 void RemoteExecutor::NotifyUserChange()
133 {
134     if (closed_) {
135         return;
136     }
137     LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
138     RemoveAllTask(-E_USER_CHANGE);
139     LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
140 }
141 
Close()142 void RemoteExecutor::Close()
143 {
144     closed_ = true;
145     LOGD("[RemoteExecutor][Close] close enter");
146     RemoveAllTask(-E_BUSY);
147     ClearInnerSource();
148     {
149         std::unique_lock<std::mutex> lock(msgQueueLock_);
150         clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
151     }
152     LOGD("[RemoteExecutor][Close] close exist");
153 }
154 
NotifyConnectionClosed(uint64_t connectionId)155 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
156 {
157     if (closed_) {
158         return;
159     }
160     std::vector<uint32_t> removeList;
161     RemoveTaskByConnection(connectionId, removeList);
162     for (const auto &sessionId : removeList) {
163         DoFinished(sessionId, -E_BUSY);
164     }
165 }
166 
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)167 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
168 {
169     LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
170     {
171         std::lock_guard<std::mutex> autoLock(msgQueueLock_);
172         searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
173         if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
174             // message deal in work thread, exist here
175             return -E_NOT_NEED_DELETE_MSG;
176         }
177         workingThreadsCount_++;
178     }
179     bool empty = true;
180     do {
181         std::pair<std::string, Message *> entry;
182         {
183             std::lock_guard<std::mutex> autoLock(msgQueueLock_);
184             empty = searchMessageQueue_.empty();
185             if (empty) {
186                 workingThreadsCount_--;
187                 continue;
188             }
189             entry = searchMessageQueue_.front();
190             searchMessageQueue_.pop();
191         }
192         ParseOneRequestMessage(entry.first, entry.second);
193         delete entry.second;
194         entry.second = nullptr;
195     } while (!empty);
196     clearCV_.notify_one();
197     return -E_NOT_NEED_DELETE_MSG;
198 }
199 
ParseOneRequestMessage(const std::string & device,Message * inMsg)200 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
201 {
202     if (closed_) {
203         LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
204         return;
205     }
206     int errCode = CheckPermissions(device, inMsg);
207     if (errCode != E_OK) {
208         (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
209         return;
210     }
211     errCode = SendRemoteExecutorData(device, inMsg);
212     if (errCode != E_OK) {
213         (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
214     }
215 }
216 
CheckPermissions(const std::string & device,Message * inMsg)217 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
218 {
219     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
220     if (storage == nullptr) {
221         LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
222         return -E_BUSY;
223     }
224     // permission check
225     std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
226     std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
227     std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
228     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
229     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
230         { userId, appId, storeId, device, instanceId }, CHECK_FLAG_SEND);
231     if (errCode != E_OK) {
232         LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
233         storage->DecRefCount();
234         return errCode;
235     }
236     const auto *requestPacket = inMsg->GetObject<RemoteExecutorRequestPacket>();
237     if (requestPacket == nullptr) {
238         LOGE("[RemoteExecutor] get packet object failed");
239         storage->DecRefCount();
240         return -E_INVALID_ARGS;
241     }
242     errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel());
243     storage->DecRefCount();
244     return errCode;
245 }
246 
SendRemoteExecutorData(const std::string & device,const Message * inMsg)247 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
248 {
249     auto *syncInterface = GetAndIncSyncInterface();
250     if (syncInterface == nullptr) {
251         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
252         return -E_INVALID_ARGS;
253     }
254     if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
255         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
256         syncInterface->DecRefCount();
257         return -E_NOT_SUPPORT;
258     }
259     RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
260 
261     const RemoteExecutorRequestPacket *requestPacket = inMsg->GetObject<RemoteExecutorRequestPacket>();
262     if (requestPacket == nullptr) {
263         LOGE("[RemoteExecutor] get packet object failed");
264         storage->DecRefCount();
265         return -E_INVALID_ARGS;
266     }
267 
268     int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
269     storage->DecRefCount();
270     return errCode;
271 }
272 
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)273 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
274 {
275     auto *packet = inMsg->GetObject<RemoteExecutorAckPacket>();
276     if (packet == nullptr) {
277         return -E_INVALID_ARGS;
278     }
279     int errCode = packet->GetAckCode();
280     uint32_t sessionId = inMsg->GetSessionId();
281     uint32_t sequenceId = inMsg->GetSequenceId();
282     if (!IsPacketValid(sessionId)) {
283         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
284         return -E_INVALID_ARGS;
285     }
286     if (errCode == E_OK) {
287         auto storage = GetAndIncSyncInterface();
288         auto communicator = GetAndIncCommunicator();
289         errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
290         if (storage != nullptr) {
291             storage->DecRefCount();
292         }
293         RefObject::DecObjRef(communicator);
294     }
295     if (errCode != E_OK) {
296         DoFinished(sessionId, errCode);
297     } else {
298         ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
299     }
300     return E_OK;
301 }
302 
CheckParamValid(const std::string & device,uint64_t timeout) const303 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
304 {
305     if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
306         LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
307         return false;
308     }
309     if (device.empty()) {
310         LOGD("[RemoteExecutor][CheckParamValid] device is empty");
311         return false;
312     }
313     if (device.length() > DBConstant::MAX_DEV_LENGTH) {
314         LOGE("[RemoteExecutor] dev is too long len=%zu", device.length());
315         return false;
316     }
317     ICommunicator *communicator = GetAndIncCommunicator();
318     if (communicator == nullptr) {
319         return false;
320     }
321     std::string localId;
322     int errCode = communicator->GetLocalIdentity(localId);
323     RefObject::DecObjRef(communicator);
324     if (errCode != E_OK) {
325         return false;
326     }
327     if (localId == device) {
328         LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
329         return false;
330     }
331     return true;
332 }
333 
CheckTaskExeStatus(const std::string & device)334 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
335 {
336     uint32_t queueCount = 0u;
337     uint32_t exeTaskCount = 0u;
338     uint32_t totalCount = 0u; // waiting task count in all queue
339     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
340         queueCount = searchTaskQueue_.at(device).size();
341     }
342     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
343         exeTaskCount = deviceWorkingSet_.at(device).size();
344     }
345     for (auto &entry : searchTaskQueue_) {
346         int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
347         int currentQueueCount = static_cast<int>(entry.second.size());
348         if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
349             // all task in this queue can execute, no need calculate as waiting task count
350             continue;
351         }
352         totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
353             static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
354     }
355     return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
356         (totalCount + 1 <= MAX_QUEUE_COUNT);
357 }
358 
GenerateSessionId()359 uint32_t RemoteExecutor::GenerateSessionId()
360 {
361     uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
362     while (taskMap_.find(sessionId) != taskMap_.end()) {
363         sessionId++;
364         if (sessionId == 0) { // if over flow start with 1
365             sessionId++;
366         }
367     }
368     lastSessionId_ = sessionId;
369     return sessionId;
370 }
371 
GenerateTaskId()372 uint32_t RemoteExecutor::GenerateTaskId()
373 {
374     lastTaskId_++;
375     if (lastTaskId_ == 0) { // if over flow start with 1
376         lastTaskId_++;
377     }
378     return lastTaskId_;
379 }
380 
RemoteQueryInner(const Task & task)381 int RemoteExecutor::RemoteQueryInner(const Task &task)
382 {
383     uint32_t sessionId = 0u;
384     {
385         // check task count and push into queue in lock
386         std::lock_guard<std::mutex> autoLock(taskLock_);
387         if (!CheckTaskExeStatus(task.target)) {
388             LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
389             return -E_MAX_LIMITS;
390         }
391         sessionId = GenerateSessionId();
392         searchTaskQueue_[task.target].push_back(sessionId);
393         if (taskMap_.find(sessionId) != taskMap_.end()) {
394             LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
395             return -E_INTERNAL_ERROR; // should not happen
396         }
397         taskMap_[sessionId] = task;
398         taskMap_[sessionId].taskId = GenerateTaskId();
399         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
400             taskMap_[sessionId].taskId, task.target.c_str());
401     }
402     std::string device = task.target;
403     RefObject::IncObjRef(this);
404     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
405         TryExecuteTaskInLock(device);
406         RefObject::DecObjRef(this);
407     });
408     if (errCode != E_OK) {
409         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
410         DoRollBack(sessionId);
411         RefObject::DecObjRef(this);
412     }
413     return errCode;
414 }
415 
TryExecuteTaskInLock(const std::string & device)416 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
417 {
418     uint32_t sessionId = 0u;
419     {
420         std::lock_guard<std::mutex> autoLock(taskLock_);
421         if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
422             return;
423         }
424         if (searchTaskQueue_[device].empty()) {
425             LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
426             return;
427         }
428         sessionId = searchTaskQueue_[device].front();
429         if (taskMap_.find(sessionId) == taskMap_.end()) {
430             searchTaskQueue_[device].pop_front();
431             LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
432             return;
433         }
434         taskMap_[sessionId].status = Status::WORKING;
435         searchTaskQueue_[device].pop_front();
436         deviceWorkingSet_[device].insert(sessionId);
437         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
438         StartTimer(taskMap_[sessionId].timeout, sessionId);
439     }
440     int errCode = RequestStart(sessionId);
441     if (errCode != E_OK) {
442         DoFinished(sessionId, errCode);
443     }
444 }
445 
DoRollBack(uint32_t sessionId)446 void RemoteExecutor::DoRollBack(uint32_t sessionId)
447 {
448     Task task;
449     std::lock_guard<std::mutex> autoLock(taskLock_);
450     if (taskMap_.find(sessionId) == taskMap_.end()) {
451         return;
452     }
453     task = taskMap_[sessionId];
454     if (task.status != Status::WAITING) {
455         // task is execute, abort roll back
456         return;
457     }
458     taskMap_.erase(sessionId);
459 
460     auto iter = searchTaskQueue_[task.target].begin();
461     while (iter != searchTaskQueue_[task.target].end()) {
462         if ((*iter) == sessionId) {
463             break;
464         }
465         iter++;
466     }
467     if (iter != searchTaskQueue_[task.target].end()) {
468         searchTaskQueue_[task.target].erase(iter);
469     }
470     // this task should not in workingSet
471     deviceWorkingSet_[task.target].erase(sessionId);
472 }
473 
RequestStart(uint32_t sessionId)474 int RemoteExecutor::RequestStart(uint32_t sessionId)
475 {
476     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
477     if (message == nullptr) {
478         LOGE("[RemoteExecutor][RequestStart] new message error");
479         return -E_OUT_OF_MEMORY;
480     }
481     message->SetSessionId(sessionId);
482     message->SetMessageType(TYPE_REQUEST);
483     auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
484     if (packet == nullptr) {
485         LOGE("[RemoteExecutor][RequestStart] new packet error");
486         ReleaseMessageAndPacket(message, nullptr);
487         return -E_OUT_OF_MEMORY;
488     }
489     std::string target;
490     int errCode = FillRequestPacket(packet, sessionId, target);
491     if (errCode != E_OK) {
492         ReleaseMessageAndPacket(message, packet);
493         return errCode;
494     }
495     errCode = message->SetExternalObject(packet);
496     if (errCode != E_OK) {
497         ReleaseMessageAndPacket(message, packet);
498         LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
499         return errCode;
500     }
501     return SendRequestMessage(target, message, sessionId);
502 }
503 
SendRequestMessage(const std::string & target,Message * message,uint32_t sessionId)504 int RemoteExecutor::SendRequestMessage(const std::string &target, Message *message, uint32_t sessionId)
505 {
506     auto communicator = GetAndIncCommunicator();
507     auto syncInterface = GetAndIncSyncInterface();
508     if (communicator == nullptr || syncInterface == nullptr) {
509         ReleaseMessageAndPacket(message, nullptr);
510         if (syncInterface != nullptr) {
511             syncInterface->DecRefCount();
512         }
513         RefObject::DecObjRef(communicator);
514         return -E_BUSY;
515     }
516     SendConfig sendConfig;
517     SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
518     RefObject::IncObjRef(this);
519     int errCode = communicator->SendMessage(target, message, sendConfig, [this, sessionId](int errCode) {
520         if (errCode != E_OK) {
521             DoSendFailed(sessionId, errCode);
522         }
523         RefObject::DecObjRef(this);
524     });
525     if (errCode != E_OK) {
526         ReleaseMessageAndPacket(message, nullptr);
527         RefObject::DecObjRef(this);
528     }
529     RefObject::DecObjRef(communicator);
530     syncInterface->DecRefCount();
531     return errCode;
532 }
533 
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)534 int RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
535     const std::string &device)
536 {
537     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
538     if (packet == nullptr) {
539         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
540         return -E_OUT_OF_MEMORY;
541     }
542     packet->SetAckCode(errCode);
543     packet->SetLastAck();
544     return ResponseStart(packet, sessionId, sequenceId, device);
545 }
546 
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)547 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
548     const std::string &device)
549 {
550     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
551     if (packet == nullptr) {
552         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
553         return -E_OUT_OF_MEMORY;
554     }
555     packet->SetAckCode(E_OK);
556     if (sendMessage.isLast) {
557         packet->SetLastAck();
558     }
559     packet->SetSecurityOption(sendMessage.option);
560     packet->MoveInRowDataSet(std::move(dataSet));
561     return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
562 }
563 
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)564 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
565     const std::string &device)
566 {
567     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
568     if (storage == nullptr) {
569         ReleaseMessageAndPacket(nullptr, packet);
570         LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
571         return -E_BUSY;
572     }
573     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
574     if (message == nullptr) {
575         LOGE("[RemoteExecutor][ResponseStart] new message error");
576         storage->DecRefCount();
577         ReleaseMessageAndPacket(nullptr, packet);
578         return -E_OUT_OF_MEMORY;
579     }
580     packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
581 
582     int errCode = message->SetExternalObject(packet);
583     if (errCode != E_OK) {
584         ReleaseMessageAndPacket(message, packet);
585         storage->DecRefCount();
586         LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
587         return errCode;
588     }
589     auto *communicator = GetAndIncCommunicator();
590     if (communicator == nullptr) {
591         ReleaseMessageAndPacket(message, nullptr);
592         storage->DecRefCount();
593         LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
594         return -E_BUSY;
595     }
596 
597     message->SetTarget(device);
598     message->SetSessionId(sessionId);
599     message->SetSequenceId(sequenceId);
600     message->SetMessageType(TYPE_RESPONSE);
601     SendConfig sendConfig;
602     SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
603     errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
604     RefObject::DecObjRef(communicator);
605     if (errCode != E_OK) {
606         ReleaseMessageAndPacket(message, nullptr);
607         LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
608     }
609     storage->DecRefCount();
610     return errCode;
611 }
612 
StartTimer(uint64_t timeout,uint32_t sessionId)613 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
614 {
615     TimerId timerId = 0u;
616     RefObject::IncObjRef(this);
617     TimerAction timeoutCallBack = std::bind(&RemoteExecutor::TimeoutCallBack, this, std::placeholders::_1);
618     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
619         RefObject::DecObjRef(this);
620     }, timerId);
621     if (errCode != E_OK) {
622         RefObject::DecObjRef(this);
623         LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
624     }
625     LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
626     std::lock_guard<std::mutex> autoLock(timeoutLock_);
627     timeoutMap_[timerId] = sessionId;
628     taskFinishMap_[sessionId] = timerId;
629 }
630 
RemoveTimer(uint32_t sessionId)631 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
632 {
633     TimerId timerId = 0u;
634     {
635         std::lock_guard<std::mutex> autoLock(timeoutLock_);
636         if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
637             return;
638         }
639         timerId = taskFinishMap_[sessionId];
640         LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
641         timeoutMap_.erase(timerId);
642         taskFinishMap_.erase(sessionId);
643     }
644     RuntimeContext::GetInstance()->RemoveTimer(timerId);
645 }
646 
TimeoutCallBack(TimerId timerId)647 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
648 {
649     RefObject::IncObjRef(this);
650     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
651         DoTimeout(timerId);
652         RefObject::DecObjRef(this);
653     });
654     if (errCode != E_OK) {
655         LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
656         RefObject::DecObjRef(this);
657     }
658     return -E_NO_NEED_TIMER;
659 }
660 
DoTimeout(TimerId timerId)661 void RemoteExecutor::DoTimeout(TimerId timerId)
662 {
663     LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
664     uint32_t sessionId = 0u;
665     {
666         std::lock_guard<std::mutex> autoLock(timeoutLock_);
667         if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
668             return;
669         }
670         sessionId = timeoutMap_[timerId];
671     }
672     DoFinished(sessionId, -E_TIMEOUT);
673 }
674 
DoSendFailed(uint32_t sessionId,int errCode)675 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
676 {
677     LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
678     DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
679 }
680 
DoFinished(uint32_t sessionId,int errCode)681 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
682 {
683     Task task;
684     if (ClearTaskInfo(sessionId, task) == E_OK) {
685         LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
686     } else {
687         return;
688     }
689     RefObject::IncObjRef(this);
690     if (task.onFinished != nullptr) {
691         task.onFinished(errCode, task.result);
692         LOGD("[RemoteExecutor][DoFinished] onFinished");
693     }
694     std::string device = task.target;
695     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
696         TryExecuteTaskInLock(device);
697         RefObject::DecObjRef(this);
698     });
699     if (retCode != E_OK) {
700         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
701         RefObject::DecObjRef(this);
702     }
703 }
704 
ClearTaskInfo(uint32_t sessionId,Task & task)705 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
706 {
707     {
708         std::lock_guard<std::mutex> autoLock(taskLock_);
709         if (taskMap_.find(sessionId) == taskMap_.end()) {
710             return -E_NOT_FOUND;
711         }
712         task = taskMap_[sessionId];
713         taskMap_.erase(sessionId);
714         deviceWorkingSet_[task.target].erase(sessionId);
715     }
716     RemoveTimer(sessionId);
717     return E_OK;
718 }
719 
ClearInnerSource()720 void RemoteExecutor::ClearInnerSource()
721 {
722     {
723         std::lock_guard<std::mutex> autoLock(innerSourceLock_);
724         syncInterface_ = nullptr;
725         communicator_ = nullptr;
726     }
727     std::lock_guard<std::mutex> autoLock(msgQueueLock_);
728     LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
729     while (!searchMessageQueue_.empty()) {
730         auto entry = searchMessageQueue_.front();
731         searchMessageQueue_.pop();
732         delete entry.second;
733         entry.second = nullptr;
734     }
735 }
736 
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)737 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
738 {
739     ISyncInterface *storage = GetAndIncSyncInterface();
740     if (storage == nullptr) {
741         return -E_BUSY;
742     }
743     SecurityOption localOption;
744     int errCode = storage->GetSecurityOption(localOption);
745     storage->DecRefCount();
746     storage = nullptr;
747     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
748         return -E_SECURITY_OPTION_CHECK_ERROR;
749     }
750     Task task;
751     {
752         std::lock_guard<std::mutex> autoLock(taskLock_);
753         if (taskMap_.find(sessionId) == taskMap_.end()) {
754             LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
755             return -E_FINISHED;
756         }
757         task = taskMap_[sessionId];
758     }
759     packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
760     packet->SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
761     packet->SetSql(task.condition.sql);
762     packet->SetBindArgs(task.condition.bindArgs);
763     packet->SetNeedResponse();
764     packet->SetSecLabel(errCode == E_NOT_SUPPORT ? NOT_SURPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
765     target = task.target;
766     return E_OK;
767 }
768 
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)769 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
770 {
771     int errCode = E_OK;
772     if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
773         DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
774         delete inMsg;
775         inMsg = nullptr;
776         return;
777     }
778     switch (inMsg->GetMessageType()) {
779         case TYPE_REQUEST:
780             errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
781             break;
782         case TYPE_RESPONSE:
783             errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
784             break;
785         default:
786             LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
787             break;
788     }
789     if (errCode != -E_NOT_NEED_DELETE_MSG) {
790         delete inMsg;
791         inMsg = nullptr;
792     }
793 }
794 
IsPacketValid(uint32_t sessionId)795 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
796 {
797     std::lock_guard<std::mutex> autoLock(taskLock_);
798     return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
799 }
800 
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)801 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
802     const RemoteExecutorAckPacket *packet)
803 {
804     bool isReceiveFinished = false;
805     {
806         std::lock_guard<std::mutex> autoLock(taskLock_);
807         if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
808             LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
809             return;
810         }
811         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
812             taskMap_[sessionId].taskId, sequenceId);
813         taskMap_[sessionId].currentCount++;
814         if (packet->IsLastAck()) {
815             taskMap_[sessionId].targetCount = sequenceId;
816         }
817         taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
818         if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
819             isReceiveFinished = true;
820         }
821     }
822     if (isReceiveFinished) {
823         DoFinished(sessionId, E_OK);
824     }
825 }
826 
GetAndIncCommunicator() const827 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
828 {
829     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
830     ICommunicator *communicator = communicator_;
831     RefObject::IncObjRef(communicator);
832     return communicator;
833 }
834 
GetAndIncSyncInterface() const835 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
836 {
837     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
838     ISyncInterface *syncInterface = syncInterface_;
839     if (syncInterface != nullptr) {
840         syncInterface->IncRefCount();
841     }
842     return syncInterface;
843 }
844 
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)845 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
846 {
847     std::lock_guard<std::mutex> autoLock(taskLock_);
848     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
849         for (auto &sessionId : deviceWorkingSet_[device]) {
850             removeList.push_back(sessionId);
851         }
852     }
853     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
854         for (auto &sessionId : searchTaskQueue_[device]) {
855             removeList.push_back(sessionId);
856         }
857     }
858 }
859 
RemoveAllTask(int errCode)860 void RemoteExecutor::RemoveAllTask(int errCode)
861 {
862     std::vector<OnFinished> waitToNotify;
863     std::vector<uint32_t> removeTimerList;
864     {
865         std::lock_guard<std::mutex> autoLock(taskLock_);
866         for (auto &taskEntry : taskMap_) {
867             if (taskEntry.second.onFinished != nullptr) {
868                 waitToNotify.push_back(taskEntry.second.onFinished);
869                 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
870                     taskEntry.second.taskId, errCode);
871             }
872             if (taskEntry.second.status == Status::WORKING) {
873                 removeTimerList.push_back(taskEntry.first);
874             }
875         }
876         taskMap_.clear();
877         deviceWorkingSet_.clear();
878         searchTaskQueue_.clear();
879     }
880     for (const auto &callBack : waitToNotify) {
881         callBack(errCode, nullptr);
882     }
883     for (const auto &sessionId : removeTimerList) {
884         RemoveTimer(sessionId);
885     }
886     std::lock_guard<std::mutex> autoLock(timeoutLock_);
887     timeoutMap_.clear();
888     taskFinishMap_.clear();
889 }
890 
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)891 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
892 {
893     std::lock_guard<std::mutex> autoLock(taskLock_);
894     for (auto &entry : taskMap_) {
895         if (entry.second.connectionId == connectionId) {
896             removeList.push_back(entry.first);
897         }
898     }
899 }
900 
GetPacketSize(const std::string & device,size_t & packetSize) const901 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize) const
902 {
903     auto *communicator = GetAndIncCommunicator();
904     if (communicator == nullptr) {
905         LOGD("communicator is nullptr");
906         return -E_BUSY;
907     }
908 
909     packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
910     RefObject::DecObjRef(communicator);
911     return E_OK;
912 }
913 
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)914 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
915     const SecurityOption &localOption)
916 {
917     bool res = false;
918     if (remoteOption.securityLabel == localOption.securityLabel ||
919         (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
920         localOption.securityLabel == SecurityLabel::NOT_SET)) {
921         res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
922     }
923     if (!res) {
924         LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
925             remoteOption.securityLabel, remoteOption.securityFlag,
926             localOption.securityLabel, localOption.securityFlag);
927     }
928     return res;
929 }
930 
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)931 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
932     const std::string &device, uint32_t sessionId)
933 {
934     size_t packetSize = 0u;
935     int errCode = GetPacketSize(device, packetSize);
936     if (errCode != E_OK) {
937         return errCode;
938     }
939     SecurityOption option;
940     errCode = storage->GetSecurityOption(option);
941     if (errCode == -E_NOT_SUPPORT) {
942         option.securityLabel = NOT_SURPPORT_SEC_CLASSIFICATION;
943         errCode = E_OK;
944     }
945     if (errCode != E_OK) {
946         LOGD("GetSecurityOption errCode:%d", errCode);
947         return -E_SECURITY_OPTION_CHECK_ERROR;
948     }
949     ContinueToken token = nullptr;
950     uint32_t sequenceId = 1u;
951     do {
952         RelationalRowDataSet dataSet;
953         errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
954         if (errCode != E_OK) {
955             LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
956             break;
957         }
958         SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
959         errCode = ResponseData(std::move(dataSet), sendMessage, device);
960         if (errCode != E_OK) {
961             break;
962         }
963         sequenceId++;
964     } while (token != nullptr);
965     if (token != nullptr) {
966         storage->ReleaseRemoteQueryContinueToken(token);
967     }
968     return errCode;
969 }
970 
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)971 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
972     const SecurityOption &remoteOption)
973 {
974     if (storage == nullptr || communicator == nullptr) {
975         return -E_BUSY;
976     }
977     if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
978         return -E_NOT_SUPPORT;
979     }
980     std::string device;
981     communicator->GetLocalIdentity(device);
982     SecurityOption localOption;
983     int errCode = storage->GetSecurityOption(localOption);
984     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
985         return -E_SECURITY_OPTION_CHECK_ERROR;
986     }
987     if (remoteOption.securityLabel == NOT_SURPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
988         return E_OK;
989     }
990     if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
991         errCode = -E_SECURITY_OPTION_CHECK_ERROR;
992     } else {
993         errCode = E_OK;
994     }
995     return errCode;
996 }
997 
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel)998 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
999     int32_t remoteSecLabel)
1000 {
1001     SecurityOption localOption;
1002     int errCode = storage->GetSecurityOption(localOption);
1003     if (errCode == -E_NOT_SUPPORT) {
1004         return E_OK;
1005     }
1006     if (errCode != E_OK) {
1007         return -E_SECURITY_OPTION_CHECK_ERROR;
1008     }
1009     if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SURPPORT_SEC_CLASSIFICATION) {
1010         return E_OK;
1011     }
1012     if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1013         return E_OK;
1014     }
1015     return -E_SECURITY_OPTION_CHECK_ERROR;
1016 }
1017 }