• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 #ifndef OHOS_DSCHED_COLLAB_CHANNEL_MANAGER_H
17 #define OHOS_DSCHED_COLLAB_CHANNEL_MANAGER_H
18 #include "ichannel_listener.h"
19 #include "channel_common_definition.h"
20 #include "data_sender_receiver.h"
21 #include "dms_interface_structure.h"
22 #include "single_instance.h"
23 #include "socket.h"
24 #include <map>
25 #include <memory>
26 #include <mutex>
27 #include <optional>
28 #include <shared_mutex>
29 #include <string>
30 #include <vector>
31 #include <thread>
32 #include <atomic>
33 
34 namespace OHOS {
35 namespace DistributedCollab {
36 class ChannelManager final {
37     DECLARE_SINGLE_INSTANCE_BASE(ChannelManager);
38 
39 public:
40     int32_t Init(const std::string& ownerName);
41     void DeInit();
42 
43     int32_t GetVersion();
44     int32_t CreateServerChannel(const std::string& channelName,
45         const ChannelDataType dataType, const ChannelPeerInfo& peerInfo);
46     int32_t CreateClientChannel(const std::string& channelName,
47         const ChannelDataType dataType, const ChannelPeerInfo& peerInfo);
48     bool isValidChannelId(const int32_t channelId);
49     int32_t DeleteChannel(const int32_t channelId);
50     void ClearSendTask(int32_t channelId);
51     int32_t RegisterChannelListener(const int32_t channelId,
52         const std::shared_ptr<IChannelListener> listener);
53     int32_t ConnectChannel(const int32_t channelId);
54 
55     int32_t SendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data);
56     int32_t SendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data);
57     int32_t SendMessage(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data);
58     int32_t SendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
59         const std::vector<std::string>& dFiles);
60 
61     void OnSocketError(int32_t socketId, const int32_t errorCode);
62     void OnSocketConnected(int32_t socketId, const PeerSocketInfo& info);
63     void OnSocketClosed(int32_t socketId, const ShutdownReason& reason);
64     void OnBytesReceived(int32_t socketId, const void* data, const uint32_t dataLen);
65     void OnMessageReceived(int32_t socketId, const void* data, const uint32_t dataLen);
66     void OnStreamReceived(int32_t socketId, const StreamData* data,
67         const StreamData* ext, const StreamFrameInfo* param);
68     void OnFileEventReceived(int32_t socketId, FileEvent *event);
69     const char* GetRecvPathFromUser();
70 
71 private:
72     static constexpr int32_t VERSION_ = 0;
73     static constexpr int32_t CHANNEL_ID_GAP = 1000;
74     static constexpr int32_t MESSAGE_START_ID = 1001;
75     static constexpr int32_t BYTES_START_ID = MESSAGE_START_ID + CHANNEL_ID_GAP;
76     static constexpr int32_t STREAM_START_ID = BYTES_START_ID + CHANNEL_ID_GAP;
77     static constexpr int32_t FILE_START_ID = STREAM_START_ID + CHANNEL_ID_GAP;
78     static constexpr int32_t MAX_CHANNEL_NAME_LENGTH = 64;
79     static constexpr int32_t CHANNEL_NAME_PREFIX_LENGTH = 64;
80     static constexpr int32_t RETRY_TIME_GAP = 1000;
81     static constexpr int32_t MAX_FILE_COUNT = 500;
82 
83 private:
84     int32_t serverSocketId_ = -1;
85     std::string ownerName_;
86 
87     // for all channel res
88     std::shared_mutex channelMutex_;
89     // name2ids
90     std::map<std::string, std::vector<int32_t>> channelIdMap_;
91     // id2info
92     std::map<int32_t, ChannelInfo> channelInfoMap_;
93 
94     std::atomic_uint32_t fileChannelId_;
95 
96     // for all socket res
97     std::shared_mutex socketMutex_;
98     // socketId2channelId
99     std::map<int32_t, int32_t> socketChannelMap_;
100     std::map<int32_t, ChannelStatus> socketStatusMap_;
101 
102     std::shared_mutex listenerMutex_;
103     std::map<int32_t, std::vector<std::weak_ptr<IChannelListener>>> listenersMap_;
104 
105     std::map<ChannelDataType, std::mutex> typeMutex_;
106     std::map<ChannelDataType, std::int32_t> nextIds_ = {
107         { ChannelDataType::MESSAGE, MESSAGE_START_ID },
108         { ChannelDataType::BYTES, BYTES_START_ID },
109         { ChannelDataType::VIDEO_STREAM, STREAM_START_ID },
110         { ChannelDataType::FILE, FILE_START_ID }
111     };
112 
113     std::mutex eventMutex_;
114     std::thread eventThread_;
115     std::shared_ptr<OHOS::AppExecFwk::EventHandler> eventHandler_;
116     std::condition_variable eventCon_;
117 
118     std::mutex callbackEventMutex_;
119     std::thread callbackEventThread_;
120     std::shared_ptr<OHOS::AppExecFwk::EventHandler> callbackEventHandler_;
121     std::condition_variable callbackEventCon_;
122 
123     std::thread callbackEventNewThread_;
124     std::condition_variable callbackEventNewCon_;
125     std::mutex callbackEventNewMutex_;
126     std::shared_ptr<OHOS::AppExecFwk::EventHandler> callbackEventHandlerNew_ = nullptr;
127 
128     std::mutex msgEventMutex_;
129     std::thread msgEventThread_;
130     std::shared_ptr<OHOS::AppExecFwk::EventHandler> msgEventHandler_;
131     std::condition_variable msgEventCon_;
132 
133     std::mutex dmsAdapetrLock_;
134     void *dllHandle_ = nullptr;
135     ISoftbusFileAdpater dmsFileAdapetr_ = {
136         .SetFileSchema = nullptr,
137     };
138 private:
139     explicit ChannelManager() = default;
140     ~ChannelManager();
141 
142     void Reset();
143     int32_t PostTask(const AppExecFwk::InnerEvent::Callback& callback,
144         const AppExecFwk::EventQueue::Priority priority, const std::string& name = "");
145     int32_t PostCallbackTask(const AppExecFwk::InnerEvent::Callback& callback,
146             const AppExecFwk::EventQueue::Priority priority);
147     int32_t PostCallbackTaskNew(const AppExecFwk::InnerEvent::Callback& callback,
148             const AppExecFwk::EventQueue::Priority priority);
149     int32_t PostMsgTask(const AppExecFwk::InnerEvent::Callback& callback,
150         const AppExecFwk::EventQueue::Priority priority);
151     void StartEvent();
152     void StartCallbackEvent();
153     void StartCallbackEventNew();
154     void StartMsgEvent();
155 
156     int32_t CreateServerSocket();
157     int32_t CreateClientSocket(const std::string& channelName,
158         const std::string& peerName, const std::string& peerNetworkId, const ChannelDataType dataType);
159     std::optional<ChannelInfo> CreateBaseChannel(const std::string& channelName,
160         const ChannelDataType dataType, const ChannelPeerInfo& peerInfo);
161     int32_t GenerateNextId(const ChannelDataType dataType);
162     int32_t RegisterSocket(ChannelInfo& info, const ChannelDataType dataType);
163 
164     void ClearRegisterChannel(const int32_t channelId);
165     void ClearRegisterListener(const int32_t channelId);
166     void ClearRegisterSocket(const int32_t channelId);
167 
168     void CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>>& listeners);
169 
170     int32_t DoBindSockets(const std::vector<int32_t>& socketIds, const ChannelDataType dataType);
171     ChannelStatus GetSocketStatus(const int32_t socketId);
172     int32_t BindSocket(const int32_t socketId, const ChannelDataType dataType);
173     int32_t SetSocketStatus(const int32_t socketId, const ChannelStatus status);
174     int32_t UpdateChannelStatus(const int32_t channelId);
175     int32_t SetChannelStatus(const int32_t channelId, const ChannelStatus status);
176     void DoErrorCallback(const int32_t channelId, const int32_t errorCode);
177 
178     std::optional<std::string> GetChannelNameFromSocket(const std::string& socketName);
179     std::optional<ChannelDataType> GetChannelDataTypeFromName(const std::string& channelName);
180     int32_t GetChannelId(const std::string& channelName, const ChannelDataType dataType);
181     int32_t RegisterSocket(const int32_t socketId, const int32_t channelId);
182     int32_t UpdateChannel(const int32_t socketId, const int32_t channelId);
183     int32_t GetChannelId(const int32_t socketId);
184     ChannelStatus GetChannelStatus(const int32_t channelId);
185 
186     template <typename Func, typename... Args>
187     int32_t DoSendData(const int32_t channelId, Func doSendFunc, Args&& ...args);
188     template <typename Func, typename... Args>
189     void NotifyListeners(const int32_t channelId, Func listenerFunc,
190         const AppExecFwk::EventQueue::Priority priority, Args&& ...args);
191     template <typename Func, typename... Args>
192     void NotifyListenersNew(const int32_t channelId, Func listenerFunc,
193         const AppExecFwk::EventQueue::Priority priority, Args&& ...args);
194     int32_t GetValidSocket(const int32_t channelId);
195     int32_t DoSendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data);
196     int32_t DoSendMessage(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data);
197     int32_t DoSendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data);
198     int32_t DoSendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
199         const std::vector<std::string>& dFiles);
200     std::shared_ptr<AVTransDataBuffer> ProcessRecvData(const int32_t channelId,
201         const int32_t socketId, const void* data, const uint32_t dataLen);
202     void DoConnectCallback(const int32_t channelId);
203     void DoDisConnectCallback(const int32_t channelId, const ShutdownReason& reason);
204     void DoBytesReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& buffer);
205     void DoMessageReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& buffer);
206     void DoStreamReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data);
207     void DispatchProcessFileEvent(int32_t channelId, FileEvent *event);
208     void DealFileSendEvent(int32_t channelId, FileEvent *event);
209     void DealFileRecvEvent(int32_t channelId, FileEvent *event);
210     void DealFileErrorEvent(int32_t channelId, FileEvent *event);
211     void DealFileUpdatePathEvent(int32_t channelId, FileEvent *event);
212     void DoFileRecvCallback(const int32_t channelId, const FileInfo& info);
213     void DoFileSendCallback(const int32_t channelId, const FileInfo& info);
214     int32_t GetDmsInteractiveAdapterProxy();
215 };
216 }
217 }
218 #endif