• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <SocketReaderNode.h>
18 #include <ImsMediaTrace.h>
19 #include <ImsMediaTimer.h>
20 #include <thread>
21 
22 #define MAX_BUFFER_QUEUE 250  // 5 sec in audio case.
23 
SocketReaderNode(BaseSessionCallback * callback)24 SocketReaderNode::SocketReaderNode(BaseSessionCallback* callback) :
25         BaseNode(callback),
26         mLocalFd(0)
27 {
28     mSocket = nullptr;
29     mReceiveTtl = false;
30     mSocketOpened = false;
31 }
32 
~SocketReaderNode()33 SocketReaderNode::~SocketReaderNode()
34 {
35     IMLOGD1("[~SocketReaderNode] queue size[%d]", GetDataCount());
36     CloseSocket();
37 }
38 
GetNodeId()39 kBaseNodeId SocketReaderNode::GetNodeId()
40 {
41     return kNodeIdSocketReader;
42 }
43 
Prepare()44 bool SocketReaderNode::Prepare()
45 {
46     if (!mSocketOpened)
47     {
48         return OpenSocket();
49     }
50 
51     return true;
52 }
53 
Start()54 ImsMediaResult SocketReaderNode::Start()
55 {
56     ClearDataQueue();  // clear the old data stacked
57 
58     if (mSocketOpened)
59     {
60         IMLOGD0("[Start] opened already");
61     }
62     else
63     {
64         if (!OpenSocket())
65         {
66             return RESULT_PORT_UNAVAILABLE;
67         }
68     }
69 
70     mNodeState = kNodeStateRunning;
71     return RESULT_SUCCESS;
72 }
73 
Stop()74 void SocketReaderNode::Stop()
75 {
76     IMLOGD2("[Stop] media[%d], protocolType[%d]", mMediaType, mProtocolType);
77     mNodeState = kNodeStateStopped;
78 }
79 
ProcessData()80 void SocketReaderNode::ProcessData()
81 {
82     uint8_t* data = nullptr;
83     uint32_t dataSize = 0;
84     uint32_t timeStamp = 0;
85     bool bMark = false;
86     uint32_t seqNum = 0;
87     ImsMediaSubType subtype;
88     ImsMediaSubType dataType;
89     uint32_t arrivalTime;
90 
91     while (GetData(
92             &subtype, &data, &dataSize, &timeStamp, &bMark, &seqNum, &dataType, &arrivalTime))
93     {
94         IMLOGD_PACKET3(IM_PACKET_LOG_SOCKET, "[ProcessData] media[%d], size[%d], arrivalTime[%u]",
95                 mMediaType, dataSize, arrivalTime);
96         SendDataToRearNode(MEDIASUBTYPE_UNDEFINED, reinterpret_cast<uint8_t*>(data), dataSize,
97                 timeStamp, bMark, seqNum, dataType, arrivalTime);
98         DeleteData();
99     }
100 }
101 
IsRunTime()102 bool SocketReaderNode::IsRunTime()
103 {
104     return false;
105 }
106 
IsSourceNode()107 bool SocketReaderNode::IsSourceNode()
108 {
109     return true;
110 }
111 
SetConfig(void * config)112 void SocketReaderNode::SetConfig(void* config)
113 {
114     if (config == nullptr)
115     {
116         return;
117     }
118 
119     RtpConfig* pConfig = reinterpret_cast<RtpConfig*>(config);
120 
121     if (mProtocolType == kProtocolRtp)
122     {
123         mPeerAddress = RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort());
124     }
125     else if (mProtocolType == kProtocolRtcp)
126     {
127         mPeerAddress =
128                 RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort() + 1);
129     }
130 }
131 
IsSameConfig(void * config)132 bool SocketReaderNode::IsSameConfig(void* config)
133 {
134     if (config == nullptr)
135     {
136         return true;
137     }
138 
139     RtpConfig* pConfig = reinterpret_cast<RtpConfig*>(config);
140     RtpAddress peerAddress;
141 
142     if (mProtocolType == kProtocolRtp)
143     {
144         peerAddress = RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort());
145     }
146     else if (mProtocolType == kProtocolRtcp)
147     {
148         peerAddress = RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort() + 1);
149     }
150 
151     return (mPeerAddress == peerAddress);
152 }
153 
UpdateConfig(void * config)154 ImsMediaResult SocketReaderNode::UpdateConfig(void* config)
155 {
156     // check config items updates
157     bool isUpdateNode = false;
158 
159     if (IsSameConfig(config))
160     {
161         IMLOGD0("[UpdateConfig] no update");
162         return RESULT_SUCCESS;
163     }
164     else
165     {
166         isUpdateNode = true;
167     }
168 
169     kBaseNodeState prevState = mNodeState;
170 
171     if (isUpdateNode && mNodeState == kNodeStateRunning)
172     {
173         Stop();
174 
175         if (mSocketOpened)
176         {
177             CloseSocket();
178         }
179     }
180 
181     // reset the parameters
182     SetConfig(config);
183 
184     if (isUpdateNode && prevState == kNodeStateRunning)
185     {
186         if (Prepare())
187         {
188             return Start();
189         }
190         else
191         {
192             return RESULT_INVALID_PARAM;
193         }
194     }
195 
196     return RESULT_SUCCESS;
197 }
198 
OnReadDataFromSocket()199 void SocketReaderNode::OnReadDataFromSocket()
200 {
201     IMLOGD_PACKET1(IM_PACKET_LOG_SOCKET, "[OnReadDataFromSocket] media[%d]", mMediaType);
202     std::lock_guard<std::mutex> guard(mMutex);
203 
204     // prevent infinite frame stacked in the queue
205     if (mDataQueue.GetCount() > MAX_BUFFER_QUEUE)
206     {
207         mDataQueue.Delete();
208     }
209 
210     if (mSocketOpened && mSocket != nullptr)
211     {
212         int nLen = mSocket->ReceiveFrom(mBuffer, DEFAULT_MTU);
213 
214         if (nLen > 0)
215         {
216             IMLOGD_PACKET3(IM_PACKET_LOG_SOCKET,
217                     "[OnReadDataFromSocket] media[%d], data size[%d], queue size[%d]", mMediaType,
218                     nLen, GetDataCount());
219 
220             OnDataFromFrontNode(MEDIASUBTYPE_UNDEFINED, mBuffer, nLen, 0, 0, 0,
221                     MEDIASUBTYPE_UNDEFINED, ImsMediaTimer::GetTimeInMilliSeconds());
222         }
223     }
224 }
225 
SetLocalFd(int fd)226 void SocketReaderNode::SetLocalFd(int fd)
227 {
228     mLocalFd = fd;
229 }
230 
SetLocalAddress(const RtpAddress & address)231 void SocketReaderNode::SetLocalAddress(const RtpAddress& address)
232 {
233     mLocalAddress = address;
234 }
235 
SetPeerAddress(const RtpAddress & address)236 void SocketReaderNode::SetPeerAddress(const RtpAddress& address)
237 {
238     mPeerAddress = address;
239 }
240 
OpenSocket()241 bool SocketReaderNode::OpenSocket()
242 {
243     IMLOGD2("[OpenSocket] media[%d], protocolType[%d]", mMediaType, mProtocolType);
244     mSocket = ISocket::GetInstance(mLocalAddress.port, mPeerAddress.ipAddress, mPeerAddress.port);
245 
246     if (mSocket == nullptr)
247     {
248         IMLOGE0("[OpenSocket] can't create socket instance");
249         return false;
250     }
251 
252     // set socket local/peer address here
253     mSocket->SetLocalEndpoint(mLocalAddress.ipAddress, mLocalAddress.port);
254     mSocket->SetPeerEndpoint(mPeerAddress.ipAddress, mPeerAddress.port);
255 
256     if (!mSocketOpened && !mSocket->Open(mLocalFd))
257     {
258         IMLOGE0("[OpenSocket] can't open socket");
259         mSocketOpened = false;
260         return false;
261     }
262 
263     mReceiveTtl = false;
264 
265     if (mSocket->SetSocketOpt(kSocketOptionIpTtl, 1))
266     {
267         mReceiveTtl = true;
268     }
269 
270     mSocket->Listen(this);
271     mSocketOpened = true;
272     return true;
273 }
274 
CloseSocket()275 void SocketReaderNode::CloseSocket()
276 {
277     if (mSocket != nullptr)
278     {
279         IMLOGD2("[CloseSocket] media[%d], protocolType[%d]", mMediaType, mProtocolType);
280 
281         if (mSocketOpened)
282         {
283             mSocket->Listen(nullptr);
284             mSocket->Close();
285             mSocketOpened = false;
286         }
287 
288         mMutex.lock();
289         ISocket::ReleaseInstance(mSocket);
290         mSocket = nullptr;
291         mMutex.unlock();
292     }
293 }