1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/test/channel_transport/udp_socket2_manager_win.h"
12
13 #include <assert.h>
14 #include <stdio.h>
15
16 #include "webrtc/system_wrappers/interface/aligned_malloc.h"
17 #include "webrtc/test/channel_transport/udp_socket2_win.h"
18
19 namespace webrtc {
20 namespace test {
21
22 uint32_t UdpSocket2ManagerWindows::_numOfActiveManagers = 0;
23 bool UdpSocket2ManagerWindows::_wsaInit = false;
24
UdpSocket2ManagerWindows()25 UdpSocket2ManagerWindows::UdpSocket2ManagerWindows()
26 : UdpSocketManager(),
27 _id(-1),
28 _stopped(false),
29 _init(false),
30 _pCrit(CriticalSectionWrapper::CreateCriticalSection()),
31 _ioCompletionHandle(NULL),
32 _numActiveSockets(0),
33 _event(EventWrapper::Create())
34 {
35 _managerNumber = _numOfActiveManagers++;
36
37 if(_numOfActiveManagers == 1)
38 {
39 WORD wVersionRequested = MAKEWORD(2, 2);
40 WSADATA wsaData;
41 _wsaInit = WSAStartup(wVersionRequested, &wsaData) == 0;
42 // TODO (hellner): seems safer to use RAII for this. E.g. what happens
43 // if a UdpSocket2ManagerWindows() created and destroyed
44 // without being initialized.
45 }
46 }
47
~UdpSocket2ManagerWindows()48 UdpSocket2ManagerWindows::~UdpSocket2ManagerWindows()
49 {
50 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
51 "UdpSocket2ManagerWindows(%d)::~UdpSocket2ManagerWindows()",
52 _managerNumber);
53
54 if(_init)
55 {
56 _pCrit->Enter();
57 if(_numActiveSockets)
58 {
59 _pCrit->Leave();
60 _event->Wait(INFINITE);
61 }
62 else
63 {
64 _pCrit->Leave();
65 }
66 StopWorkerThreads();
67
68 for (WorkerList::iterator iter = _workerThreadsList.begin();
69 iter != _workerThreadsList.end(); ++iter) {
70 delete *iter;
71 }
72 _workerThreadsList.clear();
73 _ioContextPool.Free();
74
75 _numOfActiveManagers--;
76 if(_ioCompletionHandle)
77 {
78 CloseHandle(_ioCompletionHandle);
79 }
80 if (_numOfActiveManagers == 0)
81 {
82 if(_wsaInit)
83 {
84 WSACleanup();
85 }
86 }
87 }
88 if(_pCrit)
89 {
90 delete _pCrit;
91 }
92 if(_event)
93 {
94 delete _event;
95 }
96 }
97
Init(int32_t id,uint8_t & numOfWorkThreads)98 bool UdpSocket2ManagerWindows::Init(int32_t id,
99 uint8_t& numOfWorkThreads) {
100 CriticalSectionScoped cs(_pCrit);
101 if ((_id != -1) || (_numOfWorkThreads != 0)) {
102 assert(_id != -1);
103 assert(_numOfWorkThreads != 0);
104 return false;
105 }
106 _id = id;
107 _numOfWorkThreads = numOfWorkThreads;
108 return true;
109 }
110
ChangeUniqueId(const int32_t id)111 int32_t UdpSocket2ManagerWindows::ChangeUniqueId(const int32_t id)
112 {
113 _id = id;
114 return 0;
115 }
116
Start()117 bool UdpSocket2ManagerWindows::Start()
118 {
119 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
120 "UdpSocket2ManagerWindows(%d)::Start()",_managerNumber);
121 if(!_init)
122 {
123 StartWorkerThreads();
124 }
125
126 if(!_init)
127 {
128 return false;
129 }
130 _pCrit->Enter();
131 // Start worker threads.
132 _stopped = false;
133 int32_t error = 0;
134 for (WorkerList::iterator iter = _workerThreadsList.begin();
135 iter != _workerThreadsList.end() && !error; ++iter) {
136 if(!(*iter)->Start())
137 error = 1;
138 }
139 if(error)
140 {
141 WEBRTC_TRACE(
142 kTraceError,
143 kTraceTransport,
144 _id,
145 "UdpSocket2ManagerWindows(%d)::Start() error starting worker\
146 threads",
147 _managerNumber);
148 _pCrit->Leave();
149 return false;
150 }
151 _pCrit->Leave();
152 return true;
153 }
154
StartWorkerThreads()155 bool UdpSocket2ManagerWindows::StartWorkerThreads()
156 {
157 if(!_init)
158 {
159 _pCrit->Enter();
160
161 _ioCompletionHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
162 0, 0);
163 if(_ioCompletionHandle == NULL)
164 {
165 int32_t error = GetLastError();
166 WEBRTC_TRACE(
167 kTraceError,
168 kTraceTransport,
169 _id,
170 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads()"
171 "_ioCompletioHandle == NULL: error:%d",
172 _managerNumber,error);
173 _pCrit->Leave();
174 return false;
175 }
176
177 // Create worker threads.
178 uint32_t i = 0;
179 bool error = false;
180 while(i < _numOfWorkThreads && !error)
181 {
182 UdpSocket2WorkerWindows* pWorker =
183 new UdpSocket2WorkerWindows(_ioCompletionHandle);
184 if(pWorker->Init() != 0)
185 {
186 error = true;
187 delete pWorker;
188 break;
189 }
190 _workerThreadsList.push_front(pWorker);
191 i++;
192 }
193 if(error)
194 {
195 WEBRTC_TRACE(
196 kTraceError,
197 kTraceTransport,
198 _id,
199 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
200 "creating work threads",
201 _managerNumber);
202 // Delete worker threads.
203 for (WorkerList::iterator iter = _workerThreadsList.begin();
204 iter != _workerThreadsList.end(); ++iter) {
205 delete *iter;
206 }
207 _workerThreadsList.clear();
208 _pCrit->Leave();
209 return false;
210 }
211 if(_ioContextPool.Init())
212 {
213 WEBRTC_TRACE(
214 kTraceError,
215 kTraceTransport,
216 _id,
217 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
218 "initiating _ioContextPool",
219 _managerNumber);
220 _pCrit->Leave();
221 return false;
222 }
223 _init = true;
224 WEBRTC_TRACE(
225 kTraceDebug,
226 kTraceTransport,
227 _id,
228 "UdpSocket2ManagerWindows::StartWorkerThreads %d number of work "
229 "threads created and initialized",
230 _numOfWorkThreads);
231 _pCrit->Leave();
232 }
233 return true;
234 }
235
Stop()236 bool UdpSocket2ManagerWindows::Stop()
237 {
238 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
239 "UdpSocket2ManagerWindows(%d)::Stop()",_managerNumber);
240
241 if(!_init)
242 {
243 return false;
244 }
245 _pCrit->Enter();
246 _stopped = true;
247 if(_numActiveSockets)
248 {
249 WEBRTC_TRACE(
250 kTraceError,
251 kTraceTransport,
252 _id,
253 "UdpSocket2ManagerWindows(%d)::Stop() there is still active\
254 sockets",
255 _managerNumber);
256 _pCrit->Leave();
257 return false;
258 }
259 // No active sockets. Stop all worker threads.
260 bool result = StopWorkerThreads();
261 _pCrit->Leave();
262 return result;
263 }
264
StopWorkerThreads()265 bool UdpSocket2ManagerWindows::StopWorkerThreads()
266 {
267 int32_t error = 0;
268 WEBRTC_TRACE(
269 kTraceDebug,
270 kTraceTransport,
271 _id,
272 "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() Worker\
273 threadsStoped, numActicve Sockets=%d",
274 _managerNumber,
275 _numActiveSockets);
276
277 // Set worker threads to not alive so that they will stop calling
278 // UdpSocket2WorkerWindows::Run().
279 for (WorkerList::iterator iter = _workerThreadsList.begin();
280 iter != _workerThreadsList.end(); ++iter) {
281 (*iter)->SetNotAlive();
282 }
283 // Release all threads waiting for GetQueuedCompletionStatus(..).
284 if(_ioCompletionHandle)
285 {
286 uint32_t i = 0;
287 for(i = 0; i < _workerThreadsList.size(); i++)
288 {
289 PostQueuedCompletionStatus(_ioCompletionHandle, 0 ,0 , NULL);
290 }
291 }
292 for (WorkerList::iterator iter = _workerThreadsList.begin();
293 iter != _workerThreadsList.end(); ++iter) {
294 if((*iter)->Stop() == false)
295 {
296 error = -1;
297 WEBRTC_TRACE(kTraceWarning, kTraceTransport, -1,
298 "failed to stop worker thread");
299 }
300 }
301
302 if(error)
303 {
304 WEBRTC_TRACE(
305 kTraceError,
306 kTraceTransport,
307 _id,
308 "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() error stopping\
309 worker threads",
310 _managerNumber);
311 return false;
312 }
313 return true;
314 }
315
AddSocketPrv(UdpSocket2Windows * s)316 bool UdpSocket2ManagerWindows::AddSocketPrv(UdpSocket2Windows* s)
317 {
318 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
319 "UdpSocket2ManagerWindows(%d)::AddSocketPrv()",_managerNumber);
320 if(!_init)
321 {
322 WEBRTC_TRACE(
323 kTraceError,
324 kTraceTransport,
325 _id,
326 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() manager not\
327 initialized",
328 _managerNumber);
329 return false;
330 }
331 _pCrit->Enter();
332 if(s == NULL)
333 {
334 WEBRTC_TRACE(
335 kTraceError,
336 kTraceTransport,
337 _id,
338 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket == NULL",
339 _managerNumber);
340 _pCrit->Leave();
341 return false;
342 }
343 if(s->GetFd() == NULL || s->GetFd() == INVALID_SOCKET)
344 {
345 WEBRTC_TRACE(
346 kTraceError,
347 kTraceTransport,
348 _id,
349 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket->GetFd() ==\
350 %d",
351 _managerNumber,
352 (int32_t)s->GetFd());
353 _pCrit->Leave();
354 return false;
355
356 }
357 _ioCompletionHandle = CreateIoCompletionPort((HANDLE)s->GetFd(),
358 _ioCompletionHandle,
359 (ULONG_PTR)(s), 0);
360 if(_ioCompletionHandle == NULL)
361 {
362 int32_t error = GetLastError();
363 WEBRTC_TRACE(
364 kTraceError,
365 kTraceTransport,
366 _id,
367 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() Error adding to IO\
368 completion: %d",
369 _managerNumber,
370 error);
371 _pCrit->Leave();
372 return false;
373 }
374 _numActiveSockets++;
375 _pCrit->Leave();
376 return true;
377 }
RemoveSocketPrv(UdpSocket2Windows * s)378 bool UdpSocket2ManagerWindows::RemoveSocketPrv(UdpSocket2Windows* s)
379 {
380 if(!_init)
381 {
382 return false;
383 }
384 _pCrit->Enter();
385 _numActiveSockets--;
386 if(_numActiveSockets == 0)
387 {
388 _event->Set();
389 }
390 _pCrit->Leave();
391 return true;
392 }
393
PopIoContext()394 PerIoContext* UdpSocket2ManagerWindows::PopIoContext()
395 {
396 if(!_init)
397 {
398 return NULL;
399 }
400
401 PerIoContext* pIoC = NULL;
402 if(!_stopped)
403 {
404 pIoC = _ioContextPool.PopIoContext();
405 }else
406 {
407 WEBRTC_TRACE(
408 kTraceError,
409 kTraceTransport,
410 _id,
411 "UdpSocket2ManagerWindows(%d)::PopIoContext() Manager Not started",
412 _managerNumber);
413 }
414 return pIoC;
415 }
416
PushIoContext(PerIoContext * pIoContext)417 int32_t UdpSocket2ManagerWindows::PushIoContext(PerIoContext* pIoContext)
418 {
419 return _ioContextPool.PushIoContext(pIoContext);
420 }
421
IoContextPool()422 IoContextPool::IoContextPool()
423 : _pListHead(NULL),
424 _init(false),
425 _size(0),
426 _inUse(0)
427 {
428 }
429
~IoContextPool()430 IoContextPool::~IoContextPool()
431 {
432 Free();
433 assert(_size.Value() == 0);
434 AlignedFree(_pListHead);
435 }
436
Init(uint32_t)437 int32_t IoContextPool::Init(uint32_t /*increaseSize*/)
438 {
439 if(_init)
440 {
441 return 0;
442 }
443
444 _pListHead = (PSLIST_HEADER)AlignedMalloc(sizeof(SLIST_HEADER),
445 MEMORY_ALLOCATION_ALIGNMENT);
446 if(_pListHead == NULL)
447 {
448 return -1;
449 }
450 InitializeSListHead(_pListHead);
451 _init = true;
452 return 0;
453 }
454
PopIoContext()455 PerIoContext* IoContextPool::PopIoContext()
456 {
457 if(!_init)
458 {
459 return NULL;
460 }
461
462 PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
463 if(pListEntry == NULL)
464 {
465 IoContextPoolItem* item = (IoContextPoolItem*)
466 AlignedMalloc(
467 sizeof(IoContextPoolItem),
468 MEMORY_ALLOCATION_ALIGNMENT);
469 if(item == NULL)
470 {
471 return NULL;
472 }
473 memset(&item->payload.ioContext,0,sizeof(PerIoContext));
474 item->payload.base = item;
475 pListEntry = &(item->itemEntry);
476 ++_size;
477 }
478 ++_inUse;
479 return &((IoContextPoolItem*)pListEntry)->payload.ioContext;
480 }
481
PushIoContext(PerIoContext * pIoContext)482 int32_t IoContextPool::PushIoContext(PerIoContext* pIoContext)
483 {
484 // TODO (hellner): Overlapped IO should be completed at this point. Perhaps
485 // add an assert?
486 const bool overlappedIOCompleted = HasOverlappedIoCompleted(
487 (LPOVERLAPPED)pIoContext);
488
489 IoContextPoolItem* item = ((IoContextPoolItemPayload*)pIoContext)->base;
490
491 const int32_t usedItems = --_inUse;
492 const int32_t totalItems = _size.Value();
493 const int32_t freeItems = totalItems - usedItems;
494 if(freeItems < 0)
495 {
496 assert(false);
497 AlignedFree(item);
498 return -1;
499 }
500 if((freeItems >= totalItems>>1) &&
501 overlappedIOCompleted)
502 {
503 AlignedFree(item);
504 --_size;
505 return 0;
506 }
507 InterlockedPushEntrySList(_pListHead, &(item->itemEntry));
508 return 0;
509 }
510
Free()511 int32_t IoContextPool::Free()
512 {
513 if(!_init)
514 {
515 return 0;
516 }
517
518 int32_t itemsFreed = 0;
519 PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
520 while(pListEntry != NULL)
521 {
522 IoContextPoolItem* item = ((IoContextPoolItem*)pListEntry);
523 AlignedFree(item);
524 --_size;
525 itemsFreed++;
526 pListEntry = InterlockedPopEntrySList(_pListHead);
527 }
528 return itemsFreed;
529 }
530
531 int32_t UdpSocket2WorkerWindows::_numOfWorkers = 0;
532
UdpSocket2WorkerWindows(HANDLE ioCompletionHandle)533 UdpSocket2WorkerWindows::UdpSocket2WorkerWindows(HANDLE ioCompletionHandle)
534 : _ioCompletionHandle(ioCompletionHandle),
535 _pThread(NULL),
536 _init(false)
537 {
538 _workerNumber = _numOfWorkers++;
539 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1,
540 "UdpSocket2WorkerWindows created");
541 }
542
~UdpSocket2WorkerWindows()543 UdpSocket2WorkerWindows::~UdpSocket2WorkerWindows()
544 {
545 if(_pThread)
546 {
547 delete _pThread;
548 }
549 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1,
550 "UdpSocket2WorkerWindows deleted");
551 }
552
Start()553 bool UdpSocket2WorkerWindows::Start()
554 {
555 unsigned int id = 0;
556 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1,
557 "Start UdpSocket2WorkerWindows");
558 return _pThread->Start(id);
559 }
560
Stop()561 bool UdpSocket2WorkerWindows::Stop()
562 {
563 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1,
564 "Stop UdpSocket2WorkerWindows");
565 return _pThread->Stop();
566 }
567
SetNotAlive()568 void UdpSocket2WorkerWindows::SetNotAlive()
569 {
570 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1,
571 "SetNotAlive UdpSocket2WorkerWindows");
572 _pThread->SetNotAlive();
573 }
574
Init()575 int32_t UdpSocket2WorkerWindows::Init()
576 {
577 if(!_init)
578 {
579 const char* threadName = "UdpSocket2ManagerWindows_thread";
580 _pThread = ThreadWrapper::CreateThread(Run, this, kRealtimePriority,
581 threadName);
582 if(_pThread == NULL)
583 {
584 WEBRTC_TRACE(
585 kTraceError,
586 kTraceTransport,
587 -1,
588 "UdpSocket2WorkerWindows(%d)::Init(), error creating thread!",
589 _workerNumber);
590 return -1;
591 }
592 _init = true;
593 }
594 return 0;
595 }
596
Run(ThreadObj obj)597 bool UdpSocket2WorkerWindows::Run(ThreadObj obj)
598 {
599 UdpSocket2WorkerWindows* pWorker =
600 static_cast<UdpSocket2WorkerWindows*>(obj);
601 return pWorker->Process();
602 }
603
604 // Process should always return true. Stopping the worker threads is done in
605 // the UdpSocket2ManagerWindows::StopWorkerThreads() function.
Process()606 bool UdpSocket2WorkerWindows::Process()
607 {
608 int32_t success = 0;
609 DWORD ioSize = 0;
610 UdpSocket2Windows* pSocket = NULL;
611 PerIoContext* pIOContext = 0;
612 OVERLAPPED* pOverlapped = 0;
613 success = GetQueuedCompletionStatus(_ioCompletionHandle,
614 &ioSize,
615 (ULONG_PTR*)&pSocket, &pOverlapped, 200);
616
617 uint32_t error = 0;
618 if(!success)
619 {
620 error = GetLastError();
621 if(error == WAIT_TIMEOUT)
622 {
623 return true;
624 }
625 // This may happen if e.g. PostQueuedCompletionStatus() has been called.
626 // The IO context still needs to be reclaimed or re-used which is done
627 // in UdpSocket2Windows::IOCompleted(..).
628 }
629 if(pSocket == NULL)
630 {
631 WEBRTC_TRACE(
632 kTraceDebug,
633 kTraceTransport,
634 -1,
635 "UdpSocket2WorkerWindows(%d)::Process(), pSocket == 0, end thread",
636 _workerNumber);
637 return true;
638 }
639 pIOContext = (PerIoContext*)pOverlapped;
640 pSocket->IOCompleted(pIOContext,ioSize,error);
641 return true;
642 }
643
644 } // namespace test
645 } // namespace webrtc
646