• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "remote_executor.h"
17 
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 namespace {
30     constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31     constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32     constexpr uint32_t MAX_QUEUE_COUNT = 10;
33     constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34 
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35     void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36     {
37         delete message;
38         message = nullptr;
39         delete packet;
40         packet = nullptr;
41     }
42 }
43 
RemoteExecutor()44 RemoteExecutor::RemoteExecutor()
45     : workingThreadsCount_(0),
46       syncInterface_(nullptr),
47       communicator_(nullptr),
48       lastSessionId_(0),
49       lastTaskId_(0),
50       closed_(false)
51 {
52 }
53 
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)54 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
55 {
56     if (syncInterface == nullptr || communicator == nullptr) {
57         return -E_INVALID_ARGS;
58     }
59     closed_ = false;
60     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
61     syncInterface_ = syncInterface;
62     communicator_ = communicator;
63     return E_OK;
64 }
65 
RemoteQuery(const std::string device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)66 int RemoteExecutor::RemoteQuery(const std::string device, const RemoteCondition &condition,
67     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
68 {
69     if (closed_) {
70         return -E_BUSY;
71     }
72     if (!CheckParamValid(device, timeout)) {
73         return -E_INVALID_ARGS;
74     }
75     int errCode = E_OK;
76     SemaphoreUtils semaphore(0);
77     Task task;
78     task.result = std::make_shared<RelationalResultSetImpl>();
79     task.target = device;
80     task.timeout = timeout;
81     task.condition = condition;
82     task.onFinished = [&semaphore, &errCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
83         errCode = retCode;
84         result = taskResult;
85         semaphore.SendSemaphore();
86     };
87     task.connectionId = connectionId;
88     errCode = RemoteQueryInner(task);
89     if (errCode != E_OK) {
90         return errCode;
91     }
92     semaphore.WaitSemaphore();
93     return errCode;
94 }
95 
ReceiveMessage(const std::string & targetDev,Message * inMsg)96 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
97 {
98     if (inMsg == nullptr) {
99         return -E_INVALID_ARGS;
100     }
101     if (closed_) {
102         LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
103         delete inMsg;
104         inMsg = nullptr;
105         return -E_BUSY;
106     }
107     RefObject::IncObjRef(this);
108     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
109         ReceiveMessageInner(targetDev, inMsg);
110         RefObject::DecObjRef(this);
111     });
112     if (errCode != E_OK) {
113         RefObject::DecObjRef(this);
114     }
115     return errCode;
116 }
117 
NotifyDeviceOffline(const std::string & device)118 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
119 {
120     if (closed_) {
121         return;
122     }
123     LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
124     std::vector<uint32_t> removeList;
125     RemoveTaskByDevice(device, removeList);
126     for (const auto &sessionId : removeList) {
127         DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
128     }
129 }
130 
NotifyUserChange()131 void RemoteExecutor::NotifyUserChange()
132 {
133     if (closed_) {
134         return;
135     }
136     LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
137     RemoveAllTask(-E_USER_CHANGE);
138     LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
139 }
140 
Close()141 void RemoteExecutor::Close()
142 {
143     closed_ = true;
144     LOGD("[RemoteExecutor][Close] close enter");
145     RemoveAllTask(-E_BUSY);
146     ClearInnerSource();
147     {
148         std::unique_lock<std::mutex> lock(msgQueueLock_);
149         clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
150     }
151     LOGD("[RemoteExecutor][Close] close exist");
152 }
153 
NotifyConnectionClosed(uint64_t connectionId)154 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
155 {
156     if (closed_) {
157         return;
158     }
159     std::vector<uint32_t> removeList;
160     RemoveTaskByConnection(connectionId, removeList);
161     for (const auto &sessionId : removeList) {
162         DoFinished(sessionId, -E_BUSY);
163     }
164 }
165 
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)166 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
167 {
168     LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
169     {
170         std::lock_guard<std::mutex> autoLock(msgQueueLock_);
171         searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
172         if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
173             // message deal in work thread, exist here
174             return -E_NOT_NEED_DELETE_MSG;
175         }
176         workingThreadsCount_++;
177     }
178     RefObject::IncObjRef(this);
179     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
180         bool empty = true;
181         do {
182             std::pair<std::string, Message *> entry;
183             {
184                 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
185                 empty = searchMessageQueue_.empty();
186                 if (empty) {
187                     workingThreadsCount_--;
188                     continue;
189                 }
190                 entry = searchMessageQueue_.front();
191                 searchMessageQueue_.pop();
192             }
193             ParseOneRequestMessage(entry.first, entry.second);
194             delete entry.second;
195             entry.second = nullptr;
196         } while (!empty);
197         clearCV_.notify_one();
198         RefObject::DecObjRef(this);
199     });
200     if (errCode != E_OK) {
201         workingThreadsCount_--;
202         clearCV_.notify_one();
203         RefObject::DecObjRef(this);
204     } else {
205         errCode = -E_NOT_NEED_DELETE_MSG;
206     }
207     return errCode;
208 }
209 
ParseOneRequestMessage(const std::string & device,Message * inMsg)210 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
211 {
212     if (closed_) {
213         LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
214         return;
215     }
216     int errCode = CheckPermissions(device, inMsg);
217     if (errCode != E_OK) {
218         ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
219         return;
220     }
221     errCode = SendRemoteExecutorData(device, inMsg);
222     if (errCode != E_OK) {
223         ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
224     }
225 }
226 
CheckPermissions(const std::string & device,Message * inMsg)227 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
228 {
229     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
230     if (storage == nullptr) {
231         LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
232         return -E_BUSY;
233     }
234     // permission check
235     std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
236     std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
237     std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
238     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
239     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
240         { userId, appId, storeId, device, instanceId }, CHECK_FLAG_SEND);
241     if (errCode != E_OK) {
242         LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
243         storage->DecRefCount();
244         return errCode;
245     }
246     const auto *requestPacket = inMsg->GetObject<RemoteExecutorRequestPacket>();
247     if (requestPacket == nullptr) {
248         LOGE("[RemoteExecutor] get packet object failed");
249         storage->DecRefCount();
250         return -E_INVALID_ARGS;
251     }
252     errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel());
253     storage->DecRefCount();
254     return errCode;
255 }
256 
SendRemoteExecutorData(const std::string & device,const Message * inMsg)257 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
258 {
259     auto *syncInterface = GetAndIncSyncInterface();
260     if (syncInterface == nullptr) {
261         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
262         return -E_INVALID_ARGS;
263     }
264     if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
265         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
266         syncInterface->DecRefCount();
267         return -E_NOT_SUPPORT;
268     }
269     RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
270 
271     const RemoteExecutorRequestPacket *requestPacket = inMsg->GetObject<RemoteExecutorRequestPacket>();
272     if (requestPacket == nullptr) {
273         LOGE("[RemoteExecutor] get packet object failed");
274         storage->DecRefCount();
275         return -E_INVALID_ARGS;
276     }
277 
278     int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
279     storage->DecRefCount();
280     return errCode;
281 }
282 
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)283 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
284 {
285     auto *packet = inMsg->GetObject<RemoteExecutorAckPacket>();
286     if (packet == nullptr) {
287         return -E_INVALID_ARGS;
288     }
289     int errCode = packet->GetAckCode();
290     uint32_t sessionId = inMsg->GetSessionId();
291     uint32_t sequenceId = inMsg->GetSequenceId();
292     if (!IsPacketValid(sessionId)) {
293         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
294         return -E_INVALID_ARGS;
295     }
296     if (errCode == E_OK) {
297         auto storage = GetAndIncSyncInterface();
298         auto communicator = GetAndIncCommunicator();
299         errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
300         if (storage != nullptr) {
301             storage->DecRefCount();
302         }
303         RefObject::DecObjRef(communicator);
304     }
305     if (errCode != E_OK) {
306         DoFinished(sessionId, errCode);
307     } else {
308         ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
309     }
310     return E_OK;
311 }
312 
CheckParamValid(const std::string & device,uint64_t timeout) const313 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
314 {
315     if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
316         LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
317         return false;
318     }
319     if (device.empty()) {
320         LOGD("[RemoteExecutor][CheckParamValid] device is empty");
321         return false;
322     }
323     ICommunicator *communicator = GetAndIncCommunicator();
324     if (communicator == nullptr) {
325         return false;
326     }
327     std::string localId;
328     int errCode = communicator->GetLocalIdentity(localId);
329     RefObject::DecObjRef(communicator);
330     if (errCode != E_OK) {
331         return false;
332     }
333     if (localId == device) {
334         LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
335         return false;
336     }
337     return true;
338 }
339 
CheckTaskExeStatus(const std::string & device)340 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
341 {
342     uint32_t queueCount = 0u;
343     uint32_t exeTaskCount = 0u;
344     uint32_t totalCount = 0u; // waiting task count in all queue
345     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
346         queueCount = searchTaskQueue_.at(device).size();
347     }
348     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
349         exeTaskCount = deviceWorkingSet_.at(device).size();
350     }
351     for (auto &entry : searchTaskQueue_) {
352         int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
353         int currentQueueCount = static_cast<int>(entry.second.size());
354         if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
355             // all task in this queue can execute, no need calculate as waiting task count
356             continue;
357         }
358         totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
359             static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
360     }
361     return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
362         (totalCount + 1 <= MAX_QUEUE_COUNT);
363 }
364 
GenerateSessionId()365 uint32_t RemoteExecutor::GenerateSessionId()
366 {
367     uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
368     while (taskMap_.find(sessionId) != taskMap_.end()) {
369         sessionId++;
370         if (sessionId == 0) { // if over flow start with 1
371             sessionId++;
372         }
373     }
374     lastSessionId_ = sessionId;
375     return sessionId;
376 }
377 
GenerateTaskId()378 uint32_t RemoteExecutor::GenerateTaskId()
379 {
380     lastTaskId_++;
381     if (lastTaskId_ == 0) { // if over flow start with 1
382         lastTaskId_++;
383     }
384     return lastTaskId_;
385 }
386 
RemoteQueryInner(const Task & task)387 int RemoteExecutor::RemoteQueryInner(const Task &task)
388 {
389     uint32_t sessionId = 0u;
390     {
391         // check task count and push into queue in lock
392         std::lock_guard<std::mutex> autoLock(taskLock_);
393         if (!CheckTaskExeStatus(task.target)) {
394             LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
395             return -E_MAX_LIMITS;
396         }
397         sessionId = GenerateSessionId();
398         searchTaskQueue_[task.target].push_back(sessionId);
399         if (taskMap_.find(sessionId) != taskMap_.end()) {
400             LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
401             return -E_INTERNAL_ERROR; // should not happen
402         }
403         taskMap_[sessionId] = task;
404         taskMap_[sessionId].taskId = GenerateTaskId();
405         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
406             taskMap_[sessionId].taskId, task.target.c_str());
407     }
408     std::string device = task.target;
409     RefObject::IncObjRef(this);
410     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
411         TryExecuteTaskInLock(device);
412         RefObject::DecObjRef(this);
413     });
414     if (errCode != E_OK) {
415         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
416         DoRollBack(sessionId);
417         RefObject::DecObjRef(this);
418     }
419     return errCode;
420 }
421 
TryExecuteTaskInLock(const std::string & device)422 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
423 {
424     uint32_t sessionId = 0u;
425     {
426         std::lock_guard<std::mutex> autoLock(taskLock_);
427         if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
428             return;
429         }
430         if (searchTaskQueue_[device].empty()) {
431             LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
432             return;
433         }
434         sessionId = searchTaskQueue_[device].front();
435         if (taskMap_.find(sessionId) == taskMap_.end()) {
436             searchTaskQueue_[device].pop_front();
437             LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
438             return;
439         }
440         taskMap_[sessionId].status = Status::WORKING;
441         searchTaskQueue_[device].pop_front();
442         deviceWorkingSet_[device].insert(sessionId);
443         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
444         StartTimer(taskMap_[sessionId].timeout, sessionId);
445     }
446     int errCode = RequestStart(sessionId);
447     if (errCode != E_OK) {
448         DoFinished(sessionId, errCode);
449     }
450 }
451 
DoRollBack(uint32_t sessionId)452 void RemoteExecutor::DoRollBack(uint32_t sessionId)
453 {
454     Task task;
455     std::lock_guard<std::mutex> autoLock(taskLock_);
456     if (taskMap_.find(sessionId) == taskMap_.end()) {
457         return;
458     }
459     task = taskMap_[sessionId];
460     if (task.status != Status::WAITING) {
461         // task is execute, abort roll back
462         return;
463     }
464     taskMap_.erase(sessionId);
465 
466     auto iter = searchTaskQueue_[task.target].begin();
467     while (iter != searchTaskQueue_[task.target].end()) {
468         if ((*iter) == sessionId) {
469             break;
470         }
471         iter++;
472     }
473     if (iter != searchTaskQueue_[task.target].end()) {
474         searchTaskQueue_[task.target].erase(iter);
475     }
476     // this task should not in workingSet
477     deviceWorkingSet_[task.target].erase(sessionId);
478 }
479 
RequestStart(uint32_t sessionId)480 int RemoteExecutor::RequestStart(uint32_t sessionId)
481 {
482     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
483     if (message == nullptr) {
484         LOGE("[RemoteExecutor][RequestStart] new message error");
485         return -E_OUT_OF_MEMORY;
486     }
487     message->SetSessionId(sessionId);
488     message->SetMessageType(TYPE_REQUEST);
489     auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
490     if (packet == nullptr) {
491         LOGE("[RemoteExecutor][RequestStart] new packet error");
492         ReleaseMessageAndPacket(message, nullptr);
493         return -E_OUT_OF_MEMORY;
494     }
495     std::string target;
496     int errCode = FillRequestPacket(packet, sessionId, target);
497     if (errCode != E_OK) {
498         ReleaseMessageAndPacket(message, packet);
499         return errCode;
500     }
501     errCode = message->SetExternalObject(packet);
502     if (errCode != E_OK) {
503         ReleaseMessageAndPacket(message, packet);
504         LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
505     }
506     auto communicator = GetAndIncCommunicator();
507     auto syncInterface = GetAndIncSyncInterface();
508     if (communicator == nullptr || syncInterface == nullptr) {
509         ReleaseMessageAndPacket(message, nullptr);
510         if (syncInterface != nullptr) {
511             syncInterface->DecRefCount();
512         }
513         RefObject::DecObjRef(communicator);
514         return -E_BUSY;
515     }
516     SendConfig sendConfig;
517     SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
518     RefObject::IncObjRef(this);
519     errCode = communicator->SendMessage(target, message, sendConfig, [this, sessionId](int errCode) {
520         if (errCode != E_OK) {
521             DoSendFailed(sessionId, errCode);
522         }
523         RefObject::DecObjRef(this);
524     });
525     RefObject::DecObjRef(communicator);
526     syncInterface->DecRefCount();
527     return errCode;
528 }
529 
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)530 void RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
531     const std::string &device)
532 {
533     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
534     if (packet == nullptr) {
535         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
536         return;
537     }
538     packet->SetAckCode(errCode);
539     packet->SetLastAck();
540     (void)ResponseStart(packet, sessionId, sequenceId, device);
541 }
542 
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)543 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
544     const std::string &device)
545 {
546     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
547     if (packet == nullptr) {
548         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
549         return -E_OUT_OF_MEMORY;
550     }
551     packet->SetAckCode(E_OK);
552     if (sendMessage.isLast) {
553         packet->SetLastAck();
554     }
555     packet->SetSecurityOption(sendMessage.option);
556     packet->MoveInRowDataSet(std::move(dataSet));
557     return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
558 }
559 
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)560 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
561     const std::string &device)
562 {
563     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
564     if (storage == nullptr) {
565         ReleaseMessageAndPacket(nullptr, packet);
566         LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
567         return -E_BUSY;
568     }
569     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
570     if (message == nullptr) {
571         LOGE("[RemoteExecutor][ResponseStart] new message error");
572         storage->DecRefCount();
573         ReleaseMessageAndPacket(nullptr, packet);
574         return -E_OUT_OF_MEMORY;
575     }
576     packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
577 
578     int errCode = message->SetExternalObject(packet);
579     if (errCode != E_OK) {
580         ReleaseMessageAndPacket(message, packet);
581         storage->DecRefCount();
582         LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
583         return errCode;
584     }
585     auto *communicator = GetAndIncCommunicator();
586     if (communicator == nullptr) {
587         ReleaseMessageAndPacket(message, nullptr);
588         storage->DecRefCount();
589         LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
590         return -E_BUSY;
591     }
592 
593     message->SetTarget(device);
594     message->SetSessionId(sessionId);
595     message->SetSequenceId(sequenceId);
596     message->SetMessageType(TYPE_RESPONSE);
597     SendConfig sendConfig;
598     SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
599     errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
600     RefObject::DecObjRef(communicator);
601     if (errCode != E_OK) {
602         ReleaseMessageAndPacket(message, nullptr);
603         LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
604     }
605     storage->DecRefCount();
606     return errCode;
607 }
608 
StartTimer(uint64_t timeout,uint32_t sessionId)609 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
610 {
611     TimerId timerId = 0u;
612     RefObject::IncObjRef(this);
613     TimerAction timeoutCallBack = std::bind(&RemoteExecutor::TimeoutCallBack, this, std::placeholders::_1);
614     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
615         RefObject::DecObjRef(this);
616     }, timerId);
617     if (errCode != E_OK) {
618         RefObject::DecObjRef(this);
619         LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
620     }
621     LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
622     std::lock_guard<std::mutex> autoLock(timeoutLock_);
623     timeoutMap_[timerId] = sessionId;
624     taskFinishMap_[sessionId] = timerId;
625 }
626 
RemoveTimer(uint32_t sessionId)627 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
628 {
629     TimerId timerId = 0u;
630     {
631         std::lock_guard<std::mutex> autoLock(timeoutLock_);
632         if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
633             return;
634         }
635         timerId = taskFinishMap_[sessionId];
636         LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
637         timeoutMap_.erase(timerId);
638         taskFinishMap_.erase(sessionId);
639     }
640     RuntimeContext::GetInstance()->RemoveTimer(timerId);
641 }
642 
TimeoutCallBack(TimerId timerId)643 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
644 {
645     RefObject::IncObjRef(this);
646     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
647         DoTimeout(timerId);
648         RefObject::DecObjRef(this);
649     });
650     if (errCode != E_OK) {
651         LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
652         RefObject::DecObjRef(this);
653     }
654     return -E_NO_NEED_TIMER;
655 }
656 
DoTimeout(TimerId timerId)657 void RemoteExecutor::DoTimeout(TimerId timerId)
658 {
659     LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
660     uint32_t sessionId = 0u;
661     {
662         std::lock_guard<std::mutex> autoLock(timeoutLock_);
663         if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
664             return;
665         }
666         sessionId = timeoutMap_[timerId];
667     }
668     DoFinished(sessionId, -E_TIMEOUT);
669 }
670 
DoSendFailed(uint32_t sessionId,int errCode)671 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
672 {
673     LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
674     DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
675 }
676 
DoFinished(uint32_t sessionId,int errCode)677 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
678 {
679     Task task;
680     if (ClearTaskInfo(sessionId, task) == E_OK) {
681         LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
682     } else {
683         return;
684     }
685     RefObject::IncObjRef(this);
686     if (task.onFinished != nullptr) {
687         task.onFinished(errCode, task.result);
688         LOGD("[RemoteExecutor][DoFinished] onFinished");
689     }
690     std::string device = task.target;
691     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
692         TryExecuteTaskInLock(device);
693         RefObject::DecObjRef(this);
694     });
695     if (retCode != E_OK) {
696         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
697         RefObject::DecObjRef(this);
698     }
699 }
700 
ClearTaskInfo(uint32_t sessionId,Task & task)701 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
702 {
703     {
704         std::lock_guard<std::mutex> autoLock(taskLock_);
705         if (taskMap_.find(sessionId) == taskMap_.end()) {
706             return -E_NOT_FOUND;
707         }
708         task = taskMap_[sessionId];
709         taskMap_.erase(sessionId);
710         deviceWorkingSet_[task.target].erase(sessionId);
711     }
712     RemoveTimer(sessionId);
713     return E_OK;
714 }
715 
ClearInnerSource()716 void RemoteExecutor::ClearInnerSource()
717 {
718     {
719         std::lock_guard<std::mutex> autoLock(innerSourceLock_);
720         syncInterface_ = nullptr;
721         communicator_ = nullptr;
722     }
723     std::lock_guard<std::mutex> autoLock(msgQueueLock_);
724     LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
725     while (!searchMessageQueue_.empty()) {
726         auto entry = searchMessageQueue_.front();
727         searchMessageQueue_.pop();
728         delete entry.second;
729         entry.second = nullptr;
730     }
731 }
732 
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)733 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
734 {
735     ISyncInterface *storage = GetAndIncSyncInterface();
736     if (storage == nullptr) {
737         return -E_BUSY;
738     }
739     SecurityOption localOption;
740     int errCode = storage->GetSecurityOption(localOption);
741     storage->DecRefCount();
742     storage = nullptr;
743     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
744         return -E_SECURITY_OPTION_CHECK_ERROR;
745     }
746     Task task;
747     {
748         std::lock_guard<std::mutex> autoLock(taskLock_);
749         if (taskMap_.find(sessionId) == taskMap_.end()) {
750             LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
751             return -E_FINISHED;
752         }
753         task = taskMap_[sessionId];
754     }
755     PreparedStmt stmt;
756     stmt.SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
757     stmt.SetSql(task.condition.sql);
758     stmt.SetBindArgs(task.condition.bindArgs);
759     packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
760     packet->SetPreparedStmt(stmt);
761     packet->SetNeedResponse();
762     packet->SetSecLabel(errCode == E_NOT_SUPPORT ? NOT_SURPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
763     target = task.target;
764     return E_OK;
765 }
766 
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)767 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
768 {
769     int errCode = E_OK;
770     if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
771         DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
772         delete inMsg;
773         inMsg = nullptr;
774         return;
775     }
776     switch (inMsg->GetMessageType()) {
777         case TYPE_REQUEST:
778             errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
779             break;
780         case TYPE_RESPONSE:
781             errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
782             break;
783         default:
784             LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
785             break;
786     }
787     if (errCode != -E_NOT_NEED_DELETE_MSG) {
788         delete inMsg;
789         inMsg = nullptr;
790     }
791 }
792 
IsPacketValid(uint32_t sessionId)793 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
794 {
795     std::lock_guard<std::mutex> autoLock(taskLock_);
796     return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
797 }
798 
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)799 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
800     const RemoteExecutorAckPacket *packet)
801 {
802     bool isReceiveFinished = false;
803     {
804         std::lock_guard<std::mutex> autoLock(taskLock_);
805         if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
806             LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
807             return;
808         }
809         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
810             taskMap_[sessionId].taskId, sequenceId);
811         taskMap_[sessionId].currentCount++;
812         if (packet->IsLastAck()) {
813             taskMap_[sessionId].targetCount = sequenceId;
814         }
815         taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
816         if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
817             isReceiveFinished = true;
818         }
819     }
820     if (isReceiveFinished) {
821         DoFinished(sessionId, E_OK);
822     }
823 }
824 
GetAndIncCommunicator() const825 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
826 {
827     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
828     ICommunicator *communicator = communicator_;
829     RefObject::IncObjRef(communicator);
830     return communicator;
831 }
832 
GetAndIncSyncInterface() const833 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
834 {
835     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
836     ISyncInterface *syncInterface = syncInterface_;
837     if (syncInterface != nullptr) {
838         syncInterface->IncRefCount();
839     }
840     return syncInterface;
841 }
842 
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)843 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
844 {
845     std::lock_guard<std::mutex> autoLock(taskLock_);
846     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
847         for (auto &sessionId : deviceWorkingSet_[device]) {
848             removeList.push_back(sessionId);
849         }
850     }
851     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
852         for (auto &sessionId : searchTaskQueue_[device]) {
853             removeList.push_back(sessionId);
854         }
855     }
856 }
857 
RemoveAllTask(int errCode)858 void RemoteExecutor::RemoveAllTask(int errCode)
859 {
860     std::vector<OnFinished> waitToNotify;
861     std::vector<uint32_t> removeTimerList;
862     {
863         std::lock_guard<std::mutex> autoLock(taskLock_);
864         for (auto &taskEntry : taskMap_) {
865             if (taskEntry.second.onFinished != nullptr) {
866                 waitToNotify.push_back(taskEntry.second.onFinished);
867                 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
868                     taskEntry.second.taskId, errCode);
869             }
870             if (taskEntry.second.status == Status::WORKING) {
871                 removeTimerList.push_back(taskEntry.first);
872             }
873         }
874         taskMap_.clear();
875         deviceWorkingSet_.clear();
876         searchTaskQueue_.clear();
877     }
878     for (const auto &callBack : waitToNotify) {
879         callBack(errCode, nullptr);
880     }
881     for (const auto &sessionId : removeTimerList) {
882         RemoveTimer(sessionId);
883     }
884     std::lock_guard<std::mutex> autoLock(timeoutLock_);
885     timeoutMap_.clear();
886     taskFinishMap_.clear();
887 }
888 
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)889 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
890 {
891     std::lock_guard<std::mutex> autoLock(taskLock_);
892     for (auto &entry : taskMap_) {
893         if (entry.second.connectionId == connectionId) {
894             removeList.push_back(entry.first);
895         }
896     }
897 }
898 
GetPacketSize(const std::string & device,size_t & packetSize)899 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize)
900 {
901     auto *communicator = GetAndIncCommunicator();
902     if (communicator == nullptr) {
903         LOGD("communicator is nullptr");
904         return -E_BUSY;
905     }
906 
907     packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
908     RefObject::DecObjRef(communicator);
909     return E_OK;
910 }
911 
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)912 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
913     const SecurityOption &localOption)
914 {
915     bool res = false;
916     if (remoteOption.securityLabel == localOption.securityLabel ||
917         (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
918         localOption.securityLabel == SecurityLabel::NOT_SET)) {
919         res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
920     }
921     if (!res) {
922         LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
923             remoteOption.securityLabel, remoteOption.securityFlag,
924             localOption.securityLabel, localOption.securityFlag);
925     }
926     return res;
927 }
928 
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)929 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
930     const std::string &device, uint32_t sessionId)
931 {
932     size_t packetSize = 0u;
933     int errCode = GetPacketSize(device, packetSize);
934     if (errCode != E_OK) {
935         return errCode;
936     }
937     SecurityOption option;
938     errCode = storage->GetSecurityOption(option);
939     if (errCode == -E_NOT_SUPPORT) {
940         option.securityLabel = NOT_SURPPORT_SEC_CLASSIFICATION;
941         errCode = E_OK;
942     }
943     if (errCode != E_OK) {
944         LOGD("GetSecurityOption errCode:%d", errCode);
945         return -E_SECURITY_OPTION_CHECK_ERROR;
946     }
947     ContinueToken token = nullptr;
948     uint32_t sequenceId = 1u;
949     do {
950         RelationalRowDataSet dataSet;
951         errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
952         if (errCode != E_OK) {
953             LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
954             break;
955         }
956         SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
957         errCode = ResponseData(std::move(dataSet), sendMessage, device);
958         if (errCode != E_OK) {
959             break;
960         }
961         sequenceId++;
962     } while (token != nullptr);
963     if (token != nullptr) {
964         storage->ReleaseRemoteQueryContinueToken(token);
965     }
966     return errCode;
967 }
968 
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)969 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
970     const SecurityOption &remoteOption)
971 {
972     if (storage == nullptr || communicator == nullptr) {
973         return -E_BUSY;
974     }
975     if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
976         return -E_NOT_SUPPORT;
977     }
978     std::string device;
979     communicator->GetLocalIdentity(device);
980     SecurityOption localOption;
981     int errCode = storage->GetSecurityOption(localOption);
982     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
983         return -E_SECURITY_OPTION_CHECK_ERROR;
984     }
985     if (remoteOption.securityLabel == NOT_SURPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
986         return E_OK;
987     }
988     if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
989         errCode = -E_SECURITY_OPTION_CHECK_ERROR;
990     } else {
991         errCode = E_OK;
992     }
993     return errCode;
994 }
995 
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel)996 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
997     int32_t remoteSecLabel)
998 {
999     SecurityOption localOption;
1000     int errCode = storage->GetSecurityOption(localOption);
1001     if (errCode == -E_NOT_SUPPORT) {
1002         return E_OK;
1003     }
1004     if (errCode != E_OK) {
1005         return -E_SECURITY_OPTION_CHECK_ERROR;
1006     }
1007     if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SURPPORT_SEC_CLASSIFICATION) {
1008         return E_OK;
1009     }
1010     if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1011         return E_OK;
1012     }
1013     return -E_SECURITY_OPTION_CHECK_ERROR;
1014 }
1015 }