1 /*
2 * Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/test/channel_transport/udp_socket_manager_posix.h"
12
13 #include <stdio.h>
14 #include <strings.h>
15 #include <sys/time.h>
16 #include <sys/types.h>
17 #include <time.h>
18 #include <unistd.h>
19
20 #include "webrtc/system_wrappers/interface/sleep.h"
21 #include "webrtc/system_wrappers/interface/trace.h"
22 #include "webrtc/test/channel_transport/udp_socket_posix.h"
23
24 namespace webrtc {
25 namespace test {
26
UdpSocketManagerPosix()27 UdpSocketManagerPosix::UdpSocketManagerPosix()
28 : UdpSocketManager(),
29 _id(-1),
30 _critSect(CriticalSectionWrapper::CreateCriticalSection()),
31 _numberOfSocketMgr(-1),
32 _incSocketMgrNextTime(0),
33 _nextSocketMgrToAssign(0),
34 _socketMgr()
35 {
36 }
37
Init(int32_t id,uint8_t & numOfWorkThreads)38 bool UdpSocketManagerPosix::Init(int32_t id, uint8_t& numOfWorkThreads) {
39 CriticalSectionScoped cs(_critSect);
40 if ((_id != -1) || (_numOfWorkThreads != 0)) {
41 assert(_id != -1);
42 assert(_numOfWorkThreads != 0);
43 return false;
44 }
45
46 _id = id;
47 _numberOfSocketMgr = numOfWorkThreads;
48 _numOfWorkThreads = numOfWorkThreads;
49
50 if(MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX < _numberOfSocketMgr)
51 {
52 _numberOfSocketMgr = MAX_NUMBER_OF_SOCKET_MANAGERS_LINUX;
53 }
54 for(int i = 0;i < _numberOfSocketMgr; i++)
55 {
56 _socketMgr[i] = new UdpSocketManagerPosixImpl();
57 }
58 return true;
59 }
60
61
~UdpSocketManagerPosix()62 UdpSocketManagerPosix::~UdpSocketManagerPosix()
63 {
64 Stop();
65 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
66 "UdpSocketManagerPosix(%d)::UdpSocketManagerPosix()",
67 _numberOfSocketMgr);
68
69 for(int i = 0;i < _numberOfSocketMgr; i++)
70 {
71 delete _socketMgr[i];
72 }
73 delete _critSect;
74 }
75
ChangeUniqueId(const int32_t id)76 int32_t UdpSocketManagerPosix::ChangeUniqueId(const int32_t id)
77 {
78 _id = id;
79 return 0;
80 }
81
Start()82 bool UdpSocketManagerPosix::Start()
83 {
84 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
85 "UdpSocketManagerPosix(%d)::Start()",
86 _numberOfSocketMgr);
87
88 _critSect->Enter();
89 bool retVal = true;
90 for(int i = 0;i < _numberOfSocketMgr && retVal; i++)
91 {
92 retVal = _socketMgr[i]->Start();
93 }
94 if(!retVal)
95 {
96 WEBRTC_TRACE(
97 kTraceError,
98 kTraceTransport,
99 _id,
100 "UdpSocketManagerPosix(%d)::Start() error starting socket managers",
101 _numberOfSocketMgr);
102 }
103 _critSect->Leave();
104 return retVal;
105 }
106
Stop()107 bool UdpSocketManagerPosix::Stop()
108 {
109 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
110 "UdpSocketManagerPosix(%d)::Stop()",_numberOfSocketMgr);
111
112 _critSect->Enter();
113 bool retVal = true;
114 for(int i = 0; i < _numberOfSocketMgr && retVal; i++)
115 {
116 retVal = _socketMgr[i]->Stop();
117 }
118 if(!retVal)
119 {
120 WEBRTC_TRACE(
121 kTraceError,
122 kTraceTransport,
123 _id,
124 "UdpSocketManagerPosix(%d)::Stop() there are still active socket "
125 "managers",
126 _numberOfSocketMgr);
127 }
128 _critSect->Leave();
129 return retVal;
130 }
131
AddSocket(UdpSocketWrapper * s)132 bool UdpSocketManagerPosix::AddSocket(UdpSocketWrapper* s)
133 {
134 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
135 "UdpSocketManagerPosix(%d)::AddSocket()",_numberOfSocketMgr);
136
137 _critSect->Enter();
138 bool retVal = _socketMgr[_nextSocketMgrToAssign]->AddSocket(s);
139 if(!retVal)
140 {
141 WEBRTC_TRACE(
142 kTraceError,
143 kTraceTransport,
144 _id,
145 "UdpSocketManagerPosix(%d)::AddSocket() failed to add socket to\
146 manager",
147 _numberOfSocketMgr);
148 }
149
150 // Distribute sockets on UdpSocketManagerPosixImpls in a round-robin
151 // fashion.
152 if(_incSocketMgrNextTime == 0)
153 {
154 _incSocketMgrNextTime++;
155 } else {
156 _incSocketMgrNextTime = 0;
157 _nextSocketMgrToAssign++;
158 if(_nextSocketMgrToAssign >= _numberOfSocketMgr)
159 {
160 _nextSocketMgrToAssign = 0;
161 }
162 }
163 _critSect->Leave();
164 return retVal;
165 }
166
RemoveSocket(UdpSocketWrapper * s)167 bool UdpSocketManagerPosix::RemoveSocket(UdpSocketWrapper* s)
168 {
169 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
170 "UdpSocketManagerPosix(%d)::RemoveSocket()",
171 _numberOfSocketMgr);
172
173 _critSect->Enter();
174 bool retVal = false;
175 for(int i = 0;i < _numberOfSocketMgr && (retVal == false); i++)
176 {
177 retVal = _socketMgr[i]->RemoveSocket(s);
178 }
179 if(!retVal)
180 {
181 WEBRTC_TRACE(
182 kTraceError,
183 kTraceTransport,
184 _id,
185 "UdpSocketManagerPosix(%d)::RemoveSocket() failed to remove socket\
186 from manager",
187 _numberOfSocketMgr);
188 }
189 _critSect->Leave();
190 return retVal;
191 }
192
193
UdpSocketManagerPosixImpl()194 UdpSocketManagerPosixImpl::UdpSocketManagerPosixImpl()
195 {
196 _critSectList = CriticalSectionWrapper::CreateCriticalSection();
197 _thread = ThreadWrapper::CreateThread(UdpSocketManagerPosixImpl::Run, this,
198 kRealtimePriority,
199 "UdpSocketManagerPosixImplThread");
200 FD_ZERO(&_readFds);
201 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1,
202 "UdpSocketManagerPosix created");
203 }
204
~UdpSocketManagerPosixImpl()205 UdpSocketManagerPosixImpl::~UdpSocketManagerPosixImpl()
206 {
207 if(_thread != NULL)
208 {
209 delete _thread;
210 }
211
212 if (_critSectList != NULL)
213 {
214 UpdateSocketMap();
215
216 _critSectList->Enter();
217 for (std::map<SOCKET, UdpSocketPosix*>::iterator it =
218 _socketMap.begin();
219 it != _socketMap.end();
220 ++it) {
221 delete it->second;
222 }
223 _socketMap.clear();
224 _critSectList->Leave();
225
226 delete _critSectList;
227 }
228
229 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1,
230 "UdpSocketManagerPosix deleted");
231 }
232
Start()233 bool UdpSocketManagerPosixImpl::Start()
234 {
235 unsigned int id = 0;
236 if (_thread == NULL)
237 {
238 return false;
239 }
240
241 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1,
242 "Start UdpSocketManagerPosix");
243 return _thread->Start(id);
244 }
245
Stop()246 bool UdpSocketManagerPosixImpl::Stop()
247 {
248 if (_thread == NULL)
249 {
250 return true;
251 }
252
253 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1,
254 "Stop UdpSocketManagerPosix");
255 return _thread->Stop();
256 }
257
Process()258 bool UdpSocketManagerPosixImpl::Process()
259 {
260 bool doSelect = false;
261 // Timeout = 1 second.
262 struct timeval timeout;
263 timeout.tv_sec = 0;
264 timeout.tv_usec = 10000;
265
266 FD_ZERO(&_readFds);
267
268 UpdateSocketMap();
269
270 SOCKET maxFd = 0;
271 for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin();
272 it != _socketMap.end();
273 ++it) {
274 doSelect = true;
275 if (it->first > maxFd)
276 maxFd = it->first;
277 FD_SET(it->first, &_readFds);
278 }
279
280 int num = 0;
281 if (doSelect)
282 {
283 num = select(maxFd+1, &_readFds, NULL, NULL, &timeout);
284
285 if (num == SOCKET_ERROR)
286 {
287 // Timeout = 10 ms.
288 SleepMs(10);
289 return true;
290 }
291 }else
292 {
293 // Timeout = 10 ms.
294 SleepMs(10);
295 return true;
296 }
297
298 for (std::map<SOCKET, UdpSocketPosix*>::iterator it = _socketMap.begin();
299 it != _socketMap.end();
300 ++it) {
301 if (FD_ISSET(it->first, &_readFds)) {
302 it->second->HasIncoming();
303 --num;
304 }
305 }
306
307 return true;
308 }
309
Run(ThreadObj obj)310 bool UdpSocketManagerPosixImpl::Run(ThreadObj obj)
311 {
312 UdpSocketManagerPosixImpl* mgr =
313 static_cast<UdpSocketManagerPosixImpl*>(obj);
314 return mgr->Process();
315 }
316
AddSocket(UdpSocketWrapper * s)317 bool UdpSocketManagerPosixImpl::AddSocket(UdpSocketWrapper* s)
318 {
319 UdpSocketPosix* sl = static_cast<UdpSocketPosix*>(s);
320 if(sl->GetFd() == INVALID_SOCKET || !(sl->GetFd() < FD_SETSIZE))
321 {
322 return false;
323 }
324 _critSectList->Enter();
325 _addList.push_back(s);
326 _critSectList->Leave();
327 return true;
328 }
329
RemoveSocket(UdpSocketWrapper * s)330 bool UdpSocketManagerPosixImpl::RemoveSocket(UdpSocketWrapper* s)
331 {
332 // Put in remove list if this is the correct UdpSocketManagerPosixImpl.
333 _critSectList->Enter();
334
335 // If the socket is in the add list it's safe to remove and delete it.
336 for (SocketList::iterator iter = _addList.begin();
337 iter != _addList.end(); ++iter) {
338 UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter);
339 unsigned int addFD = addSocket->GetFd();
340 unsigned int removeFD = static_cast<UdpSocketPosix*>(s)->GetFd();
341 if(removeFD == addFD)
342 {
343 _removeList.push_back(removeFD);
344 _critSectList->Leave();
345 return true;
346 }
347 }
348
349 // Checking the socket map is safe since all Erase and Insert calls to this
350 // map are also protected by _critSectList.
351 if (_socketMap.find(static_cast<UdpSocketPosix*>(s)->GetFd()) !=
352 _socketMap.end()) {
353 _removeList.push_back(static_cast<UdpSocketPosix*>(s)->GetFd());
354 _critSectList->Leave();
355 return true;
356 }
357 _critSectList->Leave();
358 return false;
359 }
360
UpdateSocketMap()361 void UdpSocketManagerPosixImpl::UpdateSocketMap()
362 {
363 // Remove items in remove list.
364 _critSectList->Enter();
365 for (FdList::iterator iter = _removeList.begin();
366 iter != _removeList.end(); ++iter) {
367 UdpSocketPosix* deleteSocket = NULL;
368 SOCKET removeFD = *iter;
369
370 // If the socket is in the add list it hasn't been added to the socket
371 // map yet. Just remove the socket from the add list.
372 for (SocketList::iterator iter = _addList.begin();
373 iter != _addList.end(); ++iter) {
374 UdpSocketPosix* addSocket = static_cast<UdpSocketPosix*>(*iter);
375 SOCKET addFD = addSocket->GetFd();
376 if(removeFD == addFD)
377 {
378 deleteSocket = addSocket;
379 _addList.erase(iter);
380 break;
381 }
382 }
383
384 // Find and remove socket from _socketMap.
385 std::map<SOCKET, UdpSocketPosix*>::iterator it =
386 _socketMap.find(removeFD);
387 if(it != _socketMap.end())
388 {
389 deleteSocket = it->second;
390 _socketMap.erase(it);
391 }
392 if(deleteSocket)
393 {
394 deleteSocket->ReadyForDeletion();
395 delete deleteSocket;
396 }
397 }
398 _removeList.clear();
399
400 // Add sockets from add list.
401 for (SocketList::iterator iter = _addList.begin();
402 iter != _addList.end(); ++iter) {
403 UdpSocketPosix* s = static_cast<UdpSocketPosix*>(*iter);
404 if(s) {
405 _socketMap[s->GetFd()] = s;
406 }
407 }
408 _addList.clear();
409 _critSectList->Leave();
410 }
411
412 } // namespace test
413 } // namespace webrtc
414