1 /* ------------------------------------------------------------------
2 * Copyright (C) 1998-2009 PacketVideo
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
13 * express or implied.
14 * See the License for the specific language governing permissions
15 * and limitations under the License.
16 * -------------------------------------------------------------------
17 */
18 /**
19 * @file pv_comms_io_node_port.cpp
20 * @brief Port for Comms I/O interfacing; can be unidirectional or bidirectional
21 */
22
23 #ifndef PV_COMMS_IO_NODE_OUTPORT_H_INCLUDED
24 #include "pv_comms_io_node_port.h"
25 #endif
26 #ifndef PV_COMMS_IO_NODE_H_INCLUDED
27 #include "pv_comms_io_node.h"
28 #endif
29 #ifndef PVMF_MEDIA_MSG_FORMAT_IDS_H_INCLUDED
30 #include "pvmf_media_msg_format_ids.h"
31 #endif
32
33 #ifndef OSCL_BASE_MACROS_H_INCLUDED
34 #include "oscl_base_macros.h"
35 #endif
36
37 #define PVMIO_MEDIADATA_POOLNUM 9
38 #define MIO_PORT_QUEUE_LIMIT 10
39
40 #define INPUT_TRANSFER_MODEL_VAL ".../input/transfer_model;valtype=uint32"
41 #define OUTPUT_TRANSFER_MODEL_VAL ".../output/transfer_model;valtype=uin32"
42
43 // Logging macros
44 #define LOGDATAPATH(x) PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iDatapathLogger, PVLOGMSG_INFO, x);
45 #define LOG_STACK_TRACE(m) PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, m);
46 #define LOG_DEBUG(m) PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_DEBUG, m);
47 #define LOG_ERR(m) PVLOGGER_LOGMSG(PVLOGMSG_INST_REL,iLogger,PVLOGMSG_ERR,m);
48
49 #define IS_INPUT_PORT (iTag&PVMF_COMMSIO_NODE_INPUT_PORT_TAG)
50
51 #define IS_OUTPUT_PORT (iTag&PVMF_COMMSIO_NODE_OUTPUT_PORT_TAG)
52
53
54 ////////////////////////////////////////////////////////////////////////////
PVCommsIONodePort(int32 aPortTag,PVCommsIONode * aNode)55 PVCommsIONodePort::PVCommsIONodePort(int32 aPortTag, PVCommsIONode* aNode)
56 : OsclTimerObject(OsclActiveObject::EPriorityNominal, "PVCommsIONodePort")
57 , PvmfPortBaseImpl(aPortTag
58 //this port handles its own port activity
59 , this
60 // incoming data queue config
61 , 0
62 , DEFAULT_DATA_QUEUE_CAPACITY
63 , DEFAULT_READY_TO_RECEIVE_THRESHOLD_PERCENT
64 // outgoing data queue config
65 , 0
66 , DEFAULT_DATA_QUEUE_CAPACITY
67 , DEFAULT_READY_TO_RECEIVE_THRESHOLD_PERCENT,
68 "PVCommsIONodePort")
69 , iNode(aNode)
70 , iMediaOutputTransfer(NULL)
71 , iMediaInputTransfer(NULL)
72 , iMediaDataMemPool(PVMIO_MEDIADATA_POOLNUM)
73 , iState(PVCommsIONodePort::PORT_STATE_BUFFERING)
74 , iWriteState(EWriteOK)
75 , iActiveCommsWrite(true)
76 , iActiveCommsRead(false)
77 , iEndOfInputPortDataReached(false)
78 , iEndOfOutputPortDataReached(false)
79 {
80 cmdId = 0;
81 AddToScheduler();
82 iFormatType = PVMF_MIME_FORMAT_UNKNOWN;
83 iWriteFailed = false;
84 iPeer = NULL;
85 iCleanupQueue.reserve(1);
86 iWriteAsyncContext = 0;
87 iQueueLimit = MIO_PORT_QUEUE_LIMIT;
88 iTxLogger = iRxLogger = NULL;
89
90 switch (iTag)
91 {
92 case PVMF_COMMSIO_NODE_INPUT_PORT_TAG:
93 // configure default incoming queue size,
94 // and we don't need outgoing queue
95 SetCapacity(EPVOutgoingDataQueue, 0);
96 SetReserve(EPVOutgoingDataQueue, 0);
97 SetThreshold(EPVOutgoingDataQueue, 0);
98 if (iNode->iLogBitstream)
99 {
100 iTxLogger = PVLogger::GetLoggerObject("pvcommionode.tx.bin");
101 iTxLogger->DisableAppenderInheritance();
102 }
103 break;
104 case PVMF_COMMSIO_NODE_OUTPUT_PORT_TAG:
105 // configure default outgoing queue size,
106 // and we don't need icomiing queue
107 SetCapacity(EPVIncomingDataQueue, 0);
108 SetReserve(EPVIncomingDataQueue, 0);
109 SetThreshold(EPVIncomingDataQueue, 0);
110 if (iNode->iLogBitstream)
111 {
112 iRxLogger = PVLogger::GetLoggerObject("pvcommionode.rx.bin");
113 iRxLogger->DisableAppenderInheritance();
114 }
115 break;
116 default:
117 if (iNode->iLogBitstream)
118 {
119 iTxLogger = PVLogger::GetLoggerObject("pvcommionode.tx.bin");
120 iRxLogger = PVLogger::GetLoggerObject("pvcommionode.rx.bin");
121 iTxLogger->DisableAppenderInheritance();
122 iRxLogger->DisableAppenderInheritance();
123 }
124 // defaults are already set for I/O
125 break;
126 }
127
128 #ifdef USE_COPY_BUFFER
129 for (int i = 0; i < NUM_COPY_BUFFERS; i++)
130 {
131 oscl_memset(&iCopyBuffer[i][0], 0, COPY_BUFFER_SIZE);
132 iCopyBufferSize[i] = 0;
133 }
134 iCopyBufferIndex = 0;
135 iCopyBufferSent = false;
136 #endif
137 }
138
ClearCleanupQueue()139 void PVCommsIONodePort::ClearCleanupQueue()
140 {//clear the media transfer cleanup queue and log all messages.
141 while (!iCleanupQueue.empty())
142 {
143 PVMFSharedMediaDataPtr mediaData = iCleanupQueue.begin()->iData;
144 PVMFCommandId cmdId = iCleanupQueue.begin()->iCmdId;
145 iCleanupQueue.erase(iCleanupQueue.begin());
146 LogMediaDataInfo("Cleared"
147 , mediaData
148 , cmdId
149 , iCleanupQueue.size()
150 );
151 }
152 }
153
154
155 ////////////////////////////////////////////////////////////////////////////
~PVCommsIONodePort()156 PVCommsIONodePort::~PVCommsIONodePort()
157 {
158 PvmfPortBaseImpl::ClearMsgQueues();
159
160 //cancel any pending write operations
161 if (!iCleanupQueue.empty())
162 {
163 int32 err;
164 OSCL_TRY(err, iMediaOutputTransfer->cancelAllCommands(););
165 ClearCleanupQueue();
166 }
167
168 }
169
170 ////////////////////////////////////////////////////////////////////////////
Start()171 OSCL_EXPORT_REF void PVCommsIONodePort::Start()
172 {
173 iState = PVCommsIONodePort::PORT_STATE_STARTED;
174 if (iNode->iMediaIOState == PVCommsIONode::MIO_STATE_STARTED)
175 {
176 RunIfNotReady();
177 }
178 }
179
180 ////////////////////////////////////////////////////////////////////////////
MediaIOStarted()181 OSCL_EXPORT_REF void PVCommsIONodePort::MediaIOStarted()
182 {
183 if (iState == PVCommsIONodePort::PORT_STATE_STARTED)
184 RunIfNotReady();
185 }
186
187 ////////////////////////////////////////////////////////////////////////////
Pause()188 OSCL_EXPORT_REF void PVCommsIONodePort::Pause()
189 {
190 iState = PVCommsIONodePort::PORT_STATE_BUFFERING;
191 }
192
193 ////////////////////////////////////////////////////////////////////////////
Stop()194 OSCL_EXPORT_REF void PVCommsIONodePort::Stop()
195 {
196 ClearMsgQueues();
197 iState = PVCommsIONodePort::PORT_STATE_BUFFERING;
198 }
199
200 ////////////////////////////////////////////////////////////////////////////
Configure(PVMFFormatType aPortProperty)201 OSCL_EXPORT_REF PVMFStatus PVCommsIONodePort::Configure(PVMFFormatType aPortProperty)
202 {
203 if (iConnectedPort)
204 {
205 // Must disconnect before changing port properties, so return error
206 return PVMFFailure;
207 }
208
209 iFormatType = aPortProperty;
210 return PVMFSuccess;
211 }
212
213 ////////////////////////////////////////////////////////////////////////////
Connect(PVMFPortInterface * aPort)214 OSCL_EXPORT_REF PVMFStatus PVCommsIONodePort::Connect(PVMFPortInterface* aPort)
215 {
216 PVMFStatus status = PvmfPortBaseImpl::Connect(aPort);
217
218 if (status != PVMFSuccess)
219 return status;
220
221 if (!iNode->CreateMediaTransfer(iTag, iMediaInputTransfer, iMediaOutputTransfer))
222 return PVMFFailure;
223
224 if (iMediaInputTransfer)
225 iMediaInputTransfer->setPeer(this);
226 if (iMediaOutputTransfer &&
227 iMediaOutputTransfer != iMediaInputTransfer)
228 iMediaOutputTransfer->setPeer(this);
229
230 iEndOfInputPortDataReached = false;
231 iEndOfOutputPortDataReached = false;
232
233 SetDataTransferModels();
234 return PVMFSuccess;
235 }
236
237 ////////////////////////////////////////////////////////////////////////////
PeerConnect(PVMFPortInterface * aPort)238 OSCL_EXPORT_REF PVMFStatus PVCommsIONodePort::PeerConnect(PVMFPortInterface* aPort)
239 {
240 PVMFStatus status = PvmfPortBaseImpl::PeerConnect(aPort);
241 if (status != PVMFSuccess)
242 return status;
243
244 if (!iNode->CreateMediaTransfer(iTag, iMediaInputTransfer, iMediaOutputTransfer))
245 return PVMFFailure;
246
247 if (iMediaInputTransfer)
248 iMediaInputTransfer->setPeer(this);
249 if (iMediaOutputTransfer &&
250 iMediaOutputTransfer != iMediaInputTransfer)
251 iMediaOutputTransfer->setPeer(this);
252
253 SetDataTransferModels();
254
255 return PVMFSuccess;
256 }
257
258 ////////////////////////////////////////////////////////////////////////////
Disconnect()259 OSCL_EXPORT_REF PVMFStatus PVCommsIONodePort::Disconnect()
260 {
261 PVMFStatus status = PvmfPortBaseImpl::Disconnect();
262 if (status != PVMFSuccess)
263 return status;
264
265 iNode->DeleteMediaTransfer(iTag, iMediaInputTransfer, iMediaOutputTransfer);
266 return PVMFSuccess;
267 }
268
269 ////////////////////////////////////////////////////////////////////////////
PeerDisconnect()270 OSCL_EXPORT_REF PVMFStatus PVCommsIONodePort::PeerDisconnect()
271 {
272 PVMFStatus status = PvmfPortBaseImpl::PeerDisconnect();
273 if (status != PVMFSuccess)
274 return status;
275
276 iNode->DeleteMediaTransfer(iTag, iMediaInputTransfer, iMediaOutputTransfer);
277 return PVMFSuccess;
278 }
279
280 ////////////////////////////////////////////////////////////////////////////
setPeer(PvmiMediaTransfer * aPeer)281 void PVCommsIONodePort::setPeer(PvmiMediaTransfer *aPeer)
282 {
283 iPeer = aPeer;
284 }
285
286 ////////////////////////////////////////////////////////////////////////////
useMemoryAllocators(OsclMemAllocator * write_alloc)287 void PVCommsIONodePort::useMemoryAllocators(OsclMemAllocator* write_alloc)
288 {
289 OSCL_UNUSED_ARG(write_alloc);
290 OSCL_LEAVE(OsclErrNotSupported);
291 }
292
293 ////////////////////////////////////////////////////////////////////////////
writeAsync(uint8 format_type,int32 format_index,uint8 * data,uint32 data_len,const PvmiMediaXferHeader & data_header_info,OsclAny * aContext)294 PVMFCommandId PVCommsIONodePort::writeAsync(uint8 format_type, int32 format_index, uint8* data, uint32 data_len,
295 const PvmiMediaXferHeader& data_header_info, OsclAny* aContext)
296 {
297 OSCL_UNUSED_ARG(format_type);
298 OSCL_UNUSED_ARG(format_index);
299
300 // TODO: Handle incoming data here. Create a media data using PvmiMIOSourceDataBufferAlloc::allocate,
301 // save the data there, put the media data to outgoing queue.
302 // If the port is started, schedule to send in Run
303 if (!IS_OUTPUT_PORT || isActiveCommsRead())
304 {
305 // Not configured for input or passive read
306 OsclError::Leave(OsclErrNotSupported);
307 }
308
309 PVLOGGER_LOGBIN(PVLOGMSG_INST_LLDBG, iRxLogger, PVLOGMSG_DEBUG, (0, 1, data_len, data));
310
311 //if the outgoing queue is full, we can't accept data
312 //now.
313 if (IsOutgoingQueueBusy())
314 {
315 iWriteFailed = true;
316 OsclError::Leave(OsclErrGeneral);
317 }
318
319 // Create new media data buffer
320 PVMFSharedMediaDataPtr mediaData;
321 int32 err = 0;
322 if (cmdId == 0x7FFFFFFF)
323 cmdId = 0;
324 OSCL_TRY(err,
325 OsclSharedPtr<PVMFMediaDataImpl> mediaDataImpl = iMediaDataAlloc.allocate(iMediaInputTransfer, data,
326 data_len, cmdId, aContext);
327 mediaData = PVMFMediaData::createMediaData(mediaDataImpl, &iMediaDataMemPool);
328 );
329
330 // Set timestamp
331 mediaData->setTimestamp(data_header_info.timestamp);
332 mediaData->setSeqNum(data_header_info.seq_num);
333 mediaData->setMediaFragFilledLen(0, data_len);
334
335 // Convert media data to MediaMsg
336 PVMFSharedMediaMsgPtr mediaMsg;
337 convertToPVMFMediaMsg(mediaMsg, mediaData);
338
339
340 if (mediaMsg->getFormatID() == PVMF_MEDIA_CMD_EOS_FORMAT_ID)
341 {
342 iEndOfOutputPortDataReached = true;
343 RunIfNotReady();
344 }
345
346 PVMFStatus status = QueueOutgoingMsg(mediaMsg);
347 if (status != PVMFSuccess)
348 iNode->ReportErrorEvent(PVMFErrPortProcessing, (OsclAny*)status);
349
350 if (iState == PVCommsIONodePort::PORT_STATE_STARTED)
351 RunIfNotReady();
352 return cmdId++;
353 }
354
355 ////////////////////////////////////////////////////////////////////////////
356
writeComplete(PVMFStatus status,PVMFCommandId aCmdId,OsclAny * aContext)357 void PVCommsIONodePort::writeComplete(PVMFStatus status, PVMFCommandId aCmdId, OsclAny* aContext)
358 //for PvmiMediaTransfer
359 {
360 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_INFO,
361 (0, "PVCommsIONodePort::writeComplete status %d cmdId %d context 0x%x", status, aCmdId, aContext));
362
363
364
365 #ifdef USE_COPY_BUFFER
366 PVLOGGER_LOGBIN(PVLOGMSG_INST_LLDBG, iTxLogger, PVLOGMSG_DEBUG, (0, 1, iCopyBufferSize[(iCopyBufferIndex - 1) % NUM_COPY_BUFFERS], iCopyBuffer[(iCopyBufferIndex -1) % NUM_COPY_BUFFERS]));
367 iCopyBufferSize[(iCopyBufferIndex - 1) % NUM_COPY_BUFFERS] = 0;
368 iCopyBufferSent = false;
369 RunIfNotReady();
370 return;
371 #endif
372
373 //we don't expect any error status to be returned here
374 //except possibly cancelled
375 if (status != PVMFSuccess
376 && status != PVMFErrCancelled)
377 iNode->ReportErrorEvent(PVMFErrPortProcessing, NULL, PVCommsIONodeErr_WriteComplete);
378
379 //detect cases where the current call is completing synchronously.
380 if (iWriteState == EWriteBusy
381 && (uint32)aContext == iWriteAsyncContext)
382 {
383 //synchronous completion
384 iWriteState = EWriteOK;
385 }
386 else
387 {
388 //asynchronous completion
389 //do any memory cleanup
390 uint32 i;
391 for (i = 0; i < iCleanupQueue.size(); i++)
392 {
393 if (iCleanupQueue[i].iCmdId == aCmdId)
394 {
395 PVMFSharedMediaDataPtr mediaData = iCleanupQueue[i].iData;
396 iCleanupQueue.erase(&iCleanupQueue[i]);
397
398 LogMediaDataInfo("Async Write Complete"
399 , mediaData
400 , aCmdId
401 , iCleanupQueue.size()
402 );
403 //may need to generate port flow control now
404 PvmfPortBaseImpl::EvaluateIncomingBusy();
405 break;
406 }
407 }
408 //we may be waiting on completion of EOS
409 if (EndOfData(PVMF_COMMSIO_NODE_INPUT_PORT_TAG))
410 RunIfNotReady();
411 }
412 }
413
414
415
416 ////////////////////////////////////////////////////////////////////////////
readAsync(uint8 * data,uint32 max_data_len,OsclAny * context,int32 * formats,uint16 num_formats)417 PVMFCommandId PVCommsIONodePort::readAsync(uint8* data, uint32 max_data_len, OsclAny* context,
418 int32* formats, uint16 num_formats)
419 {
420 OSCL_UNUSED_ARG(data);
421 OSCL_UNUSED_ARG(max_data_len);
422 OSCL_UNUSED_ARG(context);
423 OSCL_UNUSED_ARG(formats);
424 OSCL_UNUSED_ARG(num_formats);
425
426 if (!IS_INPUT_PORT || isActiveCommsWrite())
427 {
428 OsclError::Leave(OsclErrNotSupported);
429 }
430
431 return -1;
432 }
433
434 ////////////////////////////////////////////////////////////////////////////
readComplete(PVMFStatus status,PVMFCommandId read_cmd_id,int32 format_index,const PvmiMediaXferHeader & data_header_info,OsclAny * context)435 void PVCommsIONodePort::readComplete(PVMFStatus status, PVMFCommandId read_cmd_id,
436 int32 format_index, const PvmiMediaXferHeader& data_header_info,
437 OsclAny* context)
438 {
439 OSCL_UNUSED_ARG(status);
440 OSCL_UNUSED_ARG(read_cmd_id);
441 OSCL_UNUSED_ARG(format_index);
442 OSCL_UNUSED_ARG(data_header_info);
443 OSCL_UNUSED_ARG(context);
444 OSCL_LEAVE(OsclErrNotSupported);
445 }
446
447 ////////////////////////////////////////////////////////////////////////////
cancelCommand(PVMFCommandId command_id)448 void PVCommsIONodePort::cancelCommand(PVMFCommandId command_id)
449 {
450 OSCL_UNUSED_ARG(command_id);
451 OSCL_LEAVE(OsclErrNotSupported);
452 }
453
454 ////////////////////////////////////////////////////////////////////////////
cancelAllCommands()455 void PVCommsIONodePort::cancelAllCommands()
456 {
457 OSCL_LEAVE(OsclErrNotSupported);
458 }
459
460 ////////////////////////////////////////////////////////////////////////////
461 // PvmiCapabilityAndConfig
462 ////////////////////////////////////////////////////////////////////////////
setObserver(PvmiConfigAndCapabilityCmdObserver * aObserver)463 OSCL_EXPORT_REF void PVCommsIONodePort::setObserver(PvmiConfigAndCapabilityCmdObserver* aObserver)
464 {
465 // Not supported
466 OSCL_UNUSED_ARG(aObserver);
467 }
468
469 ////////////////////////////////////////////////////////////////////////////
getParametersSync(PvmiMIOSession session,PvmiKeyType identifier,PvmiKvp * & parameters,int & num_parameter_elements,PvmiCapabilityContext context)470 OSCL_EXPORT_REF PVMFStatus PVCommsIONodePort::getParametersSync(PvmiMIOSession session,
471 PvmiKeyType identifier,
472 PvmiKvp*& parameters,
473 int& num_parameter_elements,
474 PvmiCapabilityContext context)
475 {
476 LOG_STACK_TRACE((0, "PVCommsIONodePort::getParametersSync"));
477
478 MIOControlContextSet set;
479
480 if (!iNode)
481 {
482 LOG_ERR((0, "PVCommsIONodePort::getParametersSync: Error - Config object for media IO not available"));
483 return PVMFFailure;
484
485 }
486 set = iNode->ContextSetFromTag(iTag);
487
488 if (set.iMediaInputElement &&
489 set.iMediaInputElement->hasConfig() &&
490 (set.iMediaInputElement->iMediaIOConfig->getParametersSync(session, identifier, parameters, num_parameter_elements, context) == PVMFSuccess))
491 return PVMFSuccess;
492
493 // if this is a proxy port, we don't really have any way of determining
494 // which config we want to query, so if the input query fails, we'll
495 // try on the output config
496 if (set.iMediaOutputElement &&
497 set.iMediaOutputElement->hasConfig() &&
498 (set.iMediaOutputElement->iMediaIOConfig->getParametersSync(session, identifier, parameters, num_parameter_elements, context) == PVMFSuccess))
499 return PVMFSuccess;
500
501 return PVMFFailure;
502 }
503
504 ////////////////////////////////////////////////////////////////////////////
releaseParameters(PvmiMIOSession session,PvmiKvp * parameters,int num_elements)505 OSCL_EXPORT_REF PVMFStatus PVCommsIONodePort::releaseParameters(PvmiMIOSession session,
506 PvmiKvp* parameters,
507 int num_elements)
508 {
509 LOG_STACK_TRACE((0, "PVCommsIONodePort::releaseParameters"));
510
511 MIOControlContextSet set;
512
513 if (!iNode)
514 {
515 LOG_ERR((0, "PVCommsIONodePort::releaseParameters: Error - Config object for media IO not available"));
516 return PVMFFailure;
517
518 }
519
520 set = iNode->ContextSetFromTag(iTag);
521
522 if (set.iMediaInputElement &&
523 set.iMediaInputElement->hasConfig() &&
524 (set.iMediaInputElement->iMediaIOConfig->releaseParameters(session, parameters, num_elements) == PVMFSuccess))
525 return PVMFSuccess;
526
527 // if this is a proxy port, we don't really have any way of determining
528 // which config we want to query, so if the input query fails, we'll
529 // try on the output config
530 if (set.iMediaOutputElement &&
531 set.iMediaOutputElement->hasConfig() &&
532 (set.iMediaOutputElement->iMediaIOConfig->releaseParameters(session, parameters, num_elements) == PVMFSuccess))
533 return PVMFSuccess;
534
535 return PVMFFailure;
536 }
537
538 ////////////////////////////////////////////////////////////////////////////
createContext(PvmiMIOSession session,PvmiCapabilityContext & context)539 OSCL_EXPORT_REF void PVCommsIONodePort::createContext(PvmiMIOSession session, PvmiCapabilityContext& context)
540 {
541 OSCL_UNUSED_ARG(session);
542 OSCL_UNUSED_ARG(context);
543 }
544
545 ////////////////////////////////////////////////////////////////////////////
setContextParameters(PvmiMIOSession session,PvmiCapabilityContext & context,PvmiKvp * parameters,int num_parameter_elements)546 OSCL_EXPORT_REF void PVCommsIONodePort::setContextParameters(PvmiMIOSession session,
547 PvmiCapabilityContext& context,
548 PvmiKvp* parameters, int num_parameter_elements)
549 {
550 OSCL_UNUSED_ARG(session);
551 OSCL_UNUSED_ARG(context);
552 OSCL_UNUSED_ARG(parameters);
553 OSCL_UNUSED_ARG(num_parameter_elements);
554 }
555
556 ////////////////////////////////////////////////////////////////////////////
DeleteContext(PvmiMIOSession session,PvmiCapabilityContext & context)557 OSCL_EXPORT_REF void PVCommsIONodePort::DeleteContext(PvmiMIOSession session, PvmiCapabilityContext& context)
558 {
559 OSCL_UNUSED_ARG(session);
560 OSCL_UNUSED_ARG(context);
561 }
562
563 ////////////////////////////////////////////////////////////////////////////
setParametersSync(PvmiMIOSession session,PvmiKvp * parameters,int num_elements,PvmiKvp * & ret_kvp)564 OSCL_EXPORT_REF void PVCommsIONodePort::setParametersSync(PvmiMIOSession session, PvmiKvp* parameters,
565 int num_elements, PvmiKvp*& ret_kvp)
566 {
567 LOG_STACK_TRACE((0, "PVCommsIONodePort::setParametersSync"));
568
569 MIOControlContextSet set;
570
571 if (!iNode)
572 {
573 LOG_ERR((0, "PVCommsIONodePort::setParametersSync: Error - Config object for media IO not available"));
574 ret_kvp = parameters;
575 OSCL_LEAVE(OsclErrGeneral);
576 }
577
578 PvmiKvp *ret_kvp1 = NULL;
579 PvmiKvp *ret_kvp2 = NULL;
580
581 set = iNode->ContextSetFromTag(iTag);
582
583 if (set.iMediaInputElement &&
584 set.iMediaInputElement->hasConfig())
585 {
586 set.iMediaInputElement->iMediaIOConfig->setParametersSync(session, parameters, num_elements, ret_kvp1);
587 }
588 // if this is a proxy port, we don't really have any way of determining
589 // which config we want to query, so we'll just try setting both of them
590 if (set.iMediaOutputElement &&
591 set.iMediaOutputElement != set.iMediaInputElement &&
592 set.iMediaOutputElement->hasConfig())
593 {
594 set.iMediaOutputElement->iMediaIOConfig->setParametersSync(session, parameters, num_elements, ret_kvp);
595 }
596 // Since we aren't sure about which IO the request goes to we'll
597 // have to settle for considering 1 successful setParam() a success
598
599 if ((ret_kvp1 == NULL && ret_kvp2 == NULL) ||
600 (set.iMediaInputElement && (ret_kvp1 == NULL)) ||
601 (set.iMediaOutputElement && (ret_kvp2 == NULL)))
602 {
603 ret_kvp = ret_kvp1 ? ret_kvp1 : ret_kvp2;
604 }
605 }
606
607 ////////////////////////////////////////////////////////////////////////////
setParametersAsync(PvmiMIOSession session,PvmiKvp * parameters,int num_elements,PvmiKvp * & ret_kvp,OsclAny * context)608 OSCL_EXPORT_REF PVMFCommandId PVCommsIONodePort::setParametersAsync(PvmiMIOSession session,
609 PvmiKvp* parameters,
610 int num_elements,
611 PvmiKvp*& ret_kvp,
612 OsclAny* context)
613 {
614 OSCL_UNUSED_ARG(session);
615 OSCL_UNUSED_ARG(parameters);
616 OSCL_UNUSED_ARG(num_elements);
617 OSCL_UNUSED_ARG(ret_kvp);
618 OSCL_UNUSED_ARG(context);
619 OsclError::Leave(OsclErrNotSupported);
620 return -1;
621 }
622
623 ////////////////////////////////////////////////////////////////////////////
getCapabilityMetric(PvmiMIOSession session)624 OSCL_EXPORT_REF uint32 PVCommsIONodePort::getCapabilityMetric(PvmiMIOSession session)
625 {
626 OSCL_UNUSED_ARG(session);
627 return 0;
628 }
629
630 ////////////////////////////////////////////////////////////////////////////
verifyParametersSync(PvmiMIOSession session,PvmiKvp * parameters,int num_elements)631 OSCL_EXPORT_REF PVMFStatus PVCommsIONodePort::verifyParametersSync(PvmiMIOSession session,
632 PvmiKvp* parameters, int num_elements)
633 {
634 LOG_STACK_TRACE((0, "PVCommsIONodePort::verifyParametersSync"));
635
636 MIOControlContextSet set;
637
638 if (!iNode)
639 {
640 LOG_ERR((0, "PVCommsIONodePort::verifyParametersSync: Error - Config object for media IO not available"));
641 return PVMFFailure;
642 }
643
644 set = iNode->ContextSetFromTag(iTag);
645
646 if (set.iMediaInputElement &&
647 set.iMediaInputElement->hasConfig() &&
648 (set.iMediaInputElement->iMediaIOConfig->verifyParametersSync(session, parameters, num_elements) == PVMFSuccess))
649 return PVMFSuccess;
650
651 // if this is a proxy port, we don't really have any way of determining
652 // which config we want to query, so if the input query fails, we'll
653 // try on the output config
654 if (set.iMediaOutputElement &&
655 set.iMediaOutputElement->hasConfig() &&
656 (set.iMediaOutputElement->iMediaIOConfig->verifyParametersSync(session, parameters, num_elements) == PVMFSuccess))
657 return PVMFSuccess;
658
659 return PVMFFailure;
660 }
661
662 ////////////////////////////////////////////////////////////////////////////
Run()663 void PVCommsIONodePort::Run()
664 {
665 if (iState == PVCommsIONodePort::PORT_STATE_BUFFERING)
666 return;
667
668 #ifdef USE_COPY_BUFFER
669 SendCopyBuffer();
670 #endif
671
672 if (iReadAsyncCmds.size() > 0)
673 {
674 PvmiMediaXferHeader data_hdr;
675 data_hdr.seq_num = 0;
676 data_hdr.timestamp = 0;
677 data_hdr.flags = 0;
678 data_hdr.duration = 0;
679 data_hdr.stream_id = 0;
680 //writeAsyncID = iPeer->writeAsync(0, 0, data, bytesToRead, data_hdr);
681
682 iMediaOutputTransfer->readComplete(PVMFSuccess,
683 iReadAsyncCmds.begin()->iID,
684 0,
685 data_hdr,
686 iReadAsyncCmds.begin()->iContext);
687 iReadAsyncCmds.erase(iReadAsyncCmds.begin());
688 }
689
690 if (IS_OUTPUT_PORT && OutgoingMsgQueueSize() > 0 && !IsConnectedPortBusy())
691 {
692 //transfer data to connected port.
693 PVMFStatus status = Send();
694 if (status != PVMFSuccess)
695 iNode->ReportErrorEvent(PVMFErrPortProcessing, (OsclAny*)status);
696 }
697
698 if (CanSendCommsData())
699 SendCommsData();
700
701 if (iNode->IsFlushPending())
702 {
703 if (IncomingMsgQueueSize() == 0 && OutgoingMsgQueueSize() == 0
704 && iCleanupQueue.size() == 0)
705 iNode->FlushComplete();
706 }
707
708 // Send EOS event
709 if (EndOfData(PVMF_COMMSIO_NODE_INPUT_PORT_TAG))
710 {
711 iNode->ReportInfoEvent(PVMFInfoEndOfInputPortData, (OsclAny*)NULL);
712 iEndOfInputPortDataReached = false;
713 }
714
715 if (EndOfData(PVMF_COMMSIO_NODE_OUTPUT_PORT_TAG))
716 {
717 iNode->ReportInfoEvent(PVMFInfoEndOfOutputPortData, (OsclAny*)NULL);
718 iEndOfOutputPortDataReached = false;
719 }
720
721 //reschedule if necessary
722 if ((OutgoingMsgQueueSize() > 0 && !IsConnectedPortBusy()) ||
723 CanSendCommsData() ||
724 iReadAsyncCmds.size() > 0)
725 {
726 RunIfNotReady();
727 }
728 }
729
730 ////////////////////////////////////////////////////////////////////////////
HandlePortActivity(const PVMFPortActivity & aActivity)731 void PVCommsIONodePort::HandlePortActivity(const PVMFPortActivity& aActivity)
732 {
733 switch (aActivity.iType)
734 {
735 case PVMF_PORT_ACTIVITY_OUTGOING_MSG:
736 RunIfNotReady();
737 break;
738
739 case PVMF_PORT_ACTIVITY_CONNECTED_PORT_READY:
740 //wakeup the AO when the connected port is
741 //ready to accept data again.
742 RunIfNotReady();
743 break;
744
745 case PVMF_PORT_ACTIVITY_INCOMING_MSG:
746 #ifdef USE_COPY_BUFFER
747 CopyBuffer();
748 #else
749 if (CanSendCommsData())
750 RunIfNotReady();
751 #endif
752 break;
753
754 case PVMF_PORT_ACTIVITY_OUTGOING_QUEUE_READY:
755 if (iWriteFailed)
756 {
757 iWriteFailed = false;
758 //let the peer know they can try to write again.
759 if (iPeer)
760 iPeer->statusUpdate(PVMI_MEDIAXFER_STATUS_WRITE);
761 }
762 break;
763
764 default:
765 break;
766 }
767 }
768
CanSendCommsData()769 bool PVCommsIONodePort::CanSendCommsData()
770 {
771 return !iIncomingQueue.iQ.empty()
772 && iNode->GetState() == EPVMFNodeStarted
773 && iWriteState == EWriteOK
774 && isActiveCommsWrite()
775 && IS_INPUT_PORT;
776 }
777
SendCommsData()778 void PVCommsIONodePort::SendCommsData()
779 {
780 if (!CanSendCommsData()) return;
781
782
783 PVMFSharedMediaMsgPtr aMsg;
784 PVMFSharedMediaDataPtr aMediaData;
785 uint32 fragment = 0;
786 uint32 fragindex;
787
788 // get the input message at the front of the queue.
789 aMsg = iIncomingQueue.iQ.front();
790
791 if (aMsg->getFormatID() == PVMF_MEDIA_CMD_EOS_FORMAT_ID)
792 {
793 iEndOfInputPortDataReached = true;
794 // remove the message from the queue
795 iIncomingQueue.iQ.pop();
796 RunIfNotReady();
797 return;
798 }
799
800 convertToPVMFMediaData(aMediaData, aMsg);
801
802 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
803 (0, "PVCommsIOPort::SendCommsData Seq %d TS %d Rep 0x%x Count %d"
804 , aMediaData->getSeqNum()
805 , aMediaData->getTimestamp()
806 , aMediaData.GetRep()
807 , aMediaData.get_count()));
808
809 OSCL_ASSERT(iWriteState == EWriteOK);
810
811 if (iResend
812 && aMediaData->getSeqNum() == iResendSeqNum)
813 {
814 fragment = iResendFragment;
815 iResend = false;
816 }
817
818 for (fragindex = fragment; fragindex < aMediaData->getNumFragments();)
819 {
820 OsclRefCounterMemFrag frag;
821 aMediaData->getMediaFragment(fragindex, frag);
822
823 ++iWriteAsyncContext;
824 iWriteState = EWriteBusy;
825 int32 err;
826 int32 cmdId = 0;
827
828 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
829 (0, "PVCommsIONodePort::SendCommsData Calling WriteAsync context 0x%x", iWriteAsyncContext));
830
831 PvmiMediaXferHeader data_hdr;
832 data_hdr.seq_num = aMediaData->getSeqNum();
833 data_hdr.timestamp = aMediaData->getTimestamp();
834 data_hdr.flags = 0;
835 data_hdr.duration = 0;
836 data_hdr.stream_id = 0;
837 //writeAsyncID = iPeer->writeAsync(0, 0, data, bytesToRead, data_hdr);
838
839 err = WriteAsync(cmdId, frag, data_hdr);
840
841 if (err != OsclErrNone)
842 {
843 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
844 (0, "PVCommsIONodePort::SendData WriteAsync Leave code %d", err));
845
846 LogDatapath("MIO Component Busy! Waiting on Status Update");
847
848 //if a leave occurs in the writeAsync call, we suspend data
849 //transfer until a statusUpdate call from the MIO component
850 //tells us to resume.
851 //this is not an error-- it's the normal flow control mechanism.
852 iWriteState = EWriteWait;
853
854 //save the data to re-send later.
855 iResend = true;
856 iResendFragment = fragindex;
857 iResendSeqNum = aMediaData->getSeqNum();
858
859 // leave the message on front of the queue and return
860
861 return ;//wait on statusUpdate call from the MIO component.
862 }
863 else
864 {
865 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE,
866 (0, "PVCommsIONodePort::SendData WriteAsync returned cmdId %d", cmdId));
867
868 fragindex++;
869 if (fragindex == aMediaData->getNumFragments())
870 {
871 //all fragments have been sent. see whether completion
872 //is synchronous or asynchronous.
873 if (iWriteState == EWriteBusy)
874 {
875 //asynchronous completion.
876 //push the data onto the cleanup stack so it won't get cleaned
877 //up until the component consumes it.
878 iCleanupQueue.push_back(CleanupQueueElement(aMediaData, cmdId));
879
880 LogMediaDataInfo("Async Write"
881 , aMediaData
882 , cmdId
883 , iCleanupQueue.size()
884 );
885
886 }
887 //else the write already completed synchronously.
888 else
889 {
890 LogMediaDataInfo("Sync Write"
891 , aMediaData
892 );
893 }
894 }
895 iWriteState = EWriteOK;
896 }
897 }
898
899 // remove the message from the fron of the queue
900 iIncomingQueue.iQ.pop();
901
902 }
903
WriteAsync(int32 & cmdId,OsclRefCounterMemFrag frag,PvmiMediaXferHeader data_hdr)904 int32 PVCommsIONodePort::WriteAsync(int32& cmdId,
905 OsclRefCounterMemFrag frag,
906 PvmiMediaXferHeader data_hdr)
907 {
908 int32 err = 0;
909 OSCL_TRY(err,
910 cmdId = iMediaOutputTransfer->writeAsync(PVMI_MEDIAXFER_FMT_TYPE_DATA, /*format_type*/
911 PVMI_MEDIAXFER_FMT_INDEX_DATA, /*format_index*/
912 (uint8*)frag.getMemFragPtr(),
913 frag.getMemFragSize(),
914 data_hdr,
915 (OsclAny*)iWriteAsyncContext);
916 );
917 return err;
918 }
919
920 //for logging media data info
LogMediaDataInfo(const char * msg,PVMFSharedMediaDataPtr mediaData)921 void PVCommsIONodePort::LogMediaDataInfo(const char*msg, PVMFSharedMediaDataPtr mediaData)
922 {
923 OSCL_UNUSED_ARG(msg);
924 OSCL_UNUSED_ARG(mediaData);
925 LOGDATAPATH(
926 (0, "MOUT %s %s MediaData SeqNum %d, SId %d, TS %d"
927 , PortName()
928 , msg
929 , mediaData->getSeqNum()
930 , mediaData->getStreamID()
931 , mediaData->getTimestamp()
932 ));
933 }
934 //for logging media data info plus write ID and cleanup q depth
LogMediaDataInfo(const char * msg,PVMFSharedMediaDataPtr mediaData,int32 cmdid,int32 qdepth)935 void PVCommsIONodePort::LogMediaDataInfo(const char*msg, PVMFSharedMediaDataPtr mediaData, int32 cmdid, int32 qdepth)
936 {
937 OSCL_UNUSED_ARG(msg);
938 OSCL_UNUSED_ARG(mediaData);
939 OSCL_UNUSED_ARG(cmdid);
940 OSCL_UNUSED_ARG(qdepth);
941 LOGDATAPATH(
942 (0, "MOUT %s %s, Write Id %d, MediaData SeqNum %d, SId %d, TS %d, Cleanup Q-depth %d"
943 , PortName()
944 , msg
945 , cmdid
946 , mediaData->getSeqNum()
947 , mediaData->getStreamID()
948 , mediaData->getTimestamp()
949 , qdepth
950 ));
951 }
952 //for logging media xfer info
LogDatapath(const char * msg)953 void PVCommsIONodePort::LogDatapath(const char*msg)
954 {
955 OSCL_UNUSED_ARG(msg);
956 LOGDATAPATH(
957 (0, "MOUT %s %s"
958 , PortName()
959 , msg
960 ));
961 }
962
statusUpdate(uint32 status_flags)963 void PVCommsIONodePort::statusUpdate(uint32 status_flags)
964 //for PvmiMediaTransfer
965 {
966 PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_INFO,
967 (0, "PVCommsIONodePort::statusUpdate flags %d", status_flags));
968
969 if (status_flags & PVMI_MEDIAXFER_STATUS_WRITE)
970 {
971 //recover from a previous async write error.
972 if (iWriteState == EWriteWait)
973 {
974 LogDatapath("Data Transfer re-enabled");
975 iWriteState = EWriteOK;
976 RunIfNotReady();
977 }
978 }
979 else
980 {
981 //disable write
982 iWriteState = EWriteWait;
983 }
984 }
985
isIncomingFull()986 OSCL_EXPORT_REF bool PVCommsIONodePort::isIncomingFull()
987 // override the PvmfPortBaseImpl routine to impose a
988 // limit on the total buffers in this node.
989 // Since we are doing this we also need to call EvaluateIncomingBusy
990 // when any data is consumed.
991 {
992 return (iIncomingQueue.iQ.size()
993 + iCleanupQueue.size()) >= iQueueLimit;
994 }
995
996 ////////////////////////////////////////////////////////////////////////////
EndOfData(int32 aPortTag)997 bool PVCommsIONodePort::EndOfData(int32 aPortTag)
998 {
999 //this is really just necessary for the unit test,
1000 //which uses file I/O.
1001 //end of data is reached when we have received the end-of-data
1002 //command in the media data and all data is played out.
1003 if (aPortTag == PVMF_COMMSIO_NODE_INPUT_PORT_TAG)
1004 {
1005 return (iEndOfInputPortDataReached
1006 && iIncomingQueue.iQ.empty()
1007 && iCleanupQueue.empty());
1008 }
1009 else
1010 {
1011 return (iEndOfOutputPortDataReached
1012 && iOutgoingQueue.iQ.empty());
1013 }
1014 }
1015
SetDataTransferModels()1016 void PVCommsIONodePort::SetDataTransferModels()
1017 {
1018 // This will determine how the port writes
1019 // and writes data to its mediaDataTransfer
1020 // object. Default is active writing to media
1021 // object, and passive reading from media object
1022
1023 PvmiKvp* parameters = NULL;
1024 int num_parameter_elements = 0;
1025 PvmiCapabilityContext context = NULL;
1026 MIOControlContextSet set;
1027
1028 set = iNode->ContextSetFromTag(iTag);
1029
1030 if (set.iMediaInputElement &&
1031 set.iMediaInputElement->hasConfig() &&
1032 set.iMediaInputElement->iMediaIOConfig->getParametersSync(set.iMediaInputElement->iMediaSession,
1033 OSCL_CONST_CAST(char*, OUTPUT_TRANSFER_MODEL_VAL),
1034 parameters, num_parameter_elements, context) == PVMFSuccess)
1035 {
1036 if (num_parameter_elements == 1)
1037 {
1038 // In the future we will be able to handle
1039 // both push and pull for input from the
1040 // MIO object, but for now we only want to
1041 // do push input, meaning the object
1042 // calls our writeAsync() method to send data
1043 // to us.
1044 OSCL_ASSERT(parameters[0].value.uint32_value == 1);
1045 iActiveCommsRead = false;
1046 }
1047 set.iMediaInputElement->iMediaIOConfig->releaseParameters(set.iMediaInputElement->iMediaSession, parameters, num_parameter_elements);
1048 }
1049 num_parameter_elements = 0;
1050 parameters = NULL;
1051 if (set.iMediaOutputElement &&
1052 set.iMediaOutputElement->hasConfig() &&
1053 set.iMediaOutputElement->iMediaIOConfig->getParametersSync(set.iMediaOutputElement->iMediaSession,
1054 OSCL_CONST_CAST(char*, INPUT_TRANSFER_MODEL_VAL),
1055 parameters, num_parameter_elements, context) == PVMFSuccess)
1056 {
1057 if (num_parameter_elements == 1)
1058 {
1059 // In the future we will be able to handle
1060 // both push and pull for output to the
1061 // MIO object, but for now we only want to
1062 // do push output, meaning we will call
1063 // the object's writeAsync() method to send
1064 // data to it.
1065 OSCL_ASSERT(parameters[0].value.uint32_value == 1);
1066 iActiveCommsWrite = true;
1067 }
1068 set.iMediaOutputElement->iMediaIOConfig->releaseParameters(set.iMediaOutputElement->iMediaSession, parameters, num_parameter_elements);
1069 }
1070 }
1071
1072 #ifdef USE_COPY_BUFFER
CopyBuffer()1073 void PVCommsIONodePort::CopyBuffer()
1074 {
1075 PVMFSharedMediaMsgPtr aMsg;
1076 PVMFSharedMediaDataPtr aMediaData;
1077
1078 while (IncomingMsgQueueSize())
1079 {
1080 DequeueIncomingMsg(aMsg);
1081 convertToPVMFMediaData(aMediaData, aMsg);
1082
1083 for (uint32 fragindex = 0; fragindex < aMediaData->getNumFragments(); fragindex++)
1084 {
1085 OsclRefCounterMemFrag frag;
1086 aMediaData->getMediaFragment(fragindex, frag);
1087
1088 //If going to overflow flush data
1089 if ((frag.getMemFragSize() + iCopyBufferSize[iCopyBufferIndex % NUM_COPY_BUFFERS]) > COPY_BUFFER_SIZE)
1090 {
1091 iCopyBufferSize[iCopyBufferIndex % NUM_COPY_BUFFERS] = 0;
1092 }
1093
1094 //If frag is too large, drop it
1095 if (frag.getMemFragSize() > COPY_BUFFER_SIZE)
1096 {
1097 continue;
1098 }
1099
1100 oscl_memcpy(&iCopyBuffer[iCopyBufferIndex % NUM_COPY_BUFFERS][iCopyBufferSize[iCopyBufferIndex % NUM_COPY_BUFFERS]],
1101 frag.getMemFragPtr(),
1102 frag.getMemFragSize());
1103
1104 iCopyBufferSize[iCopyBufferIndex % NUM_COPY_BUFFERS] += frag.getMemFragSize();
1105 }
1106
1107 SendCopyBuffer();
1108 }
1109 }
1110
SendCopyBuffer()1111 void PVCommsIONodePort::SendCopyBuffer()
1112 {
1113 int32 err;
1114 int32 cmdId;
1115
1116 if (!iCopyBufferSent && (iCopyBufferSize[iCopyBufferIndex % NUM_COPY_BUFFERS] > 0))
1117 {
1118 PvmiMediaXferHeader data_hdr;
1119 data_hdr.seq_num = iCopyBufferIndex;
1120 data_hdr.timestamp = iCopyBufferIndex;
1121 data_hdr.flags = 0;
1122 data_hdr.duration = 0;
1123 data_hdr.stream_id = 0;
1124 //writeAsyncID = iPeer->writeAsync(0, 0, data, bytesToRead, data_hdr);
1125
1126 OSCL_TRY(err,
1127 cmdId = iMediaOutputTransfer->writeAsync(PVMI_MEDIAXFER_FMT_TYPE_DATA, /*format_type*/
1128 PVMI_MEDIAXFER_FMT_INDEX_DATA, /*format_index*/
1129 (uint8*) & iCopyBuffer[iCopyBufferIndex % NUM_COPY_BUFFERS][0],
1130 iCopyBufferSize[iCopyBufferIndex % NUM_COPY_BUFFERS],
1131 data_hdr,
1132 (OsclAny*)iWriteAsyncContext);
1133 );
1134
1135 if (err)
1136 {
1137 iCopyBufferSize[iCopyBufferIndex % NUM_COPY_BUFFERS] = 0;
1138 }
1139 else
1140 {
1141 iCopyBufferIndex++;
1142 iCopyBufferSent = true;
1143 }
1144 }
1145 }
1146 #endif
1147
1148
1149
1150