• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef STREAM_SOCKET_H
17 #define STREAM_SOCKET_H
18 
19 #include <condition_variable>
20 #include <map>
21 #include <mutex>
22 #include <queue>
23 #include <utility>
24 
25 #include "i_stream.h"
26 #include "session.h"
27 #include "stream_common.h"
28 
29 namespace Communication {
30 namespace SoftBus {
31 class IStreamSocketListener {
32 public:
33     IStreamSocketListener() = default;
34     virtual ~IStreamSocketListener() = default;
35 
36     virtual void OnStreamReceived(std::unique_ptr<IStream> stream) = 0;
37     virtual void OnStreamStatus(int status) = 0;
38     virtual int OnStreamHdrReceived(std::unique_ptr<char[]> header, int size) = 0;
39     virtual void OnQosEvent(int32_t eventId, int32_t tvCount, const QosTv *tvList) const = 0;
40 };
41 
42 class IStreamSocket {
43 public:
IStreamSocket()44     IStreamSocket()
45     {
46         listenFd_ = -1;
47         streamFd_ = -1;
48         epollFd_ = -1;
49         isStreamRecv_ = false;
50         streamType_ = INVALID;
51         isBlocked_ = false;
52     }
53     virtual ~IStreamSocket() = default;
54 
55     virtual bool CreateClient(IpAndPort &local, int streamType, const std::string &sessionKey) = 0; // socket + bind
56     virtual bool CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType,
57         const std::string &sessionKey) = 0;
58     virtual bool CreateServer(IpAndPort &local, int streamType, const std::string &sessionKey) = 0;
59 
60     virtual void DestroyStreamSocket() = 0;
61 
62     virtual bool Connect(const IpAndPort &remote) = 0;
63     virtual bool Send(std::unique_ptr<IStream> stream) = 0;
64 
65     virtual bool SetOption(int type, const StreamAttr &value) = 0;
66     virtual StreamAttr GetOption(int type) const = 0;
67 
68     virtual bool SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver) = 0;
69 
70 protected:
71     static constexpr int MAX_EPOLL_NUM = 100;
72     static constexpr int MAX_CONNECTION_VALUE = 100;
73     static constexpr int FRAME_HEADER_LEN = 4;
74     static constexpr int BYTE_TO_BIT = 8;
75     static constexpr int INT_TO_BYTE = 0xff;
76     static constexpr int IPTOS_LOWDELAY = 0XBC;
77     static constexpr int DEFAULT_UDP_BUFFER_SIZE = 512 * 1024;
78     static constexpr int STREAM_BUFFER_THRESHOLD = 5;
79 
80     virtual int CreateAndBindSocket(IpAndPort &local) = 0;
81     virtual bool Accept() = 0;
82 
83     virtual int EpollTimeout(int fd, int timeout) = 0;
84     virtual int SetSocketEpollMode(int fd) = 0;
85     virtual std::unique_ptr<char[]> RecvStream(int dataLength) = 0;
TakeStream()86     virtual std::unique_ptr<IStream> TakeStream()
87     {
88         std::unique_lock<std::mutex> lock(streamReceiveLock_);
89         while (isStreamRecv_) {
90             if (!streamReceiveBuffer_.empty()) {
91                 auto item = std::move(streamReceiveBuffer_.front());
92                 streamReceiveBuffer_.pop();
93                 return item;
94             }
95             streamReceiveCv_.wait(lock);
96         }
97         return nullptr;
98     }
99 
PutStream(std::unique_ptr<IStream> stream)100     virtual void PutStream(std::unique_ptr<IStream> stream)
101     {
102         std::lock_guard<std::mutex> lock(streamReceiveLock_);
103         if (isStreamRecv_) {
104             streamReceiveBuffer_.push(std::move(stream));
105             streamReceiveCv_.notify_all();
106         }
107     }
108 
GetStreamNum()109     virtual int GetStreamNum()
110     {
111         std::lock_guard<std::mutex> lock(streamReceiveLock_);
112         return streamReceiveBuffer_.size();
113     }
114 
QuitStreamBuffer()115     virtual void QuitStreamBuffer()
116     {
117         std::lock_guard<std::mutex> lock(streamReceiveLock_);
118         isStreamRecv_ = false;
119         streamReceiveCv_.notify_all();
120     }
121 
122     int listenFd_;
123     int streamFd_;
124     int epollFd_;
125     IpAndPort localIpPort_ {};
126     IpAndPort remoteIpPort_ {};
127     bool isStreamRecv_;
128     std::shared_ptr<IStreamSocketListener> streamReceiver_ = nullptr;
129     std::queue<std::unique_ptr<IStream>> streamReceiveBuffer_;
130     std::mutex streamReceiveLock_;
131     std::condition_variable streamReceiveCv_;
132     int streamType_ = INVALID;
133     bool isBlocked_;
134     std::string sessionKey_;
135 };
136 } // namespace SoftBus
137 } // namespace Communication
138 
139 #endif