1 /* 2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef STREAM_SOCKET_H 17 #define STREAM_SOCKET_H 18 19 #include <condition_variable> 20 #include <map> 21 #include <mutex> 22 #include <queue> 23 #include <securec.h> 24 #include <utility> 25 26 #include "client_trans_stream.h" 27 #include "i_stream.h" 28 #include "session.h" 29 #include "stream_common.h" 30 31 namespace Communication { 32 namespace SoftBus { 33 class IStreamSocketListener { 34 public: 35 IStreamSocketListener() = default; 36 virtual ~IStreamSocketListener() = default; 37 38 virtual void OnStreamReceived(std::unique_ptr<IStream> stream) = 0; 39 virtual void OnStreamStatus(int status) = 0; 40 virtual int OnStreamHdrReceived(std::unique_ptr<char[]> header, int size) = 0; 41 virtual void OnQosEvent(int32_t eventId, int32_t tvCount, const QosTv *tvList) const = 0; 42 virtual void OnFrameStats(const StreamSendStats *data) = 0; 43 virtual void OnRippleStats(const TrafficStats *data) = 0; 44 }; 45 46 class IStreamSocket { 47 public: IStreamSocket()48 IStreamSocket() 49 { 50 listenFd_ = -1; 51 streamFd_ = -1; 52 epollFd_ = -1; 53 isStreamRecv_ = false; 54 streamType_ = INVALID; 55 isBlocked_ = false; 56 } ~IStreamSocket()57 virtual ~IStreamSocket() 58 { 59 if (sessionKey_.first != nullptr) { 60 (void)memset_s(sessionKey_.first, sessionKey_.second, 0, sessionKey_.second); 61 delete [] sessionKey_.first; 62 } 63 sessionKey_.first = nullptr; 64 } 65 66 virtual bool CreateClient(IpAndPort &local, int streamType, 67 std::pair<uint8_t*, uint32_t> sessionKey) = 0; // socket + bind 68 virtual bool CreateClient(IpAndPort &local, const IpAndPort &remote, int streamType, 69 std::pair<uint8_t*, uint32_t> sessionKey) = 0; 70 virtual bool CreateServer(IpAndPort &local, int streamType, std::pair<uint8_t*, uint32_t> sessionKey) = 0; 71 72 virtual void DestroyStreamSocket() = 0; 73 74 virtual bool Connect(const IpAndPort &remote) = 0; 75 virtual bool Send(std::unique_ptr<IStream> stream) = 0; 76 77 virtual bool SetOption(int type, const StreamAttr &value) = 0; 78 virtual StreamAttr GetOption(int type) const = 0; 79 80 virtual bool SetStreamListener(std::shared_ptr<IStreamSocketListener> receiver) = 0; 81 82 protected: 83 static constexpr int MAX_EPOLL_NUM = 100; 84 static constexpr int MAX_CONNECTION_VALUE = 100; 85 static constexpr int FRAME_HEADER_LEN = 4; 86 static constexpr int BYTE_TO_BIT = 8; 87 static constexpr int INT_TO_BYTE = 0xff; 88 static constexpr int IPTOS_LOWDELAY = 0XBC; 89 static constexpr int DEFAULT_UDP_BUFFER_SIZE = 512 * 1024; 90 static constexpr int DEFAULT_UDP_BUFFER_RCV_SIZE = 1024 * 1024; 91 static constexpr int STREAM_BUFFER_THRESHOLD = 5; 92 93 virtual int CreateAndBindSocket(IpAndPort &local) = 0; 94 virtual bool Accept() = 0; 95 96 virtual int EpollTimeout(int fd, int timeout) = 0; 97 virtual int SetSocketEpollMode(int fd) = 0; 98 virtual std::unique_ptr<char[]> RecvStream(int dataLength) = 0; TakeStream()99 virtual std::unique_ptr<IStream> TakeStream() 100 { 101 std::unique_lock<std::mutex> lock(streamReceiveLock_); 102 while (isStreamRecv_) { 103 if (!streamReceiveBuffer_.empty()) { 104 auto item = std::move(streamReceiveBuffer_.front()); 105 streamReceiveBuffer_.pop(); 106 return item; 107 } 108 streamReceiveCv_.wait(lock); 109 } 110 return nullptr; 111 } 112 PutStream(std::unique_ptr<IStream> stream)113 virtual void PutStream(std::unique_ptr<IStream> stream) 114 { 115 std::lock_guard<std::mutex> lock(streamReceiveLock_); 116 if (isStreamRecv_) { 117 streamReceiveBuffer_.push(std::move(stream)); 118 streamReceiveCv_.notify_all(); 119 } 120 } 121 GetStreamNum()122 virtual int GetStreamNum() 123 { 124 std::lock_guard<std::mutex> lock(streamReceiveLock_); 125 return streamReceiveBuffer_.size(); 126 } 127 QuitStreamBuffer()128 virtual void QuitStreamBuffer() 129 { 130 std::lock_guard<std::mutex> lock(streamReceiveLock_); 131 isStreamRecv_ = false; 132 streamReceiveCv_.notify_all(); 133 } 134 135 int listenFd_; 136 int streamFd_; 137 int epollFd_; 138 IpAndPort localIpPort_ {}; 139 IpAndPort remoteIpPort_ {}; 140 bool isStreamRecv_; 141 std::shared_ptr<IStreamSocketListener> streamReceiver_ = nullptr; 142 std::queue<std::unique_ptr<IStream>> streamReceiveBuffer_; 143 std::mutex streamReceiveLock_; 144 std::condition_variable streamReceiveCv_; 145 int streamType_ = INVALID; 146 bool isBlocked_; 147 std::pair<uint8_t*, uint32_t> sessionKey_ = std::make_pair(nullptr, 0); 148 }; 149 } // namespace SoftBus 150 } // namespace Communication 151 152 #endif