1 /* ------------------------------------------------------------------
2 * Copyright (C) 1998-2009 PacketVideo
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13 * express or implied.
14 * See the License for the specific language governing permissions
15 * and limitations under the License.
16 * -------------------------------------------------------------------
17 */
18
19 #include "threadsafe_callback_ao.h"
20 #include "pvlogger.h"
21 #include "oscl_procstatus.h"
22
23 #include "oscl_dll.h"
24
OSCL_DLL_ENTRY_POINT_DEFAULT()25 OSCL_DLL_ENTRY_POINT_DEFAULT()
26
27
28 /////////////////////////////////////////////////////////////////////////////
29 // Add AO to the scheduler
30 /////////////////////////////////////////////////////////////////////////////
31 OSCL_EXPORT_REF void ThreadSafeCallbackAO::ThreadLogon()
32 {
33 if (!IsAdded())
34 {
35 AddToScheduler();
36 }
37
38 iLogger = PVLogger::GetLoggerObject(iLoggerString);
39 }
40 ////////////////////////////////////////////////////////////////////////////
41 // Remove AO from the scheduler
42 ////////////////////////////////////////////////////////////////////////////
ThreadLogoff()43 OSCL_EXPORT_REF void ThreadSafeCallbackAO::ThreadLogoff()
44 {
45 //thread logoff
46 if (IsAdded())
47 {
48 RemoveFromScheduler();
49 }
50
51 iLogger = NULL;
52 }
53 ///////////////////////////////////////////////////////////////////////////
54 // CONSTRUCTOR
55 ///////////////////////////////////////////////////////////////////////////
ThreadSafeCallbackAO(void * aObserver,uint32 aDepth,const char * aAOname,int32 aPriority)56 OSCL_EXPORT_REF ThreadSafeCallbackAO::ThreadSafeCallbackAO(void *aObserver, uint32 aDepth, const char *aAOname, int32 aPriority)
57 : OsclActiveObject(aPriority, aAOname),
58 iLogger(NULL)
59 {
60
61
62 OsclReturnCode queue_init_status = OsclSuccess;
63 OsclProcStatus::eOsclProcError mutex_init_status = OsclProcStatus::SUCCESS_ERROR;
64 OsclProcStatus::eOsclProcError sema_init_status = OsclProcStatus::SUCCESS_ERROR;
65 int32 err = 0;
66
67 iLoggerString = aAOname;
68 iObserver = aObserver;
69 Q = NULL;
70
71 OSCL_TRY(err,
72 queue_init_status = QueueInit(aDepth); //create the Q
73 mutex_init_status = Mutex.Create(); // Create Mutex
74 sema_init_status = RemoteThreadCtrlSema.Create(aDepth);
75
76 ThreadLogon(); // add to scheduler
77 );
78
79 if ((err != 0) ||
80 (queue_init_status != OsclSuccess) ||
81 (mutex_init_status != OsclProcStatus::SUCCESS_ERROR) ||
82 (sema_init_status != OsclProcStatus::SUCCESS_ERROR)
83 )
84 {
85 OSCL_LEAVE(OsclFailure);
86 }
87
88
89 PendForExec(); // make sure to "prime" the callback AO for the first event that arrives
90
91 }
92
93 /////////////////////////////////////////////////////////////////////////////
94 // Destructor
95 /////////////////////////////////////////////////////////////////////////////
~ThreadSafeCallbackAO()96 OSCL_EXPORT_REF ThreadSafeCallbackAO::~ThreadSafeCallbackAO()
97 {
98 OsclReturnCode queue_deinit_status = OsclSuccess;
99 OsclProcStatus::eOsclProcError mutex_close_status = OsclProcStatus::SUCCESS_ERROR;
100 OsclProcStatus::eOsclProcError sema_close_status = OsclProcStatus::SUCCESS_ERROR;
101
102 int32 err = 0;
103
104 OSCL_TRY(err,
105 queue_deinit_status = QueueDeInit(); // Destroy the queue
106 mutex_close_status = Mutex.Close(); // close the mutex
107 sema_close_status = RemoteThreadCtrlSema.Close(); // close the semaphore
108 ThreadLogoff();
109 );
110
111 if ((err != 0) ||
112 (queue_deinit_status != OsclSuccess) ||
113 (mutex_close_status != OsclProcStatus::SUCCESS_ERROR) ||
114 (sema_close_status != OsclProcStatus::SUCCESS_ERROR))
115 {
116 OSCL_LEAVE(OsclFailure);
117 }
118
119 iObserver = NULL;
120 }
121
122 /////////////////////////////////////////////////////////////////////////////
123 // Run Routine
124 /////////////////////////////////////////////////////////////////////////////
Run()125 OSCL_EXPORT_REF void ThreadSafeCallbackAO::Run()
126 {
127 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() In"));
128
129
130 OsclAny *P; // parameter to dequeue
131 OsclReturnCode status;
132
133
134 // get the event parameters and status of de-queuing
135 P = DeQueue(status);
136
137
138 if (status == OsclSuccess)
139 {
140 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() - Calling Process Event"));
141 ProcessEvent(P);
142 }
143 else
144 {
145 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() - could not dequeue event data"));
146 }
147
148
149
150 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() Out"));
151 }
152
153
154 /////////////////////////////////////////////////////////////////////////////////////
155 //////// ReceiveEvent
156 /////////////////////////////////////////////////////////////////////////////////////
157 /////// NOTE: THIS METHOD IS EXECUTED IN THE REMOTE THREAD CONTEXT
158 //////////////AS PART OF THE CALLBACK. IT QUEUES THE EVENT PARAMETERS !!!!!!!!!!!!
159 /////////////////////////////////////////////////////////////////////////////////////
160 ///////////// GENERIC API requires only one parameter (osclany pointer to "event data")
161 ///////////////
162 ////////////// BEFORE CALLING THE GENERIC API, YOU MUST SETUP (COPY IF NECESSARY)
163 ///////////////// CALLBACK EVENT PARAMETERS AND PROVIDE An OSCLANY POINTER TO THEM
164 /////////////////////////////////////////////////////////////////////////////////////
165 ///////////////YOU MUST NOT USE LOGGER IN THE CALLBACK (IN THE REMOTE THREAD CONTEXT)
166 ///////////////
167 //////////////// EXAMPLE CODE (For CALLING THE GENERIC API):
168
169 // typedef struct event_data{
170 // int d1;
171 // char *d2;
172 // uint32 d3;
173 // } event_data;
174
175 //int ActualCallbackAPI(int p1, char * p2, uint32 p3)
176 //{
177 // // pack all parameters
178 // event_data *pED = malloc(sizeof(event_data));
179 // pED->d1 = p1;
180 // pED->d2 = p2;
181 // pED->d3 = p3;
182 // OsclAny *P = (OsclAny *) pED:
183 // // call the generic callback API:
184 // PVMFStatus stat = ReceiveEvent(P);
185 // // check the status
186 // if(stat == PVMFSuccess)
187 // return OK;
188 // else
189 // return ERROR;
190 //}
191 //
192
193
194
ReceiveEvent(OsclAny * EventData)195 OSCL_EXPORT_REF OsclReturnCode ThreadSafeCallbackAO::ReceiveEvent(OsclAny *EventData)
196 {
197
198 OsclReturnCode status;
199
200 status = Queue(EventData);
201
202 return status;
203 }
204
ProcessEvent(OsclAny * EventData)205 OSCL_EXPORT_REF OsclReturnCode ThreadSafeCallbackAO::ProcessEvent(OsclAny *EventData)
206 {
207 OSCL_UNUSED_ARG(EventData);
208 // DO NOTHING. OVERLOAD THIS METHOD IN THE DERIVED CLASS TO DO SOMETHING MEANINGFUL
209 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::ProcessEvent() In and Out"));
210 return OsclSuccess;
211 }
212 ///////////////////////////////////////////////////////////////////////////////
213 //// QUEUE OPERATIONS
214 ///////////////////////////////////////////////////////////////////////////////
215
216
217 /////CREATE QUEUE AND INIT ///////////////////
218
QueueInit(uint32 aMaxQueueDepth)219 OSCL_EXPORT_REF OsclReturnCode ThreadSafeCallbackAO::QueueInit(uint32 aMaxQueueDepth)
220 {
221
222 if (aMaxQueueDepth < 1)
223 {
224 Q = NULL;
225 return OsclErrArgument;
226 }
227
228 // create Q as simple array
229 Q = (QueueT *) oscl_malloc(sizeof(QueueT));
230 if (Q == NULL)
231 {
232 return OsclErrNoMemory;
233 }
234
235 Q->MaxNumElements = aMaxQueueDepth;
236 Q->NumElem = 0;
237 Q->index_in = 0;
238 Q->index_out = 0;
239 Q->pFirst = NULL;
240 // create the Q elements
241 Q->pFirst = (QElement *) oscl_malloc((Q->MaxNumElements) * oscl_mem_aligned_size(sizeof(QElement)));
242 if (Q->pFirst == NULL)
243 {
244 Q = NULL;
245 return OsclErrNoMemory;
246 }
247
248 // zero out the Q
249 oscl_memset(Q->pFirst, 0, (Q->MaxNumElements) * oscl_mem_aligned_size(sizeof(QElement)));
250
251
252 return OsclSuccess;
253 }
254
255 ///////////// DESTROY QUEUE ///////////////////////
QueueDeInit()256 OSCL_EXPORT_REF OsclReturnCode ThreadSafeCallbackAO::QueueDeInit()
257 {
258 OsclReturnCode status = OsclSuccess;
259
260 // free the Q elements
261 if (Q->pFirst != NULL)
262 oscl_free(Q->pFirst);
263
264 Q->pFirst = NULL;
265 // free the Q structure
266 if (Q != NULL)
267 oscl_free(Q);
268
269 Q = NULL;
270
271 return status;
272 }
273
274 /////////////QUEUE ONE ELEMENT //////////////////////
Queue(OsclAny * pData)275 OSCL_EXPORT_REF OsclReturnCode ThreadSafeCallbackAO::Queue(OsclAny *pData)
276 {
277 OsclProcStatus::eOsclProcError sema_status;
278
279 // Wait on the remote thread control semaphore. If the queue is full, must block and wait
280 // for the AO to dequeue some previous event. If the queue is not full, proceed
281 sema_status = RemoteThreadCtrlSema.Wait();
282 if (sema_status != OsclProcStatus::SUCCESS_ERROR)
283 return OsclFailure;
284
285
286 // protect queue access
287 Mutex.Lock();
288
289 if (Q->NumElem >= Q->MaxNumElements)
290 {
291 Mutex.Unlock();
292 RemoteThreadCtrlSema.Signal(); // signal the sema to maintain sema count consistency in case
293 // of error (inability to queue)
294 return OsclFailure;
295 }
296
297 (Q->pFirst[Q->index_in]).pData = pData;
298
299 Q->index_in++;
300 // roll-over the index to 0 if it reaches the end.
301 if (Q->index_in == Q->MaxNumElements)
302 Q->index_in = 0;
303
304 Q->NumElem++;
305
306 // check if AO needs to be scheduled (i.e. check if this is the first event the queue after the queue was empty)
307 if (GetQueueNumElem() == 1)
308 {
309 PendComplete(OSCL_REQUEST_ERR_NONE);
310 }
311
312 Mutex.Unlock();
313
314 return OsclSuccess;
315 }
316
317 ///////////DE-QUEUE ONE ELEMENT /////////////////////
DeQueue(OsclReturnCode & stat)318 OSCL_EXPORT_REF OsclAny* ThreadSafeCallbackAO::DeQueue(OsclReturnCode &stat)
319 {
320 OsclAny *pData;
321 OsclProcStatus::eOsclProcError sema_status;
322
323 stat = OsclSuccess;
324
325 // Protect the queue while accessing it:
326 Mutex.Lock();
327
328 if (Q->NumElem == 0)
329 {
330 // nothing to de-queue
331 stat = OsclFailure;
332 Mutex.Unlock();
333
334 return NULL;
335 }
336
337 pData = (Q->pFirst[Q->index_out]).pData;
338
339 Q->index_out++;
340 // roll-over the index
341 if (Q->index_out == Q->MaxNumElements)
342 Q->index_out = 0;
343
344 Q->NumElem--;
345
346 // check if there is need to reschedule
347 if ((Q->NumElem) > 0)
348 {
349 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() - More events, call RunIfNotReady()"));
350 RunIfNotReady();
351 }
352 else
353 {
354 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "ThreadSafeCallbackAO::Run() - No more events, call PendForExec()"));
355 PendForExec();
356 }
357
358 //release queue access
359 Mutex.Unlock();
360
361 // Signal the semaphore that controls the remote thread.
362 // The remote thread might be blocked and waiting for an event to be processed in case the event queue is full
363 sema_status = RemoteThreadCtrlSema.Signal();
364 if (sema_status != OsclProcStatus::SUCCESS_ERROR)
365 {
366 stat = OsclFailure;
367 return NULL;
368 }
369
370 return pData;
371 }
372
373 ////////// GET CURRENT QUEUE SIZE//////////////
GetQueueNumElem()374 OSCL_EXPORT_REF uint32 ThreadSafeCallbackAO::GetQueueNumElem()
375 {
376 return Q->NumElem;
377 }
378
379