1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef STREAM_SOCKET_H 17 #define STREAM_SOCKET_H 18 19 #include <condition_variable> 20 #include <map> 21 #include <mutex> 22 #include <queue> 23 #include <utility> 24 25 #include "i_stream.h" 26 #include "session.h" 27 #include "stream_common.h" 28 29 namespace Communication { 30 namespace SoftBus { 31 class IStreamSocketListener { 32 public: 33 IStreamSocketListener() = default; 34 virtual ~IStreamSocketListener() = default; 35 36 virtual void OnStreamReceived(std::unique_ptr<IStream> stream) = 0; 37 virtual void OnStreamStatus(int status) = 0; 38 virtual int OnStreamHdrReceived(std::unique_ptr<char[]> header, int size) = 0; 39 virtual void OnQosEvent(int32_t eventId, int32_t tvCount, const QosTv *tvList) const = 0; 40 }; 41 42 class IStreamSocket { 43 public: IStreamSocket()44 IStreamSocket() 45 { 46 listenFd_ = -1; 47 streamFd_ = -1; 48 epollFd_ = -1; 49 isStreamRecv_ = false; 50 streamType_ = INVALID; 51 isBlocked_ = false; 52 } 53 virtual ~IStreamSocket() = default; 54 55 virtual bool CreateClient(IpAndPort &local, int streamType, const std::string &sessionKey) = 0; // socket + bind 56 virtual bool CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType, 57 const std::string &sessionKey) = 0; 58 virtual bool CreateServer(IpAndPort &local, int streamType, const std::string &sessionKey) = 0; 59 60 virtual void DestroyStreamSocket() = 0; 61 62 virtual bool Connect(const IpAndPort &remote) = 0; 63 virtual bool Send(std::unique_ptr<IStream> stream) = 0; 64 65 virtual bool SetOption(int type, const StreamAttr &value) = 0; 66 virtual StreamAttr GetOption(int type) const = 0; 67 68 virtual bool SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver) = 0; 69 70 protected: 71 static constexpr int MAX_EPOLL_NUM = 100; 72 static constexpr int MAX_CONNECTION_VALUE = 100; 73 static constexpr int FRAME_HEADER_LEN = 4; 74 static constexpr int BYTE_TO_BIT = 8; 75 static constexpr int INT_TO_BYTE = 0xff; 76 static constexpr int IPTOS_LOWDELAY = 0XBC; 77 static constexpr int DEFAULT_UDP_BUFFER_SIZE = 512 * 1024; 78 static constexpr int STREAM_BUFFER_THRESHOLD = 5; 79 80 virtual int CreateAndBindSocket(IpAndPort &local) = 0; 81 virtual bool Accept() = 0; 82 83 virtual int EpollTimeout(int fd, int timeout) = 0; 84 virtual int SetSocketEpollMode(int fd) = 0; 85 virtual std::unique_ptr<char[]> RecvStream(int dataLength) = 0; TakeStream()86 virtual std::unique_ptr<IStream> TakeStream() 87 { 88 std::unique_lock<std::mutex> lock(streamReceiveLock_); 89 while (isStreamRecv_) { 90 if (!streamReceiveBuffer_.empty()) { 91 auto item = std::move(streamReceiveBuffer_.front()); 92 streamReceiveBuffer_.pop(); 93 return item; 94 } 95 streamReceiveCv_.wait(lock); 96 } 97 return nullptr; 98 } 99 PutStream(std::unique_ptr<IStream> stream)100 virtual void PutStream(std::unique_ptr<IStream> stream) 101 { 102 std::lock_guard<std::mutex> lock(streamReceiveLock_); 103 if (isStreamRecv_) { 104 streamReceiveBuffer_.push(std::move(stream)); 105 streamReceiveCv_.notify_all(); 106 } 107 } 108 GetStreamNum()109 virtual int GetStreamNum() 110 { 111 std::lock_guard<std::mutex> lock(streamReceiveLock_); 112 return streamReceiveBuffer_.size(); 113 } 114 QuitStreamBuffer()115 virtual void QuitStreamBuffer() 116 { 117 std::lock_guard<std::mutex> lock(streamReceiveLock_); 118 isStreamRecv_ = false; 119 streamReceiveCv_.notify_all(); 120 } 121 122 int listenFd_; 123 int streamFd_; 124 int epollFd_; 125 IpAndPort localIpPort_ {}; 126 IpAndPort remoteIpPort_ {}; 127 bool isStreamRecv_; 128 std::shared_ptr<IStreamSocketListener> streamReceiver_ = nullptr; 129 std::queue<std::unique_ptr<IStream>> streamReceiveBuffer_; 130 std::mutex streamReceiveLock_; 131 std::condition_variable streamReceiveCv_; 132 int streamType_ = INVALID; 133 bool isBlocked_; 134 std::string sessionKey_; 135 }; 136 } // namespace SoftBus 137 } // namespace Communication 138 139 #endif