1 /* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DSCHED_COLLAB_CHANNEL_MANAGER_H 17 #define OHOS_DSCHED_COLLAB_CHANNEL_MANAGER_H 18 #include "ichannel_listener.h" 19 #include "channel_common_definition.h" 20 #include "data_sender_receiver.h" 21 #include "dms_interface_structure.h" 22 #include "single_instance.h" 23 #include "socket.h" 24 #include <map> 25 #include <memory> 26 #include <mutex> 27 #include <optional> 28 #include <shared_mutex> 29 #include <string> 30 #include <vector> 31 #include <thread> 32 #include <atomic> 33 34 namespace OHOS { 35 namespace DistributedCollab { 36 class ChannelManager final { 37 DECLARE_SINGLE_INSTANCE_BASE(ChannelManager); 38 39 public: 40 int32_t Init(const std::string& ownerName); 41 void DeInit(); 42 43 int32_t GetVersion(); 44 int32_t CreateServerChannel(const std::string& channelName, 45 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo); 46 int32_t CreateClientChannel(const std::string& channelName, 47 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo); 48 bool isValidChannelId(const int32_t channelId); 49 int32_t DeleteChannel(const int32_t channelId); 50 void ClearSendTask(int32_t channelId); 51 int32_t RegisterChannelListener(const int32_t channelId, 52 const std::shared_ptr<IChannelListener> listener); 53 int32_t ConnectChannel(const int32_t channelId); 54 55 int32_t SendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data); 56 int32_t SendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data); 57 int32_t SendMessage(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data); 58 int32_t SendFile(const int32_t channelId, const std::vector<std::string>& sFiles, 59 const std::vector<std::string>& dFiles); 60 61 void OnSocketError(int32_t socketId, const int32_t errorCode); 62 void OnSocketConnected(int32_t socketId, const PeerSocketInfo& info); 63 void OnSocketClosed(int32_t socketId, const ShutdownReason& reason); 64 void OnBytesReceived(int32_t socketId, const void* data, const uint32_t dataLen); 65 void OnMessageReceived(int32_t socketId, const void* data, const uint32_t dataLen); 66 void OnStreamReceived(int32_t socketId, const StreamData* data, 67 const StreamData* ext, const StreamFrameInfo* param); 68 void OnFileEventReceived(int32_t socketId, FileEvent *event); 69 const char* GetRecvPathFromUser(); 70 71 private: 72 static constexpr int32_t VERSION_ = 0; 73 static constexpr int32_t CHANNEL_ID_GAP = 1000; 74 static constexpr int32_t MESSAGE_START_ID = 1001; 75 static constexpr int32_t BYTES_START_ID = MESSAGE_START_ID + CHANNEL_ID_GAP; 76 static constexpr int32_t STREAM_START_ID = BYTES_START_ID + CHANNEL_ID_GAP; 77 static constexpr int32_t FILE_START_ID = STREAM_START_ID + CHANNEL_ID_GAP; 78 static constexpr int32_t MAX_CHANNEL_NAME_LENGTH = 64; 79 static constexpr int32_t CHANNEL_NAME_PREFIX_LENGTH = 64; 80 static constexpr int32_t RETRY_TIME_GAP = 1000; 81 static constexpr int32_t MAX_FILE_COUNT = 500; 82 83 private: 84 int32_t serverSocketId_ = -1; 85 std::string ownerName_; 86 87 // for all channel res 88 std::shared_mutex channelMutex_; 89 // name2ids 90 std::map<std::string, std::vector<int32_t>> channelIdMap_; 91 // id2info 92 std::map<int32_t, ChannelInfo> channelInfoMap_; 93 94 std::atomic_uint32_t fileChannelId_; 95 96 // for all socket res 97 std::shared_mutex socketMutex_; 98 // socketId2channelId 99 std::map<int32_t, int32_t> socketChannelMap_; 100 std::map<int32_t, ChannelStatus> socketStatusMap_; 101 102 std::shared_mutex listenerMutex_; 103 std::map<int32_t, std::vector<std::weak_ptr<IChannelListener>>> listenersMap_; 104 105 std::map<ChannelDataType, std::mutex> typeMutex_; 106 std::map<ChannelDataType, std::int32_t> nextIds_ = { 107 { ChannelDataType::MESSAGE, MESSAGE_START_ID }, 108 { ChannelDataType::BYTES, BYTES_START_ID }, 109 { ChannelDataType::VIDEO_STREAM, STREAM_START_ID }, 110 { ChannelDataType::FILE, FILE_START_ID } 111 }; 112 113 std::mutex eventMutex_; 114 std::thread eventThread_; 115 std::shared_ptr<OHOS::AppExecFwk::EventHandler> eventHandler_; 116 std::condition_variable eventCon_; 117 118 std::mutex callbackEventMutex_; 119 std::thread callbackEventThread_; 120 std::shared_ptr<OHOS::AppExecFwk::EventHandler> callbackEventHandler_; 121 std::condition_variable callbackEventCon_; 122 123 std::thread callbackEventNewThread_; 124 std::condition_variable callbackEventNewCon_; 125 std::mutex callbackEventNewMutex_; 126 std::shared_ptr<OHOS::AppExecFwk::EventHandler> callbackEventHandlerNew_ = nullptr; 127 128 std::mutex msgEventMutex_; 129 std::thread msgEventThread_; 130 std::shared_ptr<OHOS::AppExecFwk::EventHandler> msgEventHandler_; 131 std::condition_variable msgEventCon_; 132 133 std::mutex dmsAdapetrLock_; 134 void *dllHandle_ = nullptr; 135 ISoftbusFileAdpater dmsFileAdapetr_ = { 136 .SetFileSchema = nullptr, 137 }; 138 private: 139 explicit ChannelManager() = default; 140 ~ChannelManager(); 141 142 void Reset(); 143 int32_t PostTask(const AppExecFwk::InnerEvent::Callback& callback, 144 const AppExecFwk::EventQueue::Priority priority, const std::string& name = ""); 145 int32_t PostCallbackTask(const AppExecFwk::InnerEvent::Callback& callback, 146 const AppExecFwk::EventQueue::Priority priority); 147 int32_t PostCallbackTaskNew(const AppExecFwk::InnerEvent::Callback& callback, 148 const AppExecFwk::EventQueue::Priority priority); 149 int32_t PostMsgTask(const AppExecFwk::InnerEvent::Callback& callback, 150 const AppExecFwk::EventQueue::Priority priority); 151 void StartEvent(); 152 void StartCallbackEvent(); 153 void StartCallbackEventNew(); 154 void StartMsgEvent(); 155 156 int32_t CreateServerSocket(); 157 int32_t CreateClientSocket(const std::string& channelName, 158 const std::string& peerName, const std::string& peerNetworkId, const ChannelDataType dataType); 159 std::optional<ChannelInfo> CreateBaseChannel(const std::string& channelName, 160 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo); 161 int32_t GenerateNextId(const ChannelDataType dataType); 162 int32_t RegisterSocket(ChannelInfo& info, const ChannelDataType dataType); 163 164 void ClearRegisterChannel(const int32_t channelId); 165 void ClearRegisterListener(const int32_t channelId); 166 void ClearRegisterSocket(const int32_t channelId); 167 168 void CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>>& listeners); 169 170 int32_t DoBindSockets(const std::vector<int32_t>& socketIds, const ChannelDataType dataType); 171 ChannelStatus GetSocketStatus(const int32_t socketId); 172 int32_t BindSocket(const int32_t socketId, const ChannelDataType dataType); 173 int32_t SetSocketStatus(const int32_t socketId, const ChannelStatus status); 174 int32_t UpdateChannelStatus(const int32_t channelId); 175 int32_t SetChannelStatus(const int32_t channelId, const ChannelStatus status); 176 void DoErrorCallback(const int32_t channelId, const int32_t errorCode); 177 178 std::optional<std::string> GetChannelNameFromSocket(const std::string& socketName); 179 std::optional<ChannelDataType> GetChannelDataTypeFromName(const std::string& channelName); 180 int32_t GetChannelId(const std::string& channelName, const ChannelDataType dataType); 181 int32_t RegisterSocket(const int32_t socketId, const int32_t channelId); 182 int32_t UpdateChannel(const int32_t socketId, const int32_t channelId); 183 int32_t GetChannelId(const int32_t socketId); 184 ChannelStatus GetChannelStatus(const int32_t channelId); 185 186 template <typename Func, typename... Args> 187 int32_t DoSendData(const int32_t channelId, Func doSendFunc, Args&& ...args); 188 template <typename Func, typename... Args> 189 void NotifyListeners(const int32_t channelId, Func listenerFunc, 190 const AppExecFwk::EventQueue::Priority priority, Args&& ...args); 191 template <typename Func, typename... Args> 192 void NotifyListenersNew(const int32_t channelId, Func listenerFunc, 193 const AppExecFwk::EventQueue::Priority priority, Args&& ...args); 194 int32_t GetValidSocket(const int32_t channelId); 195 int32_t DoSendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data); 196 int32_t DoSendMessage(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data); 197 int32_t DoSendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data); 198 int32_t DoSendFile(const int32_t channelId, const std::vector<std::string>& sFiles, 199 const std::vector<std::string>& dFiles); 200 std::shared_ptr<AVTransDataBuffer> ProcessRecvData(const int32_t channelId, 201 const int32_t socketId, const void* data, const uint32_t dataLen); 202 void DoConnectCallback(const int32_t channelId); 203 void DoDisConnectCallback(const int32_t channelId, const ShutdownReason& reason); 204 void DoBytesReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& buffer); 205 void DoMessageReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& buffer); 206 void DoStreamReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data); 207 void DispatchProcessFileEvent(int32_t channelId, FileEvent *event); 208 void DealFileSendEvent(int32_t channelId, FileEvent *event); 209 void DealFileRecvEvent(int32_t channelId, FileEvent *event); 210 void DealFileErrorEvent(int32_t channelId, FileEvent *event); 211 void DealFileUpdatePathEvent(int32_t channelId, FileEvent *event); 212 void DoFileRecvCallback(const int32_t channelId, const FileInfo& info); 213 void DoFileSendCallback(const int32_t channelId, const FileInfo& info); 214 int32_t GetDmsInteractiveAdapterProxy(); 215 }; 216 } 217 } 218 #endif