1 /*
2 * Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/test/channel_transport/udp_socket_manager_posix.h"
12
13 #include <stdio.h>
14 #include <strings.h>
15 #include <sys/time.h>
16 #include <sys/types.h>
17 #include <time.h>
18 #include <unistd.h>
19
20 #include "webrtc/system_wrappers/include/sleep.h"
21 #include "webrtc/system_wrappers/include/trace.h"
22 #include "webrtc/test/channel_transport/udp_socket_posix.h"
23
24 namespace webrtc {
25 namespace test {
26
UdpSocketManagerPosix()27 UdpSocketManagerPosix::UdpSocketManagerPosix()
28 : UdpSocketManager(),
29 _id(-1),
30 _critSect(CriticalSectionWrapper::CreateCriticalSection()),
31 _numberOfSocketMgr(-1),
32 _incSocketMgrNextTime(0),
33 _nextSocketMgrToAssign(0),
34 _socketMgr()
35 {
36 }
37
Init(int32_t id,uint8_t & numOfWorkThreads)38 bool UdpSocketManagerPosix::Init(int32_t id, uint8_t& numOfWorkThreads) {
39 CriticalSectionScoped cs(_critSect);
40 if ((_id != -1) || (_numOfWorkThreads != 0)) {
41 assert(_id != -1);
42 assert(_numOfWorkThreads != 0);
43 return false;
44 }
45
46 _id = id;
47 _numberOfSocketMgr = numOfWorkThreads;
48 _numOfWorkThreads = numOfWorkThreads;
49
50 if(MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX < _numberOfSocketMgr)
51 {
52 _numberOfSocketMgr = MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX;
53 }
54 for(int i = 0;i < _numberOfSocketMgr; i++)
55 {
56 _socketMgr[i] = new UdpSocketManagerPosixImpl();
57 }
58 return true;
59 }
60
61
~UdpSocketManagerPosix()62 UdpSocketManagerPosix::~UdpSocketManagerPosix()
63 {
64 Stop();
65 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
66 "UdpSocketManagerPosix(%d)::UdpSocketManagerPosix()",
67 _numberOfSocketMgr);
68
69 for(int i = 0;i < _numberOfSocketMgr; i++)
70 {
71 delete _socketMgr[i];
72 }
73 delete _critSect;
74 }
75
Start()76 bool UdpSocketManagerPosix::Start()
77 {
78 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
79 "UdpSocketManagerPosix(%d)::Start()",
80 _numberOfSocketMgr);
81
82 _critSect->Enter();
83 bool retVal = true;
84 for(int i = 0;i < _numberOfSocketMgr && retVal; i++)
85 {
86 retVal = _socketMgr[i]->Start();
87 }
88 if(!retVal)
89 {
90 WEBRTC_TRACE(
91 kTraceError,
92 kTraceTransport,
93 _id,
94 "UdpSocketManagerPosix(%d)::Start() error starting socket managers",
95 _numberOfSocketMgr);
96 }
97 _critSect->Leave();
98 return retVal;
99 }
100
Stop()101 bool UdpSocketManagerPosix::Stop()
102 {
103 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
104 "UdpSocketManagerPosix(%d)::Stop()",_numberOfSocketMgr);
105
106 _critSect->Enter();
107 bool retVal = true;
108 for(int i = 0; i < _numberOfSocketMgr && retVal; i++)
109 {
110 retVal = _socketMgr[i]->Stop();
111 }
112 if(!retVal)
113 {
114 WEBRTC_TRACE(
115 kTraceError,
116 kTraceTransport,
117 _id,
118 "UdpSocketManagerPosix(%d)::Stop() there are still active socket "
119 "managers",
120 _numberOfSocketMgr);
121 }
122 _critSect->Leave();
123 return retVal;
124 }
125
AddSocket(UdpSocketWrapper * s)126 bool UdpSocketManagerPosix::AddSocket(UdpSocketWrapper* s)
127 {
128 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
129 "UdpSocketManagerPosix(%d)::AddSocket()",_numberOfSocketMgr);
130
131 _critSect->Enter();
132 bool retVal = _socketMgr[_nextSocketMgrToAssign]->AddSocket(s);
133 if(!retVal)
134 {
135 WEBRTC_TRACE(
136 kTraceError,
137 kTraceTransport,
138 _id,
139 "UdpSocketManagerPosix(%d)::AddSocket() failed to add socket to\
140 manager",
141 _numberOfSocketMgr);
142 }
143
144 // Distribute sockets on UdpSocketManagerPosixImpls in a round-robin
145 // fashion.
146 if(_incSocketMgrNextTime == 0)
147 {
148 _incSocketMgrNextTime++;
149 } else {
150 _incSocketMgrNextTime = 0;
151 _nextSocketMgrToAssign++;
152 if(_nextSocketMgrToAssign >= _numberOfSocketMgr)
153 {
154 _nextSocketMgrToAssign = 0;
155 }
156 }
157 _critSect->Leave();
158 return retVal;
159 }
160
RemoveSocket(UdpSocketWrapper * s)161 bool UdpSocketManagerPosix::RemoveSocket(UdpSocketWrapper* s)
162 {
163 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
164 "UdpSocketManagerPosix(%d)::RemoveSocket()",
165 _numberOfSocketMgr);
166
167 _critSect->Enter();
168 bool retVal = false;
169 for(int i = 0;i < _numberOfSocketMgr && (retVal == false); i++)
170 {
171 retVal = _socketMgr[i]->RemoveSocket(s);
172 }
173 if(!retVal)
174 {
175 WEBRTC_TRACE(
176 kTraceError,
177 kTraceTransport,
178 _id,
179 "UdpSocketManagerPosix(%d)::RemoveSocket() failed to remove socket\
180 from manager",
181 _numberOfSocketMgr);
182 }
183 _critSect->Leave();
184 return retVal;
185 }
186
UdpSocketManagerPosixImpl()187 UdpSocketManagerPosixImpl::UdpSocketManagerPosixImpl()
188 : _thread(UdpSocketManagerPosixImpl::Run,
189 this,
190 "UdpSocketManagerPosixImplThread"),
191 _critSectList(CriticalSectionWrapper::CreateCriticalSection()) {
192 FD_ZERO(&_readFds);
193 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1,
194 "UdpSocketManagerPosix created");
195 }
196
~UdpSocketManagerPosixImpl()197 UdpSocketManagerPosixImpl::~UdpSocketManagerPosixImpl()
198 {
199 if (_critSectList != NULL)
200 {
201 UpdateSocketMap();
202
203 _critSectList->Enter();
204 for (std::map<SOCKET, UdpSocketPosix*>::iterator it =
205 _socketMap.begin();
206 it != _socketMap.end();
207 ++it) {
208 delete it->second;
209 }
210 _socketMap.clear();
211 _critSectList->Leave();
212
213 delete _critSectList;
214 }
215
216 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1,
217 "UdpSocketManagerPosix deleted");
218 }
219
Start()220 bool UdpSocketManagerPosixImpl::Start()
221 {
222 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1,
223 "Start UdpSocketManagerPosix");
224 _thread.Start();
225 _thread.SetPriority(rtc::kRealtimePriority);
226 return true;
227 }
228
Stop()229 bool UdpSocketManagerPosixImpl::Stop()
230 {
231 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1,
232 "Stop UdpSocketManagerPosix");
233 _thread.Stop();
234 return true;
235 }
236
Process()237 bool UdpSocketManagerPosixImpl::Process()
238 {
239 bool doSelect = false;
240 // Timeout = 1 second.
241 struct timeval timeout;
242 timeout.tv_sec = 0;
243 timeout.tv_usec = 10000;
244
245 FD_ZERO(&_readFds);
246
247 UpdateSocketMap();
248
249 SOCKET maxFd = 0;
250 for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin();
251 it != _socketMap.end();
252 ++it) {
253 doSelect = true;
254 if (it->first > maxFd)
255 maxFd = it->first;
256 FD_SET(it->first, &_readFds);
257 }
258
259 int num = 0;
260 if (doSelect)
261 {
262 num = select(maxFd+1, &_readFds, NULL, NULL, &timeout);
263
264 if (num == SOCKET_ERROR)
265 {
266 // Timeout = 10 ms.
267 SleepMs(10);
268 return true;
269 }
270 }else
271 {
272 // Timeout = 10 ms.
273 SleepMs(10);
274 return true;
275 }
276
277 for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin();
278 it != _socketMap.end();
279 ++it) {
280 if (FD_ISSET(it->first, &_readFds)) {
281 it->second->HasIncoming();
282 --num;
283 }
284 }
285
286 return true;
287 }
288
Run(void * obj)289 bool UdpSocketManagerPosixImpl::Run(void* obj)
290 {
291 UdpSocketManagerPosixImpl* mgr =
292 static_cast<UdpSocketManagerPosixImpl*>(obj);
293 return mgr->Process();
294 }
295
AddSocket(UdpSocketWrapper * s)296 bool UdpSocketManagerPosixImpl::AddSocket(UdpSocketWrapper* s)
297 {
298 UdpSocketPosix* sl = static_cast<UdpSocketPosix*>(s);
299 if(sl->GetFd() == INVALID_SOCKET || !(sl->GetFd() < FD_SETSIZE))
300 {
301 return false;
302 }
303 _critSectList->Enter();
304 _addList.push_back(s);
305 _critSectList->Leave();
306 return true;
307 }
308
RemoveSocket(UdpSocketWrapper * s)309 bool UdpSocketManagerPosixImpl::RemoveSocket(UdpSocketWrapper* s)
310 {
311 // Put in remove list if this is the correct UdpSocketManagerPosixImpl.
312 _critSectList->Enter();
313
314 // If the socket is in the add list it's safe to remove and delete it.
315 for (SocketList::iterator iter = _addList.begin();
316 iter != _addList.end(); ++iter) {
317 UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter);
318 unsigned int addFD = addSocket->GetFd();
319 unsigned int removeFD = static_cast<UdpSocketPosix*>(s)->GetFd();
320 if(removeFD == addFD)
321 {
322 _removeList.push_back(removeFD);
323 _critSectList->Leave();
324 return true;
325 }
326 }
327
328 // Checking the socket map is safe since all Erase and Insert calls to this
329 // map are also protected by _critSectList.
330 if (_socketMap.find(static_cast<UdpSocketPosix*>(s)->GetFd()) !=
331 _socketMap.end()) {
332 _removeList.push_back(static_cast<UdpSocketPosix*>(s)->GetFd());
333 _critSectList->Leave();
334 return true;
335 }
336 _critSectList->Leave();
337 return false;
338 }
339
UpdateSocketMap()340 void UdpSocketManagerPosixImpl::UpdateSocketMap()
341 {
342 // Remove items in remove list.
343 _critSectList->Enter();
344 for (FdList::iterator iter = _removeList.begin();
345 iter != _removeList.end(); ++iter) {
346 UdpSocketPosix* deleteSocket = NULL;
347 SOCKET removeFD = *iter;
348
349 // If the socket is in the add list it hasn't been added to the socket
350 // map yet. Just remove the socket from the add list.
351 for (SocketList::iterator iter = _addList.begin();
352 iter != _addList.end(); ++iter) {
353 UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter);
354 SOCKET addFD = addSocket->GetFd();
355 if(removeFD == addFD)
356 {
357 deleteSocket = addSocket;
358 _addList.erase(iter);
359 break;
360 }
361 }
362
363 // Find and remove socket from _socketMap.
364 std::map<SOCKET, UdpSocketPosix*>::iterator it =
365 _socketMap.find(removeFD);
366 if(it != _socketMap.end())
367 {
368 deleteSocket = it->second;
369 _socketMap.erase(it);
370 }
371 if(deleteSocket)
372 {
373 deleteSocket->ReadyForDeletion();
374 delete deleteSocket;
375 }
376 }
377 _removeList.clear();
378
379 // Add sockets from add list.
380 for (SocketList::iterator iter = _addList.begin();
381 iter != _addList.end(); ++iter) {
382 UdpSocketPosix* s = static_cast<UdpSocketPosix*>(*iter);
383 if(s) {
384 _socketMap[s->GetFd()] = s;
385 }
386 }
387 _addList.clear();
388 _critSectList->Leave();
389 }
390
391 } // namespace test
392 } // namespace webrtc
393