• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "remote_executor.h"
17 
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 namespace {
30     constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31     constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32     constexpr uint32_t MAX_QUEUE_COUNT = 10;
33     constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34 
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35     void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36     {
37         delete message;
38         message = nullptr;
39         delete packet;
40         packet = nullptr;
41     }
42 }
43 
RemoteExecutor()44 RemoteExecutor::RemoteExecutor()
45     : workingThreadsCount_(0),
46       syncInterface_(nullptr),
47       communicator_(nullptr),
48       lastSessionId_(0),
49       lastTaskId_(0),
50       closed_(false)
51 {
52 }
53 
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)54 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
55 {
56     if (syncInterface == nullptr || communicator == nullptr) {
57         return -E_INVALID_ARGS;
58     }
59     closed_ = false;
60     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
61     syncInterface_ = syncInterface;
62     communicator_ = communicator;
63     return E_OK;
64 }
65 
RemoteQuery(const std::string device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)66 int RemoteExecutor::RemoteQuery(const std::string device, const RemoteCondition &condition,
67     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
68 {
69     if (closed_) {
70         return -E_BUSY;
71     }
72     if (!CheckParamValid(device, timeout)) {
73         return -E_INVALID_ARGS;
74     }
75     int errCode = E_OK;
76     SemaphoreUtils semaphore(0);
77     Task task;
78     task.result = std::make_shared<RelationalResultSetImpl>();
79     task.target = device;
80     task.timeout = timeout;
81     task.condition = condition;
82     task.onFinished = [&semaphore, &errCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
83         errCode = retCode;
84         result = taskResult;
85         semaphore.SendSemaphore();
86     };
87     task.connectionId = connectionId;
88     errCode = RemoteQueryInner(task);
89     if (errCode != E_OK) {
90         return errCode;
91     }
92     semaphore.WaitSemaphore();
93     return errCode;
94 }
95 
ReceiveMessage(const std::string & targetDev,Message * inMsg)96 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
97 {
98     if (inMsg == nullptr) {
99         return -E_INVALID_ARGS;
100     }
101     if (closed_) {
102         LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
103         delete inMsg;
104         inMsg = nullptr;
105         return -E_BUSY;
106     }
107     RefObject::IncObjRef(this);
108     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
109         ReceiveMessageInner(targetDev, inMsg);
110         RefObject::DecObjRef(this);
111     });
112     if (errCode != E_OK) {
113         RefObject::DecObjRef(this);
114     }
115     return errCode;
116 }
117 
NotifyDeviceOffline(const std::string & device)118 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
119 {
120     if (closed_) {
121         return;
122     }
123     LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
124     std::vector<uint32_t> removeList;
125     RemoveTaskByDevice(device, removeList);
126     for (const auto &sessionId : removeList) {
127         DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
128     }
129 }
130 
NotifyUserChange()131 void RemoteExecutor::NotifyUserChange()
132 {
133     if (closed_) {
134         return;
135     }
136     LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
137     RemoveAllTask(-E_USER_CHANGE);
138     LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
139 }
140 
Close()141 void RemoteExecutor::Close()
142 {
143     closed_ = true;
144     LOGD("[RemoteExecutor][Close] close enter");
145     RemoveAllTask(-E_BUSY);
146     ClearInnerSource();
147     {
148         std::unique_lock<std::mutex> lock(msgQueueLock_);
149         clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
150     }
151     LOGD("[RemoteExecutor][Close] close exist");
152 }
153 
NotifyConnectionClosed(uint64_t connectionId)154 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
155 {
156     if (closed_) {
157         return;
158     }
159     std::vector<uint32_t> removeList;
160     RemoveTaskByConnection(connectionId, removeList);
161     for (const auto &sessionId : removeList) {
162         DoFinished(sessionId, -E_BUSY);
163     }
164 }
165 
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)166 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
167 {
168     LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
169     {
170         std::lock_guard<std::mutex> autoLock(msgQueueLock_);
171         searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
172         if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
173             // message deal in work thread, exist here
174             return -E_NOT_NEED_DELETE_MSG;
175         }
176         workingThreadsCount_++;
177     }
178     RefObject::IncObjRef(this);
179     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
180         bool empty = true;
181         do {
182             std::pair<std::string, Message *> entry;
183             {
184                 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
185                 empty = searchMessageQueue_.empty();
186                 if (empty) {
187                     workingThreadsCount_--;
188                     continue;
189                 }
190                 entry = searchMessageQueue_.front();
191                 searchMessageQueue_.pop();
192             }
193             ParseOneRequestMessage(entry.first, entry.second);
194             delete entry.second;
195             entry.second = nullptr;
196         } while (!empty);
197         clearCV_.notify_one();
198         RefObject::DecObjRef(this);
199     });
200     if (errCode != E_OK) {
201         workingThreadsCount_--;
202         clearCV_.notify_one();
203         RefObject::DecObjRef(this);
204     } else {
205         errCode = -E_NOT_NEED_DELETE_MSG;
206     }
207     return errCode;
208 }
209 
ParseOneRequestMessage(const std::string & device,Message * inMsg)210 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
211 {
212     if (closed_) {
213         LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
214         return;
215     }
216     int errCode = CheckPermissions(device);
217     if (errCode != E_OK) {
218         ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
219         return;
220     }
221     errCode = SendRemoteExecutorData(device, inMsg);
222     if (errCode != E_OK) {
223         ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
224     }
225 }
226 
CheckPermissions(const std::string & device)227 int RemoteExecutor::CheckPermissions(const std::string &device)
228 {
229     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
230     if (storage == nullptr) {
231         LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
232         return -E_BUSY;
233     }
234     // permission check
235     std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
236     std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
237     std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
238     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
239     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
240         { userId, appId, storeId, device, instanceId }, CHECK_FLAG_SEND);
241     if (errCode != E_OK) {
242         LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
243     }
244     storage->DecRefCount();
245     return errCode;
246 }
247 
SendRemoteExecutorData(const std::string & device,const Message * inMsg)248 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
249 {
250     auto *syncInterface = GetAndIncSyncInterface();
251     if (syncInterface == nullptr) {
252         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
253         return -E_INVALID_ARGS;
254     }
255     if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
256         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
257         syncInterface->DecRefCount();
258         return -E_NOT_SUPPORT;
259     }
260     RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
261 
262     const RemoteExecutorRequestPacket *requestPacket = inMsg->GetObject<RemoteExecutorRequestPacket>();
263     if (requestPacket == nullptr) {
264         LOGE("[RemoteExecutor] get packet object failed");
265         storage->DecRefCount();
266         return -E_INVALID_ARGS;
267     }
268 
269     int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
270     storage->DecRefCount();
271     return errCode;
272 }
273 
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)274 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
275 {
276     auto *packet = inMsg->GetObject<RemoteExecutorAckPacket>();
277     if (packet == nullptr) {
278         return -E_INVALID_ARGS;
279     }
280     int errCode = packet->GetAckCode();
281     uint32_t sessionId = inMsg->GetSessionId();
282     uint32_t sequenceId = inMsg->GetSequenceId();
283     if (!IsPacketValid(sessionId)) {
284         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
285         return -E_INVALID_ARGS;
286     }
287     if (errCode == E_OK) {
288         auto storage = GetAndIncSyncInterface();
289         auto communicator = GetAndIncCommunicator();
290         errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
291         if (storage != nullptr) {
292             storage->DecRefCount();
293         }
294         RefObject::DecObjRef(communicator);
295     }
296     if (errCode != E_OK) {
297         DoFinished(sessionId, errCode);
298     } else {
299         ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
300     }
301     return E_OK;
302 }
303 
CheckParamValid(const std::string & device,uint64_t timeout) const304 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
305 {
306     if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
307         LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
308         return false;
309     }
310     if (device.empty()) {
311         LOGD("[RemoteExecutor][CheckParamValid] device is empty");
312         return false;
313     }
314     ICommunicator *communicator = GetAndIncCommunicator();
315     if (communicator == nullptr) {
316         return false;
317     }
318     std::string localId;
319     int errCode = communicator->GetLocalIdentity(localId);
320     RefObject::DecObjRef(communicator);
321     if (errCode != E_OK) {
322         return false;
323     }
324     if (localId == device) {
325         LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
326         return false;
327     }
328     return true;
329 }
330 
CheckTaskExeStatus(const std::string & device)331 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
332 {
333     uint32_t queueCount = 0u;
334     uint32_t exeTaskCount = 0u;
335     uint32_t totalCount = 0u; // waiting task count in all queue
336     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
337         queueCount = searchTaskQueue_.at(device).size();
338     }
339     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
340         exeTaskCount = deviceWorkingSet_.at(device).size();
341     }
342     for (auto &entry : searchTaskQueue_) {
343         int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
344         int currentQueueCount = static_cast<int>(entry.second.size());
345         if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
346             // all task in this queue can execute, no need calculate as waiting task count
347             continue;
348         }
349         totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
350             static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
351     }
352     return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
353         (totalCount + 1 <= MAX_QUEUE_COUNT);
354 }
355 
GenerateSessionId()356 uint32_t RemoteExecutor::GenerateSessionId()
357 {
358     uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
359     while (taskMap_.find(sessionId) != taskMap_.end()) {
360         sessionId++;
361         if (sessionId == 0) { // if over flow start with 1
362             sessionId++;
363         }
364     }
365     lastSessionId_ = sessionId;
366     return sessionId;
367 }
368 
GenerateTaskId()369 uint32_t RemoteExecutor::GenerateTaskId()
370 {
371     lastTaskId_++;
372     if (lastTaskId_ == 0) { // if over flow start with 1
373         lastTaskId_++;
374     }
375     return lastTaskId_;
376 }
377 
RemoteQueryInner(const Task & task)378 int RemoteExecutor::RemoteQueryInner(const Task &task)
379 {
380     uint32_t sessionId = 0u;
381     {
382         // check task count and push into queue in lock
383         std::lock_guard<std::mutex> autoLock(taskLock_);
384         if (!CheckTaskExeStatus(task.target)) {
385             LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
386             return -E_MAX_LIMITS;
387         }
388         sessionId = GenerateSessionId();
389         searchTaskQueue_[task.target].push_back(sessionId);
390         if (taskMap_.find(sessionId) != taskMap_.end()) {
391             LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
392             return -E_INTERNAL_ERROR; // should not happen
393         }
394         taskMap_[sessionId] = task;
395         taskMap_[sessionId].taskId = GenerateTaskId();
396         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
397             taskMap_[sessionId].taskId, task.target.c_str());
398     }
399     std::string device = task.target;
400     RefObject::IncObjRef(this);
401     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
402         TryExecuteTaskInLock(device);
403         RefObject::DecObjRef(this);
404     });
405     if (errCode != E_OK) {
406         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
407         DoRollBack(sessionId);
408         RefObject::DecObjRef(this);
409     }
410     return errCode;
411 }
412 
TryExecuteTaskInLock(const std::string & device)413 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
414 {
415     uint32_t sessionId = 0u;
416     {
417         std::lock_guard<std::mutex> autoLock(taskLock_);
418         if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
419             return;
420         }
421         if (searchTaskQueue_[device].empty()) {
422             LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
423             return;
424         }
425         sessionId = searchTaskQueue_[device].front();
426         if (taskMap_.find(sessionId) == taskMap_.end()) {
427             searchTaskQueue_[device].pop_front();
428             LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
429             return;
430         }
431         taskMap_[sessionId].status = Status::WORKING;
432         searchTaskQueue_[device].pop_front();
433         deviceWorkingSet_[device].insert(sessionId);
434         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
435         StartTimer(taskMap_[sessionId].timeout, sessionId);
436     }
437     int errCode = RequestStart(sessionId);
438     if (errCode != E_OK) {
439         DoFinished(sessionId, errCode);
440     }
441 }
442 
DoRollBack(uint32_t sessionId)443 void RemoteExecutor::DoRollBack(uint32_t sessionId)
444 {
445     Task task;
446     std::lock_guard<std::mutex> autoLock(taskLock_);
447     if (taskMap_.find(sessionId) == taskMap_.end()) {
448         return;
449     }
450     task = taskMap_[sessionId];
451     if (task.status != Status::WAITING) {
452         // task is execute, abort roll back
453         return;
454     }
455     taskMap_.erase(sessionId);
456 
457     auto iter = searchTaskQueue_[task.target].begin();
458     while (iter != searchTaskQueue_[task.target].end()) {
459         if ((*iter) == sessionId) {
460             break;
461         }
462         iter++;
463     }
464     if (iter != searchTaskQueue_[task.target].end()) {
465         searchTaskQueue_[task.target].erase(iter);
466     }
467     // this task should not in workingSet
468     deviceWorkingSet_[task.target].erase(sessionId);
469 }
470 
RequestStart(uint32_t sessionId)471 int RemoteExecutor::RequestStart(uint32_t sessionId)
472 {
473     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
474     if (message == nullptr) {
475         LOGE("[RemoteExecutor][RequestStart] new message error");
476         return -E_OUT_OF_MEMORY;
477     }
478     message->SetSessionId(sessionId);
479     message->SetMessageType(TYPE_REQUEST);
480     auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
481     if (packet == nullptr) {
482         LOGE("[RemoteExecutor][RequestStart] new packet error");
483         ReleaseMessageAndPacket(message, nullptr);
484         return -E_OUT_OF_MEMORY;
485     }
486     std::string target;
487     int errCode = FillRequestPacket(packet, sessionId, target);
488     if (errCode != E_OK) {
489         ReleaseMessageAndPacket(message, packet);
490         return errCode;
491     }
492     errCode = message->SetExternalObject(packet);
493     if (errCode != E_OK) {
494         ReleaseMessageAndPacket(message, packet);
495         LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
496     }
497     auto communicator = GetAndIncCommunicator();
498     auto syncInterface = GetAndIncSyncInterface();
499     if (communicator == nullptr || syncInterface == nullptr) {
500         ReleaseMessageAndPacket(message, nullptr);
501         if (syncInterface != nullptr) {
502             syncInterface->DecRefCount();
503         }
504         RefObject::DecObjRef(communicator);
505         return -E_BUSY;
506     }
507     SendConfig sendConfig;
508     SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
509     RefObject::IncObjRef(this);
510     errCode = communicator->SendMessage(target, message, sendConfig, [this, sessionId](int errCode) {
511         if (errCode != E_OK) {
512             DoSendFailed(sessionId, errCode);
513         }
514         RefObject::DecObjRef(this);
515     });
516     RefObject::DecObjRef(communicator);
517     syncInterface->DecRefCount();
518     return errCode;
519 }
520 
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)521 void RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
522     const std::string &device)
523 {
524     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
525     if (packet == nullptr) {
526         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
527         return;
528     }
529     packet->SetAckCode(errCode);
530     packet->SetLastAck();
531     (void)ResponseStart(packet, sessionId, sequenceId, device);
532 }
533 
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)534 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
535     const std::string &device)
536 {
537     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
538     if (packet == nullptr) {
539         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
540         return -E_OUT_OF_MEMORY;
541     }
542     packet->SetAckCode(E_OK);
543     if (sendMessage.isLast) {
544         packet->SetLastAck();
545     }
546     packet->SetSecurityOption(sendMessage.option);
547     packet->MoveInRowDataSet(std::move(dataSet));
548     return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
549 }
550 
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)551 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
552     const std::string &device)
553 {
554     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
555     if (storage == nullptr) {
556         ReleaseMessageAndPacket(nullptr, packet);
557         LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
558         return -E_BUSY;
559     }
560     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
561     if (message == nullptr) {
562         LOGE("[RemoteExecutor][ResponseStart] new message error");
563         storage->DecRefCount();
564         ReleaseMessageAndPacket(nullptr, packet);
565         return -E_OUT_OF_MEMORY;
566     }
567     packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
568 
569     int errCode = message->SetExternalObject(packet);
570     if (errCode != E_OK) {
571         ReleaseMessageAndPacket(message, packet);
572         storage->DecRefCount();
573         LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
574         return errCode;
575     }
576     auto *communicator = GetAndIncCommunicator();
577     if (communicator == nullptr) {
578         ReleaseMessageAndPacket(message, nullptr);
579         storage->DecRefCount();
580         LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
581         return -E_BUSY;
582     }
583 
584     message->SetTarget(device);
585     message->SetSessionId(sessionId);
586     message->SetSequenceId(sequenceId);
587     message->SetMessageType(TYPE_RESPONSE);
588     SendConfig sendConfig;
589     SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
590     errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
591     RefObject::DecObjRef(communicator);
592     if (errCode != E_OK) {
593         ReleaseMessageAndPacket(message, nullptr);
594         LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
595     }
596     storage->DecRefCount();
597     return errCode;
598 }
599 
StartTimer(uint64_t timeout,uint32_t sessionId)600 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
601 {
602     TimerId timerId = 0u;
603     RefObject::IncObjRef(this);
604     TimerAction timeoutCallBack = std::bind(&RemoteExecutor::TimeoutCallBack, this, std::placeholders::_1);
605     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
606         RefObject::DecObjRef(this);
607     }, timerId);
608     if (errCode != E_OK) {
609         RefObject::DecObjRef(this);
610         LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
611     }
612     LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
613     std::lock_guard<std::mutex> autoLock(timeoutLock_);
614     timeoutMap_[timerId] = sessionId;
615     taskFinishMap_[sessionId] = timerId;
616 }
617 
RemoveTimer(uint32_t sessionId)618 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
619 {
620     TimerId timerId = 0u;
621     {
622         std::lock_guard<std::mutex> autoLock(timeoutLock_);
623         if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
624             return;
625         }
626         timerId = taskFinishMap_[sessionId];
627         LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
628         timeoutMap_.erase(timerId);
629         taskFinishMap_.erase(sessionId);
630     }
631     RuntimeContext::GetInstance()->RemoveTimer(timerId);
632 }
633 
TimeoutCallBack(TimerId timerId)634 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
635 {
636     RefObject::IncObjRef(this);
637     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
638         DoTimeout(timerId);
639         RefObject::DecObjRef(this);
640     });
641     if (errCode != E_OK) {
642         LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
643         RefObject::DecObjRef(this);
644     }
645     return -E_NO_NEED_TIMER;
646 }
647 
DoTimeout(TimerId timerId)648 void RemoteExecutor::DoTimeout(TimerId timerId)
649 {
650     LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
651     uint32_t sessionId = 0u;
652     {
653         std::lock_guard<std::mutex> autoLock(timeoutLock_);
654         if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
655             return;
656         }
657         sessionId = timeoutMap_[timerId];
658     }
659     DoFinished(sessionId, -E_TIMEOUT);
660 }
661 
DoSendFailed(uint32_t sessionId,int errCode)662 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
663 {
664     LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
665     DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
666 }
667 
DoFinished(uint32_t sessionId,int errCode)668 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
669 {
670     Task task;
671     if (ClearTaskInfo(sessionId, task) == E_OK) {
672         LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
673     } else {
674         return;
675     }
676     RefObject::IncObjRef(this);
677     if (task.onFinished != nullptr) {
678         task.onFinished(errCode, task.result);
679         LOGD("[RemoteExecutor][DoFinished] onFinished");
680     }
681     std::string device = task.target;
682     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
683         TryExecuteTaskInLock(device);
684         RefObject::DecObjRef(this);
685     });
686     if (retCode != E_OK) {
687         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
688         RefObject::DecObjRef(this);
689     }
690 }
691 
ClearTaskInfo(uint32_t sessionId,Task & task)692 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
693 {
694     {
695         std::lock_guard<std::mutex> autoLock(taskLock_);
696         if (taskMap_.find(sessionId) == taskMap_.end()) {
697             return -E_NOT_FOUND;
698         }
699         task = taskMap_[sessionId];
700         taskMap_.erase(sessionId);
701         deviceWorkingSet_[task.target].erase(sessionId);
702     }
703     RemoveTimer(sessionId);
704     return E_OK;
705 }
706 
ClearInnerSource()707 void RemoteExecutor::ClearInnerSource()
708 {
709     {
710         std::lock_guard<std::mutex> autoLock(innerSourceLock_);
711         syncInterface_ = nullptr;
712         communicator_ = nullptr;
713     }
714     std::lock_guard<std::mutex> autoLock(msgQueueLock_);
715     LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
716     while (!searchMessageQueue_.empty()) {
717         auto entry = searchMessageQueue_.front();
718         searchMessageQueue_.pop();
719         delete entry.second;
720         entry.second = nullptr;
721     }
722 }
723 
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)724 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
725 {
726     Task task;
727     {
728         std::lock_guard<std::mutex> autoLock(taskLock_);
729         if (taskMap_.find(sessionId) == taskMap_.end()) {
730             LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
731             return -E_FINISHED;
732         }
733         task = taskMap_[sessionId];
734     }
735     PreparedStmt stmt;
736     stmt.SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
737     stmt.SetSql(task.condition.sql);
738     stmt.SetBindArgs(task.condition.bindArgs);
739     packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
740     packet->SetPreparedStmt(stmt);
741     packet->SetNeedResponse();
742     target = task.target;
743     return E_OK;
744 }
745 
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)746 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
747 {
748     int errCode = E_OK;
749     if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
750         DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
751         delete inMsg;
752         inMsg = nullptr;
753         return;
754     }
755     switch (inMsg->GetMessageType()) {
756         case TYPE_REQUEST:
757             errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
758             break;
759         case TYPE_RESPONSE:
760             errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
761             break;
762         default:
763             LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
764             break;
765     }
766     if (errCode != -E_NOT_NEED_DELETE_MSG) {
767         delete inMsg;
768         inMsg = nullptr;
769     }
770 }
771 
IsPacketValid(uint32_t sessionId)772 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
773 {
774     std::lock_guard<std::mutex> autoLock(taskLock_);
775     return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
776 }
777 
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)778 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
779     const RemoteExecutorAckPacket *packet)
780 {
781     bool isReceiveFinished = false;
782     {
783         std::lock_guard<std::mutex> autoLock(taskLock_);
784         if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
785             LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
786             return;
787         }
788         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
789             taskMap_[sessionId].taskId, sequenceId);
790         taskMap_[sessionId].currentCount++;
791         if (packet->IsLastAck()) {
792             taskMap_[sessionId].targetCount = sequenceId;
793         }
794         taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
795         if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
796             isReceiveFinished = true;
797         }
798     }
799     if (isReceiveFinished) {
800         DoFinished(sessionId, E_OK);
801     }
802 }
803 
GetAndIncCommunicator() const804 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
805 {
806     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
807     ICommunicator *communicator = communicator_;
808     RefObject::IncObjRef(communicator);
809     return communicator;
810 }
811 
GetAndIncSyncInterface() const812 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
813 {
814     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
815     ISyncInterface *syncInterface = syncInterface_;
816     if (syncInterface != nullptr) {
817         syncInterface->IncRefCount();
818     }
819     return syncInterface;
820 }
821 
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)822 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
823 {
824     std::lock_guard<std::mutex> autoLock(taskLock_);
825     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
826         for (auto &sessionId : deviceWorkingSet_[device]) {
827             removeList.push_back(sessionId);
828         }
829     }
830     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
831         for (auto &sessionId : searchTaskQueue_[device]) {
832             removeList.push_back(sessionId);
833         }
834     }
835 }
836 
RemoveAllTask(int errCode)837 void RemoteExecutor::RemoveAllTask(int errCode)
838 {
839     std::vector<OnFinished> waitToNotify;
840     std::vector<uint32_t> removeTimerList;
841     {
842         std::lock_guard<std::mutex> autoLock(taskLock_);
843         for (auto &taskEntry : taskMap_) {
844             if (taskEntry.second.onFinished != nullptr) {
845                 waitToNotify.push_back(taskEntry.second.onFinished);
846                 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
847                     taskEntry.second.taskId, errCode);
848             }
849             if (taskEntry.second.status == Status::WORKING) {
850                 removeTimerList.push_back(taskEntry.first);
851             }
852         }
853         taskMap_.clear();
854         deviceWorkingSet_.clear();
855         searchTaskQueue_.clear();
856     }
857     for (const auto &callBack : waitToNotify) {
858         callBack(errCode, nullptr);
859     }
860     for (const auto &sessionId : removeTimerList) {
861         RemoveTimer(sessionId);
862     }
863     std::lock_guard<std::mutex> autoLock(timeoutLock_);
864     timeoutMap_.clear();
865     taskFinishMap_.clear();
866 }
867 
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)868 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
869 {
870     std::lock_guard<std::mutex> autoLock(taskLock_);
871     for (auto &entry : taskMap_) {
872         if (entry.second.connectionId == connectionId) {
873             removeList.push_back(entry.first);
874         }
875     }
876 }
877 
GetPacketSize(const std::string & device,size_t & packetSize)878 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize)
879 {
880     auto *communicator = GetAndIncCommunicator();
881     if (communicator == nullptr) {
882         LOGD("communicator is nullptr");
883         return -E_BUSY;
884     }
885 
886     packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
887     RefObject::DecObjRef(communicator);
888     return E_OK;
889 }
890 
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)891 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
892     const SecurityOption &localOption)
893 {
894     bool res = false;
895     if (remoteOption.securityLabel == localOption.securityLabel ||
896         (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
897         localOption.securityLabel == SecurityLabel::NOT_SET)) {
898         res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
899     }
900     if (!res) {
901         LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
902             remoteOption.securityLabel, remoteOption.securityFlag,
903             localOption.securityLabel, localOption.securityFlag);
904     }
905     return res;
906 }
907 
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)908 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
909     const std::string &device, uint32_t sessionId)
910 {
911     size_t packetSize = 0u;
912     int errCode = GetPacketSize(device, packetSize);
913     if (errCode != E_OK) {
914         return errCode;
915     }
916     SecurityOption option;
917     errCode = storage->GetSecurityOption(option);
918     if (errCode == -E_NOT_SUPPORT) {
919         option.securityLabel = NOT_SURPPORT_SEC_CLASSIFICATION;
920         errCode = E_OK;
921     }
922     if (errCode != E_OK) {
923         LOGD("GetSecurityOption errCode:%d", errCode);
924         return -E_SECURITY_OPTION_CHECK_ERROR;
925     }
926     ContinueToken token = nullptr;
927     uint32_t sequenceId = 1u;
928     do {
929         RelationalRowDataSet dataSet;
930         errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
931         if (errCode != E_OK) {
932             LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
933             break;
934         }
935         SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
936         errCode = ResponseData(std::move(dataSet), sendMessage, device);
937         if (errCode != E_OK) {
938             break;
939         }
940         sequenceId++;
941     } while (token != nullptr);
942     if (token != nullptr) {
943         storage->ReleaseRemoteQueryContinueToken(token);
944     }
945     return errCode;
946 }
947 
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)948 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
949     const SecurityOption &remoteOption)
950 {
951     if (storage == nullptr || communicator == nullptr) {
952         return -E_BUSY;
953     }
954     if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
955         return -E_NOT_SUPPORT;
956     }
957     std::string device;
958     communicator->GetLocalIdentity(device);
959     SecurityOption localOption;
960     int errCode = static_cast<SyncGenericInterface *>(storage)->GetSecurityOption(localOption);
961     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
962         return -E_SECURITY_OPTION_CHECK_ERROR;
963     }
964     if (remoteOption.securityLabel == NOT_SURPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
965         return E_OK;
966     }
967     if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
968         errCode = -E_SECURITY_OPTION_CHECK_ERROR;
969     } else {
970         errCode = E_OK;
971     }
972     return errCode;
973 }
974 }