• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "remote_executor.h"
17 
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 namespace {
30     constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31     constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32     constexpr uint32_t MAX_QUEUE_COUNT = 10;
33     constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34 
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35     void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36     {
37         delete message;
38         message = nullptr;
39         delete packet;
40         packet = nullptr;
41     }
42 }
43 
RemoteExecutor()44 RemoteExecutor::RemoteExecutor()
45     : workingThreadsCount_(0),
46       syncInterface_(nullptr),
47       communicator_(nullptr),
48       lastSessionId_(0),
49       lastTaskId_(0),
50       closed_(false)
51 {
52 }
53 
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)54 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
55 {
56     if (syncInterface == nullptr || communicator == nullptr) {
57         return -E_INVALID_ARGS;
58     }
59     closed_ = false;
60     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
61     syncInterface_ = syncInterface;
62     communicator_ = communicator;
63     return E_OK;
64 }
65 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)66 int RemoteExecutor::RemoteQuery(const std::string &device, const RemoteCondition &condition,
67     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
68 {
69     if (closed_) {
70         return -E_BUSY;
71     }
72     if (!CheckParamValid(device, timeout)) {
73         return -E_INVALID_ARGS;
74     }
75     int errCode = E_OK;
76     int taskErrCode = E_OK;
77     SemaphoreUtils semaphore(0);
78     Task task;
79     task.result = std::make_shared<RelationalResultSetImpl>();
80     task.target = device;
81     task.timeout = timeout;
82     task.condition = condition;
83     task.onFinished = [&semaphore, &taskErrCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
84         taskErrCode = retCode;
85         result = taskResult;
86         semaphore.SendSemaphore();
87     };
88     task.connectionId = connectionId;
89     errCode = RemoteQueryInner(task);
90     if (errCode != E_OK) {
91         return errCode;
92     }
93     semaphore.WaitSemaphore();
94     return taskErrCode;
95 }
96 
ReceiveMessage(const std::string & targetDev,Message * inMsg)97 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
98 {
99     if (inMsg == nullptr) {
100         return -E_INVALID_ARGS;
101     }
102     if (closed_) {
103         LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
104         delete inMsg;
105         inMsg = nullptr;
106         return -E_BUSY;
107     }
108     RefObject::IncObjRef(this);
109     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
110         ReceiveMessageInner(targetDev, inMsg);
111         RefObject::DecObjRef(this);
112     });
113     if (errCode != E_OK) {
114         RefObject::DecObjRef(this);
115     }
116     return errCode;
117 }
118 
NotifyDeviceOffline(const std::string & device)119 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
120 {
121     if (closed_) {
122         return;
123     }
124     LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
125     std::vector<uint32_t> removeList;
126     RemoveTaskByDevice(device, removeList);
127     for (const auto &sessionId : removeList) {
128         DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
129     }
130 }
131 
NotifyUserChange()132 void RemoteExecutor::NotifyUserChange()
133 {
134     if (closed_) {
135         return;
136     }
137     LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
138     RemoveAllTask(-E_USER_CHANGE);
139     LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
140 }
141 
Close()142 void RemoteExecutor::Close()
143 {
144     closed_ = true;
145     LOGD("[RemoteExecutor][Close] close enter");
146     RemoveAllTask(-E_BUSY);
147     ClearInnerSource();
148     {
149         std::unique_lock<std::mutex> lock(msgQueueLock_);
150         clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
151     }
152     LOGD("[RemoteExecutor][Close] close exist");
153 }
154 
NotifyConnectionClosed(uint64_t connectionId)155 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
156 {
157     if (closed_) {
158         return;
159     }
160     std::vector<uint32_t> removeList;
161     RemoveTaskByConnection(connectionId, removeList);
162     for (const auto &sessionId : removeList) {
163         DoFinished(sessionId, -E_BUSY);
164     }
165 }
166 
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)167 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
168 {
169     LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
170     {
171         std::lock_guard<std::mutex> autoLock(msgQueueLock_);
172         searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
173         if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
174             // message deal in work thread, exist here
175             return -E_NOT_NEED_DELETE_MSG;
176         }
177         workingThreadsCount_++;
178     }
179     bool empty = true;
180     do {
181         std::pair<std::string, Message *> entry;
182         {
183             std::lock_guard<std::mutex> autoLock(msgQueueLock_);
184             empty = searchMessageQueue_.empty();
185             if (empty) {
186                 workingThreadsCount_--;
187                 continue;
188             }
189             entry = searchMessageQueue_.front();
190             searchMessageQueue_.pop();
191         }
192         ParseOneRequestMessage(entry.first, entry.second);
193         delete entry.second;
194         entry.second = nullptr;
195     } while (!empty);
196     clearCV_.notify_one();
197     return -E_NOT_NEED_DELETE_MSG;
198 }
199 
ParseOneRequestMessage(const std::string & device,Message * inMsg)200 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
201 {
202     if (closed_) {
203         LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
204         return;
205     }
206     int errCode = CheckPermissions(device, inMsg);
207     if (errCode != E_OK) {
208         (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
209         return;
210     }
211     errCode = SendRemoteExecutorData(device, inMsg);
212     if (errCode != E_OK) {
213         (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
214     }
215 }
216 
CheckPermissions(const std::string & device,Message * inMsg)217 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
218 {
219     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
220     if (storage == nullptr) {
221         LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
222         return -E_BUSY;
223     }
224     // permission check
225     std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
226     std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
227     std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
228     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
229     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
230         { userId, appId, storeId, device, instanceId }, CHECK_FLAG_SEND);
231     if (errCode != E_OK) {
232         LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
233         storage->DecRefCount();
234         return errCode;
235     }
236     const auto *packet = inMsg->GetObject<ISyncPacket>();
237     if (packet == nullptr) {
238         LOGE("[RemoteExecutor] get packet object failed");
239         storage->DecRefCount();
240         return -E_INVALID_ARGS;
241     }
242     const auto *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
243     errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel(), requestPacket->GetVersion());
244     storage->DecRefCount();
245     return errCode;
246 }
247 
SendRemoteExecutorData(const std::string & device,const Message * inMsg)248 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
249 {
250     auto *syncInterface = GetAndIncSyncInterface();
251     if (syncInterface == nullptr) {
252         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
253         return -E_INVALID_ARGS;
254     }
255     if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
256         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
257         syncInterface->DecRefCount();
258         return -E_NOT_SUPPORT;
259     }
260     RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
261 
262     const auto *packet = inMsg->GetObject<ISyncPacket>();
263     if (packet == nullptr) {
264         LOGE("[RemoteExecutor] get packet object failed");
265         storage->DecRefCount();
266         return -E_INVALID_ARGS;
267     }
268     const RemoteExecutorRequestPacket *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
269     int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
270     storage->DecRefCount();
271     return errCode;
272 }
273 
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)274 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
275 {
276     const auto *ack = inMsg->GetObject<ISyncPacket>();
277     if (ack == nullptr) {
278         return -E_INVALID_ARGS;
279     }
280     const auto *packet = static_cast<const RemoteExecutorAckPacket *>(ack);
281     int errCode = packet->GetAckCode();
282     uint32_t sessionId = inMsg->GetSessionId();
283     uint32_t sequenceId = inMsg->GetSequenceId();
284     if (!IsPacketValid(sessionId)) {
285         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
286         return -E_INVALID_ARGS;
287     }
288     if (errCode == E_OK) {
289         auto storage = GetAndIncSyncInterface();
290         auto communicator = GetAndIncCommunicator();
291         errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
292         if (storage != nullptr) {
293             storage->DecRefCount();
294         }
295         RefObject::DecObjRef(communicator);
296     }
297     if (errCode != E_OK) {
298         DoFinished(sessionId, errCode);
299     } else {
300         ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
301     }
302     return E_OK;
303 }
304 
CheckParamValid(const std::string & device,uint64_t timeout) const305 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
306 {
307     if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
308         LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
309         return false;
310     }
311     if (device.empty()) {
312         LOGD("[RemoteExecutor][CheckParamValid] device is empty");
313         return false;
314     }
315     if (device.length() > DBConstant::MAX_DEV_LENGTH) {
316         LOGE("[RemoteExecutor] dev is too long len=%zu", device.length());
317         return false;
318     }
319     ICommunicator *communicator = GetAndIncCommunicator();
320     if (communicator == nullptr) {
321         return false;
322     }
323     std::string localId;
324     int errCode = communicator->GetLocalIdentity(localId);
325     RefObject::DecObjRef(communicator);
326     if (errCode != E_OK) {
327         return false;
328     }
329     if (localId == device) {
330         LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
331         return false;
332     }
333     return true;
334 }
335 
CheckTaskExeStatus(const std::string & device)336 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
337 {
338     uint32_t queueCount = 0u;
339     uint32_t exeTaskCount = 0u;
340     uint32_t totalCount = 0u; // waiting task count in all queue
341     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
342         queueCount = searchTaskQueue_.at(device).size();
343     }
344     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
345         exeTaskCount = deviceWorkingSet_.at(device).size();
346     }
347     for (auto &entry : searchTaskQueue_) {
348         int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
349         int currentQueueCount = static_cast<int>(entry.second.size());
350         if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
351             // all task in this queue can execute, no need calculate as waiting task count
352             continue;
353         }
354         totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
355             static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
356     }
357     return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
358         (totalCount + 1 <= MAX_QUEUE_COUNT);
359 }
360 
GenerateSessionId()361 uint32_t RemoteExecutor::GenerateSessionId()
362 {
363     uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
364     while (taskMap_.find(sessionId) != taskMap_.end()) { // LCOV_EXCL_BR_LINE
365         sessionId++;
366         if (sessionId == 0) { // if over flow start with 1
367             sessionId++;
368         }
369     }
370     lastSessionId_ = sessionId;
371     return sessionId;
372 }
373 
GenerateTaskId()374 uint32_t RemoteExecutor::GenerateTaskId()
375 {
376     lastTaskId_++;
377     if (lastTaskId_ == 0) { // if over flow start with 1
378         lastTaskId_++;
379     }
380     return lastTaskId_;
381 }
382 
RemoteQueryInner(const Task & task)383 int RemoteExecutor::RemoteQueryInner(const Task &task)
384 {
385     uint32_t sessionId = 0u;
386     {
387         // check task count and push into queue in lock
388         std::lock_guard<std::mutex> autoLock(taskLock_);
389         if (!CheckTaskExeStatus(task.target)) {
390             LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
391             return -E_MAX_LIMITS;
392         }
393         sessionId = GenerateSessionId();
394         searchTaskQueue_[task.target].push_back(sessionId);
395         if (taskMap_.find(sessionId) != taskMap_.end()) {
396             LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
397             return -E_INTERNAL_ERROR; // should not happen
398         }
399         taskMap_[sessionId] = task;
400         taskMap_[sessionId].taskId = GenerateTaskId();
401         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
402             taskMap_[sessionId].taskId, task.target.c_str());
403     }
404     std::string device = task.target;
405     RefObject::IncObjRef(this);
406     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
407         TryExecuteTaskInLock(device);
408         RefObject::DecObjRef(this);
409     });
410     if (errCode != E_OK) {
411         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
412         DoRollBack(sessionId);
413         RefObject::DecObjRef(this);
414     }
415     return errCode;
416 }
417 
TryExecuteTaskInLock(const std::string & device)418 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
419 {
420     uint32_t sessionId = 0u;
421     {
422         std::lock_guard<std::mutex> autoLock(taskLock_);
423         if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
424             return;
425         }
426         if (searchTaskQueue_[device].empty()) {
427             LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
428             return;
429         }
430         sessionId = searchTaskQueue_[device].front();
431         if (taskMap_.find(sessionId) == taskMap_.end()) {
432             searchTaskQueue_[device].pop_front();
433             LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
434             return;
435         }
436         taskMap_[sessionId].status = Status::WORKING;
437         searchTaskQueue_[device].pop_front();
438         deviceWorkingSet_[device].insert(sessionId);
439         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
440         StartTimer(taskMap_[sessionId].timeout, sessionId);
441     }
442     int errCode = RequestStart(sessionId);
443     if (errCode != E_OK) {
444         DoFinished(sessionId, errCode);
445     }
446 }
447 
DoRollBack(uint32_t sessionId)448 void RemoteExecutor::DoRollBack(uint32_t sessionId)
449 {
450     Task task;
451     std::lock_guard<std::mutex> autoLock(taskLock_);
452     if (taskMap_.find(sessionId) == taskMap_.end()) { // LCOV_EXCL_BR_LINE
453         return;
454     }
455     task = taskMap_[sessionId];
456     if (task.status != Status::WAITING) { // LCOV_EXCL_BR_LINE
457         // task is execute, abort roll back
458         return;
459     }
460     taskMap_.erase(sessionId);
461 
462     auto iter = searchTaskQueue_[task.target].begin();
463     while (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
464         if ((*iter) == sessionId) { // LCOV_EXCL_BR_LINE
465             break;
466         }
467         iter++;
468     }
469     if (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
470         searchTaskQueue_[task.target].erase(iter);
471     }
472     // this task should not in workingSet
473     deviceWorkingSet_[task.target].erase(sessionId);
474 }
475 
RequestStart(uint32_t sessionId)476 int RemoteExecutor::RequestStart(uint32_t sessionId)
477 {
478     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
479     if (message == nullptr) {
480         LOGE("[RemoteExecutor][RequestStart] new message error");
481         return -E_OUT_OF_MEMORY;
482     }
483     message->SetSessionId(sessionId);
484     message->SetMessageType(TYPE_REQUEST);
485     auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
486     if (packet == nullptr) {
487         LOGE("[RemoteExecutor][RequestStart] new packet error");
488         ReleaseMessageAndPacket(message, nullptr);
489         return -E_OUT_OF_MEMORY;
490     }
491     std::string target;
492     int errCode = FillRequestPacket(packet, sessionId, target);
493     if (errCode != E_OK) {
494         ReleaseMessageAndPacket(message, packet);
495         return errCode;
496     }
497     auto exObj = static_cast<ISyncPacket *>(packet);
498     errCode = message->SetExternalObject(exObj);
499     if (errCode != E_OK) {
500         ReleaseMessageAndPacket(message, packet);
501         LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
502         return errCode;
503     }
504     return SendRequestMessage(target, message, sessionId);
505 }
506 
SendRequestMessage(const std::string & target,Message * message,uint32_t sessionId)507 int RemoteExecutor::SendRequestMessage(const std::string &target, Message *message, uint32_t sessionId)
508 {
509     auto communicator = GetAndIncCommunicator();
510     auto syncInterface = GetAndIncSyncInterface();
511     if (communicator == nullptr || syncInterface == nullptr) {
512         ReleaseMessageAndPacket(message, nullptr);
513         if (syncInterface != nullptr) {
514             syncInterface->DecRefCount();
515         }
516         RefObject::DecObjRef(communicator);
517         return -E_BUSY;
518     }
519     SendConfig sendConfig;
520     SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
521     RefObject::IncObjRef(this);
522     int errCode = communicator->SendMessage(target, message, sendConfig, [this, sessionId](int errCode) {
523         if (errCode != E_OK) {
524             DoSendFailed(sessionId, errCode);
525         }
526         RefObject::DecObjRef(this);
527     });
528     if (errCode != E_OK) {
529         ReleaseMessageAndPacket(message, nullptr);
530         RefObject::DecObjRef(this);
531     }
532     RefObject::DecObjRef(communicator);
533     syncInterface->DecRefCount();
534     return errCode;
535 }
536 
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)537 int RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
538     const std::string &device)
539 {
540     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
541     if (packet == nullptr) {
542         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
543         return -E_OUT_OF_MEMORY;
544     }
545     packet->SetAckCode(errCode);
546     packet->SetLastAck();
547     return ResponseStart(packet, sessionId, sequenceId, device);
548 }
549 
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)550 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
551     const std::string &device)
552 {
553     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
554     if (packet == nullptr) {
555         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
556         return -E_OUT_OF_MEMORY;
557     }
558     packet->SetAckCode(E_OK);
559     if (sendMessage.isLast) {
560         packet->SetLastAck();
561     }
562     packet->SetSecurityOption(sendMessage.option);
563     packet->MoveInRowDataSet(std::move(dataSet));
564     return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
565 }
566 
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)567 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
568     const std::string &device)
569 {
570     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
571     if (storage == nullptr) {
572         ReleaseMessageAndPacket(nullptr, packet);
573         LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
574         return -E_BUSY;
575     }
576     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
577     if (message == nullptr) {
578         LOGE("[RemoteExecutor][ResponseStart] new message error");
579         storage->DecRefCount();
580         ReleaseMessageAndPacket(nullptr, packet);
581         return -E_OUT_OF_MEMORY;
582     }
583     packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
584     auto exObj = static_cast<ISyncPacket *>(packet);
585     int errCode = message->SetExternalObject(exObj);
586     if (errCode != E_OK) {
587         ReleaseMessageAndPacket(message, packet);
588         storage->DecRefCount();
589         LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
590         return errCode;
591     }
592     auto *communicator = GetAndIncCommunicator();
593     if (communicator == nullptr) {
594         ReleaseMessageAndPacket(message, nullptr);
595         storage->DecRefCount();
596         LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
597         return -E_BUSY;
598     }
599 
600     message->SetTarget(device);
601     message->SetSessionId(sessionId);
602     message->SetSequenceId(sequenceId);
603     message->SetMessageType(TYPE_RESPONSE);
604     SendConfig sendConfig;
605     SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
606     errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
607     RefObject::DecObjRef(communicator);
608     if (errCode != E_OK) {
609         ReleaseMessageAndPacket(message, nullptr);
610         LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
611     }
612     storage->DecRefCount();
613     return errCode;
614 }
615 
StartTimer(uint64_t timeout,uint32_t sessionId)616 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
617 {
618     TimerId timerId = 0u;
619     RefObject::IncObjRef(this);
620     TimerAction timeoutCallBack = [this](TimerId timerId) { return TimeoutCallBack(timerId); };
621     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
622         RefObject::DecObjRef(this);
623     }, timerId);
624     if (errCode != E_OK) {
625         RefObject::DecObjRef(this);
626         LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
627     }
628     LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
629     std::lock_guard<std::mutex> autoLock(timeoutLock_);
630     timeoutMap_[timerId] = sessionId;
631     taskFinishMap_[sessionId] = timerId;
632 }
633 
RemoveTimer(uint32_t sessionId)634 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
635 {
636     TimerId timerId = 0u;
637     {
638         std::lock_guard<std::mutex> autoLock(timeoutLock_);
639         if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
640             return;
641         }
642         timerId = taskFinishMap_[sessionId];
643         LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
644         timeoutMap_.erase(timerId);
645         taskFinishMap_.erase(sessionId);
646     }
647     RuntimeContext::GetInstance()->RemoveTimer(timerId);
648 }
649 
TimeoutCallBack(TimerId timerId)650 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
651 {
652     RefObject::IncObjRef(this);
653     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
654         DoTimeout(timerId);
655         RefObject::DecObjRef(this);
656     });
657     if (errCode != E_OK) {
658         LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
659         RefObject::DecObjRef(this);
660     }
661     return -E_NO_NEED_TIMER;
662 }
663 
DoTimeout(TimerId timerId)664 void RemoteExecutor::DoTimeout(TimerId timerId)
665 {
666     LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
667     uint32_t sessionId = 0u;
668     {
669         std::lock_guard<std::mutex> autoLock(timeoutLock_);
670         if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
671             return;
672         }
673         sessionId = timeoutMap_[timerId];
674     }
675     DoFinished(sessionId, -E_TIMEOUT);
676 }
677 
DoSendFailed(uint32_t sessionId,int errCode)678 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
679 {
680     LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
681     DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
682 }
683 
DoFinished(uint32_t sessionId,int errCode)684 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
685 {
686     Task task;
687     if (ClearTaskInfo(sessionId, task) == E_OK) {
688         LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
689     } else {
690         return;
691     }
692     RefObject::IncObjRef(this);
693     if (task.onFinished != nullptr) {
694         task.onFinished(errCode, task.result);
695         LOGD("[RemoteExecutor][DoFinished] onFinished");
696     }
697     std::string device = task.target;
698     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
699         TryExecuteTaskInLock(device);
700         RefObject::DecObjRef(this);
701     });
702     if (retCode != E_OK) {
703         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
704         RefObject::DecObjRef(this);
705     }
706 }
707 
ClearTaskInfo(uint32_t sessionId,Task & task)708 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
709 {
710     {
711         std::lock_guard<std::mutex> autoLock(taskLock_);
712         if (taskMap_.find(sessionId) == taskMap_.end()) {
713             return -E_NOT_FOUND;
714         }
715         task = taskMap_[sessionId];
716         taskMap_.erase(sessionId);
717         deviceWorkingSet_[task.target].erase(sessionId);
718     }
719     RemoveTimer(sessionId);
720     return E_OK;
721 }
722 
ClearInnerSource()723 void RemoteExecutor::ClearInnerSource()
724 {
725     {
726         std::lock_guard<std::mutex> autoLock(innerSourceLock_);
727         syncInterface_ = nullptr;
728         communicator_ = nullptr;
729     }
730     std::lock_guard<std::mutex> autoLock(msgQueueLock_);
731     LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
732     while (!searchMessageQueue_.empty()) {
733         auto entry = searchMessageQueue_.front();
734         searchMessageQueue_.pop();
735         delete entry.second;
736         entry.second = nullptr;
737     }
738 }
739 
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)740 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
741 {
742     ISyncInterface *storage = GetAndIncSyncInterface();
743     if (storage == nullptr) {
744         return -E_BUSY;
745     }
746     SecurityOption localOption;
747     int errCode = storage->GetSecurityOption(localOption);
748     storage->DecRefCount();
749     storage = nullptr;
750     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
751         return -E_SECURITY_OPTION_CHECK_ERROR;
752     }
753     if (errCode == E_OK && localOption.securityLabel == NOT_SET) {
754         LOGE("[AbilitySync] Local not set security option");
755         return -E_SECURITY_OPTION_CHECK_ERROR;
756     }
757     Task task;
758     {
759         std::lock_guard<std::mutex> autoLock(taskLock_);
760         if (taskMap_.find(sessionId) == taskMap_.end()) {
761             LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
762             return -E_FINISHED;
763         }
764         task = taskMap_[sessionId];
765     }
766     packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
767     packet->SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
768     packet->SetSql(task.condition.sql);
769     packet->SetBindArgs(task.condition.bindArgs);
770     packet->SetNeedResponse();
771     packet->SetSecLabel(errCode == -E_NOT_SUPPORT ? NOT_SUPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
772     target = task.target;
773     return E_OK;
774 }
775 
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)776 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
777 {
778     int errCode = E_OK;
779     if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
780         DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
781         delete inMsg;
782         inMsg = nullptr;
783         return;
784     }
785     switch (inMsg->GetMessageType()) {
786         case TYPE_REQUEST:
787             errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
788             break;
789         case TYPE_RESPONSE:
790             errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
791             break;
792         default:
793             LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
794             break;
795     }
796     if (errCode != -E_NOT_NEED_DELETE_MSG) {
797         delete inMsg;
798         inMsg = nullptr;
799     }
800 }
801 
IsPacketValid(uint32_t sessionId)802 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
803 {
804     std::lock_guard<std::mutex> autoLock(taskLock_);
805     return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
806 }
807 
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)808 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
809     const RemoteExecutorAckPacket *packet)
810 {
811     bool isReceiveFinished = false;
812     {
813         std::lock_guard<std::mutex> autoLock(taskLock_);
814         if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
815             LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
816             return;
817         }
818         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
819             taskMap_[sessionId].taskId, sequenceId);
820         taskMap_[sessionId].currentCount++;
821         if (packet->IsLastAck()) {
822             taskMap_[sessionId].targetCount = sequenceId;
823         }
824         taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
825         if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
826             isReceiveFinished = true;
827         }
828     }
829     if (isReceiveFinished) {
830         DoFinished(sessionId, E_OK);
831     }
832 }
833 
GetAndIncCommunicator() const834 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
835 {
836     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
837     ICommunicator *communicator = communicator_;
838     RefObject::IncObjRef(communicator);
839     return communicator;
840 }
841 
GetAndIncSyncInterface() const842 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
843 {
844     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
845     ISyncInterface *syncInterface = syncInterface_;
846     if (syncInterface != nullptr) {
847         syncInterface->IncRefCount();
848     }
849     return syncInterface;
850 }
851 
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)852 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
853 {
854     std::lock_guard<std::mutex> autoLock(taskLock_);
855     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
856         for (auto &sessionId : deviceWorkingSet_[device]) {
857             removeList.push_back(sessionId);
858         }
859     }
860     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
861         for (auto &sessionId : searchTaskQueue_[device]) {
862             removeList.push_back(sessionId);
863         }
864     }
865 }
866 
RemoveAllTask(int errCode)867 void RemoteExecutor::RemoveAllTask(int errCode)
868 {
869     std::vector<OnFinished> waitToNotify;
870     std::vector<uint32_t> removeTimerList;
871     {
872         std::lock_guard<std::mutex> autoLock(taskLock_);
873         for (auto &taskEntry : taskMap_) {
874             if (taskEntry.second.onFinished != nullptr) {
875                 waitToNotify.push_back(taskEntry.second.onFinished);
876                 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
877                     taskEntry.second.taskId, errCode);
878             }
879             if (taskEntry.second.status == Status::WORKING) {
880                 removeTimerList.push_back(taskEntry.first);
881             }
882         }
883         taskMap_.clear();
884         deviceWorkingSet_.clear();
885         searchTaskQueue_.clear();
886     }
887     for (const auto &callBack : waitToNotify) {
888         callBack(errCode, nullptr);
889     }
890     for (const auto &sessionId : removeTimerList) {
891         RemoveTimer(sessionId);
892     }
893     std::lock_guard<std::mutex> autoLock(timeoutLock_);
894     timeoutMap_.clear();
895     taskFinishMap_.clear();
896 }
897 
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)898 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
899 {
900     std::lock_guard<std::mutex> autoLock(taskLock_);
901     for (auto &entry : taskMap_) {
902         if (entry.second.connectionId == connectionId) {
903             removeList.push_back(entry.first);
904         }
905     }
906 }
907 
GetPacketSize(const std::string & device,size_t & packetSize) const908 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize) const
909 {
910     auto *communicator = GetAndIncCommunicator();
911     if (communicator == nullptr) {
912         LOGD("communicator is nullptr");
913         return -E_BUSY;
914     }
915 
916     packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
917     RefObject::DecObjRef(communicator);
918     return E_OK;
919 }
920 
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)921 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
922     const SecurityOption &localOption)
923 {
924     bool res = false;
925     if (remoteOption.securityLabel == localOption.securityLabel ||
926         (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
927         localOption.securityLabel == SecurityLabel::NOT_SET)) {
928         res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
929     }
930     if (!res) {
931         LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
932             remoteOption.securityLabel, remoteOption.securityFlag,
933             localOption.securityLabel, localOption.securityFlag);
934     }
935     return res;
936 }
937 
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)938 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
939     const std::string &device, uint32_t sessionId)
940 {
941     size_t packetSize = 0u;
942     int errCode = GetPacketSize(device, packetSize);
943     if (errCode != E_OK) {
944         return errCode;
945     }
946     SecurityOption option;
947     errCode = storage->GetSecurityOption(option);
948     if (errCode == -E_NOT_SUPPORT) {
949         option.securityLabel = NOT_SUPPORT_SEC_CLASSIFICATION;
950         errCode = E_OK;
951     }
952     if (errCode != E_OK) {
953         LOGD("GetSecurityOption errCode:%d", errCode);
954         return -E_SECURITY_OPTION_CHECK_ERROR;
955     }
956     ContinueToken token = nullptr;
957     uint32_t sequenceId = 1u;
958     do {
959         RelationalRowDataSet dataSet;
960         errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
961         if (errCode != E_OK) {
962             LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
963             break;
964         }
965         SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
966         errCode = ResponseData(std::move(dataSet), sendMessage, device);
967         if (errCode != E_OK) {
968             break;
969         }
970         sequenceId++;
971     } while (token != nullptr);
972     if (token != nullptr) {
973         storage->ReleaseRemoteQueryContinueToken(token);
974     }
975     return errCode;
976 }
977 
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)978 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
979     const SecurityOption &remoteOption)
980 {
981     if (storage == nullptr || communicator == nullptr) {
982         return -E_BUSY;
983     }
984     if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
985         return -E_NOT_SUPPORT;
986     }
987     std::string device;
988     communicator->GetLocalIdentity(device);
989     SecurityOption localOption;
990     int errCode = storage->GetSecurityOption(localOption);
991     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
992         return -E_SECURITY_OPTION_CHECK_ERROR;
993     }
994     if (remoteOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
995         return E_OK;
996     }
997     if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
998         errCode = -E_SECURITY_OPTION_CHECK_ERROR;
999     } else {
1000         errCode = E_OK;
1001     }
1002     return errCode;
1003 }
1004 
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel,uint32_t remoteVersion)1005 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
1006     int32_t remoteSecLabel, uint32_t remoteVersion)
1007 {
1008     SecurityOption localOption;
1009     int errCode = storage->GetSecurityOption(localOption);
1010     LOGI("[RemoteExecutor] remote label:%d local l:%d, f:%d, errCode:%d, remote ver %" PRIu32, remoteSecLabel,
1011          localOption.securityLabel, localOption.securityFlag, errCode, remoteVersion);
1012     if (remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION && errCode == -E_NOT_SUPPORT) {
1013         return E_OK;
1014     }
1015     if (errCode != -E_NOT_SUPPORT && localOption.securityLabel == SecurityLabel::NOT_SET) {
1016         LOGE("[RemoteExecutor] local security label not set!");
1017         return -E_SECURITY_OPTION_CHECK_ERROR;
1018     }
1019     if (remoteVersion >= RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_V4 && remoteSecLabel == NOT_SET) {
1020         LOGE("[RemoteExecutor] remote security label not set!");
1021         return -E_SECURITY_OPTION_CHECK_ERROR;
1022     }
1023     if (errCode == -E_NOT_SUPPORT) {
1024         return E_OK;
1025     }
1026     if (errCode != E_OK) {
1027         return -E_SECURITY_OPTION_CHECK_ERROR;
1028     }
1029     if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
1030         return E_OK;
1031     }
1032     if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1033         return E_OK;
1034     }
1035     return -E_SECURITY_OPTION_CHECK_ERROR;
1036 }
1037 }