• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 #ifndef OHOS_DSCHED_COLLAB_CHANNEL_MANAGER_H
17 #define OHOS_DSCHED_COLLAB_CHANNEL_MANAGER_H
18 #include "ichannel_listener.h"
19 #include "channel_common_definition.h"
20 #include "data_sender_receiver.h"
21 #include "event_handler.h"
22 #include "single_instance.h"
23 #include "socket.h"
24 #include <map>
25 #include <memory>
26 #include <mutex>
27 #include <optional>
28 #include <shared_mutex>
29 #include <string>
30 #include <vector>
31 #include <thread>
32 #include <atomic>
33 
34 namespace OHOS {
35 namespace DistributedCollab {
36 class ChannelManager final {
37     DECLARE_SINGLE_INSTANCE_BASE(ChannelManager);
38 
39 public:
40     int32_t Init(const std::string& ownerName);
41     void DeInit();
42 
43     int32_t GetVersion();
44     int32_t CreateServerChannel(const std::string& channelName,
45         const ChannelDataType dataType, const ChannelPeerInfo& peerInfo);
46     int32_t CreateClientChannel(const std::string& channelName,
47         const ChannelDataType dataType, const ChannelPeerInfo& peerInfo);
48     bool isValidChannelId(const int32_t channelId);
49     int32_t DeleteChannel(const int32_t channelId);
50     void ClearSendTask(int32_t channelId);
51     int32_t RegisterChannelListener(const int32_t channelId,
52         const std::shared_ptr<IChannelListener> listener);
53     int32_t ConnectChannel(const int32_t channelId);
54 
55     int32_t SendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data);
56     int32_t SendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data);
57     int32_t SendMessage(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data);
58     int32_t SendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
59         const std::vector<std::string>& dFiles);
60 
61     void OnSocketError(int32_t socketId, const int32_t errorCode);
62     void OnSocketConnected(int32_t socketId, const PeerSocketInfo& info);
63     void OnSocketClosed(int32_t socketId, const ShutdownReason& reason);
64     void OnBytesReceived(int32_t socketId, const void* data, const uint32_t dataLen);
65     void OnMessageReceived(int32_t socketId, const void* data, const uint32_t dataLen);
66     void OnStreamReceived(int32_t socketId, const StreamData* data,
67         const StreamData* ext, const StreamFrameInfo* param);
68     void OnFileEventReceived(int32_t socketId, FileEvent *event);
69     const char* GetRecvPathFromUser();
70 
71 private:
72     static constexpr int32_t VERSION_ = 0;
73     static constexpr int32_t CHANNEL_ID_GAP = 1000;
74     static constexpr int32_t MESSAGE_START_ID = 1001;
75     static constexpr int32_t BYTES_START_ID = MESSAGE_START_ID + CHANNEL_ID_GAP;
76     static constexpr int32_t STREAM_START_ID = BYTES_START_ID + CHANNEL_ID_GAP;
77     static constexpr int32_t FILE_START_ID = STREAM_START_ID + CHANNEL_ID_GAP;
78     static constexpr int32_t MAX_CHANNEL_NAME_LENGTH = 64;
79     static constexpr int32_t CHANNEL_NAME_PREFIX_LENGTH = 64;
80     static constexpr int32_t RETRY_TIME_GAP = 1000;
81     static constexpr int32_t MAX_FILE_COUNT = 500;
82 
83 private:
84     int32_t serverSocketId_ = -1;
85     std::string ownerName_;
86 
87     // for all channel res
88     std::shared_mutex channelMutex_;
89     // name2ids
90     std::map<std::string, std::vector<int32_t>> channelIdMap_;
91     // id2info
92     std::map<int32_t, ChannelInfo> channelInfoMap_;
93 
94     std::atomic_uint32_t fileChannelId_;
95 
96     // for all socket res
97     std::shared_mutex socketMutex_;
98     // socketId2channelId
99     std::map<int32_t, int32_t> socketChannelMap_;
100     std::map<int32_t, ChannelStatus> socketStatusMap_;
101 
102     std::shared_mutex listenerMutex_;
103     std::map<int32_t, std::vector<std::weak_ptr<IChannelListener>>> listenersMap_;
104 
105     std::map<ChannelDataType, std::mutex> typeMutex_;
106     std::map<ChannelDataType, std::int32_t> nextIds_ = {
107         { ChannelDataType::MESSAGE, MESSAGE_START_ID },
108         { ChannelDataType::BYTES, BYTES_START_ID },
109         { ChannelDataType::VIDEO_STREAM, STREAM_START_ID },
110         { ChannelDataType::FILE, FILE_START_ID }
111     };
112 
113     std::mutex eventMutex_;
114     std::thread eventThread_;
115     std::shared_ptr<OHOS::AppExecFwk::EventHandler> eventHandler_;
116     std::condition_variable eventCon_;
117 
118     std::mutex callbackEventMutex_;
119     std::thread callbackEventThread_;
120     std::shared_ptr<OHOS::AppExecFwk::EventHandler> callbackEventHandler_;
121     std::condition_variable callbackEventCon_;
122 
123     std::mutex msgEventMutex_;
124     std::thread msgEventThread_;
125     std::shared_ptr<OHOS::AppExecFwk::EventHandler> msgEventHandler_;
126     std::condition_variable msgEventCon_;
127 private:
128     explicit ChannelManager() = default;
129     ~ChannelManager();
130 
131     void Reset();
132     int32_t PostTask(const AppExecFwk::InnerEvent::Callback& callback,
133         const AppExecFwk::EventQueue::Priority priority, const std::string& name = "");
134     int32_t PostCallbackTask(const AppExecFwk::InnerEvent::Callback& callback,
135             const AppExecFwk::EventQueue::Priority priority);
136     int32_t PostMsgTask(const AppExecFwk::InnerEvent::Callback& callback,
137         const AppExecFwk::EventQueue::Priority priority);
138     void StartEvent();
139     void StartCallbackEvent();
140     void StartMsgEvent();
141 
142     int32_t CreateServerSocket();
143     int32_t CreateClientSocket(const std::string& channelName,
144         const std::string& peerName, const std::string& peerNetworkId, const ChannelDataType dataType);
145     std::optional<ChannelInfo> CreateBaseChannel(const std::string& channelName,
146         const ChannelDataType dataType, const ChannelPeerInfo& peerInfo);
147     int32_t GenerateNextId(const ChannelDataType dataType);
148     int32_t RegisterSocket(ChannelInfo& info, const ChannelDataType dataType);
149 
150     void ClearRegisterChannel(const int32_t channelId);
151     void ClearRegisterListener(const int32_t channelId);
152     void ClearRegisterSocket(const int32_t channelId);
153 
154     void CleanInvalidListener(std::vector<std::weak_ptr<IChannelListener>>& listeners);
155 
156     int32_t DoBindSockets(const std::vector<int32_t>& socketIds, const ChannelDataType dataType);
157     ChannelStatus GetSocketStatus(const int32_t socketId);
158     int32_t BindSocket(const int32_t socketId, const ChannelDataType dataType);
159     int32_t SetSocketStatus(const int32_t socketId, const ChannelStatus status);
160     int32_t UpdateChannelStatus(const int32_t channelId);
161     int32_t SetChannelStatus(const int32_t channelId, const ChannelStatus status);
162     void DoErrorCallback(const int32_t channelId, const int32_t errorCode);
163 
164     std::optional<std::string> GetChannelNameFromSocket(const std::string& socketName);
165     std::optional<ChannelDataType> GetChannelDataTypeFromName(const std::string& channelName);
166     int32_t GetChannelId(const std::string& channelName, const ChannelDataType dataType);
167     int32_t RegisterSocket(const int32_t socketId, const int32_t channelId);
168     int32_t UpdateChannel(const int32_t socketId, const int32_t channelId);
169     int32_t GetChannelId(const int32_t socketId);
170     ChannelStatus GetChannelStatus(const int32_t channelId);
171 
172     template <typename Func, typename... Args>
173     int32_t DoSendData(const int32_t channelId, Func doSendFunc, Args&& ...args);
174     template <typename Func, typename... Args>
175     void NotifyListeners(const int32_t channelId, Func listenerFunc,
176         const AppExecFwk::EventQueue::Priority priority, Args&& ...args);
177     int32_t GetValidSocket(const int32_t channelId);
178     int32_t DoSendBytes(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data);
179     int32_t DoSendMessage(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& data);
180     int32_t DoSendStream(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data);
181     int32_t DoSendFile(const int32_t channelId, const std::vector<std::string>& sFiles,
182         const std::vector<std::string>& dFiles);
183     std::shared_ptr<AVTransDataBuffer> ProcessRecvData(const int32_t channelId,
184         const int32_t socketId, const void* data, const uint32_t dataLen);
185     void DoConnectCallback(const int32_t channelId);
186     void DoDisConnectCallback(const int32_t channelId, const ShutdownReason& reason);
187     void DoBytesReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& buffer);
188     void DoMessageReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransDataBuffer>& buffer);
189     void DoStreamReceiveCallback(const int32_t channelId, const std::shared_ptr<AVTransStreamData>& data);
190     void DispatchProcessFileEvent(int32_t channelId, FileEvent *event);
191     void DealFileSendEvent(int32_t channelId, FileEvent *event);
192     void DealFileRecvEvent(int32_t channelId, FileEvent *event);
193     void DealFileErrorEvent(int32_t channelId, FileEvent *event);
194     void DealFileUpdatePathEvent(int32_t channelId, FileEvent *event);
195     void DoFileRecvCallback(const int32_t channelId, const FileInfo& info);
196     void DoFileSendCallback(const int32_t channelId, const FileInfo& info);
197 };
198 }
199 }
200 #endif