1 /* ------------------------------------------------------------------
2 * Copyright (C) 1998-2009 PacketVideo
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13 * express or implied.
14 * See the License for the specific language governing permissions
15 * and limitations under the License.
16 * -------------------------------------------------------------------
17 */
18
19 #include "threadsafe_queue.h"
20
21 //Only do sem wait on pre-emptive thread OS.
22 //The reason is that sem wait is not always available on
23 //non-pre-emptive OS (such as under Brew applet thread)
24 #if !(OSCL_HAS_NON_PREEMPTIVE_THREAD_SUPPORT)
25 #define USE_SEM_WAIT 1
26 #else
27 #define USE_SEM_WAIT 0
28 #endif
29
ThreadSafeQueue()30 OSCL_EXPORT_REF ThreadSafeQueue::ThreadSafeQueue()
31 : OsclActiveObject(OsclActiveObject::EPriorityNominal, "ThreadSafeQueue")
32 {
33 iObserver = NULL;
34 iCounter = 1;
35 if (OsclThread::GetId(iThreadId) != OsclProcStatus::SUCCESS_ERROR)
36 OsclError::Leave(OsclErrSystemCallFailed);
37 #if USE_SEM_WAIT
38 iQueueReadySem.Create();
39 #endif
40 iQueueMut.Create();
41 AddToScheduler();
42 PendForExec();
43 iQueueReadySem.Signal();
44 }
45
~ThreadSafeQueue()46 OSCL_EXPORT_REF ThreadSafeQueue::~ThreadSafeQueue()
47 {
48 RemoveFromScheduler();
49 #if USE_SEM_WAIT
50 iQueueReadySem.Close();
51 #endif
52 iQueueMut.Close();
53 }
54
IsInThread()55 OSCL_EXPORT_REF bool ThreadSafeQueue::IsInThread()
56 {
57 TOsclThreadId id;
58 if (OsclThread::GetId(id) == OsclProcStatus::SUCCESS_ERROR)
59 {
60 return OsclThread::CompareId(id, iThreadId);
61 }
62 return false;
63 }
64
Configure(ThreadSafeQueueObserver * aObs,uint32 aReserve,uint32 aId)65 OSCL_EXPORT_REF void ThreadSafeQueue::Configure(ThreadSafeQueueObserver* aObs, uint32 aReserve, uint32 aId)
66 {
67 iQueueMut.Lock();
68 iObserver = aObs;
69 iQueue.reserve(aReserve);
70 iCounter = aId;
71 iQueueMut.Unlock();
72 }
73
AddToQueue(OsclAny * EventData,ThreadSafeQueueId * aId)74 OSCL_EXPORT_REF ThreadSafeQueueId ThreadSafeQueue::AddToQueue(OsclAny *EventData, ThreadSafeQueueId* aId)
75 {
76 iQueueMut.Lock();
77 uint32 count = (aId) ? *aId : ++iCounter;
78 ThreadSafeQueueElement elem(count, EventData);
79 iQueue.push_back(elem);
80 uint32 size = iQueue.size();
81 iQueueMut.Unlock();
82
83 //Signal the AO. Only signal when the queue was previously empty in order
84 // to minimize the amount of blocking in this call.
85 if (size == 1)
86 {
87 #if USE_SEM_WAIT
88 //Wait on the AO to be ready to be signaled.
89 iQueueReadySem.Wait();
90 PendComplete(OSCL_REQUEST_ERR_NONE);
91 #else
92 //To avoid problems under brew applet, don't do a sem wait here.
93 //instead just check AO status and signal if needed. It should
94 //not be possible to lose data, since the only time AO is *not* ready
95 //to be signaled is when a notification is already pending.
96 //The reason to not do this in all platforms is that Status() call
97 //is not thread-safe, but on non-preemptive OS it's ok here.
98 if (Status() == OSCL_REQUEST_PENDING)
99 PendComplete(OSCL_REQUEST_ERR_NONE);
100 #endif
101 }
102
103 return count;
104 }
105
DeQueue(ThreadSafeQueueId & aId,OsclAny * & aData)106 OSCL_EXPORT_REF uint32 ThreadSafeQueue::DeQueue(ThreadSafeQueueId& aId, OsclAny*& aData)
107 {
108 uint32 num = 0;
109 iQueueMut.Lock();
110 if (iQueue.size())
111 {
112 aId = iQueue[0].iId;
113 aData = iQueue[0].iData;
114 iQueue.erase(&iQueue[0]);
115 num++;
116 }
117 iQueueMut.Unlock();
118 return num;
119 }
120
Run()121 void ThreadSafeQueue::Run()
122 {
123 iQueueMut.Lock();
124 PendForExec();
125 #if USE_SEM_WAIT
126 iQueueReadySem.Signal();
127 #endif
128 uint32 count = iQueue.size();
129 ThreadSafeQueueObserver* obs = iObserver;
130 iQueueMut.Unlock();
131
132 //note: don't do the callback under the lock, in order to allow
133 //de-queueing the data in the callback. this creates the possibility
134 //that queue size may not equal "count" in the callback.
135 if (count && obs)
136 obs->ThreadSafeQueueDataAvailable(this);
137 }
138
139
140
141
142