1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef REMOTE_EXECUTOR_H 17 #define REMOTE_EXECUTOR_H 18 19 #include <deque> 20 #include <queue> 21 22 #include "db_types.h" 23 #include "distributeddb/result_set.h" 24 #include "icommunicator.h" 25 #include "isync_interface.h" 26 #include "message.h" 27 #include "relational_db_sync_interface.h" 28 #include "relational_result_set_impl.h" 29 #include "remote_executor_packet.h" 30 #include "runtime_context.h" 31 32 namespace DistributedDB { 33 class RemoteExecutor : public RefObject { 34 public: 35 enum class Status { 36 WAITING = 0, 37 WORKING 38 }; 39 40 using OnFinished = std::function<void(int32_t, std::shared_ptr<ResultSet>)>; 41 42 struct Task { 43 Status status = Status::WAITING; 44 uint32_t taskId = 0u; 45 uint64_t timeout = 0u; 46 uint32_t targetCount = 0; 47 uint32_t currentCount = 0; 48 uint64_t connectionId = 0u; 49 std::string target; 50 RemoteCondition condition; 51 OnFinished onFinished = nullptr; 52 std::shared_ptr<RelationalResultSetImpl> result; 53 }; 54 55 RemoteExecutor(); 56 57 ~RemoteExecutor() = default; 58 59 int Initialize(ISyncInterface *syncInterface, ICommunicator *communicator); 60 61 int RemoteQuery(const std::string device, const RemoteCondition &condition, 62 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result); 63 64 // receive request and ack, and process in another thread 65 int ReceiveMessage(const std::string &targetDev, Message *inMsg); 66 67 void NotifyDeviceOffline(const std::string &device); 68 69 void NotifyUserChange(); 70 71 void Close(); 72 73 void NotifyConnectionClosed(uint64_t connectionId); 74 75 protected: 76 virtual void ParseOneRequestMessage(const std::string &device, Message *inMsg); 77 78 virtual bool IsPacketValid(uint32_t sessionId); 79 80 void ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId, const std::string &device); 81 82 private: 83 struct SendMessage { 84 uint32_t sessionId; 85 uint32_t sequenceId; 86 bool isLast; 87 SecurityOption option; 88 }; 89 90 void ReceiveMessageInner(const std::string &targetDev, Message *inMsg); 91 92 int ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg); 93 94 int ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg); 95 96 int CheckPermissions(const std::string &device); 97 98 int SendRemoteExecutorData(const std::string &device, const Message *inMsg); 99 100 bool CheckParamValid(const std::string &device, uint64_t timeout) const; 101 102 bool CheckTaskExeStatus(const std::string &device); 103 104 uint32_t GenerateSessionId(); 105 uint32_t GenerateTaskId(); 106 107 int RemoteQueryInner(const Task &task); 108 void TryExecuteTaskInLock(const std::string &device); 109 void DoRollBack(uint32_t sessionId); 110 111 int RequestStart(uint32_t sessionId); 112 113 int ResponseData(RelationalRowDataSet &&dataSet, uint32_t sessionId, uint32_t sequenceId, bool isLast, 114 const std::string &device); 115 116 int ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage, const std::string &device); 117 118 int ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId, 119 const std::string &device); 120 121 void StartTimer(uint64_t timeout, uint32_t sessionId); 122 void RemoveTimer(uint32_t sessionId); 123 int TimeoutCallBack(TimerId timerId); 124 void DoTimeout(TimerId timerId); 125 126 void DoSendFailed(uint32_t sessionId, int errCode); 127 void DoFinished(uint32_t sessionId, int errCode); 128 129 int ClearTaskInfo(uint32_t sessionId, Task &task); 130 void ClearInnerSource(); 131 132 int FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target); 133 134 void ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId, 135 const RemoteExecutorAckPacket *packet); 136 137 void RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList); 138 void RemoveAllTask(int errCode); 139 void RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList); 140 141 int GetPacketSize(const std::string &device, size_t &packetSize); 142 int CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator, const SecurityOption &remoteOption); 143 bool CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption, 144 const SecurityOption &localOption); 145 int ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt, 146 const std::string &device, uint32_t sessionId); 147 148 ICommunicator *GetAndIncCommunicator() const; 149 ISyncInterface *GetAndIncSyncInterface() const; 150 151 std::mutex taskLock_; 152 std::map<std::string, std::deque<uint32_t>> searchTaskQueue_; // key is device, value is sessionId queue 153 std::map<std::string, std::set<uint32_t>> deviceWorkingSet_; // key is device, value is sessionId set 154 std::map<uint32_t, Task> taskMap_; // key is sessionId 155 156 std::mutex timeoutLock_; 157 std::map<TimerId, uint32_t> timeoutMap_; // use to abort task when timeout 158 std::map<uint32_t, TimerId> taskFinishMap_; // use to wake up timer when task finished 159 160 std::mutex msgQueueLock_; 161 std::queue<std::pair<std::string, Message *>> searchMessageQueue_; 162 std::atomic<uint32_t> workingThreadsCount_; 163 164 mutable std::mutex innerSourceLock_; 165 ISyncInterface *syncInterface_; 166 ICommunicator *communicator_; 167 168 uint32_t lastSessionId_; 169 uint32_t lastTaskId_; 170 std::atomic<bool> closed_; 171 172 std::condition_variable clearCV_; // msgQueueLock_ 173 }; 174 } 175 #endif