1 /* ------------------------------------------------------------------
2 * Copyright (C) 1998-2009 PacketVideo
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13 * express or implied.
14 * See the License for the specific language governing permissions
15 * and limitations under the License.
16 * -------------------------------------------------------------------
17 */
18 /**
19 * @file pvmf_port_base_impl.cpp
20 * @brief Base implementation of pvmf port
21 */
22
23 #ifndef PVMF_PORT_BASE_IMPL_H_INCLUDED
24 #include "pvmf_port_base_impl.h"
25 #endif
26 #ifndef PVMF_NODE_INTERFACE_H_INCLUDED
27 #include "pvmf_node_interface.h"
28 #endif
29 #ifndef OSCL_MEM_BASIC_FUNCTIONS_H
30 #include "oscl_mem_basic_functions.h"
31 #endif
32 #include "pvmf_media_msg_format_ids.h"
33 #include "pvmf_media_data.h"
34
35 #define LOGDATAPATH(x) PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iDatapathLogger, PVLOGMSG_INFO, x);
36
Construct(uint32 aCap,uint32 aReserve,uint32 aThresh)37 void PvmfPortBaseImplQueue::Construct(uint32 aCap, uint32 aReserve, uint32 aThresh)
38 {
39 iBusy = false;
40
41 iCapacity = aCap;
42
43 if (aReserve > 0)
44 iQ.reserve(aReserve);
45
46 if (aThresh > 100)
47 aThresh = 100;
48 iThresholdPercent = aThresh;
49 iThreshold = iCapacity * iThresholdPercent / 100;
50 }
51
SetCapacity(uint32 aCap)52 PVMFStatus PvmfPortBaseImplQueue::SetCapacity(uint32 aCap)
53 {
54 iCapacity = aCap;
55 iThreshold = iCapacity * iThresholdPercent / 100;
56
57 return PVMFSuccess;
58 }
59
SetReserve(uint32 aReserve)60 PVMFStatus PvmfPortBaseImplQueue::SetReserve(uint32 aReserve)
61 {
62 if (aReserve > 0)
63 {
64 iQ.reserve(aReserve);
65 return PVMFSuccess;
66 }
67 return PVMFFailure;
68 }
69
SetThreshold(uint32 aThresh)70 PVMFStatus PvmfPortBaseImplQueue::SetThreshold(uint32 aThresh)
71 {
72 if (aThresh > 100)
73 return PVMFFailure;
74
75 iThresholdPercent = aThresh;
76 iThreshold = iCapacity * iThresholdPercent / 100;
77 return PVMFSuccess;
78 }
79
80 ////////////////////////////////////////////////////////////////////////////
81
GetPortTag() const82 OSCL_EXPORT_REF int32 PvmfPortBaseImpl::GetPortTag() const
83 {
84 return iTag;
85 }
86
PortActivity(PVMFPortActivityType aActivity)87 OSCL_EXPORT_REF void PvmfPortBaseImpl::PortActivity(PVMFPortActivityType aActivity)
88 {
89 #if (PVLOGGER_INST_LEVEL > PVLOGMSG_INST_LLDBG)
90 if (iDatapathLogger)
91 {
92 switch (aActivity)
93 {
94 case PVMF_PORT_ACTIVITY_CREATED:
95 LOGDATAPATH((0, "PORT %s Created", iPortName.get_cstr()));
96 break;
97 case PVMF_PORT_ACTIVITY_DELETED:
98 LOGDATAPATH((0, "PORT %s Deleted", iPortName.get_cstr()));
99 break;
100 case PVMF_PORT_ACTIVITY_CONNECT:
101 LOGDATAPATH((0, "PORT %s Connected", iPortName.get_cstr()));
102 break;
103 case PVMF_PORT_ACTIVITY_DISCONNECT:
104 LOGDATAPATH((0, "PORT %s Disconnected", iPortName.get_cstr()));
105 break;
106 case PVMF_PORT_ACTIVITY_OUTGOING_MSG:
107 //msg was just added to end of in Q.
108 LogMediaMsgInfo(iOutgoingQueue.iQ.back(), "Out Msg Q'd", iOutgoingQueue);
109 break;
110 case PVMF_PORT_ACTIVITY_INCOMING_MSG:
111 //msg was just added to end of in Q.
112 LogMediaMsgInfo(iIncomingQueue.iQ.back(), "In Msg Received ", iIncomingQueue);
113 break;
114 case PVMF_PORT_ACTIVITY_OUTGOING_QUEUE_BUSY:
115 LOGDATAPATH(
116 (0, "PORT %s Outgoing Q Busy, Q-depth %d/%d", iPortName.get_cstr()
117 , iOutgoingQueue.iQ.size()
118 , iOutgoingQueue.iCapacity));
119 break;
120 case PVMF_PORT_ACTIVITY_OUTGOING_QUEUE_READY:
121 LOGDATAPATH((0, "PORT %s Outgoing Q Ready, Q-depth %d/%d", iPortName.get_cstr()
122 , iOutgoingQueue.iQ.size()
123 , iOutgoingQueue.iCapacity));
124 break;
125 case PVMF_PORT_ACTIVITY_CONNECTED_PORT_BUSY:
126 LOGDATAPATH((0, "PORT %s Connected Port Busy, Q-depth %d/%d", iPortName.get_cstr()
127 , iOutgoingQueue.iQ.size()
128 , iOutgoingQueue.iCapacity));
129 break;
130 case PVMF_PORT_ACTIVITY_CONNECTED_PORT_READY:
131 LOGDATAPATH((0, "PORT %s Connected Port Ready, Q-depth %d/%d", iPortName.get_cstr()
132 , iOutgoingQueue.iQ.size()
133 , iOutgoingQueue.iCapacity));
134 break;
135 case PVMF_PORT_ACTIVITY_ERROR:
136 default:
137 break;
138 }
139 }
140 #endif
141
142 //report port activity to the activity handler
143 if (iPortActivityHandler)
144 {
145 PVMFPortActivity activity(this, aActivity);
146 iPortActivityHandler->HandlePortActivity(activity);
147 }
148 }
149
SetName(const char * name)150 OSCL_EXPORT_REF void PvmfPortBaseImpl::SetName(const char*name)
151 {
152 if (name
153 && name[0] != '\0')
154 {
155 iPortName = name;
156 iDatapathLogger = PVLogger::GetLoggerObject("datapath");
157 }
158 else
159 {
160 iDatapathLogger = NULL;
161 }
162 }
163
164 ////////////////////////////////////////////////////////////////////////////
PvmfPortBaseImpl(int32 aTag,PVMFPortActivityHandler * aNode,const char * name)165 OSCL_EXPORT_REF PvmfPortBaseImpl::PvmfPortBaseImpl(int32 aTag, PVMFPortActivityHandler* aNode, const char*name)
166 : PVMFPortInterface(aNode),
167 iConnectedPortBusy(false),
168 iInputSuspended(false),
169 iTag(aTag)
170 {
171 iLogger = PVLogger::GetLoggerObject("PvmfPortBaseImpl");
172 iIncomingQueue.Construct(DEFAULT_DATA_QUEUE_CAPACITY, DEFAULT_DATA_QUEUE_CAPACITY, DEFAULT_READY_TO_RECEIVE_THRESHOLD_PERCENT);
173 iOutgoingQueue.Construct(DEFAULT_DATA_QUEUE_CAPACITY, DEFAULT_DATA_QUEUE_CAPACITY, DEFAULT_READY_TO_RECEIVE_THRESHOLD_PERCENT);
174 SetName(name);
175 }
176
177 ////////////////////////////////////////////////////////////////////////////
PvmfPortBaseImpl(int32 aTag,PVMFPortActivityHandler * aNode,uint32 aInCapacity,uint32 aInReserve,uint32 aInThreshold,uint32 aOutCapacity,uint32 aOutReserve,uint32 aOutThreshold,const char * name)178 OSCL_EXPORT_REF PvmfPortBaseImpl::PvmfPortBaseImpl(int32 aTag
179 , PVMFPortActivityHandler* aNode
180 , uint32 aInCapacity
181 , uint32 aInReserve
182 , uint32 aInThreshold
183 , uint32 aOutCapacity
184 , uint32 aOutReserve
185 , uint32 aOutThreshold
186 , const char*name)
187 : PVMFPortInterface(aNode),
188 iConnectedPortBusy(false),
189 iInputSuspended(false),
190 iTag(aTag)
191 {
192 iLogger = PVLogger::GetLoggerObject("PvmfPortBaseImpl");
193 iIncomingQueue.Construct(aInCapacity, aInReserve, aInThreshold);
194 iOutgoingQueue.Construct(aOutCapacity, aOutReserve, aOutThreshold);
195 SetName(name);
196 }
197
198 ////////////////////////////////////////////////////////////////////////////
Construct()199 OSCL_EXPORT_REF void PvmfPortBaseImpl::Construct()
200 {
201 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
202 (0, "0x%x PvmfPortBaseImpl::Construct: iTag=%d", this, iTag));
203
204 #if PVMF_PORT_BASE_IMPL_STATS
205 oscl_memset(&iStats, 0, sizeof(PvmfPortBaseImplStats));
206 #endif
207 PortActivity(PVMF_PORT_ACTIVITY_CREATED);
208 }
209
210 ////////////////////////////////////////////////////////////////////////////
~PvmfPortBaseImpl()211 OSCL_EXPORT_REF PvmfPortBaseImpl::~PvmfPortBaseImpl()
212 {
213 if (iConnectedPort)
214 {
215 Disconnect();
216 }
217 while (!iIncomingQueue.iQ.empty())
218 {
219 PVMFSharedMediaMsgPtr msg = iIncomingQueue.iQ.front();
220 iIncomingQueue.iQ.pop();
221 #if (PVLOGGER_INST_LEVEL > PVLOGMSG_INST_LLDBG)
222 if (iDatapathLogger)
223 LogMediaMsgInfo(msg, "In Msg Cleared", iIncomingQueue);
224 #endif
225 }
226 while (!iOutgoingQueue.iQ.empty())
227 {
228 PVMFSharedMediaMsgPtr msg = iOutgoingQueue.iQ.front();
229 iOutgoingQueue.iQ.pop();
230 #if (PVLOGGER_INST_LEVEL > PVLOGMSG_INST_LLDBG)
231 if (iDatapathLogger)
232 LogMediaMsgInfo(msg, "Out Msg Cleared", iOutgoingQueue);
233 #endif
234 }
235 PortActivity(PVMF_PORT_ACTIVITY_DELETED);
236 }
237
238 ////////////////////////////////////////////////////////////////////////////
Connect(PVMFPortInterface * aPort)239 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::Connect(PVMFPortInterface* aPort)
240 {
241 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
242 (0, "0x%x PvmfPortBaseImpl::Connect: aPort=0x%x", this, aPort));
243
244 if (!aPort)
245 {
246 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
247 (0, "0x%x PvmfPortBaseImpl::Connect: Error - Connecting to invalid port", this));
248 return PVMFErrArgument;
249 }
250
251 if (iConnectedPort)
252 {
253 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
254 (0, "0x%x PvmfPortBaseImpl::Connect: Error - Already connected", this));
255 return PVMFFailure;
256 }
257
258 //Automatically connect the peer.
259 if (aPort->PeerConnect(this) != PVMFSuccess)
260 {
261 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
262 (0, "0x%x PvmfPortBaseImpl::Connect: Error - Peer Connect failed", this));
263 return PVMFFailure;
264 }
265
266 iConnectedPort = aPort;
267
268 #if PVMF_PORT_BASE_IMPL_STATS
269 // Reset statistics
270 oscl_memset(&iStats, 0, sizeof(PvmfPortBaseImplStats));
271 #endif
272
273 PortActivity(PVMF_PORT_ACTIVITY_CONNECT);
274 return PVMFSuccess;
275 }
276
277 ////////////////////////////////////////////////////////////////////////////
PeerConnect(PVMFPortInterface * aPort)278 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::PeerConnect(PVMFPortInterface* aPort)
279 {
280 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
281 (0, "0x%x PvmfPortBaseImpl::PeerConnect: aPort=0x%x", this, aPort));
282
283 if (!aPort)
284 {
285 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
286 (0, "0x%x PvmfPortBaseImpl::PeerConnect: Error - Connecting to invalid port", this));
287 return PVMFErrArgument;
288 }
289
290 if (iConnectedPort)
291 {
292 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
293 (0, "0x%x PvmfPortBaseImpl::PeerConnect: Error - Already connected", this));
294 return PVMFFailure;
295 }
296
297 iConnectedPort = aPort;
298
299 #if PVMF_PORT_BASE_IMPL_STATS
300 // Reset statistics
301 oscl_memset(&iStats, 0, sizeof(PvmfPortBaseImplStats));
302 #endif
303
304 PortActivity(PVMF_PORT_ACTIVITY_CONNECT);
305 return PVMFSuccess;
306 }
307
308 ////////////////////////////////////////////////////////////////////////////
Disconnect()309 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::Disconnect()
310 {
311 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
312 (0, "0x%x PvmfPortBaseImpl::Disconnect", this));
313
314 if (!iConnectedPort)
315 {
316 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_INFO,
317 (0, "0x%x PvmfPortBaseImpl::Disconnect: Port already disconnected", this));
318 return PVMFFailure;
319 }
320
321 //reset busy flags - this would prevent any queue / port ready
322 //events from being generated as we clear the message queues
323 iIncomingQueue.iBusy = false;
324 iOutgoingQueue.iBusy = false;
325
326 //Automatically disconnect the peer.
327 iConnectedPort->PeerDisconnect();
328
329 iConnectedPort = NULL;
330
331 ClearMsgQueues();
332
333 #if PVMF_PORT_BASE_IMPL_STATS
334 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_DEBUG,
335 (0, "0x%x PvmfPortBaseImpl::Disonnect: Stats: OutMsgQd=%d, OutMsgSent=%d, OutBusy=%d, ConnectedPortBusy=%d",
336 this, iStats.iOutgoingMsgQueued, iStats.iOutgoingMsgSent, iStats.iOutgoingQueueBusy,
337 iStats.iConnectedPortBusy));
338 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_DEBUG,
339 (0, "0x%x PvmfPortBaseImpl::Disonnect: Stats: InMsgRecv=%d, InMsgConsumed=%d, InBusy=%d",
340 this, iStats.iIncomingMsgRecv, iStats.iIncomingMsgConsumed, iStats.iIncomingQueueBusy));
341 #endif
342
343 PortActivity(PVMF_PORT_ACTIVITY_DISCONNECT);
344 return PVMFSuccess;
345 }
346
347 ////////////////////////////////////////////////////////////////////////////
PeerDisconnect()348 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::PeerDisconnect()
349 {
350 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
351 (0, "0x%x PvmfPortBaseImpl::PeerDisconnect", this));
352
353 if (!iConnectedPort)
354 {
355 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
356 (0, "0x%x PvmfPortBaseImpl::PeerDisconnect: Error - Port not connected", this));
357 return PVMFFailure;
358 }
359
360 //reset busy flags - this would prevent any queue / port ready
361 //events from being generated as we clear the message queues
362 iIncomingQueue.iBusy = false;
363 iOutgoingQueue.iBusy = false;
364
365 ClearMsgQueues();
366
367 iConnectedPort = NULL;
368
369 #if PVMF_PORT_BASE_IMPL_STATS
370 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_DEBUG,
371 (0, "0x%x PvmfPortBaseImpl::PeerDisconnect: Stats: OutMsgQd=%d, OutMsgSent=%d, OutBusy=%d, ConnectedPortBusy=%d",
372 this, iStats.iOutgoingMsgQueued, iStats.iOutgoingMsgSent, iStats.iOutgoingQueueBusy,
373 iStats.iConnectedPortBusy));
374 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_DEBUG,
375 (0, "0x%x PvmfPortBaseImpl::PeerDisconnect: Stats: InMsgRecv=%d, InMsgConsumed=%d, InBusy=%d",
376 this, iStats.iIncomingMsgRecv, iStats.iIncomingMsgConsumed, iStats.iIncomingQueueBusy));
377 #endif
378
379 PortActivity(PVMF_PORT_ACTIVITY_DISCONNECT);
380 return PVMFSuccess;
381 }
382
383 ////////////////////////////////////////////////////////////////////////////
QueueOutgoingMsg(PVMFSharedMediaMsgPtr aMsg)384 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::QueueOutgoingMsg(PVMFSharedMediaMsgPtr aMsg)
385 {
386 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
387 (0, "0x%x PvmfPortBaseImpl::QueueOutgoingMsg", this));
388
389 //If port is not connected, don't accept data on the
390 //outgoing queue.
391 if (!iConnectedPort)
392 {
393 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_DEBUG,
394 (0, "0x%x PvmfPortBaseImpl::QueueOutgoingMsg: Error - Port not connected", this));
395 return PVMFFailure;
396 }
397
398 // Queue is in busy / flushing state. Do not accept more outgoing messages until the queue
399 // is not busy, i.e. queue size drops below specified threshold or FlushComplete is called.
400 if (iOutgoingQueue.iBusy)
401 {
402 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_DEBUG,
403 (0, "0x%x PvmfPortBaseImpl::QueueOutgoingMsg: Outgoing queue in busy / flushing state", this));
404 return PVMFErrBusy;
405 }
406
407 // Add message to outgoing queue and notify the node of the activity
408 // There is no need to trap the push_back, since it cannot leave in this usage
409 // Reason being that we do a reserve in the constructor and we do not let the
410 // port queues grow indefinitely (we either a connected port busy or outgoing Q busy
411 // before we reach the reserved limit
412 iOutgoingQueue.iQ.push(aMsg);
413
414 #if PVMF_PORT_BASE_IMPL_STATS
415 ++iStats.iOutgoingMsgQueued;
416 #endif
417
418 //Attempt to queue the message directly in connected port's incoming msg queue
419 //first. If we cannot queue then we leave the msg in iOutgoingQueue
420 //Doing the push in iOutgoingQueue first followed by Receive ensure that msgs
421 //flow in FIFO order. If we did a Receive first then we would need
422 PVMFStatus status = iConnectedPort->Receive(iOutgoingQueue.iQ.front());
423 if (status == PVMFSuccess)
424 {
425 // Dequeue the message
426 PVMFSharedMediaMsgPtr msg = iOutgoingQueue.iQ.front();
427 iOutgoingQueue.iQ.pop();
428 #if (PVLOGGER_INST_LEVEL > PVLOGMSG_INST_LLDBG)
429 //log to datapath
430 if (iDatapathLogger)
431 {
432 LogMediaMsgInfo(msg, "Msg Sent Directly", iOutgoingQueue);
433 }
434 #endif
435 #if PVMF_PORT_BASE_IMPL_STATS
436 // Count this message as either sent successfully,
437 ++iStats.iOutgoingMsgSent;
438 #endif
439 return status;
440 //there is no need to queue port activity PVMF_PORT_ACTIVITY_OUTGOING_MSG
441 //here since we have successfully q'd the msg on connected port's incoming
442 //msg queue
443 }
444 else
445 {
446 PortActivity(PVMF_PORT_ACTIVITY_OUTGOING_MSG);
447 // Outgoing queue size is at capacity and goes into busy state. The owner node is
448 // notified of this transition into busy state.
449 if (isOutgoingFull())
450 {
451 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_DEBUG,
452 (0, "0x%x PvmfPortBaseImpl::QueueOutgoingMsg: Outgoing queue is full. Goes into busy state.", this));
453 iOutgoingQueue.iBusy = true;
454 PortActivity(PVMF_PORT_ACTIVITY_OUTGOING_QUEUE_BUSY);
455 #if PVMF_PORT_BASE_IMPL_STATS
456 ++iStats.iOutgoingQueueBusy;
457 #endif
458 }
459 }
460 return PVMFSuccess;
461 }
462
463 ////////////////////////////////////////////////////////////////////////////
Send()464 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::Send()
465 {
466 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
467 (0, "0x%x PvmfPortBaseImpl::Send", this));
468
469 if (!iConnectedPort)
470 {
471 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
472 (0, "0x%x PvmfPortBaseImpl::Send: Error - Port not connected", this));
473 return PVMFFailure;
474 }
475
476 if (iOutgoingQueue.iQ.empty())
477 {
478 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
479 (0, "0x%x PvmfPortBaseImpl::Send: Error - No queued message on outgoing queue", this));
480 return PVMFFailure;
481 }
482
483 if (iConnectedPortBusy)
484 {
485 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_DEBUG,
486 (0, "0x%x PvmfPortBaseImpl::Send: Connected port is in busy state", this));
487 return PVMFErrBusy;
488 }
489
490 PVMFStatus status = iConnectedPort->Receive(iOutgoingQueue.iQ.front());
491
492 //Detect connected port busy...
493 if (status == PVMFErrBusy)
494 {
495 //don't dequeue the message-- save it for
496 //later.
497 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_DEBUG,
498 (0, "0x%x PvmfPortBaseImpl::Send: Receive failed. Connected port is in busy state", this));
499 iConnectedPortBusy = true;
500 PortActivity(PVMF_PORT_ACTIVITY_CONNECTED_PORT_BUSY);
501 #if PVMF_PORT_BASE_IMPL_STATS
502 ++iStats.iConnectedPortBusy;
503 #endif
504 return status;
505 }
506
507 // Dequeue the message
508 {
509 PVMFSharedMediaMsgPtr msg = iOutgoingQueue.iQ.front();
510 iOutgoingQueue.iQ.pop();
511 #if (PVLOGGER_INST_LEVEL > PVLOGMSG_INST_LLDBG)
512 //log to datapath
513 if (iDatapathLogger)
514 {
515 switch (status)
516 {
517 case PVMFSuccess:
518 LogMediaMsgInfo(msg, "Msg Sent", iOutgoingQueue);
519 break;
520 default:
521 LogMediaMsgInfo(msg, "Send Failed-- Msg Dropped!", iOutgoingQueue);
522 break;
523 }
524 }
525 #endif
526 }
527
528 #if PVMF_PORT_BASE_IMPL_STATS
529 // Count this message as either sent successfully,
530 // or discarded.
531 if (status == PVMFSuccess)
532 ++iStats.iOutgoingMsgSent;
533 else
534 ++iStats.iOutgoingMsgDiscarded;
535 #endif
536
537 // Notify the node if the queue was busy and the queue size has just dropped
538 // below the threshold. The node can resume calling QueueOutgoingMsg.
539 if (iOutgoingQueue.iBusy)
540 EvaluateOutgoingBusy();
541
542 return status;
543 }
544
EvaluateOutgoingBusy()545 OSCL_EXPORT_REF void PvmfPortBaseImpl::EvaluateOutgoingBusy()
546 {
547 // Notify the node if the queue was busy and the queue size has just dropped
548 // below the threshold. The node can resume calling QueueOutgoingMsg.
549 if (iOutgoingQueue.iBusy && !isOutgoingFull())
550 {
551 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_DEBUG,
552 (0, "0x%x PvmfPortBaseImpl::Send: Outgoing queue ready", this));
553 iOutgoingQueue.iBusy = false;
554 PortActivity(PVMF_PORT_ACTIVITY_OUTGOING_QUEUE_READY);
555 }
556 }
557
558 ////////////////////////////////////////////////////////////////////////////
isOutgoingFull()559 OSCL_EXPORT_REF bool PvmfPortBaseImpl::isOutgoingFull()
560 //derived class can override this to redefine "queue full" condition.
561 {
562 if (iOutgoingQueue.iThreshold == 0)
563 {
564 //should never happen
565 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
566 (0, "0x%x PvmfPortBaseImpl::isOutgoingFull: Zero Threshold", this));
567 OSCL_ASSERT(false);
568 return true;
569 }
570 if (iOutgoingQueue.iQ.size() == iOutgoingQueue.iCapacity)
571 {
572 return true;
573 }
574 if (iOutgoingQueue.iBusy == true)
575 {
576 //this implies that we were at capacity previously
577 //wait for occupancy to fall below threshold before
578 //treating the queue as "not-full"
579 if (iOutgoingQueue.iQ.size() >= iOutgoingQueue.iThreshold)
580 {
581 return true;
582 }
583 }
584 //this means that we are below capacity and are approaching
585 //capacity, or that we have fallen below threshold. in either
586 //case treat queue as "not-full"
587 return false;
588 }
589
590
591
LogMediaMsgInfo(PVMFSharedMediaMsgPtr aMediaMsg,const char * msg,PvmfPortBaseImplQueue & q)592 void PvmfPortBaseImpl::LogMediaMsgInfo(PVMFSharedMediaMsgPtr aMediaMsg, const char* msg, PvmfPortBaseImplQueue&q)
593 //log media msg info, description, and associated q-depth.
594 {
595 // to avoid compiler warnings when logger is not available
596 OSCL_UNUSED_ARG(msg);
597 OSCL_UNUSED_ARG(q);
598
599 switch (aMediaMsg->getFormatID())
600 {
601 case PVMF_MEDIA_CMD_BOS_FORMAT_ID:
602 {
603 LOGDATAPATH(
604 (0, "PORT %s %s MediaCmd FmtId %s, SeqNum %d, SId %d, TS %d, Q-depth %d/%d", iPortName.get_cstr()
605 , msg
606 , "BOS"
607 , aMediaMsg->getSeqNum()
608 , aMediaMsg->getStreamID()
609 , aMediaMsg->getTimestamp()
610 , q.iQ.size()
611 , q.iCapacity
612 ));
613 }
614 break;
615 case PVMF_MEDIA_CMD_EOS_FORMAT_ID:
616 {
617 LOGDATAPATH(
618 (0, "PORT %s %s MediaCmd FmtId %s, SeqNum %d, SId %d, TS %d, Q-depth %d/%d", iPortName.get_cstr()
619 , msg
620 , "EOS"
621 , aMediaMsg->getSeqNum()
622 , aMediaMsg->getStreamID()
623 , aMediaMsg->getTimestamp()
624 , q.iQ.size()
625 , q.iCapacity
626 ));
627 }
628 break;
629 case PVMF_MEDIA_MSG_DATA_FORMAT_ID:
630 {
631 PVMFSharedMediaDataPtr mediaData;
632 convertToPVMFMediaData(mediaData, aMediaMsg);
633 LOGDATAPATH(
634 (0, "PORT %s %s MediaData SeqNum %d, SId %d, TS %d, Q-depth %d/%d", iPortName.get_cstr()
635 , msg
636 , mediaData->getSeqNum()
637 , mediaData->getStreamID()
638 , mediaData->getTimestamp()
639 , q.iQ.size()
640 , q.iCapacity
641 ));
642 }
643 break;
644 default:
645 {
646 LOGDATAPATH(
647 (0, "PORT %s %s MediaCmd FmtId %d, SeqNum %d, SId %d, TS %d, Q-depth %d/%d", iPortName.get_cstr()
648 , msg
649 , aMediaMsg->getFormatID()
650 , aMediaMsg->getSeqNum()
651 , aMediaMsg->getStreamID()
652 , aMediaMsg->getTimestamp()
653 , q.iQ.size()
654 , q.iCapacity
655 ));
656 }
657 break;
658 }
659 }
660
LogMediaMsgInfo(PVMFSharedMediaMsgPtr aMediaMsg,const char * msg,int32 qsize)661 OSCL_EXPORT_REF void PvmfPortBaseImpl::LogMediaMsgInfo(PVMFSharedMediaMsgPtr aMediaMsg, const char* msg, int32 qsize)
662 //log media msg info, description, and associated q-depth.
663 {
664 OSCL_UNUSED_ARG(msg);
665 OSCL_UNUSED_ARG(qsize);
666
667 if (!iDatapathLogger)
668 return;
669
670 switch (aMediaMsg->getFormatID())
671 {
672 case PVMF_MEDIA_CMD_EOS_FORMAT_ID:
673 {
674 LOGDATAPATH(
675 (0, "PORT %s %s MediaCmd FmtId %s, SeqNum %d, SId %d, TS %d, Q-depth %d/%d", iPortName.get_cstr()
676 , msg
677 , "EOS"
678 , aMediaMsg->getSeqNum()
679 , aMediaMsg->getStreamID()
680 , aMediaMsg->getTimestamp()
681 , qsize
682 ));
683 }
684 break;
685 case PVMF_MEDIA_MSG_DATA_FORMAT_ID:
686 {
687 PVMFSharedMediaDataPtr mediaData;
688 convertToPVMFMediaData(mediaData, aMediaMsg);
689 LOGDATAPATH(
690 (0, "PORT %s %s MediaData SeqNum %d, SId %d, TS %d, Q-depth %d/%d", iPortName.get_cstr()
691 , msg
692 , mediaData->getSeqNum()
693 , mediaData->getStreamID()
694 , mediaData->getTimestamp()
695 , qsize
696 ));
697 }
698 break;
699 default:
700 {
701 LOGDATAPATH(
702 (0, "PORT %s %s MediaCmd FmtId %d, SeqNum %d, SId %d, TS %d, Q-depth %d/%d", iPortName.get_cstr()
703 , msg
704 , aMediaMsg->getFormatID()
705 , aMediaMsg->getSeqNum()
706 , aMediaMsg->getStreamID()
707 , aMediaMsg->getTimestamp()
708 , qsize
709 ));
710 }
711 break;
712 }
713 }
714
LogMediaDataInfo(PVMFSharedMediaDataPtr aMediaData,const char * msg,int32 qsize)715 OSCL_EXPORT_REF void PvmfPortBaseImpl::LogMediaDataInfo(PVMFSharedMediaDataPtr aMediaData, const char* msg, int32 qsize)
716 //log media data info, description, and associated q-depth.
717 {
718 OSCL_UNUSED_ARG(aMediaData);
719 OSCL_UNUSED_ARG(msg);
720 OSCL_UNUSED_ARG(qsize);
721
722 if (!iDatapathLogger)
723 return;
724
725 LOGDATAPATH(
726 (0, "PORT %s %s MediaData SeqNum %d, SId %d, TS %d, Q-depth %d/%d", iPortName.get_cstr()
727 , msg
728 , aMediaData->getSeqNum()
729 , aMediaData->getStreamID()
730 , aMediaData->getTimestamp()
731 , qsize
732 ));
733
734 }
735
736 ////////////////////////////////////////////////////////////////////////////
Receive(PVMFSharedMediaMsgPtr aMsg)737 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::Receive(PVMFSharedMediaMsgPtr aMsg)
738 {
739 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
740 (0, "0x%x PvmfPortBaseImpl::Receive", this));
741
742 // Incoming queue is in flush state. Do not receive more messages.
743 if (iInputSuspended)
744 {
745 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
746 (0, "0x%x PvmfPortBaseImpl::Receive: Incoming queue is flushing", this));
747 return PVMFErrInvalidState;
748 }
749
750 // Incoming queue is in busy state. Do not receive more messages until the queue
751 // is not busy, i.e. queue size drops below specified threshold.
752 if (iIncomingQueue.iBusy)
753 {
754 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_DEBUG,
755 (0, "0x%x PvmfPortBaseImpl::Receive: Incoming queue is busy", this));
756 return PVMFErrBusy;
757 }
758
759 // Add message to queue and notify the node of the activity
760 // There is no need to trap the push_back, since it cannot leave in this usage
761 // Reason being that we do a reserve in the constructor and we do not let the
762 // port queues grow indefinitely (we either a connected port busy or outgoing Q busy
763 // before we reach the reserved limit
764 iIncomingQueue.iQ.push(aMsg);
765
766 PortActivity(PVMF_PORT_ACTIVITY_INCOMING_MSG);
767 #if PVMF_PORT_BASE_IMPL_STATS
768 ++iStats.iIncomingMsgRecv;
769 #endif
770
771 // Incoming queue size is at capacity and goes into busy state.
772 if (iIncomingQueue.iThreshold != 0 &&
773 iIncomingQueue.iQ.size() >= iIncomingQueue.iCapacity)
774 {
775 iIncomingQueue.iBusy = true;
776 #if (PVLOGGER_INST_LEVEL > PVLOGMSG_INST_LLDBG)
777 //log transitions to Full state
778 if (iDatapathLogger)
779 {
780 LOGDATAPATH(
781 (0, "PORT %s Incoming Q Busy, Q-depth %d/%d", iPortName.get_cstr()
782 , iIncomingQueue.iQ.size()
783 , iIncomingQueue.iCapacity));
784 }
785 #endif
786
787 #if PVMF_PORT_BASE_IMPL_STATS
788 ++iStats.iIncomingQueueBusy;
789 #endif
790 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_DEBUG,
791 (0, "0x%x PvmfPortBaseImpl::Receive: Incoming queue is full. Goes into busy state.", this));
792 }
793
794 return PVMFSuccess;
795 }
796
797 ////////////////////////////////////////////////////////////////////////////
DequeueIncomingMsg(PVMFSharedMediaMsgPtr & aMsg)798 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::DequeueIncomingMsg(PVMFSharedMediaMsgPtr& aMsg)
799 {
800 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
801 (0, "0x%x PvmfPortBaseImpl::DequeueIncomingMsg", this));
802
803 if (iIncomingQueue.iQ.empty())
804 {
805 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
806 (0, "0x%x PvmfPortBaseImpl::DequeueIncomingMsg: Error - Incoming queue is empty", this));
807 return PVMFFailure;
808 }
809
810 // Save message to output parameter and remove it from queue
811 aMsg = iIncomingQueue.iQ.front();
812 iIncomingQueue.iQ.pop();
813
814 #if (PVLOGGER_INST_LEVEL > PVLOGMSG_INST_LLDBG)
815 if (iDatapathLogger)
816 LogMediaMsgInfo(aMsg, "In Msg De-Q'd", iIncomingQueue);
817 #endif
818
819 #if PVMF_PORT_BASE_IMPL_STATS
820 ++iStats.iIncomingMsgConsumed;
821 #endif
822
823 // Notify the sender port if the queue was busy and the queue size has just dropped
824 // below the threshold. Sender port can resume calling Receive.
825 if (iIncomingQueue.iBusy)
826 EvaluateIncomingBusy();
827
828 return PVMFSuccess;
829 }
830
EvaluateIncomingBusy()831 OSCL_EXPORT_REF void PvmfPortBaseImpl::EvaluateIncomingBusy()
832 {
833 // Notify the sender port if the queue was busy and the queue size has just dropped
834 // below the threshold. Sender port can resume calling Receive.
835 if (iIncomingQueue.iBusy && !isIncomingFull())
836 {
837 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_DEBUG,
838 (0, "0x%x PvmfPortBaseImpl::DequeueIncomingMsg: Ready to receive more messages", this));
839 iIncomingQueue.iBusy = false;
840 if (iConnectedPort)
841 iConnectedPort->ReadyToReceive();
842 }
843 }
844
845 ////////////////////////////////////////////////////////////////////////////
isIncomingFull()846 OSCL_EXPORT_REF bool PvmfPortBaseImpl::isIncomingFull()
847 //derived class can override this to redefine "queue full" condition.
848 {
849 if (iIncomingQueue.iThreshold == 0)
850 {
851 //should never happen
852 PVLOGGER_LOGMSG(PVLOGMSG_INST_REL, iLogger, PVLOGMSG_ERR,
853 (0, "0x%x PvmfPortBaseImpl::isIncomingFull: Zero Threshold", this));
854 OSCL_ASSERT(false);
855 return true;
856 }
857 if (iIncomingQueue.iQ.size() == iIncomingQueue.iCapacity)
858 {
859 return true;
860 }
861 if (iIncomingQueue.iBusy == true)
862 {
863 //this implies that we were at capacity previously
864 //wait for occupancy to fall below threshold before
865 //treating the queue as "not-full"
866 if (iIncomingQueue.iQ.size() >= iIncomingQueue.iThreshold)
867 {
868 return true;
869 }
870 }
871 //this means that we are below capacity and are approaching
872 //capacity, or that we have fallen below threshold. in either
873 //case treat queue as "not-full"
874 return false;
875 }
876
877 ////////////////////////////////////////////////////////////////////////////
ReadyToReceive()878 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::ReadyToReceive()
879 {
880 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
881 (0, "0x%x PvmfPortBaseImpl::ReadyToReceive", this));
882
883 // Notify the node to wake up and resume processing the outgoing queue of this port
884 iConnectedPortBusy = false;
885 PortActivity(PVMF_PORT_ACTIVITY_CONNECTED_PORT_READY);
886 return PVMFSuccess;
887 }
888
889 ////////////////////////////////////////////////////////////////////////////
ClearMsgQueues()890 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::ClearMsgQueues()
891 {
892 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
893 (0, "0x%x PvmfPortBaseImpl::ClearMsgQueues", this));
894
895 while (!iIncomingQueue.iQ.empty())
896 {
897 PVMFSharedMediaMsgPtr msg = iIncomingQueue.iQ.front();
898 iIncomingQueue.iQ.pop();
899 #if (PVLOGGER_INST_LEVEL > PVLOGMSG_INST_LLDBG)
900 if (iDatapathLogger)
901 LogMediaMsgInfo(msg, "In Msg Cleared", iIncomingQueue);
902 #endif
903 }
904 if (iIncomingQueue.iBusy)
905 EvaluateIncomingBusy();
906
907 while (!iOutgoingQueue.iQ.empty())
908 {
909 PVMFSharedMediaMsgPtr msg = iOutgoingQueue.iQ.front();
910 iOutgoingQueue.iQ.pop();
911 #if (PVLOGGER_INST_LEVEL > PVLOGMSG_INST_LLDBG)
912 if (iDatapathLogger)
913 LogMediaMsgInfo(msg, "Out Msg Cleared", iOutgoingQueue);
914 #endif
915 }
916 if (iOutgoingQueue.iBusy)
917 EvaluateOutgoingBusy();
918 return PVMFSuccess;
919 }
920
921 ////////////////////////////////////////////////////////////////////////////
IncomingMsgQueueSize()922 OSCL_EXPORT_REF uint32 PvmfPortBaseImpl::IncomingMsgQueueSize()
923 {
924 return iIncomingQueue.iQ.size();
925 }
926
927 ////////////////////////////////////////////////////////////////////////////
OutgoingMsgQueueSize()928 OSCL_EXPORT_REF uint32 PvmfPortBaseImpl::OutgoingMsgQueueSize()
929 {
930 return iOutgoingQueue.iQ.size();
931 }
932
933 ////////////////////////////////////////////////////////////////////////////
IsOutgoingQueueBusy()934 OSCL_EXPORT_REF bool PvmfPortBaseImpl::IsOutgoingQueueBusy()
935 {
936 return iOutgoingQueue.iBusy;
937 }
938
939 ////////////////////////////////////////////////////////////////////////////
IsConnectedPortBusy()940 OSCL_EXPORT_REF bool PvmfPortBaseImpl::IsConnectedPortBusy()
941 {
942 return iConnectedPortBusy;
943 }
944
945 ////////////////////////////////////////////////////////////////////////////
SuspendInput()946 OSCL_EXPORT_REF void PvmfPortBaseImpl::SuspendInput()
947 {
948 iInputSuspended = true;
949 }
950
951 ////////////////////////////////////////////////////////////////////////////
ResumeInput()952 OSCL_EXPORT_REF void PvmfPortBaseImpl::ResumeInput()
953 {
954 iInputSuspended = false;
955 }
956
957 ////////////////////////////////////////////////////////////////////////////
GetCapacity(TPvmfPortBaseImplQueueType aType)958 OSCL_EXPORT_REF uint32 PvmfPortBaseImpl::GetCapacity(TPvmfPortBaseImplQueueType aType)
959 {
960 if (aType == EPVIncomingDataQueue)
961 return iIncomingQueue.iCapacity;
962 return iOutgoingQueue.iCapacity;
963 }
964
965 ////////////////////////////////////////////////////////////////////////////
GetThreshold(TPvmfPortBaseImplQueueType aType)966 OSCL_EXPORT_REF uint32 PvmfPortBaseImpl::GetThreshold(TPvmfPortBaseImplQueueType aType)
967 {
968 if (aType == EPVIncomingDataQueue)
969 return iIncomingQueue.iThreshold;
970 return iOutgoingQueue.iThreshold;
971 }
972
973 ////////////////////////////////////////////////////////////////////////////
SetCapacity(TPvmfPortBaseImplQueueType aType,uint32 aCapacity)974 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::SetCapacity(TPvmfPortBaseImplQueueType aType, uint32 aCapacity)
975 {
976 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
977 (0, "0x%x PvmfPortBaseImpl::SetCapacity: aCapacity=%d", this, aCapacity));
978
979 if (aType == EPVIncomingDataQueue)
980 {
981 PVMFStatus status = iIncomingQueue.SetCapacity(aCapacity);
982 if (status == PVMFSuccess)
983 EvaluateIncomingBusy();
984 return status;
985 }
986 else
987 {
988 PVMFStatus status = iOutgoingQueue.SetCapacity(aCapacity);
989 if (status == PVMFSuccess)
990 EvaluateOutgoingBusy();
991 return status;
992 }
993 }
994
995 ////////////////////////////////////////////////////////////////////////////
SetReserve(TPvmfPortBaseImplQueueType aType,uint32 aCapacity)996 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::SetReserve(TPvmfPortBaseImplQueueType aType, uint32 aCapacity)
997 {
998 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
999 (0, "0x%x PvmfPortBaseImpl::SetCapacity: aCapacity=%d", this, aCapacity));
1000
1001 if (aType == EPVIncomingDataQueue)
1002 {
1003 PVMFStatus status = iIncomingQueue.SetReserve(aCapacity);
1004 if (status == PVMFSuccess)
1005 EvaluateIncomingBusy();
1006 return status;
1007 }
1008 else
1009 {
1010 PVMFStatus status = iOutgoingQueue.SetReserve(aCapacity);
1011 if (status == PVMFSuccess)
1012 EvaluateOutgoingBusy();
1013 return status;
1014 }
1015 }
1016
1017 ////////////////////////////////////////////////////////////////////////////
SetThreshold(TPvmfPortBaseImplQueueType aType,uint32 aThreshold)1018 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::SetThreshold(TPvmfPortBaseImplQueueType aType, uint32 aThreshold)
1019 {
1020 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
1021 (0, "0x%x PvmfPortBaseImpl::SetThreshold: aThreshold=%d", this, aThreshold));
1022 if (aType == EPVIncomingDataQueue)
1023 {
1024 PVMFStatus status = iIncomingQueue.SetThreshold(aThreshold);
1025 if (status == PVMFSuccess)
1026 EvaluateIncomingBusy();
1027 return status;
1028 }
1029 else
1030 {
1031 PVMFStatus status = iOutgoingQueue.SetThreshold(aThreshold);
1032 if (status == PVMFSuccess)
1033 EvaluateOutgoingBusy();
1034 return status;
1035 }
1036 }
1037
1038 ////////////////////////////////////////////////////////////////////////////
GetStats(PvmfPortBaseImplStats & aStats)1039 OSCL_EXPORT_REF PVMFStatus PvmfPortBaseImpl::GetStats(PvmfPortBaseImplStats& aStats)
1040 {
1041 #if PVMF_PORT_BASE_IMPL_STATS
1042 oscl_memcpy(&aStats, &iStats, sizeof(PvmfPortBaseImplStats));
1043 return PVMFSuccess;
1044 #else
1045 OSCL_UNUSED_ARG(aStats);
1046 return PVMFErrNotSupported;
1047 #endif
1048 }
1049
1050
1051
1052
1053
1054
1055
1056