1 /* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DSCHED_COLLAB_CHANNEL_MANAGER_H 17 #define OHOS_DSCHED_COLLAB_CHANNEL_MANAGER_H 18 #include "ichannel_listener.h" 19 #include "channel_common_definition.h" 20 #include "data_sender_receiver.h" 21 #include "event_handler.h" 22 #include "single_instance.h" 23 #include "socket.h" 24 #include <map> 25 #include <memory> 26 #include <mutex> 27 #include <optional> 28 #include <shared_mutex> 29 #include <string> 30 #include <vector> 31 #include <thread> 32 #include <atomic> 33 34 namespace OHOS { 35 namespace DistributedCollab { 36 class ChannelManager final { 37 DECLARE_SINGLE_INSTANCE_BASE(ChannelManager); 38 39 public: 40 int32_t Init(const std::string& ownerName); 41 void DeInit(); 42 43 int32_t GetVersion(); 44 int32_t CreateServerChannel(const std::string& channelName, 45 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo); 46 int32_t CreateClientChannel(const std::string& channelName, 47 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo); 48 bool isValidChannelId(const int32_t channelId); 49 int32_t DeleteChannel(const int32_t channelId); 50 void ClearSendTask(int32_t channelId); 51 int32_t RegisterChannelListener(const int32_t channelId, 52 const std::shared_ptr<IChannelListener> listener); 53 int32_t ConnectChannel(const int32_t channelId); 54 55 int32_t SendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data); 56 int32_t SendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data); 57 int32_t SendMessage(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data); 58 int32_t SendFile(const int32_t channelId, const std::vector<std::string>& sFiles, 59 const std::vector<std::string>& dFiles); 60 61 void OnSocketError(int32_t socketId, const int32_t errorCode); 62 void OnSocketConnected(int32_t socketId, const PeerSocketInfo& info); 63 void OnSocketClosed(int32_t socketId, const ShutdownReason& reason); 64 void OnBytesReceived(int32_t socketId, const void* data, const uint32_t dataLen); 65 void OnMessageReceived(int32_t socketId, const void* data, const uint32_t dataLen); 66 void OnStreamReceived(int32_t socketId, const StreamData* data, 67 const StreamData* ext, const StreamFrameInfo* param); 68 void OnFileEventReceived(int32_t socketId, FileEvent *event); 69 const char* GetRecvPathFromUser(); 70 71 private: 72 static constexpr int32_t VERSION_ = 0; 73 static constexpr int32_t CHANNEL_ID_GAP = 1000; 74 static constexpr int32_t MESSAGE_START_ID = 1001; 75 static constexpr int32_t BYTES_START_ID = MESSAGE_START_ID + CHANNEL_ID_GAP; 76 static constexpr int32_t STREAM_START_ID = BYTES_START_ID + CHANNEL_ID_GAP; 77 static constexpr int32_t FILE_START_ID = STREAM_START_ID + CHANNEL_ID_GAP; 78 static constexpr int32_t MAX_CHANNEL_NAME_LENGTH = 64; 79 static constexpr int32_t CHANNEL_NAME_PREFIX_LENGTH = 64; 80 static constexpr int32_t RETRY_TIME_GAP = 1000; 81 static constexpr int32_t MAX_FILE_COUNT = 500; 82 83 private: 84 int32_t serverSocketId_ = -1; 85 std::string ownerName_; 86 87 // for all channel res 88 std::shared_mutex channelMutex_; 89 // name2ids 90 std::map<std::string, std::vector<int32_t>> channelIdMap_; 91 // id2info 92 std::map<int32_t, ChannelInfo> channelInfoMap_; 93 94 std::atomic_uint32_t fileChannelId_; 95 96 // for all socket res 97 std::shared_mutex socketMutex_; 98 // socketId2channelId 99 std::map<int32_t, int32_t> socketChannelMap_; 100 std::map<int32_t, ChannelStatus> socketStatusMap_; 101 102 std::shared_mutex listenerMutex_; 103 std::map<int32_t, std::vector<std::weak_ptr<IChannelListener>>> listenersMap_; 104 105 std::map<ChannelDataType, std::mutex> typeMutex_; 106 std::map<ChannelDataType, std::int32_t> nextIds_ = { 107 { ChannelDataType::MESSAGE, MESSAGE_START_ID }, 108 { ChannelDataType::BYTES, BYTES_START_ID }, 109 { ChannelDataType::VIDEO_STREAM, STREAM_START_ID }, 110 { ChannelDataType::FILE, FILE_START_ID } 111 }; 112 113 std::mutex eventMutex_; 114 std::thread eventThread_; 115 std::shared_ptr<OHOS::AppExecFwk::EventHandler> eventHandler_; 116 std::condition_variable eventCon_; 117 118 std::mutex callbackEventMutex_; 119 std::thread callbackEventThread_; 120 std::shared_ptr<OHOS::AppExecFwk::EventHandler> callbackEventHandler_; 121 std::condition_variable callbackEventCon_; 122 123 std::mutex msgEventMutex_; 124 std::thread msgEventThread_; 125 std::shared_ptr<OHOS::AppExecFwk::EventHandler> msgEventHandler_; 126 std::condition_variable msgEventCon_; 127 private: 128 explicit ChannelManager() = default; 129 ~ChannelManager(); 130 131 void Reset(); 132 int32_t PostTask(const AppExecFwk::InnerEvent::Callback& callback, 133 const AppExecFwk::EventQueue::Priority priority, const std::string& name = ""); 134 int32_t PostCallbackTask(const AppExecFwk::InnerEvent::Callback& callback, 135 const AppExecFwk::EventQueue::Priority priority); 136 int32_t PostMsgTask(const AppExecFwk::InnerEvent::Callback& callback, 137 const AppExecFwk::EventQueue::Priority priority); 138 void StartEvent(); 139 void StartCallbackEvent(); 140 void StartMsgEvent(); 141 142 int32_t CreateServerSocket(); 143 int32_t CreateClientSocket(const std::string& channelName, 144 const std::string& peerName, const std::string& peerNetworkId, const ChannelDataType dataType); 145 std::optional<ChannelInfo> CreateBaseChannel(const std::string& channelName, 146 const ChannelDataType dataType, const ChannelPeerInfo& peerInfo); 147 int32_t GenerateNextId(const ChannelDataType dataType); 148 int32_t RegisterSocket(ChannelInfo& info, const ChannelDataType dataType); 149 150 void ClearRegisterChannel(const int32_t channelId); 151 void ClearRegisterListener(const int32_t channelId); 152 void ClearRegisterSocket(const int32_t channelId); 153 154 void CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>>& listeners); 155 156 int32_t DoBindSockets(const std::vector<int32_t>& socketIds, const ChannelDataType dataType); 157 ChannelStatus GetSocketStatus(const int32_t socketId); 158 int32_t BindSocket(const int32_t socketId, const ChannelDataType dataType); 159 int32_t SetSocketStatus(const int32_t socketId, const ChannelStatus status); 160 int32_t UpdateChannelStatus(const int32_t channelId); 161 int32_t SetChannelStatus(const int32_t channelId, const ChannelStatus status); 162 void DoErrorCallback(const int32_t channelId, const int32_t errorCode); 163 164 std::optional<std::string> GetChannelNameFromSocket(const std::string& socketName); 165 std::optional<ChannelDataType> GetChannelDataTypeFromName(const std::string& channelName); 166 int32_t GetChannelId(const std::string& channelName, const ChannelDataType dataType); 167 int32_t RegisterSocket(const int32_t socketId, const int32_t channelId); 168 int32_t UpdateChannel(const int32_t socketId, const int32_t channelId); 169 int32_t GetChannelId(const int32_t socketId); 170 ChannelStatus GetChannelStatus(const int32_t channelId); 171 172 template <typename Func, typename... Args> 173 int32_t DoSendData(const int32_t channelId, Func doSendFunc, Args&& ...args); 174 template <typename Func, typename... Args> 175 void NotifyListeners(const int32_t channelId, Func listenerFunc, 176 const AppExecFwk::EventQueue::Priority priority, Args&& ...args); 177 int32_t GetValidSocket(const int32_t channelId); 178 int32_t DoSendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data); 179 int32_t DoSendMessage(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data); 180 int32_t DoSendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data); 181 int32_t DoSendFile(const int32_t channelId, const std::vector<std::string>& sFiles, 182 const std::vector<std::string>& dFiles); 183 std::shared_ptr<AVTransDataBuffer> ProcessRecvData(const int32_t channelId, 184 const int32_t socketId, const void* data, const uint32_t dataLen); 185 void DoConnectCallback(const int32_t channelId); 186 void DoDisConnectCallback(const int32_t channelId, const ShutdownReason& reason); 187 void DoBytesReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& buffer); 188 void DoMessageReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& buffer); 189 void DoStreamReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data); 190 void DispatchProcessFileEvent(int32_t channelId, FileEvent *event); 191 void DealFileSendEvent(int32_t channelId, FileEvent *event); 192 void DealFileRecvEvent(int32_t channelId, FileEvent *event); 193 void DealFileErrorEvent(int32_t channelId, FileEvent *event); 194 void DealFileUpdatePathEvent(int32_t channelId, FileEvent *event); 195 void DoFileRecvCallback(const int32_t channelId, const FileInfo& info); 196 void DoFileSendCallback(const int32_t channelId, const FileInfo& info); 197 }; 198 } 199 } 200 #endif