• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* ------------------------------------------------------------------
2  * Copyright (C) 1998-2009 PacketVideo
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13  * express or implied.
14  * See the License for the specific language governing permissions
15  * and limitations under the License.
16  * -------------------------------------------------------------------
17  */
18 
19 #include "threadsafe_callback_ao.h"
20 #include "pvlogger.h"
21 #include "oscl_procstatus.h"
22 
23 #include "oscl_dll.h"
24 
OSCL_DLL_ENTRY_POINT_DEFAULT()25 OSCL_DLL_ENTRY_POINT_DEFAULT()
26 
27 
28 /////////////////////////////////////////////////////////////////////////////
29 // Add AO to the scheduler
30 /////////////////////////////////////////////////////////////////////////////
31 OSCL_EXPORT_REF void ThreadSafeCallbackAO::ThreadLogon()
32 {
33     if (!IsAdded())
34     {
35         AddToScheduler();
36     }
37 
38     iLogger = PVLogger::GetLoggerObject(iLoggerString);
39 }
40 ////////////////////////////////////////////////////////////////////////////
41 // Remove AO from the scheduler
42 ////////////////////////////////////////////////////////////////////////////
ThreadLogoff()43 OSCL_EXPORT_REF void ThreadSafeCallbackAO::ThreadLogoff()
44 {
45     //thread logoff
46     if (IsAdded())
47     {
48         RemoveFromScheduler();
49     }
50 
51     iLogger = NULL;
52 }
53 ///////////////////////////////////////////////////////////////////////////
54 // CONSTRUCTOR
55 ///////////////////////////////////////////////////////////////////////////
ThreadSafeCallbackAO(void * aObserver,uint32 aDepth,const char * aAOname,int32 aPriority)56 OSCL_EXPORT_REF ThreadSafeCallbackAO::ThreadSafeCallbackAO(void *aObserver, uint32 aDepth, const char *aAOname, int32 aPriority)
57         : OsclActiveObject(aPriority, aAOname),
58         iLogger(NULL)
59 {
60 
61 
62     OsclReturnCode queue_init_status = OsclSuccess;
63     OsclProcStatus::eOsclProcError mutex_init_status = OsclProcStatus::SUCCESS_ERROR;
64     OsclProcStatus::eOsclProcError sema_init_status = OsclProcStatus::SUCCESS_ERROR;
65     int32 err = 0;
66 
67     iLoggerString = aAOname;
68     iObserver = aObserver;
69     Q = NULL;
70 
71     OSCL_TRY(err,
72              queue_init_status = QueueInit(aDepth); //create the Q
73              mutex_init_status = Mutex.Create(); // Create Mutex
74              sema_init_status  = RemoteThreadCtrlSema.Create(aDepth);
75 
76              ThreadLogon(); // add to scheduler
77             );
78 
79     if ((err != 0) ||
80             (queue_init_status != OsclSuccess) ||
81             (mutex_init_status != OsclProcStatus::SUCCESS_ERROR) ||
82             (sema_init_status  != OsclProcStatus::SUCCESS_ERROR)
83        )
84     {
85         OSCL_LEAVE(OsclFailure);
86     }
87 
88 
89     PendForExec(); // make sure to "prime" the callback AO for the first event that arrives
90 
91 }
92 
93 /////////////////////////////////////////////////////////////////////////////
94 // Destructor
95 /////////////////////////////////////////////////////////////////////////////
~ThreadSafeCallbackAO()96 OSCL_EXPORT_REF ThreadSafeCallbackAO::~ThreadSafeCallbackAO()
97 {
98     OsclReturnCode queue_deinit_status = OsclSuccess;
99     OsclProcStatus::eOsclProcError mutex_close_status = OsclProcStatus::SUCCESS_ERROR;
100     OsclProcStatus::eOsclProcError sema_close_status = OsclProcStatus::SUCCESS_ERROR;
101 
102     int32 err = 0;
103 
104     OSCL_TRY(err,
105              queue_deinit_status = QueueDeInit(); // Destroy the queue
106              mutex_close_status =  Mutex.Close(); // close the mutex
107              sema_close_status  =  RemoteThreadCtrlSema.Close(); // close the semaphore
108              ThreadLogoff();
109             );
110 
111     if ((err != 0) ||
112             (queue_deinit_status != OsclSuccess) ||
113             (mutex_close_status != OsclProcStatus::SUCCESS_ERROR) ||
114             (sema_close_status != OsclProcStatus::SUCCESS_ERROR))
115     {
116         OSCL_LEAVE(OsclFailure);
117     }
118 
119     iObserver = NULL;
120 }
121 
122 /////////////////////////////////////////////////////////////////////////////
123 // Run Routine
124 /////////////////////////////////////////////////////////////////////////////
Run()125 OSCL_EXPORT_REF void ThreadSafeCallbackAO::Run()
126 {
127     PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() In"));
128 
129 
130     OsclAny *P; // parameter to dequeue
131     OsclReturnCode status;
132 
133 
134     // get the event parameters and status of de-queuing
135     P = DeQueue(status);
136 
137 
138     if (status == OsclSuccess)
139     {
140         PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() - Calling Process Event"));
141         ProcessEvent(P);
142     }
143     else
144     {
145         PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() - could not dequeue event data"));
146     }
147 
148 
149 
150     PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() Out"));
151 }
152 
153 
154 /////////////////////////////////////////////////////////////////////////////////////
155 //////// ReceiveEvent
156 /////////////////////////////////////////////////////////////////////////////////////
157 /////// NOTE: THIS METHOD IS EXECUTED IN THE REMOTE THREAD CONTEXT
158 //////////////AS PART OF THE CALLBACK. IT QUEUES THE EVENT PARAMETERS !!!!!!!!!!!!
159 /////////////////////////////////////////////////////////////////////////////////////
160 ///////////// GENERIC API requires only one parameter (osclany pointer to "event data")
161 ///////////////
162 ////////////// BEFORE CALLING THE GENERIC API, YOU MUST SETUP (COPY IF NECESSARY)
163 ///////////////// CALLBACK EVENT PARAMETERS AND PROVIDE An OSCLANY POINTER TO THEM
164 /////////////////////////////////////////////////////////////////////////////////////
165 ///////////////YOU MUST NOT USE LOGGER IN THE CALLBACK (IN THE REMOTE THREAD CONTEXT)
166 ///////////////
167 //////////////// EXAMPLE CODE (For CALLING THE GENERIC API):
168 
169 // typedef struct event_data{
170 //      int d1;
171 //      char *d2;
172 //      uint32 d3;
173 //      } event_data;
174 
175 //int ActualCallbackAPI(int p1, char * p2, uint32 p3)
176 //{
177 //      // pack all parameters
178 //      event_data *pED = malloc(sizeof(event_data));
179 //      pED->d1 = p1;
180 //      pED->d2 = p2;
181 //      pED->d3 = p3;
182 //      OsclAny *P = (OsclAny *) pED:
183 //          // call the generic callback API:
184 //      PVMFStatus stat = ReceiveEvent(P);
185 //      // check the status
186 //      if(stat == PVMFSuccess)
187 //          return OK;
188 //      else
189 //          return ERROR;
190 //}
191 //
192 
193 
194 
ReceiveEvent(OsclAny * EventData)195 OSCL_EXPORT_REF OsclReturnCode ThreadSafeCallbackAO::ReceiveEvent(OsclAny *EventData)
196 {
197 
198     OsclReturnCode status;
199 
200     status = Queue(EventData);
201 
202     return status;
203 }
204 
ProcessEvent(OsclAny * EventData)205 OSCL_EXPORT_REF OsclReturnCode ThreadSafeCallbackAO::ProcessEvent(OsclAny *EventData)
206 {
207     OSCL_UNUSED_ARG(EventData);
208 // DO NOTHING. OVERLOAD THIS METHOD IN THE DERIVED CLASS TO DO SOMETHING MEANINGFUL
209     PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::ProcessEvent() In and Out"));
210     return OsclSuccess;
211 }
212 ///////////////////////////////////////////////////////////////////////////////
213 //// QUEUE OPERATIONS
214 ///////////////////////////////////////////////////////////////////////////////
215 
216 
217 /////CREATE QUEUE AND INIT ///////////////////
218 
QueueInit(uint32 aMaxQueueDepth)219 OSCL_EXPORT_REF OsclReturnCode ThreadSafeCallbackAO::QueueInit(uint32 aMaxQueueDepth)
220 {
221 
222     if (aMaxQueueDepth < 1)
223     {
224         Q = NULL;
225         return OsclErrArgument;
226     }
227 
228     // create Q as simple array
229     Q = (QueueT *) oscl_malloc(sizeof(QueueT));
230     if (Q == NULL)
231     {
232         return OsclErrNoMemory;
233     }
234 
235     Q->MaxNumElements = aMaxQueueDepth;
236     Q->NumElem = 0;
237     Q->index_in = 0;
238     Q->index_out = 0;
239     Q->pFirst = NULL;
240     // create the Q elements
241     Q->pFirst = (QElement *) oscl_malloc((Q->MaxNumElements) * oscl_mem_aligned_size(sizeof(QElement)));
242     if (Q->pFirst == NULL)
243     {
244         Q = NULL;
245         return OsclErrNoMemory;
246     }
247 
248     // zero out the Q
249     oscl_memset(Q->pFirst, 0, (Q->MaxNumElements) * oscl_mem_aligned_size(sizeof(QElement)));
250 
251 
252     return OsclSuccess;
253 }
254 
255 ///////////// DESTROY QUEUE ///////////////////////
QueueDeInit()256 OSCL_EXPORT_REF OsclReturnCode ThreadSafeCallbackAO::QueueDeInit()
257 {
258     OsclReturnCode status = OsclSuccess;
259 
260     // free the Q elements
261     if (Q->pFirst != NULL)
262         oscl_free(Q->pFirst);
263 
264     Q->pFirst = NULL;
265     // free the Q structure
266     if (Q != NULL)
267         oscl_free(Q);
268 
269     Q = NULL;
270 
271     return status;
272 }
273 
274 /////////////QUEUE ONE ELEMENT //////////////////////
Queue(OsclAny * pData)275 OSCL_EXPORT_REF OsclReturnCode ThreadSafeCallbackAO::Queue(OsclAny *pData)
276 {
277     OsclProcStatus::eOsclProcError sema_status;
278 
279     // Wait on the remote thread control semaphore. If the queue is full, must block and wait
280     // for the AO to dequeue some previous event. If the queue is not full, proceed
281     sema_status = RemoteThreadCtrlSema.Wait();
282     if (sema_status != OsclProcStatus::SUCCESS_ERROR)
283         return OsclFailure;
284 
285 
286     // protect queue access
287     Mutex.Lock();
288 
289     if (Q->NumElem >= Q->MaxNumElements)
290     {
291         Mutex.Unlock();
292         RemoteThreadCtrlSema.Signal(); // signal the sema to maintain sema count consistency in case
293         // of error (inability to queue)
294         return OsclFailure;
295     }
296 
297     (Q->pFirst[Q->index_in]).pData = pData;
298 
299     Q->index_in++;
300     // roll-over the index to 0 if it reaches the end.
301     if (Q->index_in == Q->MaxNumElements)
302         Q->index_in = 0;
303 
304     Q->NumElem++;
305 
306     // check if AO needs to be scheduled (i.e. check if this is the first event the queue after the queue was empty)
307     if (GetQueueNumElem() == 1)
308     {
309         PendComplete(OSCL_REQUEST_ERR_NONE);
310     }
311 
312     Mutex.Unlock();
313 
314     return OsclSuccess;
315 }
316 
317 ///////////DE-QUEUE ONE ELEMENT /////////////////////
DeQueue(OsclReturnCode & stat)318 OSCL_EXPORT_REF OsclAny* ThreadSafeCallbackAO::DeQueue(OsclReturnCode &stat)
319 {
320     OsclAny *pData;
321     OsclProcStatus::eOsclProcError sema_status;
322 
323     stat = OsclSuccess;
324 
325     // Protect the queue while accessing it:
326     Mutex.Lock();
327 
328     if (Q->NumElem == 0)
329     {
330         // nothing to de-queue
331         stat = OsclFailure;
332         Mutex.Unlock();
333 
334         return NULL;
335     }
336 
337     pData = (Q->pFirst[Q->index_out]).pData;
338 
339     Q->index_out++;
340     // roll-over the index
341     if (Q->index_out == Q->MaxNumElements)
342         Q->index_out = 0;
343 
344     Q->NumElem--;
345 
346     // check if there is need to reschedule
347     if ((Q->NumElem) > 0)
348     {
349         PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() - More events, call RunIfNotReady()"));
350         RunIfNotReady();
351     }
352     else
353     {
354         PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() - No more events, call PendForExec()"));
355         PendForExec();
356     }
357 
358     //release queue access
359     Mutex.Unlock();
360 
361     // Signal the semaphore that controls the remote thread.
362     // The remote thread might be blocked and waiting for an event to be processed in case the event queue is full
363     sema_status = RemoteThreadCtrlSema.Signal();
364     if (sema_status != OsclProcStatus::SUCCESS_ERROR)
365     {
366         stat = OsclFailure;
367         return NULL;
368     }
369 
370     return pData;
371 }
372 
373 ////////// GET CURRENT QUEUE SIZE//////////////
GetQueueNumElem()374 OSCL_EXPORT_REF uint32 ThreadSafeCallbackAO::GetQueueNumElem()
375 {
376     return Q->NumElem;
377 }
378 
379