• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/test/channel_transport/udp_socket2_manager_win.h"
12 
13 #include <assert.h>
14 #include <stdio.h>
15 
16 #include "webrtc/system_wrappers/interface/aligned_malloc.h"
17 #include "webrtc/test/channel_transport/udp_socket2_win.h"
18 
19 namespace webrtc {
20 namespace test {
21 
22 uint32_t UdpSocket2ManagerWindows::_numOfActiveManagers = 0;
23 bool UdpSocket2ManagerWindows::_wsaInit = false;
24 
UdpSocket2ManagerWindows()25 UdpSocket2ManagerWindows::UdpSocket2ManagerWindows()
26     : UdpSocketManager(),
27       _id(-1),
28       _stopped(false),
29       _init(false),
30       _pCrit(CriticalSectionWrapper::CreateCriticalSection()),
31       _ioCompletionHandle(NULL),
32       _numActiveSockets(0),
33       _event(EventWrapper::Create())
34 {
35     _managerNumber = _numOfActiveManagers++;
36 
37     if(_numOfActiveManagers == 1)
38     {
39         WORD wVersionRequested = MAKEWORD(2, 2);
40         WSADATA wsaData;
41         _wsaInit = WSAStartup(wVersionRequested, &wsaData) == 0;
42         // TODO (hellner): seems safer to use RAII for this. E.g. what happens
43         //                 if a UdpSocket2ManagerWindows() created and destroyed
44         //                 without being initialized.
45     }
46 }
47 
~UdpSocket2ManagerWindows()48 UdpSocket2ManagerWindows::~UdpSocket2ManagerWindows()
49 {
50     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
51                  "UdpSocket2ManagerWindows(%d)::~UdpSocket2ManagerWindows()",
52                  _managerNumber);
53 
54     if(_init)
55     {
56         _pCrit->Enter();
57         if(_numActiveSockets)
58         {
59             _pCrit->Leave();
60             _event->Wait(INFINITE);
61         }
62         else
63         {
64             _pCrit->Leave();
65         }
66         StopWorkerThreads();
67 
68         for (WorkerList::iterator iter = _workerThreadsList.begin();
69              iter != _workerThreadsList.end(); ++iter) {
70           delete *iter;
71         }
72         _workerThreadsList.clear();
73         _ioContextPool.Free();
74 
75         _numOfActiveManagers--;
76         if(_ioCompletionHandle)
77         {
78             CloseHandle(_ioCompletionHandle);
79         }
80         if (_numOfActiveManagers == 0)
81         {
82             if(_wsaInit)
83             {
84                 WSACleanup();
85             }
86         }
87     }
88     if(_pCrit)
89     {
90         delete _pCrit;
91     }
92     if(_event)
93     {
94         delete _event;
95     }
96 }
97 
Init(int32_t id,uint8_t & numOfWorkThreads)98 bool UdpSocket2ManagerWindows::Init(int32_t id,
99                                     uint8_t& numOfWorkThreads) {
100   CriticalSectionScoped cs(_pCrit);
101   if ((_id != -1) || (_numOfWorkThreads != 0)) {
102       assert(_id != -1);
103       assert(_numOfWorkThreads != 0);
104       return false;
105   }
106   _id = id;
107   _numOfWorkThreads = numOfWorkThreads;
108   return true;
109 }
110 
ChangeUniqueId(const int32_t id)111 int32_t UdpSocket2ManagerWindows::ChangeUniqueId(const int32_t id)
112 {
113     _id = id;
114     return 0;
115 }
116 
Start()117 bool UdpSocket2ManagerWindows::Start()
118 {
119     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
120                  "UdpSocket2ManagerWindows(%d)::Start()",_managerNumber);
121     if(!_init)
122     {
123         StartWorkerThreads();
124     }
125 
126     if(!_init)
127     {
128         return false;
129     }
130     _pCrit->Enter();
131     // Start worker threads.
132     _stopped = false;
133     int32_t error = 0;
134     for (WorkerList::iterator iter = _workerThreadsList.begin();
135          iter != _workerThreadsList.end() && !error; ++iter) {
136       if(!(*iter)->Start())
137         error = 1;
138     }
139     if(error)
140     {
141         WEBRTC_TRACE(
142             kTraceError,
143             kTraceTransport,
144             _id,
145             "UdpSocket2ManagerWindows(%d)::Start() error starting worker\
146  threads",
147             _managerNumber);
148         _pCrit->Leave();
149         return false;
150     }
151     _pCrit->Leave();
152     return true;
153 }
154 
StartWorkerThreads()155 bool UdpSocket2ManagerWindows::StartWorkerThreads()
156 {
157     if(!_init)
158     {
159         _pCrit->Enter();
160 
161         _ioCompletionHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
162                                                      0, 0);
163         if(_ioCompletionHandle == NULL)
164         {
165             int32_t error = GetLastError();
166             WEBRTC_TRACE(
167                 kTraceError,
168                 kTraceTransport,
169                 _id,
170                 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads()"
171                 "_ioCompletioHandle == NULL: error:%d",
172                 _managerNumber,error);
173             _pCrit->Leave();
174             return false;
175         }
176 
177         // Create worker threads.
178         uint32_t i = 0;
179         bool error = false;
180         while(i < _numOfWorkThreads && !error)
181         {
182             UdpSocket2WorkerWindows* pWorker =
183                 new UdpSocket2WorkerWindows(_ioCompletionHandle);
184             if(pWorker->Init() != 0)
185             {
186                 error = true;
187                 delete pWorker;
188                 break;
189             }
190             _workerThreadsList.push_front(pWorker);
191             i++;
192         }
193         if(error)
194         {
195             WEBRTC_TRACE(
196                 kTraceError,
197                 kTraceTransport,
198                 _id,
199                 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
200                 "creating work threads",
201                 _managerNumber);
202             // Delete worker threads.
203             for (WorkerList::iterator iter = _workerThreadsList.begin();
204                  iter != _workerThreadsList.end(); ++iter) {
205               delete *iter;
206             }
207             _workerThreadsList.clear();
208             _pCrit->Leave();
209             return false;
210         }
211         if(_ioContextPool.Init())
212         {
213             WEBRTC_TRACE(
214                 kTraceError,
215                 kTraceTransport,
216                 _id,
217                 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
218                 "initiating _ioContextPool",
219                 _managerNumber);
220             _pCrit->Leave();
221             return false;
222         }
223         _init = true;
224         WEBRTC_TRACE(
225             kTraceDebug,
226             kTraceTransport,
227             _id,
228             "UdpSocket2ManagerWindows::StartWorkerThreads %d number of work "
229             "threads created and initialized",
230             _numOfWorkThreads);
231         _pCrit->Leave();
232     }
233     return true;
234 }
235 
Stop()236 bool UdpSocket2ManagerWindows::Stop()
237 {
238     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
239                  "UdpSocket2ManagerWindows(%d)::Stop()",_managerNumber);
240 
241     if(!_init)
242     {
243         return false;
244     }
245     _pCrit->Enter();
246     _stopped = true;
247     if(_numActiveSockets)
248     {
249         WEBRTC_TRACE(
250             kTraceError,
251             kTraceTransport,
252             _id,
253             "UdpSocket2ManagerWindows(%d)::Stop() there is still active\
254  sockets",
255             _managerNumber);
256         _pCrit->Leave();
257         return false;
258     }
259     // No active sockets. Stop all worker threads.
260     bool result = StopWorkerThreads();
261     _pCrit->Leave();
262     return result;
263 }
264 
StopWorkerThreads()265 bool UdpSocket2ManagerWindows::StopWorkerThreads()
266 {
267     int32_t error = 0;
268     WEBRTC_TRACE(
269         kTraceDebug,
270         kTraceTransport,
271         _id,
272         "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() Worker\
273  threadsStoped, numActicve Sockets=%d",
274         _managerNumber,
275         _numActiveSockets);
276 
277     // Set worker threads to not alive so that they will stop calling
278     // UdpSocket2WorkerWindows::Run().
279     for (WorkerList::iterator iter = _workerThreadsList.begin();
280          iter != _workerThreadsList.end(); ++iter) {
281         (*iter)->SetNotAlive();
282     }
283     // Release all threads waiting for GetQueuedCompletionStatus(..).
284     if(_ioCompletionHandle)
285     {
286         uint32_t i = 0;
287         for(i = 0; i < _workerThreadsList.size(); i++)
288         {
289             PostQueuedCompletionStatus(_ioCompletionHandle, 0 ,0 , NULL);
290         }
291     }
292     for (WorkerList::iterator iter = _workerThreadsList.begin();
293          iter != _workerThreadsList.end(); ++iter) {
294         if((*iter)->Stop() == false)
295         {
296             error = -1;
297             WEBRTC_TRACE(kTraceWarning,  kTraceTransport, -1,
298                          "failed to stop worker thread");
299         }
300     }
301 
302     if(error)
303     {
304         WEBRTC_TRACE(
305             kTraceError,
306             kTraceTransport,
307             _id,
308             "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() error stopping\
309  worker threads",
310             _managerNumber);
311         return false;
312     }
313     return true;
314 }
315 
AddSocketPrv(UdpSocket2Windows * s)316 bool UdpSocket2ManagerWindows::AddSocketPrv(UdpSocket2Windows* s)
317 {
318     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
319                  "UdpSocket2ManagerWindows(%d)::AddSocketPrv()",_managerNumber);
320     if(!_init)
321     {
322         WEBRTC_TRACE(
323             kTraceError,
324             kTraceTransport,
325             _id,
326             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() manager not\
327  initialized",
328             _managerNumber);
329         return false;
330     }
331     _pCrit->Enter();
332     if(s == NULL)
333     {
334         WEBRTC_TRACE(
335             kTraceError,
336             kTraceTransport,
337             _id,
338             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket == NULL",
339             _managerNumber);
340         _pCrit->Leave();
341         return false;
342     }
343     if(s->GetFd() == NULL || s->GetFd() == INVALID_SOCKET)
344     {
345         WEBRTC_TRACE(
346             kTraceError,
347             kTraceTransport,
348             _id,
349             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket->GetFd() ==\
350  %d",
351             _managerNumber,
352             (int32_t)s->GetFd());
353         _pCrit->Leave();
354         return false;
355 
356     }
357     _ioCompletionHandle = CreateIoCompletionPort((HANDLE)s->GetFd(),
358                                                  _ioCompletionHandle,
359                                                  (ULONG_PTR)(s), 0);
360     if(_ioCompletionHandle == NULL)
361     {
362         int32_t error = GetLastError();
363         WEBRTC_TRACE(
364             kTraceError,
365             kTraceTransport,
366             _id,
367             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() Error adding to IO\
368  completion: %d",
369             _managerNumber,
370             error);
371         _pCrit->Leave();
372         return false;
373     }
374     _numActiveSockets++;
375     _pCrit->Leave();
376     return true;
377 }
RemoveSocketPrv(UdpSocket2Windows * s)378 bool UdpSocket2ManagerWindows::RemoveSocketPrv(UdpSocket2Windows* s)
379 {
380     if(!_init)
381     {
382         return false;
383     }
384     _pCrit->Enter();
385     _numActiveSockets--;
386     if(_numActiveSockets == 0)
387     {
388         _event->Set();
389     }
390     _pCrit->Leave();
391     return true;
392 }
393 
PopIoContext()394 PerIoContext* UdpSocket2ManagerWindows::PopIoContext()
395 {
396     if(!_init)
397     {
398         return NULL;
399     }
400 
401     PerIoContext* pIoC = NULL;
402     if(!_stopped)
403     {
404         pIoC = _ioContextPool.PopIoContext();
405     }else
406     {
407         WEBRTC_TRACE(
408             kTraceError,
409             kTraceTransport,
410             _id,
411             "UdpSocket2ManagerWindows(%d)::PopIoContext() Manager Not started",
412             _managerNumber);
413     }
414     return pIoC;
415 }
416 
PushIoContext(PerIoContext * pIoContext)417 int32_t UdpSocket2ManagerWindows::PushIoContext(PerIoContext* pIoContext)
418 {
419     return _ioContextPool.PushIoContext(pIoContext);
420 }
421 
IoContextPool()422 IoContextPool::IoContextPool()
423     : _pListHead(NULL),
424       _init(false),
425       _size(0),
426       _inUse(0)
427 {
428 }
429 
~IoContextPool()430 IoContextPool::~IoContextPool()
431 {
432     Free();
433     assert(_size.Value() == 0);
434     AlignedFree(_pListHead);
435 }
436 
Init(uint32_t)437 int32_t IoContextPool::Init(uint32_t /*increaseSize*/)
438 {
439     if(_init)
440     {
441         return 0;
442     }
443 
444     _pListHead = (PSLIST_HEADER)AlignedMalloc(sizeof(SLIST_HEADER),
445                                               MEMORY_ALLOCATION_ALIGNMENT);
446     if(_pListHead == NULL)
447     {
448         return -1;
449     }
450     InitializeSListHead(_pListHead);
451     _init = true;
452     return 0;
453 }
454 
PopIoContext()455 PerIoContext* IoContextPool::PopIoContext()
456 {
457     if(!_init)
458     {
459         return NULL;
460     }
461 
462     PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
463     if(pListEntry == NULL)
464     {
465         IoContextPoolItem* item = (IoContextPoolItem*)
466             AlignedMalloc(
467                 sizeof(IoContextPoolItem),
468                 MEMORY_ALLOCATION_ALIGNMENT);
469         if(item == NULL)
470         {
471             return NULL;
472         }
473         memset(&item->payload.ioContext,0,sizeof(PerIoContext));
474         item->payload.base = item;
475         pListEntry = &(item->itemEntry);
476         ++_size;
477     }
478     ++_inUse;
479     return &((IoContextPoolItem*)pListEntry)->payload.ioContext;
480 }
481 
PushIoContext(PerIoContext * pIoContext)482 int32_t IoContextPool::PushIoContext(PerIoContext* pIoContext)
483 {
484     // TODO (hellner): Overlapped IO should be completed at this point. Perhaps
485     //                 add an assert?
486     const bool overlappedIOCompleted = HasOverlappedIoCompleted(
487         (LPOVERLAPPED)pIoContext);
488 
489     IoContextPoolItem* item = ((IoContextPoolItemPayload*)pIoContext)->base;
490 
491     const int32_t usedItems = --_inUse;
492     const int32_t totalItems = _size.Value();
493     const int32_t freeItems = totalItems - usedItems;
494     if(freeItems < 0)
495     {
496         assert(false);
497         AlignedFree(item);
498         return -1;
499     }
500     if((freeItems >= totalItems>>1) &&
501         overlappedIOCompleted)
502     {
503         AlignedFree(item);
504         --_size;
505         return 0;
506     }
507     InterlockedPushEntrySList(_pListHead, &(item->itemEntry));
508     return 0;
509 }
510 
Free()511 int32_t IoContextPool::Free()
512 {
513     if(!_init)
514     {
515         return 0;
516     }
517 
518     int32_t itemsFreed = 0;
519     PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
520     while(pListEntry != NULL)
521     {
522         IoContextPoolItem* item = ((IoContextPoolItem*)pListEntry);
523         AlignedFree(item);
524         --_size;
525         itemsFreed++;
526         pListEntry = InterlockedPopEntrySList(_pListHead);
527     }
528     return itemsFreed;
529 }
530 
531 int32_t UdpSocket2WorkerWindows::_numOfWorkers = 0;
532 
UdpSocket2WorkerWindows(HANDLE ioCompletionHandle)533 UdpSocket2WorkerWindows::UdpSocket2WorkerWindows(HANDLE ioCompletionHandle)
534     : _ioCompletionHandle(ioCompletionHandle),
535       _pThread(NULL),
536       _init(false)
537 {
538     _workerNumber = _numOfWorkers++;
539     WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
540                  "UdpSocket2WorkerWindows created");
541 }
542 
~UdpSocket2WorkerWindows()543 UdpSocket2WorkerWindows::~UdpSocket2WorkerWindows()
544 {
545     if(_pThread)
546     {
547         delete _pThread;
548     }
549     WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
550                  "UdpSocket2WorkerWindows deleted");
551 }
552 
Start()553 bool UdpSocket2WorkerWindows::Start()
554 {
555     unsigned int id = 0;
556     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
557                  "Start UdpSocket2WorkerWindows");
558     return _pThread->Start(id);
559 }
560 
Stop()561 bool UdpSocket2WorkerWindows::Stop()
562 {
563     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
564                  "Stop UdpSocket2WorkerWindows");
565     return _pThread->Stop();
566 }
567 
SetNotAlive()568 void UdpSocket2WorkerWindows::SetNotAlive()
569 {
570     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
571                  "SetNotAlive UdpSocket2WorkerWindows");
572     _pThread->SetNotAlive();
573 }
574 
Init()575 int32_t UdpSocket2WorkerWindows::Init()
576 {
577     if(!_init)
578     {
579         const char* threadName = "UdpSocket2ManagerWindows_thread";
580         _pThread = ThreadWrapper::CreateThread(Run, this, kRealtimePriority,
581                                                threadName);
582         if(_pThread == NULL)
583         {
584             WEBRTC_TRACE(
585                 kTraceError,
586                 kTraceTransport,
587                 -1,
588                 "UdpSocket2WorkerWindows(%d)::Init(), error creating thread!",
589                 _workerNumber);
590             return -1;
591         }
592         _init = true;
593     }
594     return 0;
595 }
596 
Run(ThreadObj obj)597 bool UdpSocket2WorkerWindows::Run(ThreadObj obj)
598 {
599     UdpSocket2WorkerWindows* pWorker =
600         static_cast<UdpSocket2WorkerWindows*>(obj);
601     return pWorker->Process();
602 }
603 
604 // Process should always return true. Stopping the worker threads is done in
605 // the UdpSocket2ManagerWindows::StopWorkerThreads() function.
Process()606 bool UdpSocket2WorkerWindows::Process()
607 {
608     int32_t success = 0;
609     DWORD ioSize = 0;
610     UdpSocket2Windows* pSocket = NULL;
611     PerIoContext* pIOContext = 0;
612     OVERLAPPED* pOverlapped = 0;
613     success = GetQueuedCompletionStatus(_ioCompletionHandle,
614                                         &ioSize,
615                                        (ULONG_PTR*)&pSocket, &pOverlapped, 200);
616 
617     uint32_t error = 0;
618     if(!success)
619     {
620         error = GetLastError();
621         if(error == WAIT_TIMEOUT)
622         {
623             return true;
624         }
625         // This may happen if e.g. PostQueuedCompletionStatus() has been called.
626         // The IO context still needs to be reclaimed or re-used which is done
627         // in UdpSocket2Windows::IOCompleted(..).
628     }
629     if(pSocket == NULL)
630     {
631         WEBRTC_TRACE(
632             kTraceDebug,
633             kTraceTransport,
634             -1,
635             "UdpSocket2WorkerWindows(%d)::Process(), pSocket == 0, end thread",
636             _workerNumber);
637         return true;
638     }
639     pIOContext = (PerIoContext*)pOverlapped;
640     pSocket->IOCompleted(pIOContext,ioSize,error);
641     return true;
642 }
643 
644 }  // namespace test
645 }  // namespace webrtc
646