• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/test/channel_transport/udp_socket_manager_posix.h"
12 
13 #include <stdio.h>
14 #include <strings.h>
15 #include <sys/time.h>
16 #include <sys/types.h>
17 #include <time.h>
18 #include <unistd.h>
19 
20 #include "webrtc/system_wrappers/interface/sleep.h"
21 #include "webrtc/system_wrappers/interface/trace.h"
22 #include "webrtc/test/channel_transport/udp_socket_posix.h"
23 
24 namespace webrtc {
25 namespace test {
26 
UdpSocketManagerPosix()27 UdpSocketManagerPosix::UdpSocketManagerPosix()
28     : UdpSocketManager(),
29       _id(-1),
30       _critSect(CriticalSectionWrapper::CreateCriticalSection()),
31       _numberOfSocketMgr(-1),
32       _incSocketMgrNextTime(0),
33       _nextSocketMgrToAssign(0),
34       _socketMgr()
35 {
36 }
37 
Init(int32_t id,uint8_t & numOfWorkThreads)38 bool UdpSocketManagerPosix::Init(int32_t id, uint8_t& numOfWorkThreads) {
39     CriticalSectionScoped cs(_critSect);
40     if ((_id != -1) || (_numOfWorkThreads != 0)) {
41         assert(_id != -1);
42         assert(_numOfWorkThreads != 0);
43         return false;
44     }
45 
46     _id = id;
47     _numberOfSocketMgr = numOfWorkThreads;
48     _numOfWorkThreads = numOfWorkThreads;
49 
50     if(MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX < _numberOfSocketMgr)
51     {
52         _numberOfSocketMgr = MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX;
53     }
54     for(int i = 0;i < _numberOfSocketMgr; i++)
55     {
56         _socketMgr[i] = new UdpSocketManagerPosixImpl();
57     }
58     return true;
59 }
60 
61 
~UdpSocketManagerPosix()62 UdpSocketManagerPosix::~UdpSocketManagerPosix()
63 {
64     Stop();
65     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
66                  "UdpSocketManagerPosix(%d)::UdpSocketManagerPosix()",
67                  _numberOfSocketMgr);
68 
69     for(int i = 0;i < _numberOfSocketMgr; i++)
70     {
71         delete _socketMgr[i];
72     }
73     delete _critSect;
74 }
75 
ChangeUniqueId(const int32_t id)76 int32_t UdpSocketManagerPosix::ChangeUniqueId(const int32_t id)
77 {
78     _id = id;
79     return 0;
80 }
81 
Start()82 bool UdpSocketManagerPosix::Start()
83 {
84     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
85                  "UdpSocketManagerPosix(%d)::Start()",
86                  _numberOfSocketMgr);
87 
88     _critSect->Enter();
89     bool retVal = true;
90     for(int i = 0;i < _numberOfSocketMgr && retVal; i++)
91     {
92         retVal = _socketMgr[i]->Start();
93     }
94     if(!retVal)
95     {
96         WEBRTC_TRACE(
97             kTraceError,
98             kTraceTransport,
99             _id,
100             "UdpSocketManagerPosix(%d)::Start() error starting socket managers",
101             _numberOfSocketMgr);
102     }
103     _critSect->Leave();
104     return retVal;
105 }
106 
Stop()107 bool UdpSocketManagerPosix::Stop()
108 {
109     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
110                  "UdpSocketManagerPosix(%d)::Stop()",_numberOfSocketMgr);
111 
112     _critSect->Enter();
113     bool retVal = true;
114     for(int i = 0; i < _numberOfSocketMgr && retVal; i++)
115     {
116         retVal = _socketMgr[i]->Stop();
117     }
118     if(!retVal)
119     {
120         WEBRTC_TRACE(
121             kTraceError,
122             kTraceTransport,
123             _id,
124             "UdpSocketManagerPosix(%d)::Stop() there are still active socket "
125             "managers",
126             _numberOfSocketMgr);
127     }
128     _critSect->Leave();
129     return retVal;
130 }
131 
AddSocket(UdpSocketWrapper * s)132 bool UdpSocketManagerPosix::AddSocket(UdpSocketWrapper* s)
133 {
134     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
135                  "UdpSocketManagerPosix(%d)::AddSocket()",_numberOfSocketMgr);
136 
137     _critSect->Enter();
138     bool retVal = _socketMgr[_nextSocketMgrToAssign]->AddSocket(s);
139     if(!retVal)
140     {
141         WEBRTC_TRACE(
142             kTraceError,
143             kTraceTransport,
144             _id,
145             "UdpSocketManagerPosix(%d)::AddSocket() failed to add socket to\
146  manager",
147             _numberOfSocketMgr);
148     }
149 
150     // Distribute sockets on UdpSocketManagerPosixImpls in a round-robin
151     // fashion.
152     if(_incSocketMgrNextTime == 0)
153     {
154         _incSocketMgrNextTime++;
155     } else {
156         _incSocketMgrNextTime = 0;
157         _nextSocketMgrToAssign++;
158         if(_nextSocketMgrToAssign >= _numberOfSocketMgr)
159         {
160             _nextSocketMgrToAssign = 0;
161         }
162     }
163     _critSect->Leave();
164     return retVal;
165 }
166 
RemoveSocket(UdpSocketWrapper * s)167 bool UdpSocketManagerPosix::RemoveSocket(UdpSocketWrapper* s)
168 {
169     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
170                  "UdpSocketManagerPosix(%d)::RemoveSocket()",
171                  _numberOfSocketMgr);
172 
173     _critSect->Enter();
174     bool retVal = false;
175     for(int i = 0;i < _numberOfSocketMgr && (retVal == false); i++)
176     {
177         retVal = _socketMgr[i]->RemoveSocket(s);
178     }
179     if(!retVal)
180     {
181         WEBRTC_TRACE(
182             kTraceError,
183             kTraceTransport,
184             _id,
185             "UdpSocketManagerPosix(%d)::RemoveSocket() failed to remove socket\
186  from manager",
187             _numberOfSocketMgr);
188     }
189     _critSect->Leave();
190     return retVal;
191 }
192 
193 
UdpSocketManagerPosixImpl()194 UdpSocketManagerPosixImpl::UdpSocketManagerPosixImpl()
195 {
196     _critSectList = CriticalSectionWrapper::CreateCriticalSection();
197     _thread = ThreadWrapper::CreateThread(UdpSocketManagerPosixImpl::Run, this,
198                                           kRealtimePriority,
199                                           "UdpSocketManagerPosixImplThread");
200     FD_ZERO(&_readFds);
201     WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
202                  "UdpSocketManagerPosix created");
203 }
204 
~UdpSocketManagerPosixImpl()205 UdpSocketManagerPosixImpl::~UdpSocketManagerPosixImpl()
206 {
207     if(_thread != NULL)
208     {
209         delete _thread;
210     }
211 
212     if (_critSectList != NULL)
213     {
214         UpdateSocketMap();
215 
216         _critSectList->Enter();
217         for (std::map<SOCKET, UdpSocketPosix*>::iterator it =
218                  _socketMap.begin();
219              it != _socketMap.end();
220              ++it) {
221           delete it->second;
222         }
223         _socketMap.clear();
224         _critSectList->Leave();
225 
226         delete _critSectList;
227     }
228 
229     WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
230                  "UdpSocketManagerPosix deleted");
231 }
232 
Start()233 bool UdpSocketManagerPosixImpl::Start()
234 {
235     unsigned int id = 0;
236     if (_thread == NULL)
237     {
238         return false;
239     }
240 
241     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
242                  "Start UdpSocketManagerPosix");
243     return _thread->Start(id);
244 }
245 
Stop()246 bool UdpSocketManagerPosixImpl::Stop()
247 {
248     if (_thread == NULL)
249     {
250         return true;
251     }
252 
253     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
254                  "Stop UdpSocketManagerPosix");
255     return _thread->Stop();
256 }
257 
Process()258 bool UdpSocketManagerPosixImpl::Process()
259 {
260     bool doSelect = false;
261     // Timeout = 1 second.
262     struct timeval timeout;
263     timeout.tv_sec = 0;
264     timeout.tv_usec = 10000;
265 
266     FD_ZERO(&_readFds);
267 
268     UpdateSocketMap();
269 
270     SOCKET maxFd = 0;
271     for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin();
272          it != _socketMap.end();
273          ++it) {
274       doSelect = true;
275       if (it->first > maxFd)
276         maxFd = it->first;
277       FD_SET(it->first, &_readFds);
278     }
279 
280     int num = 0;
281     if (doSelect)
282     {
283         num = select(maxFd+1, &_readFds, NULL, NULL, &timeout);
284 
285         if (num == SOCKET_ERROR)
286         {
287             // Timeout = 10 ms.
288             SleepMs(10);
289             return true;
290         }
291     }else
292     {
293         // Timeout = 10 ms.
294         SleepMs(10);
295         return true;
296     }
297 
298     for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin();
299          it != _socketMap.end();
300          ++it) {
301       if (FD_ISSET(it->first, &_readFds)) {
302         it->second->HasIncoming();
303         --num;
304       }
305     }
306 
307     return true;
308 }
309 
Run(ThreadObj obj)310 bool UdpSocketManagerPosixImpl::Run(ThreadObj obj)
311 {
312     UdpSocketManagerPosixImpl* mgr =
313         static_cast<UdpSocketManagerPosixImpl*>(obj);
314     return mgr->Process();
315 }
316 
AddSocket(UdpSocketWrapper * s)317 bool UdpSocketManagerPosixImpl::AddSocket(UdpSocketWrapper* s)
318 {
319     UdpSocketPosix* sl = static_cast<UdpSocketPosix*>(s);
320     if(sl->GetFd() == INVALID_SOCKET || !(sl->GetFd() < FD_SETSIZE))
321     {
322         return false;
323     }
324     _critSectList->Enter();
325     _addList.push_back(s);
326     _critSectList->Leave();
327     return true;
328 }
329 
RemoveSocket(UdpSocketWrapper * s)330 bool UdpSocketManagerPosixImpl::RemoveSocket(UdpSocketWrapper* s)
331 {
332     // Put in remove list if this is the correct UdpSocketManagerPosixImpl.
333     _critSectList->Enter();
334 
335     // If the socket is in the add list it's safe to remove and delete it.
336     for (SocketList::iterator iter = _addList.begin();
337          iter != _addList.end(); ++iter) {
338         UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter);
339         unsigned int addFD = addSocket->GetFd();
340         unsigned int removeFD = static_cast<UdpSocketPosix*>(s)->GetFd();
341         if(removeFD == addFD)
342         {
343             _removeList.push_back(removeFD);
344             _critSectList->Leave();
345             return true;
346         }
347     }
348 
349     // Checking the socket map is safe since all Erase and Insert calls to this
350     // map are also protected by _critSectList.
351     if (_socketMap.find(static_cast<UdpSocketPosix*>(s)->GetFd()) !=
352         _socketMap.end()) {
353       _removeList.push_back(static_cast<UdpSocketPosix*>(s)->GetFd());
354       _critSectList->Leave();
355       return true;
356     }
357     _critSectList->Leave();
358     return false;
359 }
360 
UpdateSocketMap()361 void UdpSocketManagerPosixImpl::UpdateSocketMap()
362 {
363     // Remove items in remove list.
364     _critSectList->Enter();
365     for (FdList::iterator iter = _removeList.begin();
366          iter != _removeList.end(); ++iter) {
367         UdpSocketPosix* deleteSocket = NULL;
368         SOCKET removeFD = *iter;
369 
370         // If the socket is in the add list it hasn't been added to the socket
371         // map yet. Just remove the socket from the add list.
372         for (SocketList::iterator iter = _addList.begin();
373              iter != _addList.end(); ++iter) {
374             UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter);
375             SOCKET addFD = addSocket->GetFd();
376             if(removeFD == addFD)
377             {
378                 deleteSocket = addSocket;
379                 _addList.erase(iter);
380                 break;
381             }
382         }
383 
384         // Find and remove socket from _socketMap.
385         std::map<SOCKET, UdpSocketPosix*>::iterator it =
386             _socketMap.find(removeFD);
387         if(it != _socketMap.end())
388         {
389           deleteSocket = it->second;
390           _socketMap.erase(it);
391         }
392         if(deleteSocket)
393         {
394             deleteSocket->ReadyForDeletion();
395             delete deleteSocket;
396         }
397     }
398     _removeList.clear();
399 
400     // Add sockets from add list.
401     for (SocketList::iterator iter = _addList.begin();
402          iter != _addList.end(); ++iter) {
403         UdpSocketPosix* s = static_cast<UdpSocketPosix*>(*iter);
404         if(s) {
405           _socketMap[s->GetFd()] = s;
406         }
407     }
408     _addList.clear();
409     _critSectList->Leave();
410 }
411 
412 }  // namespace test
413 }  // namespace webrtc
414