• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/test/channel_transport/udp_socket_manager_posix.h"
12 
13 #include <stdio.h>
14 #include <strings.h>
15 #include <sys/time.h>
16 #include <sys/types.h>
17 #include <time.h>
18 #include <unistd.h>
19 
20 #include "webrtc/system_wrappers/include/sleep.h"
21 #include "webrtc/system_wrappers/include/trace.h"
22 #include "webrtc/test/channel_transport/udp_socket_posix.h"
23 
24 namespace webrtc {
25 namespace test {
26 
UdpSocketManagerPosix()27 UdpSocketManagerPosix::UdpSocketManagerPosix()
28     : UdpSocketManager(),
29       _id(-1),
30       _critSect(CriticalSectionWrapper::CreateCriticalSection()),
31       _numberOfSocketMgr(-1),
32       _incSocketMgrNextTime(0),
33       _nextSocketMgrToAssign(0),
34       _socketMgr()
35 {
36 }
37 
Init(int32_t id,uint8_t & numOfWorkThreads)38 bool UdpSocketManagerPosix::Init(int32_t id, uint8_t& numOfWorkThreads) {
39     CriticalSectionScoped cs(_critSect);
40     if ((_id != -1) || (_numOfWorkThreads != 0)) {
41         assert(_id != -1);
42         assert(_numOfWorkThreads != 0);
43         return false;
44     }
45 
46     _id = id;
47     _numberOfSocketMgr = numOfWorkThreads;
48     _numOfWorkThreads = numOfWorkThreads;
49 
50     if(MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX < _numberOfSocketMgr)
51     {
52         _numberOfSocketMgr = MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX;
53     }
54     for(int i = 0;i < _numberOfSocketMgr; i++)
55     {
56         _socketMgr[i] = new UdpSocketManagerPosixImpl();
57     }
58     return true;
59 }
60 
61 
~UdpSocketManagerPosix()62 UdpSocketManagerPosix::~UdpSocketManagerPosix()
63 {
64     Stop();
65     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
66                  "UdpSocketManagerPosix(%d)::UdpSocketManagerPosix()",
67                  _numberOfSocketMgr);
68 
69     for(int i = 0;i < _numberOfSocketMgr; i++)
70     {
71         delete _socketMgr[i];
72     }
73     delete _critSect;
74 }
75 
Start()76 bool UdpSocketManagerPosix::Start()
77 {
78     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
79                  "UdpSocketManagerPosix(%d)::Start()",
80                  _numberOfSocketMgr);
81 
82     _critSect->Enter();
83     bool retVal = true;
84     for(int i = 0;i < _numberOfSocketMgr && retVal; i++)
85     {
86         retVal = _socketMgr[i]->Start();
87     }
88     if(!retVal)
89     {
90         WEBRTC_TRACE(
91             kTraceError,
92             kTraceTransport,
93             _id,
94             "UdpSocketManagerPosix(%d)::Start() error starting socket managers",
95             _numberOfSocketMgr);
96     }
97     _critSect->Leave();
98     return retVal;
99 }
100 
Stop()101 bool UdpSocketManagerPosix::Stop()
102 {
103     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
104                  "UdpSocketManagerPosix(%d)::Stop()",_numberOfSocketMgr);
105 
106     _critSect->Enter();
107     bool retVal = true;
108     for(int i = 0; i < _numberOfSocketMgr && retVal; i++)
109     {
110         retVal = _socketMgr[i]->Stop();
111     }
112     if(!retVal)
113     {
114         WEBRTC_TRACE(
115             kTraceError,
116             kTraceTransport,
117             _id,
118             "UdpSocketManagerPosix(%d)::Stop() there are still active socket "
119             "managers",
120             _numberOfSocketMgr);
121     }
122     _critSect->Leave();
123     return retVal;
124 }
125 
AddSocket(UdpSocketWrapper * s)126 bool UdpSocketManagerPosix::AddSocket(UdpSocketWrapper* s)
127 {
128     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
129                  "UdpSocketManagerPosix(%d)::AddSocket()",_numberOfSocketMgr);
130 
131     _critSect->Enter();
132     bool retVal = _socketMgr[_nextSocketMgrToAssign]->AddSocket(s);
133     if(!retVal)
134     {
135         WEBRTC_TRACE(
136             kTraceError,
137             kTraceTransport,
138             _id,
139             "UdpSocketManagerPosix(%d)::AddSocket() failed to add socket to\
140  manager",
141             _numberOfSocketMgr);
142     }
143 
144     // Distribute sockets on UdpSocketManagerPosixImpls in a round-robin
145     // fashion.
146     if(_incSocketMgrNextTime == 0)
147     {
148         _incSocketMgrNextTime++;
149     } else {
150         _incSocketMgrNextTime = 0;
151         _nextSocketMgrToAssign++;
152         if(_nextSocketMgrToAssign >= _numberOfSocketMgr)
153         {
154             _nextSocketMgrToAssign = 0;
155         }
156     }
157     _critSect->Leave();
158     return retVal;
159 }
160 
RemoveSocket(UdpSocketWrapper * s)161 bool UdpSocketManagerPosix::RemoveSocket(UdpSocketWrapper* s)
162 {
163     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
164                  "UdpSocketManagerPosix(%d)::RemoveSocket()",
165                  _numberOfSocketMgr);
166 
167     _critSect->Enter();
168     bool retVal = false;
169     for(int i = 0;i < _numberOfSocketMgr && (retVal == false); i++)
170     {
171         retVal = _socketMgr[i]->RemoveSocket(s);
172     }
173     if(!retVal)
174     {
175         WEBRTC_TRACE(
176             kTraceError,
177             kTraceTransport,
178             _id,
179             "UdpSocketManagerPosix(%d)::RemoveSocket() failed to remove socket\
180  from manager",
181             _numberOfSocketMgr);
182     }
183     _critSect->Leave();
184     return retVal;
185 }
186 
UdpSocketManagerPosixImpl()187 UdpSocketManagerPosixImpl::UdpSocketManagerPosixImpl()
188     : _thread(UdpSocketManagerPosixImpl::Run,
189               this,
190               "UdpSocketManagerPosixImplThread"),
191       _critSectList(CriticalSectionWrapper::CreateCriticalSection()) {
192     FD_ZERO(&_readFds);
193     WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
194                  "UdpSocketManagerPosix created");
195 }
196 
~UdpSocketManagerPosixImpl()197 UdpSocketManagerPosixImpl::~UdpSocketManagerPosixImpl()
198 {
199     if (_critSectList != NULL)
200     {
201         UpdateSocketMap();
202 
203         _critSectList->Enter();
204         for (std::map<SOCKET, UdpSocketPosix*>::iterator it =
205                  _socketMap.begin();
206              it != _socketMap.end();
207              ++it) {
208           delete it->second;
209         }
210         _socketMap.clear();
211         _critSectList->Leave();
212 
213         delete _critSectList;
214     }
215 
216     WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
217                  "UdpSocketManagerPosix deleted");
218 }
219 
Start()220 bool UdpSocketManagerPosixImpl::Start()
221 {
222     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
223                  "Start UdpSocketManagerPosix");
224     _thread.Start();
225     _thread.SetPriority(rtc::kRealtimePriority);
226     return true;
227 }
228 
Stop()229 bool UdpSocketManagerPosixImpl::Stop()
230 {
231     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
232                  "Stop UdpSocketManagerPosix");
233     _thread.Stop();
234     return true;
235 }
236 
Process()237 bool UdpSocketManagerPosixImpl::Process()
238 {
239     bool doSelect = false;
240     // Timeout = 1 second.
241     struct timeval timeout;
242     timeout.tv_sec = 0;
243     timeout.tv_usec = 10000;
244 
245     FD_ZERO(&_readFds);
246 
247     UpdateSocketMap();
248 
249     SOCKET maxFd = 0;
250     for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin();
251          it != _socketMap.end();
252          ++it) {
253       doSelect = true;
254       if (it->first > maxFd)
255         maxFd = it->first;
256       FD_SET(it->first, &_readFds);
257     }
258 
259     int num = 0;
260     if (doSelect)
261     {
262         num = select(maxFd+1, &_readFds, NULL, NULL, &timeout);
263 
264         if (num == SOCKET_ERROR)
265         {
266             // Timeout = 10 ms.
267             SleepMs(10);
268             return true;
269         }
270     }else
271     {
272         // Timeout = 10 ms.
273         SleepMs(10);
274         return true;
275     }
276 
277     for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin();
278          it != _socketMap.end();
279          ++it) {
280       if (FD_ISSET(it->first, &_readFds)) {
281         it->second->HasIncoming();
282         --num;
283       }
284     }
285 
286     return true;
287 }
288 
Run(void * obj)289 bool UdpSocketManagerPosixImpl::Run(void* obj)
290 {
291     UdpSocketManagerPosixImpl* mgr =
292         static_cast<UdpSocketManagerPosixImpl*>(obj);
293     return mgr->Process();
294 }
295 
AddSocket(UdpSocketWrapper * s)296 bool UdpSocketManagerPosixImpl::AddSocket(UdpSocketWrapper* s)
297 {
298     UdpSocketPosix* sl = static_cast<UdpSocketPosix*>(s);
299     if(sl->GetFd() == INVALID_SOCKET || !(sl->GetFd() < FD_SETSIZE))
300     {
301         return false;
302     }
303     _critSectList->Enter();
304     _addList.push_back(s);
305     _critSectList->Leave();
306     return true;
307 }
308 
RemoveSocket(UdpSocketWrapper * s)309 bool UdpSocketManagerPosixImpl::RemoveSocket(UdpSocketWrapper* s)
310 {
311     // Put in remove list if this is the correct UdpSocketManagerPosixImpl.
312     _critSectList->Enter();
313 
314     // If the socket is in the add list it's safe to remove and delete it.
315     for (SocketList::iterator iter = _addList.begin();
316          iter != _addList.end(); ++iter) {
317         UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter);
318         unsigned int addFD = addSocket->GetFd();
319         unsigned int removeFD = static_cast<UdpSocketPosix*>(s)->GetFd();
320         if(removeFD == addFD)
321         {
322             _removeList.push_back(removeFD);
323             _critSectList->Leave();
324             return true;
325         }
326     }
327 
328     // Checking the socket map is safe since all Erase and Insert calls to this
329     // map are also protected by _critSectList.
330     if (_socketMap.find(static_cast<UdpSocketPosix*>(s)->GetFd()) !=
331         _socketMap.end()) {
332       _removeList.push_back(static_cast<UdpSocketPosix*>(s)->GetFd());
333       _critSectList->Leave();
334       return true;
335     }
336     _critSectList->Leave();
337     return false;
338 }
339 
UpdateSocketMap()340 void UdpSocketManagerPosixImpl::UpdateSocketMap()
341 {
342     // Remove items in remove list.
343     _critSectList->Enter();
344     for (FdList::iterator iter = _removeList.begin();
345          iter != _removeList.end(); ++iter) {
346         UdpSocketPosix* deleteSocket = NULL;
347         SOCKET removeFD = *iter;
348 
349         // If the socket is in the add list it hasn't been added to the socket
350         // map yet. Just remove the socket from the add list.
351         for (SocketList::iterator iter = _addList.begin();
352              iter != _addList.end(); ++iter) {
353             UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter);
354             SOCKET addFD = addSocket->GetFd();
355             if(removeFD == addFD)
356             {
357                 deleteSocket = addSocket;
358                 _addList.erase(iter);
359                 break;
360             }
361         }
362 
363         // Find and remove socket from _socketMap.
364         std::map<SOCKET, UdpSocketPosix*>::iterator it =
365             _socketMap.find(removeFD);
366         if(it != _socketMap.end())
367         {
368           deleteSocket = it->second;
369           _socketMap.erase(it);
370         }
371         if(deleteSocket)
372         {
373             deleteSocket->ReadyForDeletion();
374             delete deleteSocket;
375         }
376     }
377     _removeList.clear();
378 
379     // Add sockets from add list.
380     for (SocketList::iterator iter = _addList.begin();
381          iter != _addList.end(); ++iter) {
382         UdpSocketPosix* s = static_cast<UdpSocketPosix*>(*iter);
383         if(s) {
384           _socketMap[s->GetFd()] = s;
385         }
386     }
387     _addList.clear();
388     _critSectList->Leave();
389 }
390 
391 }  // namespace test
392 }  // namespace webrtc
393