/* ------------------------------------------------------------------ * Copyright (C) 1998-2009 PacketVideo * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. * See the License for the specific language governing permissions * and limitations under the License. * ------------------------------------------------------------------- */ #include "pvmf_socket_node.h" #include "oscl_string_utils.h" #include "pvmf_basic_errorinfomessage.h" #include "pvmf_errorinfomessage_extension.h" #include "pvmf_socket_node_events.h" #include "oscl_byte_order.h" #include "pvmf_protocol_engine_command_format_ids.h" #include "pvmf_media_cmd.h" #include "pvmf_media_msg_format_ids.h" #include "oscl_tickcount.h" #include "oscl_rand.h" #include "oscl_time.h" #include "pvmf_socket_node_tunables.h" #include "oscl_bin_stream.h" // Use default DLL entry point for Symbian #include "oscl_dll.h" ////////////////////////////////////////////////// // PVMFSocketNodeMemPool ////////////////////////////////////////////////// PVMFSocketNodeMemPool::PVMFSocketNodeMemPool(uint32 aMemPoolNumBufs) : iMediaDataMemPool(NULL) { iMediaDataMemPool = OSCL_NEW(OsclMemPoolFixedChunkAllocator, (aMemPoolNumBufs, MEDIA_DATA_CLASS_SIZE)); iInternalAlloc = NULL; iSharedBufferAlloc = NULL; iMediaMsgAllocator = NULL; iMediaFragGroupAlloc = NULL; iMediaFragGroupAllocMempool = NULL; } void PVMFSocketNodeMemPool::CreateAllocators(const OSCL_HeapString& iMime, uint32 aSize, uint32 aExpectedNumberOfBlocksPerBuffer, uint32 aResizeSize, uint32 aMaxNumResizes) { if (!iSharedBufferAlloc) { iSharedDataBufferInfo.Init(aSize, aExpectedNumberOfBlocksPerBuffer, aResizeSize, aMaxNumResizes); const uint32 maxAllocatorNameLen = 255; char allocatorName[maxAllocatorNameLen]; const uint32 strLenForMimeName = maxAllocatorNameLen - oscl_strlen("InternalSocketBuffer"); oscl_strncpy(allocatorName, iMime.get_cstr(), strLenForMimeName); allocatorName[strLenForMimeName] = '\0'; oscl_strcat(allocatorName, "InternalSocketBuffer"); CreateDefaultDataBufferAllocator(allocatorName); } #if SNODE_ENABLE_UDP_MULTI_PACKET CreateUDPMultipleRecvAllocator(); #endif } OsclMemPoolResizableAllocator* PVMFSocketNodeMemPool::CreateResizableDataBufferAllocator(const char* allocatorName) { OSCL_UNUSED_ARG(allocatorName); if (iMediaMsgAllocator) { /* * It may be possible that the allocator was binded to iInternalAlloc/iSharedBufferAlloc * So destruct it. * Deallocation of iInternalAlloc/iSharedBufferAlloc will be done as part of Reset/Destruction of node */ OSCL_DELETE(iMediaMsgAllocator); iMediaMsgAllocator = NULL; } OSCL_ASSERT(iSharedBufferAlloc == NULL); if (iSharedBufferAlloc) return NULL; iSharedBufferAlloc = OSCL_NEW(OsclMemPoolResizableAllocator, (iSharedDataBufferInfo.iSize, (1 + iSharedDataBufferInfo.iMaxNumResizes), iSharedDataBufferInfo.iExpectedNumberOfBlocksPerBuffer)); iSharedBufferAlloc->setMaxSzForNewMemPoolBuffer(iSharedDataBufferInfo.iResizeSize); iMediaMsgAllocator = OSCL_NEW(PVMFResizableSimpleMediaMsgAlloc, (iSharedBufferAlloc)); return iSharedBufferAlloc; } void PVMFSocketNodeMemPool::CreateDefaultDataBufferAllocator(const char* allocatorName) { OSCL_UNUSED_ARG(allocatorName); if (iMediaMsgAllocator) { /* * It may be possible that the allocator was binded to iInternalAlloc/iSharedBufferAlloc * So destruct it. * Deallocation of iInternalAlloc/iSharedBufferAlloc will be done as part of Reset/Destruction of node */ OSCL_DELETE(iMediaMsgAllocator); iMediaMsgAllocator = NULL; } OSCL_ASSERT(iInternalAlloc == NULL); if (iInternalAlloc) return; iInternalAlloc = OSCL_NEW(OsclMemPoolResizableAllocator, (iSharedDataBufferInfo.iSize, (1 + iSharedDataBufferInfo.iMaxNumResizes), iSharedDataBufferInfo.iExpectedNumberOfBlocksPerBuffer)); iInternalAlloc->setMaxSzForNewMemPoolBuffer(iSharedDataBufferInfo.iResizeSize); iMediaMsgAllocator = OSCL_NEW(PVMFResizableSimpleMediaMsgAlloc, (iInternalAlloc)); } void PVMFSocketNodeMemPool::CreateUDPMultipleRecvAllocator() { #if SNODE_ENABLE_UDP_MULTI_PACKET //Create the multiple-receive allocator iMediaFragGroupAllocMempool = OSCL_NEW(OsclMemPoolFixedChunkAllocator , (SNODE_UDP_MULTI_FRAG_ALLOCATOR_MAX_MSGS) ); iMediaFragGroupAlloc = OSCL_NEW(PVMFMediaFragGroupCombinedAlloc , (SNODE_UDP_MULTI_FRAG_ALLOCATOR_MAX_MSGS , SNODE_UDP_MULTI_FRAG_ALLOCATOR_MAX_FRAGS_PER_MSG , iMediaFragGroupAllocMempool) ); iMediaFragGroupAlloc->create(); #endif } uint32 PVMFSocketNodeMemPool::GetMaxSizeMediaMsgLen() { OsclMemPoolResizableAllocator* resizableAllocator = NULL; const uint32 wrappingOverhead = iMediaMsgAllocator->GetMediaMsgAllocationOverheadBytes(); if (iSharedBufferAlloc) { resizableAllocator = OSCL_STATIC_CAST(OsclMemPoolResizableAllocator*, iSharedBufferAlloc); } else if (iInternalAlloc) { resizableAllocator = OSCL_STATIC_CAST(OsclMemPoolResizableAllocator*, iInternalAlloc); } if (resizableAllocator) { return (resizableAllocator->getLargestContiguousFreeBlockSize() - wrappingOverhead); } return 0; } void PVMFSocketNodeMemPool::DestroyAllocators() { if (iInternalAlloc != NULL) { iInternalAlloc->CancelFreeChunkAvailableCallback(); iInternalAlloc->removeRef(); } if (iSharedBufferAlloc != NULL) { iSharedBufferAlloc->CancelFreeChunkAvailableCallback(); iSharedBufferAlloc->removeRef(); } if (iMediaMsgAllocator) { OSCL_DELETE(iMediaMsgAllocator); iMediaMsgAllocator = NULL; } #if SNODE_ENABLE_UDP_MULTI_PACKET DestroyUDPMultipleRecvAllocator(); #endif } void PVMFSocketNodeMemPool::DestroyUDPMultipleRecvAllocator() { #if SNODE_ENABLE_UDP_MULTI_PACKET if (iMediaFragGroupAlloc) { iMediaFragGroupAlloc->removeRef(); iMediaFragGroupAlloc = NULL; } if (iMediaFragGroupAllocMempool) { iMediaFragGroupAllocMempool->removeRef(); iMediaFragGroupAllocMempool = NULL; } #endif } ////////////////////////////////////////////////// // SocketNodeStats ////////////////////////////////////////////////// #if(ENABLE_SOCKET_NODE_STATS) void SocketNodeStats::Log(PVMFPortVector& aPortVec) { if (iLogged) return;//prevent multiple logging. if (!iLogger) return;//prevent logging after thread logoff. iLogged = true; PVLOGGER_LOGMSG(PVLOGMSG_INST_PROF, iLogger, PVLOGMSG_ERR, (0, "SocketNodeStats:@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")); PVLOGGER_LOGMSG(PVLOGMSG_INST_PROF, iLogger, PVLOGMSG_ERR, (0, "SocketNodeStats: %8d Num Run", iNumRun)); PVLOGGER_LOGMSG(PVLOGMSG_INST_PROF, iLogger, PVLOGMSG_ERR, (0, "SocketNodeStats: %8d Num QueueNodeCmd", iNumQueueNodeCmd)); PVLOGGER_LOGMSG(PVLOGMSG_INST_PROF, iLogger, PVLOGMSG_ERR, (0, "SocketNodeStats: %8d Num Bind", iNumBind)); for (uint32 i = 0; i < aPortVec.size(); i++) { if (aPortVec[i]->iConfig) aPortVec[i]->iConfig->iPortStats.Log(iLogger, aPortVec[i]->iConfig->iMime); } PVLOGGER_LOGMSG(PVLOGMSG_INST_PROF, iLogger, PVLOGMSG_ERR, (0, "SocketNodeStats:@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")); } #endif //ENABLE_SOCKET_NODE_STATS ////////////////////////////////////////////////// // SocketPortConfig ////////////////////////////////////////////////// void SocketPortConfig::CreateAllocators(uint32 aSize, uint32 aExpectedNumberOfBlocksPerBuffer, uint32 aResizeSize, uint32 aMaxNumResizes) { iMemPool->CreateAllocators(iMime, aSize, aExpectedNumberOfBlocksPerBuffer, aResizeSize, aMaxNumResizes); } void SocketPortConfig::CleanupMemPools(Oscl_DefAlloc& aAlloc) { if (iMemPool) { iMemPool->DestroyAllocators(); iMemPool->~PVMFSocketNodeMemPool(); aAlloc.deallocate((OsclAny*)(iMemPool)); iMemPool = NULL; } } ////////////////////////////////////////////////// // End SocketPortConfig ////////////////////////////////////////////////// ////////////////////////////////////////////////// // Standard Node APIs ////////////////////////////////////////////////// OSCL_EXPORT_REF PVMFSocketNode::PVMFSocketNode(int32 aPriority) : OsclActiveObject(aPriority, "PVMFSocketNode") , TIMEOUT_CONNECT(30000) , TIMEOUT_SEND(3000) , TIMEOUT_SENDTO(3000) , TIMEOUT_RECV(-1) , TIMEOUT_RECVFROM(-1) , TIMEOUT_SHUTDOWN(10000) , UDP_PORT_RANGE(2000) , MAX_UDP_PACKET_SIZE(MAX_SOCKET_BUFFER_SIZE) , MIN_UDP_PACKET_SIZE(MIN_SOCKET_BUFFER_SIZE) { iLogger = NULL; iDataPathLogger = NULL; iDataPathLoggerRTP = NULL; iDataPathLoggerRTCP = NULL; iOsclErrorTrapImp = NULL; iSockServ = NULL; iMaxTcpRecvBufferSize = SNODE_DEFAULT_SOCKET_TCP_BUFFER_SIZE; iMaxTcpRecvBufferCount = SNODE_DEFAULT_SOCKET_TCP_BUFFER_COUNT; iSocketID = 0; iCommandErrorCode = PVMFSocketNodeErrorEventStart; iErrorEventErrorCode = PVMFSocketNodeErrorEventStart; iExtensionInterface = NULL; iInSocketCallback = false; iNumStopPortActivityPending = (-1);//inactive. int32 err; OSCL_TRY(err, //Create the input command queue. Use a reserve to avoid lots of //dynamic memory allocation. iPendingCmdQueue.Construct(PVMF_SOCKET_NODE_COMMAND_ID_START, PVMF_SOCKET_NODE_COMMAND_VECTOR_RESERVE); //Create the "current command" queue. It will only contain one //command at a time, so use a reserve of 1. iCurrentCmdQueue.Construct(0, 1); iCancelCmdQueue.Construct(0, 1); //Create the port vector. iPortVector.Construct(PVMF_SOCKET_NODE_PORT_VECTOR_RESERVE); //Set the node capability data. //This node can support an unlimited number of ports. iCapability.iCanSupportMultipleInputPorts = true; iCapability.iCanSupportMultipleOutputPorts = true; iCapability.iHasMaxNumberOfPorts = false; iCapability.iMaxNumberOfPorts = 0;//no maximum iCapability.iInputFormatCapability.push_back(PVMF_MIME_INET_UDP); iCapability.iInputFormatCapability.push_back(PVMF_MIME_INET_TCP); iCapability.iOutputFormatCapability.push_back(PVMF_MIME_INET_TCP); iCapability.iOutputFormatCapability.push_back(PVMF_MIME_INET_UDP); iDnsCache.NewL(); ); if (err != OsclErrNone) { //if a leave happened, cleanup and re-throw the error iPendingCmdQueue.clear(); iCurrentCmdQueue.clear(); iCancelCmdQueue.clear(); iPortVector.clear(); iCapability.iInputFormatCapability.clear(); iCapability.iOutputFormatCapability.clear(); OSCL_CLEANUP_BASE_CLASS(PVMFNodeInterface); OSCL_CLEANUP_BASE_CLASS(OsclActiveObject); OSCL_LEAVE(err); } } OSCL_EXPORT_REF PVMFSocketNode::~PVMFSocketNode() { #if(ENABLE_SOCKET_NODE_STATS) iSocketNodeStats.Log(iPortVector); #endif Cancel(); //thread logoff if (IsAdded()) RemoveFromScheduler(); if (iExtensionInterface) { iExtensionInterface->removeRef(); } /* Cleanup allocated ports */ CleanupPorts(); iPortVector.clear(); CleanupClosedTCPSockets(); CleanupClosedUDPSockets(); CleanupClosedDNS(); //Cleanup commands //The command queues are self-deleting, but we want to //notify the observer of unprocessed commands. while (!iCurrentCmdQueue.empty()) { CommandComplete(iCurrentCmdQueue, iCurrentCmdQueue.front(), PVMFFailure); } while (!iPendingCmdQueue.empty()) { CommandComplete(iPendingCmdQueue, iPendingCmdQueue.front(), PVMFFailure); } while (!iCancelCmdQueue.empty()) { CommandComplete(iCancelCmdQueue, iCancelCmdQueue.front(), PVMFFailure); } PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "Goin to delete Sock Serv")); if (iSockServ) { iSockServ->Close(); iSockServ->~OsclSocketServ(); iAlloc.deallocate(iSockServ); iSockServ = NULL; } PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:~PVMFSocketNode out")); } //Do thread-specific node creation and go to "Idle" state. OSCL_EXPORT_REF PVMFStatus PVMFSocketNode::ThreadLogon() { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:ThreadLogon")); switch (iInterfaceState) { case EPVMFNodeCreated: if (!IsAdded()) AddToScheduler(); iLogger = PVLogger::GetLoggerObject("PVMFSocketNode"); iDataPathLogger = PVLogger::GetLoggerObject("datapath.socketnode"); iDataPathLoggerRTP = PVLogger::GetLoggerObject("datapath.socketnode.rtp"); iDataPathLoggerRTCP = PVLogger::GetLoggerObject("datapath.socketnode.rtcp"); iOsclErrorTrapImp = OsclErrorTrap::GetErrorTrapImp(); SetState(EPVMFNodeIdle); return PVMFSuccess; default: return PVMFErrInvalidState; } } //Do thread-specific node cleanup and go to "Created" state. OSCL_EXPORT_REF PVMFStatus PVMFSocketNode::ThreadLogoff() { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:ThreadLogoff")); switch (iInterfaceState) { case EPVMFNodeIdle: { if (IsAdded()) RemoveFromScheduler(); iLogger = NULL; iDataPathLogger = NULL; iDataPathLoggerRTP = NULL; iDataPathLoggerRTCP = NULL; iOsclErrorTrapImp = NULL; SetState(EPVMFNodeCreated); #if(ENABLE_SOCKET_NODE_STATS) iSocketNodeStats.Logoff(); #endif return PVMFSuccess; } // break; This break statement was removed to avoid compiler warning for Unreachable Code default: return PVMFErrInvalidState; // break; This break statement was removed to avoid compiler warning for Unreachable Code } } //retrieve node capabilities. OSCL_EXPORT_REF PVMFStatus PVMFSocketNode::GetCapability(PVMFNodeCapability& aNodeCapability) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:GetCapability")); aNodeCapability = iCapability; return PVMFSuccess; } //retrieve a port iterator. OSCL_EXPORT_REF PVMFPortIter* PVMFSocketNode::GetPorts(const PVMFPortFilter* aFilter) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:GetPorts")); OSCL_UNUSED_ARG(aFilter);//port filter is not implemented. iPortVector.Reset(); return &iPortVector; } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::QueryUUID(PVMFSessionId s, const PvmfMimeString& aMimeType, Oscl_Vector& aUuids, bool aExactUuidsOnly, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:QueryUUID")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_QUERYUUID, aMimeType, aUuids, aExactUuidsOnly, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::QueryInterface(PVMFSessionId s, const PVUuid& aUuid, PVInterface*& aInterfacePtr, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:QueryInterface")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_QUERYINTERFACE, aUuid, aInterfacePtr, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::RequestPort(PVMFSessionId s, int32 aPortTag, const PvmfMimeString* aPortConfig, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:RequestPort")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_REQUESTPORT, aPortTag, aPortConfig, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::ReleasePort(PVMFSessionId s, PVMFPortInterface& aPort, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:ReleasePort")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_RELEASEPORT, aPort, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::Init(PVMFSessionId s, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:Init")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_INIT, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::Prepare(PVMFSessionId s, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:Init")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_PREPARE, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::Start(PVMFSessionId s, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:Start")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_START, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::Stop(PVMFSessionId s, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:Stop")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_STOP, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::Flush(PVMFSessionId s, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:Flush")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_FLUSH, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::Pause(PVMFSessionId s, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:Pause")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_PAUSE, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::Reset(PVMFSessionId s, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:Reset")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_RESET, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::CancelAllCommands(PVMFSessionId s, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:CancelAllCommands")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_CANCELALLCOMMANDS, aContext); return QueueCommandL(cmd); } OSCL_EXPORT_REF PVMFCommandId PVMFSocketNode::CancelCommand(PVMFSessionId s, PVMFCommandId aCmdId, const OsclAny* aContext) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:CancelCommand")); PVMFSocketNodeCommand cmd; cmd.PVMFSocketNodeCommandBase::Construct(s, PVMF_GENERIC_NODE_CANCELCOMMAND, aCmdId, aContext); return QueueCommandL(cmd); } //Port activity handler void PVMFSocketNode::HandlePortActivity(const PVMFPortActivity &aActivity) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "0x%x PVMFSocketNode::HandlePortActivity IN: port=0x%x, type=%d", this, aActivity.iPort, aActivity.iType)); switch (aActivity.iType) { case PVMF_PORT_ACTIVITY_CONNECT: { PVMFSocketPort* sockPort = OSCL_STATIC_CAST(PVMFSocketPort*, aActivity.iPort); OSCL_ASSERT(sockPort && sockPort->iConfig); SocketPortConfig& aSockConfig = *sockPort->iConfig; aSockConfig.CreateAllocators(iMaxTcpRecvBufferSize *(iMaxTcpRecvBufferCount - 1), iMaxTcpRecvBufferCount - 1, iMaxTcpRecvBufferSize, 1); #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumPortEventConnect++; #endif //Receives may have been blocked waiting on the port to be connected, so check here. //Note: This is a known use case for HTTP streaming, where the port request and connect //can happen after the node start if (CanReceive(aSockConfig)) StartRecvOperation(aSockConfig); } break; case PVMF_PORT_ACTIVITY_INCOMING_MSG: { PVMFSocketPort* sockPort = (PVMFSocketPort*)(aActivity.iPort); OSCL_ASSERT(sockPort && sockPort->iConfig); SocketPortConfig& aSockConfig = *sockPort->iConfig; #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumPortEventIncomingMsg++; #endif //Try to process this message now. if (CanProcessIncomingMsg(aSockConfig)) ProcessIncomingMsg(aSockConfig); //Otherwise, ignore this event now. Other code will check //the input queue as needed. } break; case PVMF_PORT_ACTIVITY_CONNECTED_PORT_READY: { PVMFSocketPort* sockPort = (PVMFSocketPort*)(aActivity.iPort); OSCL_ASSERT(sockPort && sockPort->iConfig); SocketPortConfig& aSockConfig = *sockPort->iConfig; #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumPortEventConnectedPortReady++; #endif //See if the port was waiting on this event, and if so, continue processing. if (aSockConfig.iState.iConnectOperation == EPVSocketPortConnectOperation_WaitOnConnectedPort) { #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.EndConnectTime(aSockConfig.iState.iConnectOperation); #endif ConnectOperationComplete(aSockConfig, PVMFSuccess, NULL); } if (aSockConfig.iState.iRecvOperation == EPVSocketPortRecvOperation_WaitOnConnectedPort) { #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.EndRecvTime(aSockConfig.iState.iRecvOperation); #endif RecvOperationComplete(aSockConfig, PVMFSuccess, NULL); } //Otherwise ignore this event now. Other code will check connected //port status as needed. } break; default: //all other events can be ignored. break; } } ////////////////////////////////////////////////// // End Standard Node APIs ////////////////////////////////////////////////// ////////////////////////////////////////////////// // Additional Public APIs unique to Socket Node ////////////////////////////////////////////////// OSCL_EXPORT_REF bool PVMFSocketNode::GetPortConfig(PVMFPortInterface &aPort, OsclNetworkAddress &aLocalAdd, OsclNetworkAddress &aRemoteAdd) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::GetPortConfig IN")); aLocalAdd.ipAddr.Set("0.0.0.0"); aLocalAdd.port = 0; aRemoteAdd.ipAddr.Set(""); aRemoteAdd.port = 0; for (uint32 i = 0; i < iPortVector.size(); i++) { if (iPortVector[i]->iConfig && iPortVector[i]->iConfig->iPVMFPort == &aPort) { aLocalAdd = iPortVector[i]->iConfig->iAddr.iLocalAdd; aRemoteAdd = iPortVector[i]->iConfig->iAddr.iRemoteAdd; return true; } } return false; } OSCL_EXPORT_REF bool PVMFSocketNode::SetPortConfig(PVMFPortInterface &aPort, OsclNetworkAddress aLocalAdd, OsclNetworkAddress aRemoteAdd) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::SetPortConfig IN")); for (uint32 i = 0; i < iPortVector.size(); i++) { if (iPortVector[i]->iConfig && iPortVector[i]->iConfig->iPVMFPort == &aPort) { iPortVector[i]->iConfig->iAddr.iLocalAdd = aLocalAdd; iPortVector[i]->iConfig->iAddr.iRemoteAdd = aRemoteAdd; return true; } } return false; } ////////////////////////////////////////////////// // End Additional Public APIs unique to Socket Node ////////////////////////////////////////////////// //Create UDP sockets on consecutive ports. //This is needed because 3GPP streaming servers require consecutive ports. //If successfully created, the sockets are saved in the "allocated port" vector. //The created sockets will be associated with PVMF ports later during RequestPort //commands. PVMFStatus PVMFSocketNode::AllocateConsecutivePorts(PvmfMimeString* aPortConfig, uint32& aLowerPortNum, uint32& aHigherPortNum, uint32& aStartPortNum) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::AllocateConsecutivePorts IN")); //used in case of 3GPP streaming to allocate consecutive udp ports //for rtp and rtcp. //lower port number = rtp => PVMF_SOCKET_NODE_PORT_TYPE_SOURCE //higher port number = rtcp => PVMF_SOCKET_NODE_PORT_TYPE_SINK aLowerPortNum = 0; aHigherPortNum = 0; uint32 startSockID = iSocketID; PVMFStatus status = PVMFFailure; SocketPortConfig* lower_sock_config = OSCL_NEW(SocketPortConfig, ()); SocketPortConfig* higher_sock_config = OSCL_NEW(SocketPortConfig, ()); for (int maxNumOfBind = UDP_PORT_RANGE; maxNumOfBind >= 0; maxNumOfBind--) { OSCL_HeapString rtpportConfigWithMime; rtpportConfigWithMime += aPortConfig->get_cstr(); rtpportConfigWithMime += _STRLIT_CHAR("/rtp"); ParseTransportConfig(&rtpportConfigWithMime, lower_sock_config->iAddr, lower_sock_config->iMime); //this should only be UDP if (lower_sock_config->iAddr.iProtocol != INET_UDP) { status = PVMFErrArgument; goto Exit; } lower_sock_config->iContainer = this; lower_sock_config->iAddr.iLocalAdd.port = aStartPortNum; lower_sock_config->iSockId = startSockID++; lower_sock_config->iRTP = true; if (NULL == (lower_sock_config->iUDPSocket = (OsclUDPSocket*)CreateOsclSocketAndBind(lower_sock_config->iAddr, lower_sock_config->iSockId))) { status = PVMFFailure; goto Exit; } OSCL_HeapString rtcpportConfigWithMime; rtcpportConfigWithMime += aPortConfig->get_cstr(); rtcpportConfigWithMime += _STRLIT_CHAR("/rtcp"); ParseTransportConfig(&rtcpportConfigWithMime, higher_sock_config->iAddr, higher_sock_config->iMime); //this should only be UDP if (higher_sock_config->iAddr.iProtocol != INET_UDP) { status = PVMFErrArgument; goto Exit; } higher_sock_config->iContainer = this; higher_sock_config->iAddr.iLocalAdd.port = lower_sock_config->iAddr.iLocalAdd.port + 1; higher_sock_config->iSockId = startSockID++; higher_sock_config->iRTCP = true; if (NULL == (higher_sock_config->iUDPSocket = (OsclUDPSocket*)CreateOsclSocketAndBind(higher_sock_config->iAddr, higher_sock_config->iSockId))) { status = PVMFFailure; goto Exit; } //We were able to create and bind, now see whether we got consecutive ports or not. if (higher_sock_config->iAddr.iLocalAdd.port == (lower_sock_config->iAddr.iLocalAdd.port + 1)) { aLowerPortNum = lower_sock_config->iAddr.iLocalAdd.port; lower_sock_config->iPVMFPort = NULL; lower_sock_config->iTag = PVMF_SOCKET_NODE_PORT_TYPE_SOURCE; iAllocatedPortVector.push_back(lower_sock_config); aHigherPortNum = higher_sock_config->iAddr.iLocalAdd.port; higher_sock_config->iPVMFPort = NULL; higher_sock_config->iTag = PVMF_SOCKET_NODE_PORT_TYPE_SINK; iAllocatedPortVector.push_back(higher_sock_config); iSocketID = startSockID; aStartPortNum = higher_sock_config->iAddr.iLocalAdd.port + 1; status = PVMFSuccess; goto Exit; } else { //Delete and try again OsclUDPSocket* udpSocket1 = lower_sock_config->iUDPSocket; udpSocket1->~OsclUDPSocket(); iAlloc.deallocate(udpSocket1); lower_sock_config->iUDPSocket = NULL; OsclUDPSocket* udpSocket2 = higher_sock_config->iUDPSocket; udpSocket2->~OsclUDPSocket(); iAlloc.deallocate(udpSocket2); higher_sock_config->iUDPSocket = NULL; aStartPortNum = higher_sock_config->iAddr.iLocalAdd.port + 1; startSockID = iSocketID; } }//for max port bind attempts status = PVMFFailure; Exit: if (status != PVMFSuccess) { OSCL_DELETE(lower_sock_config); OSCL_DELETE(higher_sock_config); } return status; } OSCL_EXPORT_REF PVMFStatus PVMFSocketNode::SetMaxTCPRecvBufferSize(uint32 aBufferSize) { if ((aBufferSize > 0) && (aBufferSize < SNODE_DEFAULT_MAX_TCP_RECV_BUFFER_SIZE)) { iMaxTcpRecvBufferSize = aBufferSize; return PVMFSuccess; } return PVMFErrArgument; } OSCL_EXPORT_REF PVMFStatus PVMFSocketNode::GetMaxTCPRecvBufferSize(uint32& aSize) { aSize = iMaxTcpRecvBufferSize; return PVMFSuccess; } OSCL_EXPORT_REF PVMFStatus PVMFSocketNode::SetMaxTCPRecvBufferCount(uint32 aCount) { iMaxTcpRecvBufferCount = aCount; return PVMFSuccess; } OSCL_EXPORT_REF PVMFStatus PVMFSocketNode::GetMaxTCPRecvBufferCount(uint32& aCount) { aCount = iMaxTcpRecvBufferCount; return PVMFSuccess; } OsclMemPoolResizableAllocator* PVMFSocketNode::CreateSharedBuffer(const PVMFPortInterface* aPort , uint32 aBufferSize, uint32 aExpectedNumberOfBlocksPerBuffer, uint32 aResizeSize, uint32 aMaxNumResizes) { //validate input params if required.. OSCL_ASSERT(aPort); if (!aPort) return NULL; //If data buffer for the port is already created and is existing, then destroy the prev buffer and create new one (with requested attributes). //CreateResizableDataBufferAllocator func will take care of this. SocketPortConfig* portConfig = OSCL_STATIC_CAST(PVMFSocketPort*, aPort)->iConfig; PVMFSocketNodeMemPool * portMemPool = portConfig->iMemPool; portMemPool->iSharedDataBufferInfo.Init(aBufferSize, aExpectedNumberOfBlocksPerBuffer, aResizeSize, aMaxNumResizes); return portMemPool->CreateResizableDataBufferAllocator(portConfig->iMime.get_cstr()); } /////////////////////////// // End Socket Extension Interface /////////////////////////// ////////////////////////////////////// // Incoming Message Handlers ////////////////////////////////////// //Tell whether it's possible to process an incoming message on a port. bool PVMFSocketNode::CanProcessIncomingMsg(SocketPortConfig& aSockConfig) { return //node is started iInterfaceState == EPVMFNodeStarted //port has input messages && aSockConfig.iPVMFPort && aSockConfig.iPVMFPort->IncomingMsgQueueSize() > 0 //port is not busy with any sequence && aSockConfig.iState.iSequence == EPVSocketPortSequence_None; } //Process an incoming message on this port. //Assume caller already verified "can process incoming msg". void PVMFSocketNode::ProcessIncomingMsg(SocketPortConfig& aSockConfig) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::ProcessIncomingMsg: aPort=0x%x", aSockConfig.iPVMFPort)); //Dequeue the incoming message PVMFSharedMediaMsgPtr msg; PVMFStatus status = aSockConfig.iPVMFPort->DequeueIncomingMsg(msg); if (status != PVMFSuccess) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::ProcessIncomingDataMsg: Error - DequeueIncomingMsg failed")); ReportErrorEvent(PVMFErrPortProcessing); return ; } //calling logic should prevent processing a message when some other sequence //is active on the port OSCL_ASSERT(aSockConfig.iState.iSequence == EPVSocketPortSequence_None); //Handle 3 types of messages if (msg->getFormatID() == PVMF_MEDIA_CMD_SOCKET_CONNECT_FORMAT_ID) { #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumDequeueIncomingConnectMsg++; #endif //Connect message PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::ProcessIncomingMsg - CONNECT SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_cstr())); //Parse the address OsclRefCounterMemFrag newSockAddr; msg->getFormatSpecificInfo(newSockAddr); if (newSockAddr.getMemFragPtr()) { if (!ParseTransportConfig((char*)newSockAddr.getMemFragPtr(), newSockAddr.getMemFragSize(), aSockConfig.iAddr, aSockConfig.iMime)) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::ProcessIncomingMsg: ERROR - Invalid port config")); ReportSocketNodeError(PVMFErrPortProcessing, PVMFSocketNodeErrorInvalidPortTag); return ; } } //Start the sequence StartSequence(aSockConfig, EPVSocketPortSequence_InputConnectMsg); } else if (msg->getFormatID() == PVMF_MEDIA_CMD_SOCKET_DISCONNECT_FORMAT_ID) { #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumDequeueIncomingDisconnectMsg++; #endif //Disconnect message PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::ProcessIncomingMsg - DISCONNECT msg - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_cstr())); //Start the sequence StartSequence(aSockConfig, EPVSocketPortSequence_InputDisconnectMsg); } else if (msg->getFormatID() == PVMF_MEDIA_MSG_DATA_FORMAT_ID) { #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumDequeueIncomingDataMsg++; #endif //Media Data message PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::ProcessIncomingMsg - MEDIA DATA msg - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_cstr())); //calling logic should prevent any simultaneous send/connect OSCL_ASSERT(aSockConfig.iState.iConnectOperation == EPVSocketPortConnectOperation_None); //Start the sequence StartSequence(aSockConfig, EPVSocketPortSequence_InputDataMsg, (OsclAny*)&msg); } else { //unexpected message type ReportErrorEvent(PVMFErrPortProcessing); } } ////////////////////////////////////// // End Incoming Message Handlers ////////////////////////////////////// ////////////////////////////////////// // Connect Operation Handlers ////////////////////////////////////// //Start the given "connect" operation on the port. //If the connect fails, this will call "connect operation complete" and continue to //the next operation. PVMFStatus PVMFSocketNode::StartConnectOperation(SocketPortConfig& aSockConfig, TPVSocketPortConnectOperation aOperation) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::StartConnectOperation() In Operation %d", aOperation)); //connect operation should be idle now. OSCL_ASSERT(aSockConfig.iState.iConnectOperation == EPVSocketPortConnectOperation_None); //Update the state aSockConfig.iState.iConnectOperation = aOperation; PVMFStatus status = PVMFFailure; switch (aOperation) { case EPVSocketPortConnectOperation_GetHostByName: { //This is the beginning of TCP socket create/connect. //this should be TCP. OSCL_ASSERT(aSockConfig.iAddr.iProtocol == INET_TCP); //Any old socket should be cleaned up or else queued for cleanup at this point. OSCL_ASSERT(!aSockConfig.iTCPSocket); //Create the socket PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartConnectOperation GetHostByName, Creating socket - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iTCPSocket = (OsclTCPSocket*)CreateOsclSocketAndBind(aSockConfig.iAddr, aSockConfig.iSockId); if (!aSockConfig.iTCPSocket) { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartConnectOperation GetHostByName, Creating socket FAILED - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::StartConnectOperation: ERROR - CreateSocket()")); if (aSockConfig.iState.iSequence == EPVSocketPortSequence_RequestPort) { iCommandErrorCode = PVMFSocketNodeErrorSocketFailure; } else { ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeErrorSocketFailure); } status = PVMFErrResource; break; } //see if DNS is needed if (OsclValidInetAddr(aSockConfig.iAddr.iRemoteHost.get_cstr())) { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartConnectOperation GetHostByName, DNS is not needed - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iAddr.iRemoteAdd.ipAddr.Set(aSockConfig.iAddr.iRemoteHost.get_cstr()); status = PVMFSuccess;//no lookup needed-- continue to next operation. break; } //DNS lookup needed. //see if we can use the DNS cache instead of an actual lookup aSockConfig.iAddr.iRemoteAdd.ipAddr.Set(""); if (iDnsCache.QueryGetHostByName(aSockConfig.iAddr.iRemoteHost.get_str(), (char*)aSockConfig.iAddr.iRemoteAdd.ipAddr.Str(), aSockConfig.iAddr.iRemoteAdd.ipAddr.MaxLen())) { //DNS cache lookup success PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartConnectOperation GetHostByName, DNS cache lookup success - SockId=%d, Mime=%s, Host=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str(), aSockConfig.iAddr.iRemoteHost.get_str())); status = PVMFSuccess;//no lookup needed-- continue to next operation. break; } //Create DNS object if (!aSockConfig.iDNS) { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartConnectOperation GetHostByName, Creating DNS object - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iDNS = OsclDNS::NewL(iAlloc, *iSockServ, *this, aSockConfig.iSockId); if (!aSockConfig.iDNS) { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartConnectOperation GetHostByName, Creating DNS FAILED - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::StartConnectOperation: ERROR - Create DNS()")); if (aSockConfig.iState.iSequence == EPVSocketPortSequence_RequestPort) { iCommandErrorCode = PVMFSocketNodeError_DNSLookup; } else { ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeError_DNSLookup); } status = PVMFErrResource; break; } } //start the asynchronous DNS operation. #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumGetHostByName++; aSockConfig.iPortStats.StartConnectTime(aSockConfig.iState.iConnectOperation); #endif TPVDNSEvent aEvent = aSockConfig.iDNS->GetHostByName(aSockConfig.iAddr.iRemoteHost.get_str(), aSockConfig.iAddr.iRemoteAdd, -1); if (EPVDNSPending != aEvent) { aSockConfig.iState.iConnectOperation = EPVSocketPortConnectOperation_None; PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartConnectOperation: GetHostByName Failed - SockId=%d, Mime=%s, Host=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str(), aSockConfig.iAddr.iRemoteHost.get_str())); if (aSockConfig.iState.iSequence == EPVSocketPortSequence_RequestPort) { iCommandErrorCode = PVMFSocketNodeError_DNSLookup; } else { ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeError_DNSLookup); } status = PVMFErrResource; break; } PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartConnectOperation: Waiting On DNS - SockId=%d, Mime=%s, Host=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str(), aSockConfig.iAddr.iRemoteHost.get_str())); #if (ENABLE_SOCKET_NODE_DEBUG_INFO_EVENT) ReportInfoEvent(PVMFSocketNodeInfoEventRequestedDNSResolution); #endif status = PVMFPending; //wait on the DNS lookup to complete in HandleDNSEvent callback } break; case EPVSocketPortConnectOperation_Connect: //Initiate a socket connect, after DNS lookup was successful or address is already IP. //Will return "pending" or an error. { if (!aSockConfig.iTCPSocket) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartConnectOperation: ERROR socket already exists - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); status = PVMFFailure;//unexpected break; } //start the asynchronous connect operation. #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumConnect++; aSockConfig.iPortStats.StartConnectTime(aSockConfig.iState.iConnectOperation); #endif TPVSocketEvent ret = aSockConfig.iTCPSocket->Connect(aSockConfig.iAddr.iRemoteAdd, TIMEOUT_CONNECT); if (ret != EPVSocketPending) { //failure PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartConnectOperation: Starting Connect FAILED - SockId=%d, Mime=%s, RemoteIP=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str(), (char*)(aSockConfig.iAddr.iRemoteAdd.ipAddr.Str()))); if (aSockConfig.iState.iSequence == EPVSocketPortSequence_RequestPort) { iCommandErrorCode = PVMFSocketNodeError_TCPSocketConnect; } else { ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeError_TCPSocketConnect); } status = PVMFErrResource; break; } PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartConnectOperation - Socket Connect Pending...SockId=%d, Mime=%s, RemoteIP=%s" , aSockConfig.iSockId, aSockConfig.iMime.get_str(), (char*)(aSockConfig.iAddr.iRemoteAdd.ipAddr.Str()))); #if (ENABLE_SOCKET_NODE_DEBUG_INFO_EVENT) ReportInfoEvent(PVMFSocketNodeInfoEventConnectRequestPending); #endif status = PVMFPending; //wait on the socket connect to complete in HandleSocketEvent callback } break; case EPVSocketPortConnectOperation_Shutdown: { if (!aSockConfig.iTCPSocket) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartConnectOperation: ERROR socket already exists - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); status = PVMFFailure;//unexpected break; } //Initiate a socket shutdown. #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumShutdown++; aSockConfig.iPortStats.StartConnectTime(aSockConfig.iState.iConnectOperation); #endif TPVSocketEvent ret = aSockConfig.iTCPSocket->Shutdown(EPVSocketBothShutdown, TIMEOUT_SHUTDOWN); if (ret == EPVSocketPending) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartConnectOperation - Shutdown Pending...SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); status = PVMFPending; //wait on the socket shutdown to complete in HandleSocketEvent callback } else { status = PVMFFailure; PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartConnectOperation - Shutdown FAILED - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); } } break; case EPVSocketPortConnectOperation_WaitOnConnectedPort: if (aSockConfig.iPVMFPort && aSockConfig.iPVMFPort->IsOutgoingQueueBusy()) { #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumWaitOnConnectedPort++; aSockConfig.iPortStats.StartConnectTime(aSockConfig.iState.iConnectOperation); #endif status = PVMFPending; //wait for CONNECTED_PORT_READY event in HandlePortActivity } else { status = PVMFSuccess;//nothing needed. } break; case EPVSocketPortConnectOperation_WaitOnSendRecvIdle: if (aSockConfig.iState.iRecvOperation == EPVSocketPortRecvOperation_None && aSockConfig.iState.iSendOperation == EPVSocketPortSendOperation_None) { //send/recv are already idle. status = PVMFSuccess; } else { #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumWaitOnSendRecvIdle++; aSockConfig.iPortStats.StartConnectTime(aSockConfig.iState.iConnectOperation); #endif status = PVMFPending; //wait for send/recv completion (in RecvOperationComplete or SendOperationComplete) } break; default: //add code to handle this case status = PVMFFailure; OSCL_ASSERT(0); break; } //Handle synchronous completion or failure. if (status != PVMFPending) status = ConnectOperationComplete(aSockConfig, status, NULL); return status; } //The current connect operation is complete. //Continue with the next operation, or finish the sequence. //The status is in "aStatus". If there was a socket callback, the result is in "aSocketActivity". PVMFStatus PVMFSocketNode::ConnectOperationComplete(SocketPortConfig& aSockConfig, PVMFStatus aStatus, PVMFSocketActivity* aSocketActivity) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::ConnectOperationComplete() in")); //status should never be "pending" here OSCL_ASSERT(aStatus != PVMFPending); PVMFStatus status = aStatus; TPVSocketPortConnectOperation curOp = aSockConfig.iState.iConnectOperation; aSockConfig.iState.iConnectOperation = EPVSocketPortConnectOperation_None; aSockConfig.iState.iConnectOperationStatus = aStatus; aSockConfig.iState.iConnectOperationCanceled = false; switch (curOp) { case EPVSocketPortConnectOperation_None: //unexpected. break; case EPVSocketPortConnectOperation_GetHostByName: //TCP Get Host By Name is complete //catch completion of connect cancel during the cleanup sequence if (aSockConfig.iState.iSequence == EPVSocketPortSequence_SocketCleanup) { //Continue the shutdown sequence status = StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_Shutdown); break; } //else normal connect sequence. if (aStatus == PVMFSuccess) { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::ConnectOperationComplete GetHostByName Success - SockId=%d, Mime=%s Host= %s", aSockConfig.iSockId, aSockConfig.iMime.get_str() , aSockConfig.iAddr.iRemoteHost.get_str())); //Save lookup results in the DNS cache if (aSocketActivity) { iDnsCache.UpdateCacheRecord(aSockConfig.iAddr.iRemoteHost.get_str(), (char*)aSockConfig.iAddr.iRemoteAdd.ipAddr.Str()); } //DNS Lookup is always followed by a Connect status = StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_Connect); } else { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::ConnectOperationComplete GetHostByName FAILED - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); //If DNS lookup failed, then fail the current sequence with //an error. switch (aSockConfig.iState.iSequence) { case EPVSocketPortSequence_RequestPort: //report the error in the command complete iCommandErrorCode = PVMFSocketNodeError_DNSLookup; break; default: //report the error in an error message ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeError_DNSLookup); break; } //Fail the sequence. SequenceComplete(aSockConfig, aStatus); } break; case EPVSocketPortConnectOperation_Connect: //TCP Connect is complete //catch completion of connect cancel during the cleanup sequence if (aSockConfig.iState.iSequence == EPVSocketPortSequence_SocketCleanup) { //Continue the shutdown sequence status = StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_Shutdown); break; } //else normal connect sequence. if (aStatus == PVMFSuccess) { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::ConnectOperationComplete Connect Success - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); } else { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::ConnectOperationComplete Connect FAILED - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); //Report connect errors. switch (aSockConfig.iState.iSequence) { case EPVSocketPortSequence_RequestPort: //for a node command, we report the error in the node command completion message iCommandErrorCode = PVMFSocketNodeError_TCPSocketConnect; break; case EPVSocketPortSequence_InputConnectMsg: default: //for other sequences, we report the error in an error message. ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeError_TCPSocketConnect); break; } } //Connect complete is always the end of a sequence. SequenceComplete(aSockConfig, aStatus); break; case EPVSocketPortConnectOperation_Shutdown: //TCP Shutdown is complete if (aStatus == PVMFSuccess) { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::ConnectOperationComplete Shutdown Success - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); } else { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::ConnectOperationComplete Shutdown FAILED - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); if (aSocketActivity) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::ConnectOperationComplete: ERROR aEvent=%d, Ln %d", aSocketActivity->iEvent, __LINE__)); } } //Note: ignore shutdown failures and just continue onto the //next operation. //Shutdown is followed by waiting for send/recv to go idle. //On most OS the shutdown command is sufficient to cause send/recv to complete, //but on some it doesn't, so cancel any outstanding send/recv op here. CancelSendOperation(aSockConfig); CancelRecvOperation(aSockConfig); status = StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_WaitOnSendRecvIdle); break; case EPVSocketPortConnectOperation_WaitOnConnectedPort: //TCP Wait on connected port is complete. //Proceed to send the EOS and close the socket PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::ConnectOperationComplete WaitOnConnectedPort Calling CleanupTCP - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); CleanupTCP(aSockConfig); //Check the sequence... switch (aSockConfig.iState.iSequence) { case EPVSocketPortSequence_InputConnectMsg: //a disconnect is followed by a host lookup + connect //for an input connect message status = StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_GetHostByName); break; default: //for all other sequences, this is the end SequenceComplete(aSockConfig, PVMFSuccess); status = PVMFSuccess; break; } break; case EPVSocketPortConnectOperation_WaitOnSendRecvIdle: //UDP or TCP "Wait on Send/Recv Idle" is complete. //If TCP, wait on connected port to send EOS if (aSockConfig.iTCPSocket) { status = StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_WaitOnConnectedPort); } //If UDP, proceed to close & delete else if (aSockConfig.iUDPSocket) { CleanupUDP(aSockConfig); //This is the end of the sequence. SequenceComplete(aSockConfig, PVMFSuccess); status = PVMFSuccess; } else { //unexpected, but end the sequence anyway. SequenceComplete(aSockConfig, PVMFSuccess); status = PVMFSuccess; } break; default: OSCL_ASSERT(0);//unexpected, add code for this case. break; } return status; } PVMFStatus PVMFSocketNode::CancelConnectOperation(SocketPortConfig& aSockConfig) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::CancelConnectOperation() In")); PVMFStatus status = PVMFSuccess; switch (aSockConfig.iState.iConnectOperation) { case EPVSocketPortConnectOperation_None: break; case EPVSocketPortConnectOperation_GetHostByName: if (aSockConfig.iDNS) { if (!aSockConfig.iState.iConnectOperationCanceled) { aSockConfig.iState.iConnectOperationCanceled = true; PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CancelConnectOperation Calling CancelGetHostByName - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iDNS->CancelGetHostByName(); } status = PVMFPending; //wait for host lookup to complete in HandleDNSEvent } break; case EPVSocketPortConnectOperation_Connect: if (aSockConfig.iTCPSocket) { if (!aSockConfig.iState.iConnectOperationCanceled) { aSockConfig.iState.iConnectOperationCanceled = true; PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CancelConnectOperation Calling CancelConnect - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iTCPSocket->CancelConnect(); } status = PVMFPending; //wait for connect to complete in HandleSocketEvent } break; case EPVSocketPortConnectOperation_Shutdown: if (aSockConfig.iTCPSocket) { if (!aSockConfig.iState.iConnectOperationCanceled) { aSockConfig.iState.iConnectOperationCanceled = true; PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CancelConnectOperation Calling CancelShutdown - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iTCPSocket->CancelShutdown(); } status = PVMFPending; //wait for shutdown to complete in HandleSocketEvent } break; case EPVSocketPortConnectOperation_WaitOnConnectedPort: //just clear the state PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CancelConnectOperation Cancelling WaitOnConnectedPort - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iState.iConnectOperation = EPVSocketPortConnectOperation_None; break; case EPVSocketPortConnectOperation_WaitOnSendRecvIdle: //just clear the state PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CancelConnectOperation Cancelling WaitOnSendRecvIdle - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iState.iConnectOperation = EPVSocketPortConnectOperation_None; break; default: OSCL_ASSERT(0);//add code for this case status = PVMFFailure; break; } return status; } ////////////////////////////////////// // End Connect Operation Handlers ////////////////////////////////////// ////////////////////////////////// // Receive Operation Handlers ////////////////////////////////// //Tell if it's possible to initiate a recv or recvfrom operation //on this port. bool PVMFSocketNode::CanReceive(SocketPortConfig& aSockConfig) { return //node is started iInterfaceState == EPVMFNodeStarted //port is connected && aSockConfig.iPVMFPort && aSockConfig.iPVMFPort->IsConnected() //socket exists (gets created during request port or connect sequence) && (aSockConfig.iUDPSocket || aSockConfig.iTCPSocket) //port is not busy with any sequence other than sending data && (aSockConfig.iState.iSequence == EPVSocketPortSequence_None || aSockConfig.iState.iSequence == EPVSocketPortSequence_InputDataMsg) //port is not busy with any receive operation. && aSockConfig.iState.iRecvOperation == EPVSocketPortRecvOperation_None //there's no node stop going on. && iNumStopPortActivityPending < 0; } //Enter the "wait on memory" state void PVMFSocketNode::StartRecvWaitOnMemory(SocketPortConfig& aSockConfig, int32 aSize) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::StartRecvWaitOnMemory() In")); aSockConfig.iState.iRecvOperation = EPVSocketPortRecvOperation_WaitOnMemory; #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumWaitOnMemory++; aSockConfig.iPortStats.StartRecvTime(aSockConfig.iState.iRecvOperation); #endif if (aSize) {//wait on data buffer PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartRecvWaitOnMemory, wait on data buffer - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); if (aSockConfig.iMemPool->iSharedBufferAlloc) { aSockConfig.iMemPool->iSharedBufferAlloc->notifyfreeblockavailable(aSockConfig, aSize, NULL); } else { if (aSockConfig.iMemPool->iInternalAlloc) aSockConfig.iMemPool->iInternalAlloc->notifyfreeblockavailable(aSockConfig, aSize, NULL); } } else {//wait on media data wrapper PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartRecvWaitOnMemory, wait on data wrapper - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iMemPool->iMediaDataMemPool->notifyfreechunkavailable(aSockConfig, NULL); } } //Enter the "wait on connected port" state void PVMFSocketNode::StartRecvWaitOnConnectedPort(SocketPortConfig& aSockConfig, PVMFSocketActivity& aSocketActivity) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::StartRecvWaitOnConnectedPort() In")); //outgoing queue is busy-- must queue this event for later processing PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartRecvWaitOnConnectedPort, wait for connected port - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); switch (aSocketActivity.iFxn) { case EPVSocketRecv: case EPVSocketRecvFrom: #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumQueueSocketRecv++; #endif aSockConfig.iSocketRecvActivity.Set(aSocketActivity.iStatus , aSocketActivity.iId , aSocketActivity.iFxn , aSocketActivity.iEvent , aSocketActivity.iError); break; default: OSCL_ASSERT(false);//invalid input arg. break; } //current state shoudl be idle OSCL_ASSERT(aSockConfig.iState.iRecvOperation == EPVSocketPortRecvOperation_None); aSockConfig.iState.iRecvOperation = EPVSocketPortRecvOperation_WaitOnConnectedPort; #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumWaitOnConnectedPort++; aSockConfig.iPortStats.StartRecvTime(aSockConfig.iState.iRecvOperation); #endif } //Start a receive operation on the port's socket. //Assume the caller already verified that the port can receive now. //If there is a failure, this routine will call the "recv operation complete" routine to //do error handling or continue to the next state. PVMFStatus PVMFSocketNode::StartRecvOperation(SocketPortConfig& aSockConfig) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::StartRecvOperation() In")); //caller should have verified we can receive data right now. OSCL_ASSERT(aSockConfig.iState.iRecvOperation == EPVSocketPortRecvOperation_None); //there should be either a UDP or TCP socket on this port and a memory pool. if (!aSockConfig.iTCPSocket && !aSockConfig.iUDPSocket) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartRecvOperation ERROR no socket - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); return PVMFFailure; } if (!aSockConfig.iMemPool) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartRecvOperation ERROR no mem pool - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); return PVMFFailure; } PVMFStatus status = PVMFFailure; if (aSockConfig.iUDPSocket) { aSockConfig.iState.iRecvOperation = EPVSocketPortRecvOperation_RecvFrom; //Create the memory buffer int32 memSize; int32 multiRecvLimitBytes = 0; switch (aSockConfig.iMemPool->iPortTag) { case PVMF_SOCKET_NODE_PORT_TYPE_SOURCE: //source tag is used for RTP #if SNODE_ENABLE_UDP_MULTI_PACKET //must allocate enough space for the max bytes per receive plus //one more max size packet, since we can't peek ahead to see //how much data is on the socket. memSize = SNODE_UDP_MULTI_MAX_BYTES_PER_RECV + MAX_UDP_PACKET_SIZE; //this is the amt of "free space" required to continue receiving //additional packets. Use the max packet size to avoid truncated //packets. multiRecvLimitBytes = MAX_UDP_PACKET_SIZE; #else //for single packet recvs. memSize = MAX_UDP_PACKET_SIZE; #endif break; case PVMF_SOCKET_NODE_PORT_TYPE_SINK: //sink tag is used for RTCP default: memSize = MAX_FEEDBACK_SOCKET_BUFFER_SIZE; break; } //Socket node provides the buffer to receive the data on the socket //Socket node gets this from the mem pool. //It may be possible that socket node may not be able to get the //buffer of required size from the mempool //If we are not able to get the packets from the OS, then the packets //received from server will eventually be lost. //In general, streaming server will be sending the UDP packets of small size //(say 2048 bytes.. = MIN_UDP_PACKET_SIZE) //So its better to attempt to allocate the memory of size //SNODE_UDP_MULTI_MIN_BYTES_PER_RECV + MIN_UDP_PACKET_SIZE/MIN_UDP_PACKET_SIZE //in such cases. const int32 largestMediaMsgSz = aSockConfig.iMemPool->GetMaxSizeMediaMsgLen(); PVMF_SOCKETNODE_LOGINFO((0, "PVMFSocketNode::StartRecvOperation - MaxSizeMediMsg %d memSize %d", largestMediaMsgSz, memSize)); if (memSize > largestMediaMsgSz) { #if SNODE_ENABLE_UDP_MULTI_PACKET multiRecvLimitBytes = MIN_UDP_PACKET_SIZE; const int32 minBufferToContinueRecv = (SNODE_UDP_MULTI_MIN_BYTES_PER_RECV + MIN_UDP_PACKET_SIZE); if (largestMediaMsgSz >= minBufferToContinueRecv) { memSize = largestMediaMsgSz; } else { memSize = minBufferToContinueRecv; } #else if (largestMediaMsgSz >= MIN_UDP_PACKET_SIZE) { memSize = largestMediaMsgSz; } else { memSize = MIN_UDP_PACKET_SIZE; } #endif PVMF_SOCKETNODE_LOGINFO((0, "PVMFSocketNode::StartRecvOperation - Resizing Mem Request Attributes## NewSz %d recvLimitBytes %d", memSize, multiRecvLimitBytes)); } //Create the media data impl. int32 err; OsclSharedPtr mediaDataImpl; err = GetMediaDataImpl(aSockConfig, mediaDataImpl, memSize); if (err != OsclErrNone) { StartRecvWaitOnMemory(aSockConfig, memSize); status = PVMFPending; //wait on memory pool callback "notifyfreechunkavailable" goto Exit; } //Create the media data buffer. err = CreateMediaData(aSockConfig, mediaDataImpl); if (err != OsclErrNone) { StartRecvWaitOnMemory(aSockConfig); status = PVMFPending; //wait on memory pool callback "notifyfreechunkavailable" goto Exit; } // Retrieve memory fragment to write to OsclRefCounterMemFrag refCtrMemFragOut; aSockConfig.iPendingRecvMediaData->getMediaFragment(0, refCtrMemFragOut); //Issue the Oscl socket request #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumRecvFrom++; aSockConfig.iPortStats.StartRecvTime(EPVSocketPortRecvOperation_RecvFrom); #endif TPVSocketEvent retVal = aSockConfig.iUDPSocket->RecvFrom((uint8*)refCtrMemFragOut.getMemFragPtr() , refCtrMemFragOut.getCapacity() , aSockConfig.iSourceAddr , TIMEOUT_RECVFROM , multiRecvLimitBytes , &aSockConfig.iRecvFromPacketLen); if (EPVSocketPending != retVal) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::RecvFrom() failed retVal=%d Ln %d", retVal, __LINE__)); PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartRecvOperation RecvFrom FAILED - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); status = PVMFFailure; goto Exit; } PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartRecvOperation RecvFrom Pending - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); status = PVMFPending; //wait on the socket callback HandleSocketEvent } else if (aSockConfig.iTCPSocket) { aSockConfig.iState.iRecvOperation = EPVSocketPortRecvOperation_Recv; //Allocate memory int32 err; OsclSharedPtr mediaDataImpl; mediaDataImpl = aSockConfig.iMemPool->getMediaDataImpl(iMaxTcpRecvBufferSize); if (NULL == mediaDataImpl.GetRep()) { StartRecvWaitOnMemory(aSockConfig, iMaxTcpRecvBufferSize); status = PVMFPending; //wait on memory pool callback "notifyfreechunkavailable" goto Exit; } OSCL_TRY(err, aSockConfig.iPendingRecvMediaData = PVMFMediaData::createMediaData(mediaDataImpl, aSockConfig.iMemPool->iMediaDataMemPool);); if (err != OsclErrNone) { StartRecvWaitOnMemory(aSockConfig); status = PVMFPending; //wait on memory pool callback "notifyfreechunkavailable" goto Exit; } // Retrieve memory fragment to write to OsclRefCounterMemFrag refCtrMemFragOut; aSockConfig.iPendingRecvMediaData->getMediaFragment(0, refCtrMemFragOut); //Issue the Oscl socket request. #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumRecv++; aSockConfig.iPortStats.StartRecvTime(EPVSocketPortRecvOperation_Recv); #endif TPVSocketEvent retVal = aSockConfig.iTCPSocket->Recv((uint8*)refCtrMemFragOut.getMemFragPtr() , refCtrMemFragOut.getCapacity() , TIMEOUT_RECV); if (EPVSocketPending != retVal) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::Recv() failed retVal=%d Ln %d", retVal, __LINE__)); PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartRecvOperation Recv FAILED - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); status = PVMFFailure; goto Exit; } PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartRecvOperation Recv Pending - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); status = PVMFPending; //wait on HandleSocketEvent callback. } else { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::StartRecvOperation() Unexpected error, no socket")); } Exit: //Handle synchronous completion or failure. if (status != PVMFPending && status != PVMFFailure) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::StartRecvOperation: Error. status=%d", status)); } if (status != PVMFPending) status = RecvOperationComplete(aSockConfig, status, NULL); return status; } //The current recv operation is complete. Process the result and continue with the next operation. //The status is in "aStatus". //If there was a socket callback, the result is in "aSocketActivity". PVMFStatus PVMFSocketNode::RecvOperationComplete(SocketPortConfig& aSockConfig, PVMFStatus aStatus, PVMFSocketActivity* aSocketActivity) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::RecvOperationComplete() In")); //status should never be pending here OSCL_ASSERT(aStatus != PVMFPending); PVMFStatus status = aStatus; bool recvOperationCanceled = aSockConfig.iState.iRecvOperationCanceled; //Update the state TPVSocketPortRecvOperation curOp = aSockConfig.iState.iRecvOperation; aSockConfig.iState.iRecvOperation = EPVSocketPortRecvOperation_None; aSockConfig.iState.iRecvOperationStatus = aStatus; aSockConfig.iState.iRecvOperationCanceled = false; switch (curOp) { case EPVSocketPortRecvOperation_WaitOnMemory: //a memory wait is complete. PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::RecvOperationComplete WaitOnMemory - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); break; case EPVSocketPortRecvOperation_WaitOnConnectedPort: //a port wait is complete PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::RecvOperationComplete WaitOnConnectedPort - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); //there may be a prior recv or recvfrom result that was waiting on this if (aSockConfig.iSocketRecvActivity.iValid) { aSockConfig.iSocketRecvActivity.iValid = false; switch (aSockConfig.iSocketRecvActivity.iFxn) { case EPVSocketRecv: HandleRecvComplete(aSockConfig , aSockConfig.iSocketRecvActivity.iStatus , &aSockConfig.iSocketRecvActivity, recvOperationCanceled); break; case EPVSocketRecvFrom: HandleRecvFromComplete(aSockConfig , aSockConfig.iSocketRecvActivity.iStatus , &aSockConfig.iSocketRecvActivity, recvOperationCanceled); break; default: OSCL_ASSERT(0);//invalid arg break; } } break; case EPVSocketPortRecvOperation_Recv: //TCP receive is complete PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::RecvOperationComplete Recv - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); HandleRecvComplete(aSockConfig, aStatus, aSocketActivity, recvOperationCanceled); break; case EPVSocketPortRecvOperation_RecvFrom: //UDP receive from is complete PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::RecvOperationComplete RecvFrom - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); HandleRecvFromComplete(aSockConfig, aStatus, aSocketActivity, recvOperationCanceled); break; default: OSCL_ASSERT(0);//add code for this case break; } //Receive may have been blocked waiting on this operation to complete, //so start/resume receives now if possible if (CanReceive(aSockConfig)) status = StartRecvOperation(aSockConfig); //Detect connect "Wait on send/recv idle" complete if (aSockConfig.iState.iConnectOperation == EPVSocketPortConnectOperation_WaitOnSendRecvIdle && aSockConfig.iState.iRecvOperation == EPVSocketPortRecvOperation_None && aSockConfig.iState.iSendOperation == EPVSocketPortSendOperation_None) { ConnectOperationComplete(aSockConfig, PVMFSuccess, NULL); } return status; } //Cancel any current receive operation. //Return Success if the operation is canceled or Pending if we need to wait. PVMFStatus PVMFSocketNode::CancelRecvOperation(SocketPortConfig& aSockConfig) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::CancelRecvOperation() In")); PVMFStatus status = PVMFSuccess; switch (aSockConfig.iState.iRecvOperation) { case EPVSocketPortRecvOperation_None: break; case EPVSocketPortRecvOperation_Recv: if (aSockConfig.iTCPSocket) { if (!aSockConfig.iState.iRecvOperationCanceled) { aSockConfig.iState.iRecvOperationCanceled = true; PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CancelRecvOperation Calling CancelRecv - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iTCPSocket->CancelRecv(); } status = PVMFPending; //wait on recv to complete in HandleSocketEvent } break; case EPVSocketPortRecvOperation_RecvFrom: if (aSockConfig.iUDPSocket) { if (!aSockConfig.iState.iRecvOperationCanceled) { aSockConfig.iState.iRecvOperationCanceled = true; PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CancelRecvOperation Calling CancelRecvFrom - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iUDPSocket->CancelRecvFrom(); } status = PVMFPending; //wait on recvfrom to complete in HandleSocketEvent } break; case EPVSocketPortRecvOperation_WaitOnConnectedPort: //just clear the state PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CancelRecvOperation Cancelling WaitOnConnectedPort - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iState.iRecvOperation = EPVSocketPortRecvOperation_None; //also discard the received data and the associated recv activity if (aSockConfig.iSocketRecvActivity.iValid) aSockConfig.iSocketRecvActivity.iValid = false; if (aSockConfig.iPendingRecvMediaData.GetRep()) aSockConfig.iPendingRecvMediaData.Unbind(); break; case EPVSocketPortRecvOperation_WaitOnMemory: PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CancelRecvOperation Cancelling WaitOnMemory - SockId=%d, Mime=%s ", aSockConfig.iSockId, aSockConfig.iMime.get_str())); if (aSockConfig.iMemPool != NULL) { if (aSockConfig.iMemPool->iInternalAlloc) aSockConfig.iMemPool->iInternalAlloc->CancelFreeChunkAvailableCallback(); if (aSockConfig.iMemPool->iSharedBufferAlloc) aSockConfig.iMemPool->iSharedBufferAlloc->CancelFreeChunkAvailableCallback(); } //clear the state aSockConfig.iState.iRecvOperation = EPVSocketPortRecvOperation_None; break; default: OSCL_ASSERT(0);//add code for this case status = PVMFFailure; break; } return status; } //a memory pool callback void SocketPortConfig::freechunkavailable(OsclAny* aContextData) { #if(ENABLE_SOCKET_NODE_STATS) iPortStats.iNumFreeChunkCallback++; #endif OSCL_UNUSED_ARG(aContextData); //complete the "wait on memory" state if (iState.iRecvOperation == EPVSocketPortRecvOperation_WaitOnMemory) { #if(ENABLE_SOCKET_NODE_STATS) iPortStats.EndRecvTime(iState.iRecvOperation); #endif iContainer->RecvOperationComplete(*this, PVMFSuccess, NULL); } } void SocketPortConfig::freeblockavailable(OsclAny* aContextData) { freechunkavailable(aContextData); } //Handler for a "recv" complete. This handles two input cases: // - Operation failed to initiate. Status will be in aStatus and aSocketActivity will be NULL. // - Operation completed asynchronously via HandleSocketEvent callback. Status will be in aStatus // and socket callback result will be in aSocketActivity. // //The end result of processing will be one of the following: // - Received data sent to connected port // - Result queued on socket activity queue for later processing. // - Error or cancellation processed. void PVMFSocketNode::HandleRecvComplete(SocketPortConfig& aSockConfig, PVMFStatus aStatus, PVMFSocketActivity* aSocketActivity, bool aRecvOperationCanceled) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::HandleRecvComplete() In")); //operation should be complete when this is called. OSCL_ASSERT(aStatus != PVMFPending); int32 sockActivityEvent = aSocketActivity->iEvent; PVMFStatus status = aStatus; int32 tmplen; aSockConfig.iTCPSocket->GetRecvData(&tmplen); // If data length is <= zero and aSocketActivity->iEvent is EPVSocketSuccess, it should be treated as failure. // On some platforms, its possible to get data length as zero and aSocketActivity->iEvent as EPVSocketSuccess. if (EPVSocketSuccess == sockActivityEvent && tmplen <= 0) { PVMF_SOCKETNODE_LOGINFO((0, "PVMFSocketNode::HandleRecvFromComplete() Sucessful Recv With Zero Length. Treating as failure.")); sockActivityEvent = EPVSocketFailure; status = PVMFFailure; } //If there's no socket activity input, then this must be a failure in initiating //a Recv operation. if (!aSocketActivity) { OSCL_ASSERT(status != PVMFSuccess); PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvComplete: Request Failed - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeErrorSocketFailure); //release media data if (aSockConfig.iPendingRecvMediaData.GetRep()) aSockConfig.iPendingRecvMediaData.Unbind(); return; } //Otherwise this is a result from Oscl Sockets of a Recv call. //See whether we can send this data to the connected port if (sockActivityEvent == EPVSocketSuccess) { if (!aSockConfig.iPVMFPort || !aSockConfig.iPVMFPort->IsConnected()) { //port already disconnected-- just discard the data if (aSockConfig.iPendingRecvMediaData.GetRep()) aSockConfig.iPendingRecvMediaData.Unbind(); return; } if (aSockConfig.iPVMFPort->IsOutgoingQueueBusy()) { //wait on port so we can send recv data. StartRecvWaitOnConnectedPort(aSockConfig, *aSocketActivity); return; } } //If we get here then it's time to process the recv result. //Release media data on failure if (status != PVMFSuccess) { if (aSockConfig.iPendingRecvMediaData.GetRep()) aSockConfig.iPendingRecvMediaData.Unbind(); } switch (sockActivityEvent) { case EPVSocketSuccess: { //Get data length and set media buffer size int32 len; aSockConfig.iTCPSocket->GetRecvData(&len); aSockConfig.iPendingRecvMediaData->setMediaFragFilledLen(0, len); // Resize the buffer if (aSockConfig.iMemPool) { OsclSharedPtr mediaMsgImpl; aSockConfig.iPendingRecvMediaData->getMediaDataImpl(mediaMsgImpl); aSockConfig.iMemPool->resizeSocketDataBuffer(mediaMsgImpl); } else { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::HandleRecvComplete() ERROR:mempool not found")); OSCL_ASSERT(0); return ; } // set system time as timestamp TimeValue currentTime; currentTime.set_to_current_time(); int32 currentMilliSec = currentTime.to_msec(); aSockConfig.iPendingRecvMediaData->setTimestamp((PVMFTimestamp)currentMilliSec); //queue to next port PVMFSharedMediaMsgPtr aMediaMsgPtr; convertToPVMFMediaMsg(aMediaMsgPtr, aSockConfig.iPendingRecvMediaData); #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumQueueOutgoingTCPMsg++; #endif PVMFStatus status = aSockConfig.iPVMFPort->QueueOutgoingMsg(aMediaMsgPtr); if (status != PVMFSuccess) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvComplete: QueueOutgoingMsg Failed - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::HandleRecvComplete() ERROR:%d, Outgoing queue size=%d. Data discarded!", status, aSockConfig.iPVMFPort->OutgoingMsgQueueSize())); ReportErrorEvent(PVMFErrPortProcessing); return ; } PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::HandleRecvComplete: Request Succcess - SockId=%d, Mime=%s, Len=%d", aSockConfig.iSockId, aSockConfig.iMime.get_str(), len)); } break; case EPVSocketTimeout: { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvComplete: Request TimedOut - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::HandleRecvComplete() ERROR EPVSocketTimeout Ln %d", __LINE__)); ReportSocketNodeError(PVMFErrTimeout, PVMFSocketNodeErrorSocketTimeOut); } break; case EPVSocketCancel: //On some OS we may get EPVSocketCancel for recv failures, so check whether this //was response to a cancel command or not. if (aRecvOperationCanceled) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvComplete: Request Cancelled - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); PVMF_SOCKETNODE_LOGINFO((0, "PVMFSocketNode::HandleRecvComplete() EPVSocketCancel")); break; } //else fallthrough to the failure processing... case EPVSocketFailure: { //After a receive failure, we may need to do a TCP shutdown. //Check what else is currently happening on the port. switch (aSockConfig.iState.iSequence) { case EPVSocketPortSequence_RequestPort: case EPVSocketPortSequence_InputConnectMsg: case EPVSocketPortSequence_InputDisconnectMsg: case EPVSocketPortSequence_SocketCleanup: //some of these conditions may be impossible, but in any case, //it doesn't make sense to start a shutdown sequence during //any of these. Ignore the error. PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvComplete: Request Failed, ignoring - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); break; case EPVSocketPortSequence_None: case EPVSocketPortSequence_InputDataMsg: //for these cases, start a shutdown sequence PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvComplete: Request Failed, starting shutdown sequence - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); //start the sequence StartSequence(aSockConfig, EPVSocketPortSequence_SocketCleanup); break; default: //need code to handle this case. OSCL_ASSERT(0); break; } } break; default: OSCL_ASSERT(0); break; } } //Handler for a "recv from" complete. This handles two input cases: // - Operation failed to initiate. Status will be in aStatus and aSocketActivity will be NULL. // - Operation completed asynchronously via HandleSocketEvent callback. Status will be in aStatus // and socket callback result will be in aSocketActivity. // //The end result of processing will be one of the following: // - Received data sent to connected port // - Result queued on socket activity queue for later processing. // - Error or cancellation processed. void PVMFSocketNode::HandleRecvFromComplete(SocketPortConfig& aSockConfig, PVMFStatus aStatus, PVMFSocketActivity* aSocketActivity, bool aRecvOperationCanceled) { OSCL_UNUSED_ARG(aStatus); PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::HandleRecvFromComplete() In")); //operation should be complete when this is called. OSCL_ASSERT(aStatus != PVMFPending); //If there's no socket activity input, then this must be a failure in initiating //a RecvFrom operation. if (!aSocketActivity) { OSCL_ASSERT(aStatus != PVMFSuccess); PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvFromComplete: Request Failed - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeErrorSocketFailure); //release media data if (aSockConfig.iPendingRecvMediaData.GetRep()) aSockConfig.iPendingRecvMediaData.Unbind(); return; } //Otherwise this is a result from Oscl Sockets of a RecvFrom call. //Handle "truncated packet" error, in which case we still have data even though //the result is EPVSocketFailure. int32 dataLen = 0; uint8* dataPtr = NULL; if (aSocketActivity->iEvent == EPVSocketFailure || aSocketActivity->iEvent == EPVSocketSuccess) { if (aSockConfig.iUDPSocket) dataPtr = aSockConfig.iUDPSocket->GetRecvData(&dataLen); #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumRecvFromPackets += aSockConfig.iRecvFromPacketLen.size(); if (aSockConfig.iRecvFromPacketLen.size() > aSockConfig.iPortStats.iMaxRecvFromPackets) aSockConfig.iPortStats.iMaxRecvFromPackets = aSockConfig.iRecvFromPacketLen.size(); #endif } //Handle cases where we got some data if (dataLen > 0) { //See if we can pass this data along to the connected port. If not, then queue the //result for later processing. if (aSockConfig.iPVMFPort->IsOutgoingQueueBusy()) { //outgoing queue is busy-- must queue this for later. StartRecvWaitOnConnectedPort(aSockConfig, *aSocketActivity); } else { //report truncated packet. if (aSocketActivity->iEvent == EPVSocketFailure) { PVUuid eventuuid = PVMFSocketNodeEventTypeUUID; int32 infocode = PVMFSocketNodeInfoEventPacketTruncated; ReportInfoEvent(PVMFErrCorrupt, NULL, &eventuuid, &infocode); } //set the media buffer size. aSockConfig.iPendingRecvMediaData->setMediaFragFilledLen(0, dataLen); //resize the buffer. if (aSockConfig.iMemPool) { OsclSharedPtr mediaMsgImpl; aSockConfig.iPendingRecvMediaData->getMediaDataImpl(mediaMsgImpl); aSockConfig.iMemPool->resizeSocketDataBuffer(mediaMsgImpl); } else { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_ERR, (0, "PVMFSocketNode::HandleRecvFromComplete() ERROR:mempool not found")); PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvFromComplete: ERROR mempool not found - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); return;//unexpected, cleanup } #if SNODE_ENABLE_UDP_MULTI_PACKET //The media buffer may contain multiple UDP packets. We need to //re-package these into a media frag group. //Allocate a new media frag group. int32 err; OsclSharedPtr< PVMFMediaDataImpl > mediaFragGroup; err = Allocate(aSockConfig, mediaFragGroup); if (err != OsclErrNone) { //Unexpected error. With current usage by JB node, this pool should //never run out. PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvFromComplete: Unexpected mempool error - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeErrorSocketFailure); //release media data if (aSockConfig.iPendingRecvMediaData.GetRep()) aSockConfig.iPendingRecvMediaData.Unbind(); return; } //get the single frag data buffer pointer. OsclRefCounterMemFrag singleFrag; aSockConfig.iPendingRecvMediaData->getMediaFragment(0, singleFrag); for (uint32 i = 0; i < aSockConfig.iRecvFromPacketLen.size(); dataPtr += aSockConfig.iRecvFromPacketLen[i++]) { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::HandleRecvFromComplete: Fragment %d Len %d - SockId=%d, Mime=%s" , i, aSockConfig.iRecvFromPacketLen[i], aSockConfig.iSockId, aSockConfig.iMime.get_str())); //Create a new fragment with the existing refcount //and the proper packet boundaries. OsclRefCounterMemFrag multiFrag(singleFrag); //now adjust the single frag to the actual packet boundary. multiFrag.getMemFrag().ptr = dataPtr; multiFrag.getMemFrag().len = aSockConfig.iRecvFromPacketLen[i]; mediaFragGroup->appendMediaFragment(multiFrag); #if (PVLOGGER_INST_LEVEL > PVLOGMSG_INST_LLDBG) if (aSockConfig.iRTP) { LogRTPHeaderFields(aSockConfig, multiFrag); } else if (aSockConfig.iRTCP) { LogRTCPHeaderFields(aSockConfig, multiFrag); } #endif } //Replace the original media data message with the new one. //since we've linked the new frags to the original refcounter, we //can just unbind the current message then re-allocate it with the //new data. aSockConfig.iPendingRecvMediaData.Unbind(); err = CreateMediaData(aSockConfig, mediaFragGroup); if (err != OsclErrNone) { //unexpected since we just freed one message PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvFromComplete: Unexpected mempool error - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeErrorSocketFailure); return; } #else //SNODE_ENABLE_UDP_MULTI_PACKET #if (PVLOGGER_INST_LEVEL > PVLOGMSG_INST_LLDBG) if (aSockConfig.iRTP) { OsclRefCounterMemFrag memfrag; aSockConfig.iPendingRecvMediaData->getMediaFragment(0, memfrag); LogRTPHeaderFields(aSockConfig, memfrag); } else if (aSockConfig.iRTCP) { OsclRefCounterMemFrag memfrag; aSockConfig.iPendingRecvMediaData->getMediaFragment(0, memfrag); LogRTCPHeaderFields(aSockConfig, memfrag); } #endif #endif //SNODE_ENABLE_UDP_MULTI_PACKET //push the received data to the connected port PVMFSharedMediaMsgPtr mediaMsgPtr; convertToPVMFMediaMsg(mediaMsgPtr, aSockConfig.iPendingRecvMediaData); #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumQueueOutgoingUDPMsg++; #endif PVMFStatus status = aSockConfig.iPVMFPort->QueueOutgoingMsg(mediaMsgPtr); if (status != PVMFSuccess) { //should never get here because we already checked outgoign queue earlier. PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::HandleRecvFromComplete() ERROR:%d, Outgoing queue size=%d. Data discarded!", status, aSockConfig.iPVMFPort->OutgoingMsgQueueSize())); ReportErrorEvent(PVMFInfoOverflow); } PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::HandleRecvFromComplete: Request Succcess - SockId=%d, Mime=%s, Len=%d", aSockConfig.iSockId, aSockConfig.iMime.get_str(), dataLen)); } } else { //handle error, cancel, and timeout cases. //release media data. if (aSockConfig.iPendingRecvMediaData.GetRep()) aSockConfig.iPendingRecvMediaData.Unbind(); //report error events. switch (aSocketActivity->iEvent) { case EPVSocketCancel: //On some OS we may get EPVSocketCancel for recv failures, so check whether this //was response to a cancel command or not. if (aRecvOperationCanceled) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvFromComplete: Request Cancelled - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); break; } //else fallthrough to failure processing... case EPVSocketFailure: //report RTP failures. //Note: do not report RTCP errors, because some servers send zero-byte packets //and we want to be tolerant of that case. if (aSockConfig.iTag == PVMF_SOCKET_NODE_PORT_TYPE_SOURCE) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvFromComplete: Request Failed - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeErrorSocketFailure); } break; case EPVSocketTimeout: { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::HandleRecvFromComplete: Request TimedOut - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::HandleRecvFromComplete() ERROR EPVSocketTimeout Ln %d", __LINE__)); ReportSocketNodeError(PVMFErrTimeout, PVMFSocketNodeErrorSocketTimeOut); } break; default: break; } } } ////////////////////////////////// // End Receive Operation Handlers ////////////////////////////////// ////////////////////////////////// // Send Operation Handlers ////////////////////////////////// //Start a send operation on the port's socket. //Assume the caller already verified that the port can send now. //If there is a failure, this routine will call the "send operation complete" routine to //do error handling or continue to the next state. PVMFStatus PVMFSocketNode::StartSendOperation(SocketPortConfig& aSockConfig, PVMFSharedMediaMsgPtr& aMsg) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::StartSendOperation() In")); //caller should have verified we can send now. OSCL_ASSERT(aSockConfig.iState.iSendOperation == EPVSocketPortSendOperation_None); //caller should provide a media data message as input OSCL_ASSERT(aMsg->getFormatID() == PVMF_MEDIA_MSG_DATA_FORMAT_ID); //there should be either a UDP or TCP socket on this port if (!aSockConfig.iTCPSocket && !aSockConfig.iUDPSocket) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartSendOperation ERROR no socket - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); return PVMFFailure; } aSockConfig.iPVMFPort->iNumFramesConsumed++; // Retrieve memory fragment to write to if (aSockConfig.iPendingSendMediaData.GetRep()) aSockConfig.iPendingSendMediaData.Unbind(); convertToPVMFMediaData(aSockConfig.iPendingSendMediaData, aMsg); OsclRefCounterMemFrag refCtrMemFragOut; aSockConfig.iPendingSendMediaData->getMediaFragment(0, refCtrMemFragOut); PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartSendOperation - SockId=%d, Mime=%s, Len=%d", aSockConfig.iSockId, aSockConfig.iMime.get_str(), refCtrMemFragOut.getMemFragSize())); PVMFStatus status = PVMFFailure; if (aSockConfig.iUDPSocket) { aSockConfig.iState.iSendOperation = EPVSocketPortSendOperation_SendTo; #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumSendTo++; aSockConfig.iPortStats.StartSendTime(aSockConfig.iState.iSendOperation); #endif TPVSocketEvent retVal = aSockConfig.iUDPSocket->SendTo((uint8*)refCtrMemFragOut.getMemFragPtr() , refCtrMemFragOut.getMemFragSize() , aSockConfig.iAddr.iRemoteAdd , TIMEOUT_SENDTO); if (retVal == EPVSocketPending) status = PVMFPending;//wait on HandleSocketEvent callback else status = PVMFFailure; if (PVMFPending != status) { //error PVMF_SOCKETNODE_LOGERROR((0, "0x%x PVMFSocketNode::SendTo() failed retVal=%d Ln %d", this, retVal, __LINE__)); PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::StartSendOperation: UDP - Error. status=%d", status)); PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartSendOperation SendTo FAILED - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); } else { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartSendOperation SendTo Pending - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); } } else if (aSockConfig.iTCPSocket) { aSockConfig.iState.iSendOperation = EPVSocketPortSendOperation_Send; #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumSend++; aSockConfig.iPortStats.StartSendTime(aSockConfig.iState.iSendOperation); #endif TPVSocketEvent retVal = aSockConfig.iTCPSocket->Send((uint8*)refCtrMemFragOut.getMemFragPtr(), refCtrMemFragOut.getMemFragSize(), TIMEOUT_SEND); if (retVal == EPVSocketPending) status = PVMFPending;//wait on HandleSocketEvent callback else status = PVMFFailure; if (PVMFPending != status) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::StartSendOperation: TCP - Error. status=%d", status)); PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::StartSendOperation Calling Send FAILED - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); } else { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartSendOperation Send Pending - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); } } //Handle synchronous completion or failures if (status != PVMFPending) status = SendOperationComplete(aSockConfig, status, NULL); return status; } //The current send operation is complete and status is in "aStatus". //If there was a socket callback, the result is in "aSocketActivity". //Process the result and continue with the next operation if needed. PVMFStatus PVMFSocketNode::SendOperationComplete(SocketPortConfig& aSockConfig, PVMFStatus aStatus, PVMFSocketActivity* aSocketActivity) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::SendOperationComplete()")); OSCL_ASSERT(aStatus != PVMFPending); PVMFStatus status = aStatus; //Update the send state TPVSocketPortSendOperation curOp = aSockConfig.iState.iSendOperation; aSockConfig.iState.iSendOperation = EPVSocketPortSendOperation_None; aSockConfig.iState.iSendOperationStatus = aStatus; aSockConfig.iState.iSendOperationCanceled = false; //Release the media buffer after each send operation, regardless of success/fail. if (aSockConfig.iPendingSendMediaData.GetRep() != NULL) aSockConfig.iPendingSendMediaData.Unbind(); if (aSocketActivity) { //Datapath logging switch (aSocketActivity->iEvent) { case EPVSocketSuccess: PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::SendOperationComplete - Success - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); break; case EPVSocketTimeout: PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::SendOperationComplete - TimeOut - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); break; case EPVSocketFailure: PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::SendOperationComplete - Failed - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); break; case EPVSocketCancel: PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::SendOperationComplete - Cancelled - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); break; default: OSCL_ASSERT(0); break; } } //report TCP errors. if (aStatus != PVMFSuccess && curOp == EPVSocketPortSendOperation_Send) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::SendOperationComplete - Send Failed - " "SockId=%d, Mime=%s, Operaton=%d", aSockConfig.iSockId, aSockConfig.iMime.get_str(), aSockConfig.iState.iSendOperation)); PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::SendOperationComplete: Send/SendTo() ERROR status=%d, Ln %d", status, __LINE__)); ReportSocketNodeError(PVMFErrResource, PVMFSocketNodeErrorTCPSocketSendError); } //This completes an input data message sequence if (aSockConfig.iState.iSequence == EPVSocketPortSequence_InputDataMsg) SequenceComplete(aSockConfig, aStatus); //Detect connect "Wait on send/recv idle" complete if (aSockConfig.iState.iConnectOperation == EPVSocketPortConnectOperation_WaitOnSendRecvIdle && aSockConfig.iState.iRecvOperation == EPVSocketPortRecvOperation_None && aSockConfig.iState.iSendOperation == EPVSocketPortSendOperation_None) { ConnectOperationComplete(aSockConfig, PVMFSuccess, NULL); } return status; } //Cancel the current send operation. //Return Success if the operation is canceled, or Pending if we need to wait. PVMFStatus PVMFSocketNode::CancelSendOperation(SocketPortConfig& aSockConfig) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::CancelSendOperation()")); PVMFStatus status = PVMFSuccess; switch (aSockConfig.iState.iSendOperation) { case EPVSocketPortSendOperation_None: break; case EPVSocketPortSendOperation_Send: if (aSockConfig.iTCPSocket) { if (!aSockConfig.iState.iSendOperationCanceled) { aSockConfig.iState.iSendOperationCanceled = true; PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CancelSendOperation Calling CancelSend - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iTCPSocket->CancelSend(); } status = PVMFPending; //wait on send to complete in HandleSocketEvent } break; case EPVSocketPortSendOperation_SendTo: if (aSockConfig.iUDPSocket) { if (!aSockConfig.iState.iSendOperationCanceled) { aSockConfig.iState.iSendOperationCanceled = true; PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CancelSendOperation Calling CancelSendTo - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); aSockConfig.iUDPSocket->CancelSendTo(); } status = PVMFPending; //wait on sendto to complete in HandleSocketEvent } break; default: OSCL_ASSERT(0); status = PVMFFailure; break; } return status; } ////////////////////////////////// // End Send Operation Handlers ////////////////////////////////// ////////////////////////////////////// // Sequence Handler ////////////////////////////////////// static const char* const TPVSocketPortSequenceStr[] = { "EPVSocketPortSequence_None" , "EPVSocketPortSequence_RequestPort" , "EPVSocketPortSequence_InputConnectMsg" , "EPVSocketPortSequence_InputDataMsg" , "EPVSocketPortSequence_InputDisconnectMsg" , "EPVSocketPortSequence_SocketCleanup" , "EPVSocketPortSequence_Last" } ; //Start a sequence on a port. //This routine will complete all steps possible unless some operation has //asynchronous completion. //This call may actually complete the sequence. //This routine documents the series of steps for each sequence. PVMFStatus PVMFSocketNode::StartSequence(SocketPortConfig& aSockConfig, TPVSocketPortSequence aSequence, OsclAny* aParam) { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartSequence, '%s' - SockId=%d, Mime=%s" , TPVSocketPortSequenceStr[aSequence] , aSockConfig.iSockId, aSockConfig.iMime.get_cstr())); //Set the sequence. aSockConfig.iState.iSequence = aSequence; switch (aSequence) { case EPVSocketPortSequence_RequestPort: //Sequence: // 1. GetHostByName, // 2. Connect // 3. Command complete. return StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_GetHostByName); case EPVSocketPortSequence_InputConnectMsg: //Sequence: // If already connected, // 1. Shutdown // 2. Cancel send/recv operation if needed. // 3. Wait for send/recv idle if needed. // 4. Wait for connected port if needed. // 5. Close socket + send EOS to connected port + delete socket // or add to cleanup queue & trigger AO. // 6. Create // 7. GetHostByName // 8. Connect // Else if not already connected, // 1. GetHostByName // 2. Connect if (aSockConfig.iTCPSocket) { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::StartSequence - Disconnecting before Connect SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_cstr())); return StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_Shutdown); } else { return StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_GetHostByName); } break; case EPVSocketPortSequence_InputDataMsg: //Sequence: // 1. Send OSCL_ASSERT(aParam); return StartSendOperation(aSockConfig, *((PVMFSharedMediaMsgPtr*)aParam)); case EPVSocketPortSequence_InputDisconnectMsg: //Sequence: // 1. Shutdown // 2. Cancel send/recv operation if needed. // 3. Wait for send/recv idle if needed. // 4. Wait for connected port if needed. // 5. Close socket + send EOS to connected port + delete socket // or add to cleanup queue & trigger AO. return StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_Shutdown); case EPVSocketPortSequence_SocketCleanup: if (aSockConfig.iTCPSocket) { //TCP Shutdown Sequence: // 1. Cancel current connect operation if needed. // 2. Cancel send/recv operation if needed. // 3. Shutdown // 4. Wait for send/recv idle if needed. // 5. Wait for connected port if needed. // 6. Close socket + send EOS to connected port + delete socket // or add to cleanup queue & trigger AO. if (aSockConfig.iState.iConnectOperation == EPVSocketPortConnectOperation_Shutdown) { return PVMFPending;//wait on completion in HandleSocketEvent } if (CancelConnectOperation(aSockConfig) == PVMFPending) { return PVMFPending;//wait on connect operation to cancel //(in HandleSocketEvent or HandleDNSEvent) } else { //Go ahead and start any send/recv cancel since they can happen simultaneously //with the Shutdown operation. CancelRecvOperation(aSockConfig); CancelSendOperation(aSockConfig); return StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_Shutdown); } } else if (aSockConfig.iUDPSocket) { //UDP Shutdown Sequence: // 1. Cancel all current operations // 2. Wait for send/recv idle if needed. // 3. Close socket + delete socket or add to cleanup queue & trigger AO. switch (aSockConfig.iState.iConnectOperation) { case EPVSocketPortConnectOperation_WaitOnSendRecvIdle: //keep waiting for completion (in RecvOperationComplete or SendOperationComplete) return PVMFPending; case EPVSocketPortConnectOperation_None: CancelRecvOperation(aSockConfig); CancelSendOperation(aSockConfig); return StartConnectOperation(aSockConfig, EPVSocketPortConnectOperation_WaitOnSendRecvIdle); default: //unexpected OSCL_ASSERT(0); return PVMFFailure; } } //else { //nothing needed. SequenceComplete(aSockConfig, PVMFSuccess); return PVMFSuccess; } break; default: //add code to handle this case OSCL_ASSERT(0); return PVMFFailure; } } //The current multi-operation sequence is complete. //Process the result and continue with the next operation if needed. void PVMFSocketNode::SequenceComplete(SocketPortConfig& aSockConfig, PVMFStatus aStatus) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::SequenceComplete() Sequence %d Status %d ", aSockConfig.iState.iSequence, aStatus)); PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::SequenceComplete '%s' - SockId=%d, Mime=%s" , TPVSocketPortSequenceStr[aSockConfig.iState.iSequence] , aSockConfig.iSockId, aSockConfig.iMime.get_str())); //Update the state TPVSocketPortSequence curSequence = aSockConfig.iState.iSequence; aSockConfig.iState.iSequence = EPVSocketPortSequence_None; aSockConfig.iState.iSequenceStatus = aStatus; switch (curSequence) { case EPVSocketPortSequence_RequestPort: //may need to complete the node command if (iCurrentCmdQueue.size() && iCurrentCmdQueue.front().iCmd == PVMF_GENERIC_NODE_REQUESTPORT) { //@TODO would be nice to cleanup the allocated port in case of a failure. CommandComplete(iCurrentCmdQueue, iCurrentCmdQueue.front(), aStatus, iRequestedPort); } break; case EPVSocketPortSequence_InputConnectMsg: case EPVSocketPortSequence_InputDisconnectMsg: case EPVSocketPortSequence_InputDataMsg: //If we've just completed an input port message, this may complete a node Flush command if (!iCurrentCmdQueue.empty() && iCurrentCmdQueue.front().iCmd == PVMF_GENERIC_NODE_FLUSH) { //Flush is done when all input ports are empty. for (uint32 i = 0; i < iPortVector.size(); i++) { if (iPortVector[i]->IncomingMsgQueueSize() > 0) return;//keep waiting } //All ports empty-- complete the Flush. CommandComplete(iCurrentCmdQueue, iCurrentCmdQueue.front(), PVMFSuccess); } break; case EPVSocketPortSequence_SocketCleanup: //When socket cleanup is complete, decrement the "stop node activity" //counter and look for completion of all ports. //This port is done-- decrement the counter iNumStopPortActivityPending--; //When counter reaches zero, all ports are done and the sequence is complete. if (iNumStopPortActivityPending == 0) { //Reset the counter to the "idle" value. iNumStopPortActivityPending = (-1); //There may be a Reset or CancelAll command waiting on //this to complete. if (!iCurrentCmdQueue.empty() && iCurrentCmdQueue.front().iCmd == PVMF_GENERIC_NODE_RESET) { CommandComplete(iCurrentCmdQueue, iCurrentCmdQueue.front(), PVMFSuccess); } else if (!iCancelCmdQueue.empty() && iCancelCmdQueue.front().iCmd == PVMF_GENERIC_NODE_CANCELALLCOMMANDS) { CommandComplete(iCancelCmdQueue, iCancelCmdQueue.front(), PVMFSuccess); } //ports may be cleaned up now, so exit this routine now to //avoid a crash from further port processing. return; } break; default: break; } //All sequences other than input data message will //block receive operations, so may need to start or resume receives now. if (curSequence != EPVSocketPortSequence_InputDataMsg && (aStatus == PVMFSuccess) && CanReceive(aSockConfig)) { StartRecvOperation(aSockConfig); } //Input message processing may have been blocked waiting on //this sequence to complete, so resume now. if (CanProcessIncomingMsg(aSockConfig) && (aStatus == PVMFSuccess)) { ProcessIncomingMsg(aSockConfig); } } ////////////////////////////////////// // End Sequence Handler ////////////////////////////////////// ////////////////////////////////////// // Command Handlers ////////////////////////////////////// //See if we can process a new input command right now. bool PVMFSocketNode::CanProcessCommand() { //We may need to wait on //1) a new command to arrive //2) some prior command to complete //Note: this allows a Cancel to interrupt the current command, but it does //not allow a Cancel to interrupt another cancel. return (!iPendingCmdQueue.empty() && (iCurrentCmdQueue.empty() || (iPendingCmdQueue.front().hipri() && iCancelCmdQueue.empty()))); } //Process an input command. void PVMFSocketNode::ProcessCommand(PVMFSocketNodeCmdQ& aCmdQ, PVMFSocketNodeCommand& aCmd) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNodeCommand::ProcessCommand() in")); PVMFStatus status = PVMFFailure; iCommandErrorCode = PVMFSocketNodeErrorEventStart; //no error if (aCmd.hipri()) { //calling logic should prevent multiple cancels in progress. OSCL_ASSERT(iCancelCmdQueue.empty()); switch (aCmd.iCmd) { case PVMF_GENERIC_NODE_CANCELALLCOMMANDS: status = DoCancelAllCommands(aCmd); break; case PVMF_GENERIC_NODE_CANCELCOMMAND: status = DoCancelCommand(aCmd); break; default://unknown command type status = PVMFFailure; break; } //If the command was not finished in this call, then move it to the //"cancel cmd" queue where it will remain until it is completed. if (status == PVMFPending) { //move the new cmd to the "cancel cmd" queue where it will //remain until complete. int32 err; OSCL_TRY(err, iCancelCmdQueue.StoreL(aCmd);); //we reserved space in the CancelCmdQueue already, so //it should not be possible to fail here. OSCL_ASSERT(err == OsclErrNone); //erase the original command aCmdQ.Erase(&aCmd); } else { CommandComplete(aCmdQ, aCmd, status, NULL); } } else { //calling logic should prevent multiple commands in progress. OSCL_ASSERT(iCurrentCmdQueue.empty()); OsclAny* eventData = NULL; switch (aCmd.iCmd) { case PVMF_GENERIC_NODE_QUERYUUID: status = DoQueryUuid(aCmd); break; case PVMF_GENERIC_NODE_QUERYINTERFACE: status = DoQueryInterface(aCmd); break; case PVMF_GENERIC_NODE_REQUESTPORT: { iRequestedPort = NULL; status = DoRequestPort(aCmd, iRequestedPort); eventData = iRequestedPort; break; } case PVMF_GENERIC_NODE_RELEASEPORT: status = DoReleasePort(aCmd); break; case PVMF_GENERIC_NODE_INIT: status = DoInit(aCmd); break; case PVMF_GENERIC_NODE_PREPARE: status = DoPrepare(aCmd); if (status == PVMFSuccess) { ChangeExternalState(EPVMFNodePrepared); } break; case PVMF_GENERIC_NODE_START: status = DoStart(aCmd); break; case PVMF_GENERIC_NODE_STOP: status = DoStop(aCmd); break; case PVMF_GENERIC_NODE_FLUSH: status = DoFlush(aCmd); break; case PVMF_GENERIC_NODE_PAUSE: status = DoPause(aCmd); break; case PVMF_GENERIC_NODE_RESET: status = DoReset(aCmd); break; default://unknown command type status = PVMFFailure; break; } //If the command was not finished in this call, then move it to the //"current cmd" queue where it will remain until it is completed. if (status == PVMFPending) { //move the new cmd to the "current cmd" queue where it will //remain until complete. int32 err; OSCL_TRY(err, iCurrentCmdQueue.StoreL(aCmd);); //we reserved space in the CurrentCmdQueue already, so //it should not be possible to fail here. OSCL_ASSERT(err == OsclErrNone); //erase the original command aCmdQ.Erase(&aCmd); } else { CommandComplete(aCmdQ, aCmd, status, eventData); } } } //Called to complete a node command. void PVMFSocketNode::CommandComplete(PVMFSocketNodeCmdQ& aCmdQ, PVMFSocketNodeCommand& aCmd, PVMFStatus aStatus, OsclAny* aEventData, PVUuid* aEventUUID, int32* aEventCode) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:CommandComplete Id %d Cmd %d Status %d Context %d Data %d" , aCmd.iId, aCmd.iCmd, aStatus, aCmd.iContext, aEventData)); //See whether command processing was blocked on entry to this routine, so we can //decide later whether it's necessary to trigger the AO to resume processsing. bool canProcess = CanProcessCommand(); PVMFStatus status = aStatus; //do standard state changes & other cmd completion. if (aStatus == PVMFSuccess) { switch (aCmd.iCmd) { case PVMF_GENERIC_NODE_INIT: ChangeExternalState(EPVMFNodeInitialized); break; case PVMF_GENERIC_NODE_PREPARE: ChangeExternalState(EPVMFNodePrepared); break; case PVMF_GENERIC_NODE_START: ChangeExternalState(EPVMFNodeStarted); break; case PVMF_GENERIC_NODE_STOP: ChangeExternalState(EPVMFNodePrepared); #if(ENABLE_SOCKET_NODE_STATS) //dump the stats. iSocketNodeStats.Log(iPortVector); #endif break; case PVMF_GENERIC_NODE_PAUSE: ChangeExternalState(EPVMFNodePaused); break; case PVMF_GENERIC_NODE_RESET: ChangeExternalState(EPVMFNodeCreated); //Complete the reset command. { #if(ENABLE_SOCKET_NODE_STATS) iSocketNodeStats.Log(iPortVector); #endif //cleanup all ports. CleanupPorts(); //go back to Idle state iPortVector.Reconstruct(); SetState(EPVMFNodeIdle); } break; case PVMF_GENERIC_NODE_CANCELALLCOMMANDS: //Complete the reset command. { #if(ENABLE_SOCKET_NODE_STATS) iSocketNodeStats.Log(iPortVector); #endif //Since "cancel all" is effectively used as a Reset, //go ahead and cleanup all ports. CleanupPorts(); } break; default: break; } } //create extended error message if an explicit error was provided in args PVInterface* extif = NULL; PVMFBasicErrorInfoMessage* errormsg = NULL; if (aEventUUID && aEventCode) { errormsg = OSCL_NEW(PVMFBasicErrorInfoMessage, (*aEventCode, *aEventUUID, NULL)); extif = OSCL_STATIC_CAST(PVInterface*, errormsg); } //else if no error input, see if "command error code" was set. else if (iCommandErrorCode != PVMFSocketNodeErrorEventStart) { PVUuid eventuuid = PVMFSocketNodeEventTypeUUID; errormsg = OSCL_NEW(PVMFBasicErrorInfoMessage, (iCommandErrorCode, eventuuid, NULL)); extif = OSCL_STATIC_CAST(PVInterface*, errormsg); } //create response PVMFCmdResp resp(aCmd.iId, aCmd.iContext, status, extif, aEventData); PVMFSessionId session = aCmd.iSession; //Erase the command from the queue. aCmdQ.Erase(&aCmd); //Report completion to the session observer. ReportCmdCompleteEvent(session, resp); if (errormsg) { errormsg->removeRef(); } iCommandErrorCode = PVMFSocketNodeErrorEventStart;//reset. //See if there was a pending cancel waiting on the current command to //complete. if (!iCancelCmdQueue.empty() && iCancelCmdQueue.front().iCmd == PVMF_GENERIC_NODE_CANCELCOMMAND) { CommandComplete(iCancelCmdQueue, iCancelCmdQueue.front(), PVMFSuccess); } //May need to resume command handling if the AO was blocked //waiting on asynchronous command completion, but it's unblocked now. if (!canProcess && CanProcessCommand() && IsAdded()) { RunIfNotReady(); } } void PVMFSocketNode::ReportErrorEvent(PVMFEventType aEventType, OsclAny* aEventData, PVUuid* aEventUUID, int32* aEventCode) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_ERR, (0, "PVMFSocketNode:NodeErrorEvent Type %d Data %d" , aEventType, aEventData)); if (aEventUUID && aEventCode) { PVMFBasicErrorInfoMessage* eventmsg = OSCL_NEW(PVMFBasicErrorInfoMessage, (*aEventCode, *aEventUUID, NULL)); PVMFAsyncEvent asyncevent(PVMFErrorEvent, aEventType, NULL, OSCL_STATIC_CAST(PVInterface*, eventmsg), aEventData, NULL, 0); PVMFNodeInterface::ReportErrorEvent(asyncevent); eventmsg->removeRef(); } else { PVMFNodeInterface::ReportErrorEvent(aEventType, aEventData); } } void PVMFSocketNode::ReportInfoEvent(PVMFEventType aEventType, OsclAny* aEventData, PVUuid* aEventUUID, int32* aEventCode) { PVLOGGER_LOGMSG(PVLOGMSG_INST_MLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode:NodeInfoEvent Type %d Data %d" , aEventType, aEventData)); if (aEventUUID && aEventCode) { PVMFBasicErrorInfoMessage* eventmsg = OSCL_NEW(PVMFBasicErrorInfoMessage, (*aEventCode, *aEventUUID, NULL)); PVMFAsyncEvent asyncevent(PVMFInfoEvent, aEventType, NULL, OSCL_STATIC_CAST(PVInterface*, eventmsg), aEventData, NULL, 0); PVMFNodeInterface::ReportInfoEvent(asyncevent); eventmsg->removeRef(); } else { PVMFNodeInterface::ReportInfoEvent(aEventType, aEventData); } } PVMFCommandId PVMFSocketNode::QueueCommandL(PVMFSocketNodeCommand& aCmd) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::QueueCommandL()")); #if(ENABLE_SOCKET_NODE_STATS) iSocketNodeStats.iNumQueueNodeCmd++; #endif PVMFCommandId id; id = iPendingCmdQueue.AddL(aCmd); //This may be a processing trigger. //Wakeup the AO if needed. if (IsAdded() && CanProcessCommand()) { RunIfNotReady(); } return id; } PVMFStatus PVMFSocketNode::DoReset(PVMFSocketNodeCommand& aCmd) { OSCL_UNUSED_ARG(aCmd); PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::DoReset()")); /* This node allows a reset from any state */ return DoStopNodeActivity(); } PVMFStatus PVMFSocketNode::DoQueryUuid(PVMFSocketNodeCommand& aCmd) { //This node supports Query UUID from any state OSCL_String* mimetype; Oscl_Vector *uuidvec; bool exactmatch; aCmd.PVMFSocketNodeCommandBase::Parse(mimetype, uuidvec, exactmatch); //Try to match the input mimetype against any of //the custom interfaces for this node //Match against custom interface1... if (*mimetype == PVMF_SOCKET_NODE_EXTENSION_INTERFACE_MIMETYPE //also match against base mimetypes for custom interface1, //unless exactmatch is set. || (!exactmatch && *mimetype == PVMF_SOCKET_NODE_MIMETYPE) || (!exactmatch && *mimetype == PVMF_SOCKET_NODE_BASEMIMETYPE)) { PVUuid uuid(PVMF_SOCKET_NODE_EXTENSION_INTERFACE_UUID); uuidvec->push_back(uuid); } return PVMFSuccess; } bool PVMFSocketNode::queryInterface(const PVUuid& uuid, PVInterface*& iface) { iface = NULL; if (uuid == PVUuid(PVMF_SOCKET_NODE_EXTENSION_INTERFACE_UUID)) { if (!iExtensionInterface) { iExtensionInterface = OSCL_NEW(PVMFSocketNodeExtensionInterfaceImpl, (this)); } if (iExtensionInterface) { return (iExtensionInterface->queryInterface(uuid, iface)); } else { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::queryInterface: Error - Out of memory")); OSCL_LEAVE(OsclErrNoMemory); return false; } } else { return false; } } PVMFStatus PVMFSocketNode::DoQueryInterface(PVMFSocketNodeCommand& aCmd) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::DoQueryInterface")); PVUuid* uuid; PVInterface** ptr; aCmd.PVMFSocketNodeCommandBase::Parse(uuid, ptr); if (*uuid == PVUuid(PVMF_SOCKET_NODE_EXTENSION_INTERFACE_UUID)) { if (!iExtensionInterface) { iExtensionInterface = OSCL_NEW(PVMFSocketNodeExtensionInterfaceImpl, (this)); } if (iExtensionInterface) { if (iExtensionInterface->queryInterface(*uuid, *ptr)) { return PVMFSuccess; } else { return PVMFErrNotSupported; } } else { return PVMFErrNoMemory; } } else {//not supported *ptr = NULL; return PVMFErrNotSupported; } } PVMFStatus PVMFSocketNode::DoRequestPort(PVMFSocketNodeCommand& aCmd, PVMFSocketPort* &aPort) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::DoRequestPort() In")); //retrieve port tag. int32 tag; OSCL_String* portconfig; aCmd.PVMFSocketNodeCommandBase::Parse(tag, portconfig); //validate the tag... switch (tag) { case PVMF_SOCKET_NODE_PORT_TYPE_SOURCE: case PVMF_SOCKET_NODE_PORT_TYPE_SINK: case PVMF_SOCKET_NODE_PORT_TYPE_PASSTHRU: break; default: //bad port tag PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::DoRequestPort: Error - Invalid port tag")); iCommandErrorCode = PVMFSocketNodeErrorInvalidPortTag; return PVMFErrArgument; } SOCKET_ADDR sock_add; sock_add.iTag = tag; OSCL_HeapString mime; if (!ParseTransportConfig(portconfig, sock_add, mime)) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::DoRequestPort: ERROR - Invalid port config")); iCommandErrorCode = PVMFSocketNodeErrorInvalidPortTag; return PVMFErrArgument; } PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::DoRequestPort tag %d config %s", tag, portconfig->get_cstr())); switch (sock_add.iProtocol) { case INET_TCP: { //Create a new port with no socket. PVMFStatus status = AddPort(tag, aPort); if (status != PVMFSuccess) return status; //create the socket config SocketPortConfig* sock_config = OSCL_NEW(SocketPortConfig, ()); aPort->iConfig = sock_config; sock_config->iPVMFPort = aPort; sock_config->iMime = mime; sock_config->iContainer = this; sock_config->iSockId = iSocketID++; sock_config->iTag = tag; sock_config->iAddr = sock_add; //create the mem pool PVMFSocketNodeMemPool* memPool; status = AllocatePortMemPool(tag, memPool); if (status == PVMFSuccess) sock_config->iMemPool = memPool; if (status != PVMFSuccess) return status; //optionally create the socket and launch the connect sequence. if (oscl_strstr(portconfig->get_cstr(), ";no-connect-on-create") != NULL) { //if "no create on connect" was selected then port creation is complete now. status = PVMFSuccess; } else { //else create the socket and launch the connect sequence. OSCL_ASSERT(sock_config->iState.iSequence == EPVSocketPortSequence_None); status = StartSequence(*sock_config, EPVSocketPortSequence_RequestPort); //may need to wait on connect completion here, or else there could //be an error. } return status; } case INET_UDP: { //We should have already created this UDP socket in a call to AllocateConsecutivePorts. SocketPortConfig* sockConfig = FindSocketPortConfig(sock_add); if (!sockConfig) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::DoRequestPort: ERROR - Invalid protocol for port request")); return PVMFFailure; } //The udp socket was already created using AllocateConsecutivePorts(), //the but PVMFPort and memory pool were not yet created. //Allocate a new PVMF port PVMFStatus status = AddPort(tag, aPort); if (status == PVMFSuccess) { PVMFSocketNodeMemPool* memPool = NULL; status = AllocatePortMemPool(tag, memPool); if (status == PVMFSuccess) sockConfig->iMemPool = memPool; } if (status != PVMFSuccess) { //put the port config back in the allocated port vector iAllocatedPortVector.push_back(sockConfig); return status; } //move the sock config to the PVMF port vector and link the config //back to the port. aPort->iConfig = sockConfig; sockConfig->iPVMFPort = aPort; return status; } break; default: PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::DoRequestPort: ERROR - Invalid protocol for port request")); return PVMFFailure; } PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::DoRequestPort: ERROR - Invalid protocol for port request")); return PVMFFailure; } //Release ports is a do-nothing for this node. PVMFStatus PVMFSocketNode::DoReleasePort(PVMFSocketNodeCommand& aCmd) { OSCL_UNUSED_ARG(aCmd); return PVMFSuccess; } PVMFStatus PVMFSocketNode::DoInit(PVMFSocketNodeCommand& aCmd) { OSCL_UNUSED_ARG(aCmd); PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::DoInit() In")); if (iInterfaceState != EPVMFNodeIdle) { return PVMFErrInvalidState; } #if(ENABLE_SOCKET_NODE_STATS) iSocketNodeStats.Init(); #endif PVMFStatus status = PVMFSuccess; //Create socket server session if (NULL == iSockServ) { int32 err; OSCL_TRY(err, iSockServ = OsclSocketServ::NewL(iAlloc);); if (err || (iSockServ == NULL)) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_ERR, (0, "PVMFSocketNode::DoInit: ERROR. OsclSocketServ::NewL() fail Ln %d", __LINE__)); iCommandErrorCode = PVMFSocketNodeErrorSocketServerCreateError; status = PVMFErrNoResources; } else { if (iSockServ->Connect() == OsclErrNone) { status = PVMFSuccess; } else { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_ERR, (0, "PVMFSocketNode::DoInit: ERROR. OsclSocketServ::Connect() fail Ln %d", __LINE__)); iCommandErrorCode = PVMFSocketNodeErrorSocketServConnectError; status = PVMFErrResource; } } } return status; } //Prepare is a do-nothing for this node. PVMFStatus PVMFSocketNode::DoPrepare(PVMFSocketNodeCommand& aCmd) { OSCL_UNUSED_ARG(aCmd); PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::DoPrepare() In")); if (iInterfaceState != EPVMFNodeInitialized) { return PVMFErrInvalidState; } return PVMFSuccess; } PVMFStatus PVMFSocketNode::DoStart(PVMFSocketNodeCommand& aCmd) { OSCL_UNUSED_ARG(aCmd); PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::DoStart() In")); /* * If the node is already started just return success - multiple starts can happen with * flow control (auto-pause / auto-resume) scenarios */ if (iInterfaceState == EPVMFNodeStarted) { return PVMFSuccess;//already started. } if (iInterfaceState != EPVMFNodePrepared && iInterfaceState != EPVMFNodePaused) { return PVMFErrInvalidState; } //We need to update the node state before //we can get things going. TPVMFNodeInterfaceState curState = iInterfaceState; ChangeExternalState(EPVMFNodeStarted); PVMFStatus status = PVMFSuccess; for (uint32 i = 0; i < iPortVector.size(); i++) { SocketPortConfig* sockConfig = iPortVector[i]->iConfig; if (sockConfig) { //Start handling incoming messages if (CanProcessIncomingMsg(*sockConfig)) ProcessIncomingMsg(*sockConfig); //Start the receives. if (CanReceive(*sockConfig)) { status = StartRecvOperation(*sockConfig); //receive may be pending, but we don't block //the start command for it. if (status == PVMFPending) { status = PVMFSuccess; } else if (status != PVMFSuccess) { //fail the command if any individual startups fail. //restore the old node state. ChangeExternalState(curState); break;//out of for loop } } } } return status; } PVMFStatus PVMFSocketNode::DoStop(PVMFSocketNodeCommand& aCmd) { OSCL_UNUSED_ARG(aCmd); PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::DoStop() In")); if ((iInterfaceState != EPVMFNodeStarted) && (iInterfaceState != EPVMFNodePaused)) { return PVMFErrInvalidState; } //Stop will essentially just stop the receive operations and input port processing, //since those handlers check the node state. Current operations will not be //interrupted. return PVMFSuccess; } //Stop all current socket/DNS activity and clear port messages. //Return PVMFSuccess if everything is stopped, or else PVMFPending if some //ports still have operations pending. // //Also update the global "iNumStopNodeActivityPending" //with the number of ports that still have operations pending PVMFStatus PVMFSocketNode::DoStopNodeActivity() { //note that "iNumStopNodeActivityPending" uses the value (-1) to indicate idle state, //or N...0 to countdown how many ports we're waiting on. //Note: in case there's already a "stop node activity" going on, just //return the status. if (iNumStopPortActivityPending > 0) return PVMFPending; //keep waiting on completion (in SequenceComplete) //Notify all PVMF ports to suspend their input, and discard any current messages. for (uint32 i = 0; i < iPortVector.size(); i++) { iPortVector[i]->SuspendInput(); iPortVector[i]->ClearMsgQueues(); } //Stop socket activity on all ports. { uint32 nPortsPending = 0; for (uint32 i = 0; i < iPortVector.size(); i++) { SocketPortConfig* portConfig = iPortVector[i]->iConfig; if (portConfig) { // discard any saved socket activity events portConfig->iSocketRecvActivity.iValid = false; // start a shutdown sequence on the port // look at what else might be happening currently. //if a request port is going on, be sure to complete the command, //although we will interrupt the current processing. if (portConfig->iState.iSequence == EPVSocketPortSequence_RequestPort && iCurrentCmdQueue.size() && iCurrentCmdQueue.front().iCmd == PVMF_GENERIC_NODE_REQUESTPORT) { //@TODO would be nice to cleanup the allocated port in case of a failure. CommandComplete(iCurrentCmdQueue, iCurrentCmdQueue.front(), PVMFErrCancelled, NULL); } //for any other sequence, we will just interrupt it with the shutdown. //if a cleanup is already underway, just keep waiting for it to complete, //else start a new sequence if (portConfig->iState.iSequence == EPVSocketPortSequence_SocketCleanup) { nPortsPending++; } else if (StartSequence(*portConfig, EPVSocketPortSequence_SocketCleanup) == PVMFPending) { nPortsPending++; } } } if (nPortsPending > 0) iNumStopPortActivityPending = nPortsPending; } if (iNumStopPortActivityPending > 0) return PVMFPending; //wait on completion in SequenceComplete. return PVMFSuccess; } //Flush is implemented for this node, but hasn't been tested. PVMFStatus PVMFSocketNode::DoFlush(PVMFSocketNodeCommand& aCmd) { OSCL_UNUSED_ARG(aCmd); PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::DoFlush() In")); if ((iInterfaceState != EPVMFNodeStarted) && (iInterfaceState != EPVMFNodePaused)) { return PVMFErrInvalidState; } //Notify all ports to suspend their input, then //wait on completion of processing of current input. PVMFStatus status = PVMFSuccess; { for (uint32 i = 0; i < iPortVector.size(); i++) { iPortVector[i]->SuspendInput(); if (status != PVMFPending && iPortVector[i]->IncomingMsgQueueSize() > 0) { status = PVMFPending;//Wait on this queue to empty. //Completion is detected in SequenceComplete. } } } return status; } //Pause is a do-nothing for this node. PVMFStatus PVMFSocketNode::DoPause(PVMFSocketNodeCommand& aCmd) { OSCL_UNUSED_ARG(aCmd); PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::DoPause() In")); /* * If the node is already paused just return success - multiple pause can happen with * flow control (auto-pause / auto-resume) scenarios */ if (iInterfaceState == EPVMFNodePaused) { return PVMFSuccess; } if (iInterfaceState != EPVMFNodeStarted) { return PVMFErrInvalidState; } return PVMFSuccess; } PVMFStatus PVMFSocketNode::DoCancelCommand(PVMFSocketNodeCommand& aCmd) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::DoCancelCommand() Called")); // extract the command ID from the parameters. PVMFCommandId cmdId; aCmd.PVMFSocketNodeCommandBase::Parse(cmdId); // first check "current" command if any // it's safe to assume the "current cmd queue" is only 1-deep. if (!iCurrentCmdQueue.empty() && iCurrentCmdQueue.front().iId == cmdId) { return DoCancelCurrentCommand(iCurrentCmdQueue, iCurrentCmdQueue.front()); //wait on current command to complete. The cancel command //will ultimately be completed in the "CommandComplete" for the current command. } // next see if the command is still queued in the "pending command queue" { // start at element 1 since this cancel command is element 0 PVMFSocketNodeCommand* cmd = iPendingCmdQueue.FindById(cmdId, 1); if (cmd) { // cancel the queued command. Note this will complete out-of-order. CommandComplete(iPendingCmdQueue, *cmd, PVMFErrCancelled); // no further action is required. return PVMFSuccess; } } //this command fails if the given command is not queued or in progress. return PVMFErrArgument; } PVMFStatus PVMFSocketNode::DoCancelAllCommands(PVMFSocketNodeCommand& aCmd) { OSCL_UNUSED_ARG(aCmd); PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::DoCancelCommand() IN")); //This is not a conventional "cancel all" implementation. //Streaming manager currently expects this call to shutdown all activity. //It would really be more correct to use node Reset for this. PVMFStatus status = DoStopNodeActivity(); //Since node activity is suspended, just go ahead and forcibly cancel any other //pending commands now. There could be a Reset, a Flush, or a RequestPort //happening. if (!iCurrentCmdQueue.empty()) CommandComplete(iCurrentCmdQueue, iCurrentCmdQueue.front(), PVMFErrCancelled); //Cancel all other pending commands, except for this one which is //element 0 in the queue for (uint32 i = 1; i < iPendingCmdQueue.size(); i++) CommandComplete(iPendingCmdQueue, iPendingCmdQueue[i], PVMFErrCancelled); //May need to wait on completion of StopNodeActivity. return status; } //This routine must cancel the currently executing node command. //It only needs to handle those commands that may have asynchronous completion. PVMFStatus PVMFSocketNode::DoCancelCurrentCommand(PVMFSocketNodeCmdQ& aCmdQ, PVMFSocketNodeCommand& aCmd) { OSCL_UNUSED_ARG(aCmdQ); PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::DoCancelCurrentCommand()")); switch (aCmd.iCmd) { case PVMF_GENERIC_NODE_REQUESTPORT: //there may be a connect operation pending-- cancel it. { if (iRequestedPort) { SocketPortConfig* sockConfig = iRequestedPort->iConfig; if (sockConfig && sockConfig->iState.iConnectOperation != EPVSocketPortConnectOperation_None) { CancelConnectOperation(*sockConfig); return PVMFPending;//wait on the operation to complete //in HandleSocketEvent or HandleDNSEvent } } //shouldn't get here... return PVMFFailure; } break; case PVMF_GENERIC_NODE_RESET: //it's too complicated to cancel a reset, so just wait on completion return PVMFPending; case PVMF_GENERIC_NODE_FLUSH: //to cancel a flush, just discard all remaining port messages //and keep waiting on completion of current message. { for (uint32 i = 0; i < iPortVector.size(); i++) iPortVector[i]->ClearMsgQueues(); } return PVMFPending;//keep waiting on flush completion in SequenceComplete. case PVMF_GENERIC_NODE_CANCELCOMMAND: case PVMF_GENERIC_NODE_CANCELALLCOMMANDS: //these have asynchronous completion, but the command handling prevents //processing a cancel during a cancel, so we shouldn't get here. OSCL_ASSERT(0); return PVMFFailure; default: //no other node commands have asynchronous completion, so this is unexpected. OSCL_ASSERT(0); return PVMFFailure; } } ////////////////////////////////////// // End Command Handlers ////////////////////////////////////// ////////////////////////////////////// // DNS & Socket Callbacks ////////////////////////////////////// OSCL_EXPORT_REF void PVMFSocketNode::HandleDNSEvent(int32 aId, TPVDNSFxn aFxn, TPVDNSEvent aEvent, int32 aError) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::HandleDNSEvent() In aId=%d, aFxn=%d, aEvent=%d, aError=%d", aId, aFxn, aEvent, aError)); iInSocketCallback = true; if (aEvent != EPVDNSSuccess) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::HandleDNSEvent: Error - Failure")); } //Note: since the DNS object ID is identical to the socket ID, we can lookup the //socket port config using the DNS ID. SocketPortConfig* sockConfigPtr = FindSocketPortConfig((uint32)aId); if (!sockConfigPtr) { //it's possible the DNS object was already delete-- so just fail gracefully. PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::HandleDNSEvent: Error - Invalid Socket ID")); } else { #if(ENABLE_SOCKET_NODE_STATS) sockConfigPtr->iPortStats.iNumDNSCallback++; #endif //create socket activity PVMFSocketActivity activity((aEvent == EPVDNSSuccess) ? PVMFSuccess : PVMFFailure, aId, aFxn, aEvent, aError); //Call the appropriate handler switch (aFxn) { case EPVDNSGetHostByName: OSCL_ASSERT(sockConfigPtr->iState.iConnectOperation == EPVSocketPortConnectOperation_GetHostByName); #if(ENABLE_SOCKET_NODE_STATS) if (aEvent == EPVDNSSuccess) sockConfigPtr->iPortStats.EndConnectTime(sockConfigPtr->iState.iConnectOperation); #endif ConnectOperationComplete(*sockConfigPtr, activity.iStatus, &activity); break; default: OSCL_ASSERT(0);//unexpected break; } } iInSocketCallback = false; } //This is the callback from Oscl Sockets for socket operation completion. OSCL_EXPORT_REF void PVMFSocketNode::HandleSocketEvent(int32 aId, TPVSocketFxn aFxn, TPVSocketEvent aEvent, int32 aError) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_STACK_TRACE, (0, "PVMFSocketNode::HandleSocketEvent() In aId=%d, aFxn=%d, aEvent=%d, aError=%d", aId, aFxn, aEvent, aError)); iInSocketCallback = true; SocketPortConfig* sockConfigPtr = FindSocketPortConfig((uint32)aId); if (!sockConfigPtr) { //it's possible the socket was already deleted-- so just fail gracefully. PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::HandleSocketEvent: Error - Invalid Socket ID")); } else { if (aEvent != EPVSocketSuccess) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::HandleSocketEvent: Error - Event=%d, SockId=%d, Mime=%s", aEvent, aId, sockConfigPtr->iMime.get_cstr())); } #if(ENABLE_SOCKET_NODE_STATS) sockConfigPtr->iPortStats.iNumSocketCallback++; #endif //create socket activity PVMFSocketActivity activity((aEvent == EPVSocketSuccess) ? PVMFSuccess : PVMFFailure, aId, aFxn, aEvent, aError); //Call the appropriate handler switch (aFxn) { case EPVSocketRecvFrom: case EPVSocketRecv: OSCL_ASSERT(sockConfigPtr->iState.iRecvOperation == EPVSocketPortRecvOperation_Recv || sockConfigPtr->iState.iRecvOperation == EPVSocketPortRecvOperation_RecvFrom); #if(ENABLE_SOCKET_NODE_STATS) if (aEvent == EPVSocketSuccess) sockConfigPtr->iPortStats.EndRecvTime(sockConfigPtr->iState.iRecvOperation); #endif RecvOperationComplete(*sockConfigPtr, activity.iStatus, &activity); break; case EPVSocketSendTo: case EPVSocketSend: OSCL_ASSERT(sockConfigPtr->iState.iSendOperation == EPVSocketPortSendOperation_Send || sockConfigPtr->iState.iSendOperation == EPVSocketPortSendOperation_SendTo); #if(ENABLE_SOCKET_NODE_STATS) if (aEvent == EPVSocketSuccess) sockConfigPtr->iPortStats.EndSendTime(sockConfigPtr->iState.iSendOperation); #endif SendOperationComplete(*sockConfigPtr, activity.iStatus, &activity); break; case EPVSocketConnect: OSCL_ASSERT(sockConfigPtr->iState.iConnectOperation == EPVSocketPortConnectOperation_Connect); #if(ENABLE_SOCKET_NODE_STATS) if (aEvent == EPVSocketSuccess) sockConfigPtr->iPortStats.EndConnectTime(sockConfigPtr->iState.iConnectOperation); #endif ConnectOperationComplete(*sockConfigPtr, activity.iStatus, &activity); break; case EPVSocketShutdown: OSCL_ASSERT(sockConfigPtr->iState.iConnectOperation == EPVSocketPortConnectOperation_Shutdown); #if(ENABLE_SOCKET_NODE_STATS) if (aEvent == EPVSocketSuccess) sockConfigPtr->iPortStats.EndConnectTime(sockConfigPtr->iState.iConnectOperation); #endif ConnectOperationComplete(*sockConfigPtr, activity.iStatus, &activity); break; default: OSCL_ASSERT(0);//unexpected break; } } iInSocketCallback = false; } ////////////////////////////////////// // End DNS & Socket Callbacks ////////////////////////////////////// /////////////////////// // Low-level port processing /////////////////////// //Allocate a node port and add it to the port vector. PVMFStatus PVMFSocketNode::AddPort(int32 tag, PVMFSocketPort* &port) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::AddPort() In")); int32 err; OsclAny *ptr = NULL; OSCL_TRY(err, ptr = iPortVector.Allocate();); if (err != OsclErrNone || !ptr) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::AddPort: ERROR - err=%d Ln %d", err, __LINE__)); return PVMFErrNoMemory; } port = new(ptr) PVMFSocketPort((int32)tag, this, DEFAULT_DATA_QUEUE_CAPACITY, DEFAULT_DATA_QUEUE_CAPACITY, DEFAULT_READY_TO_RECEIVE_THRESHOLD_PERCENT, //the output queue is empty because //this node pushes data directly to the //connecte port's input. 0, 0, 0); //Add the port to the port vector. OSCL_TRY(err, iPortVector.AddL(port);); if (err != OsclErrNone) { iPortVector.DestructAndDealloc(port); return PVMFErrNoMemory; } return PVMFSuccess; } //Allocate a memory pool for the input port config. //When finished, the port config is updated with the //mempool and the port tag. PVMFStatus PVMFSocketNode::AllocatePortMemPool(int32 tag, PVMFSocketNodeMemPool* & aMemPool) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::AllocatePortMemPool() In")); aMemPool = NULL; OsclAny *MemPtr = NULL; int32 err; OSCL_TRY(err, MemPtr = iAlloc.ALLOCATE(sizeof(PVMFSocketNodeMemPool));); if ((err != OsclErrNone) || (MemPtr == NULL)) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::DoRequestPort: PVMFSocketNodeMemPool Allocate Failed Ln %d", err, __LINE__)); return PVMFErrNoMemory; } switch (tag) { case PVMF_SOCKET_NODE_PORT_TYPE_SOURCE: //source tag is used for RTP { int32 errcode = 0; errcode = SocketPlacementNew(aMemPool, MemPtr, DEFAULT_NUM_MEDIA_MSGS_IN_JITTER_BUFFER); if (errcode != OsclErrNone) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::DoRequestPort: PVMFSocketNodeMemPool Construct Failed Ln %d", err, __LINE__)); iAlloc.deallocate(MemPtr); return PVMFErrNoMemory; } } break; case PVMF_SOCKET_NODE_PORT_TYPE_SINK: case PVMF_SOCKET_NODE_PORT_TYPE_PASSTHRU: //sink tag is used for RTCP { int32 errcode = 0; errcode = SocketPlacementNew(aMemPool, MemPtr, SNODE_DEFAULT_NUMBER_MEDIADATA_IN_MEMPOOL); if (errcode != OsclErrNone) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::DoRequestPort: PVMFSocketNodeMemPool Construct Failed Ln %d", err, __LINE__)); iAlloc.deallocate(MemPtr); return PVMFErrNoMemory; } } break; default://unexpected-- caller already validated the tag. iAlloc.deallocate(MemPtr); return PVMFFailure; break; } aMemPool->iPortTag = tag; return PVMFSuccess; } //Lookup the socket port config entry, given a socket address. //If the entry is in the "allocated port" vector, it will be erased, so //it's up to the caller to make sure it gets deleted or moved. SocketPortConfig* PVMFSocketNode::FindSocketPortConfig(SOCKET_ADDR& aSockAddr) { uint32 i; //search the allocated port vector for (i = 0; i < iAllocatedPortVector.size(); i++) { if (iAllocatedPortVector[i] && MatchSocketAddr(aSockAddr, *iAllocatedPortVector[i])) { //erase it from the vector before returning to the caller. SocketPortConfig* elem = iAllocatedPortVector[i]; iAllocatedPortVector.erase(&iAllocatedPortVector[i]); return elem; } } //search the port vector for (i = 0; i < iPortVector.size(); i++) { if (iPortVector[i]->iConfig && MatchSocketAddr(aSockAddr, *iPortVector[i]->iConfig)) { return iPortVector[i]->iConfig; } } return NULL; } //return "true" if input socket address matches port config bool PVMFSocketNode::MatchSocketAddr(SOCKET_ADDR& aSockAddr, SocketPortConfig& aSockConfig) { return (aSockConfig.iAddr.iProtocol == aSockAddr.iProtocol && aSockConfig.iAddr.iRemoteAdd.port == aSockAddr.iRemoteAdd.port && aSockConfig.iAddr.iRemoteHost == aSockAddr.iRemoteHost && aSockConfig.iTag == aSockAddr.iTag //local address may or may not be specified-- ignore if not. && (!aSockAddr.iLocalAdd.port || (aSockConfig.iAddr.iLocalAdd.port == aSockAddr.iLocalAdd.port))); } //Lookup the socket port config entry, given a socket ID. SocketPortConfig* PVMFSocketNode::FindSocketPortConfig(uint32 aId) { for (uint32 i = 0; i < iPortVector.size(); i++) { if (iPortVector[i]->iConfig && iPortVector[i]->iConfig->iSockId == aId) { return iPortVector[i]->iConfig; } } return NULL; } //Create a socket using the given socket ID and protocol. //For UDP, this will also attempt to bind, incrementing port numbers until //either success or maximum attempts is reached. This has the side effect //of updating the port number in the local address. //It returns a pointer to the socket cast as OsclAny*. OsclAny* PVMFSocketNode::CreateOsclSocketAndBind(SOCKET_ADDR &aSockAdd, uint32 aSockId) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::CreateOsclSocketAndBind() In")); //create the socket server session if it doesn't exist yet. if (iSockServ == NULL) { int32 err; OSCL_TRY(err, iSockServ = OsclSocketServ::NewL(iAlloc);); if (err || (iSockServ == NULL)) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_ERR, (0, "PVMFSocketNode::CreateSocket() ERROR- iSockServ=0x%x, err=%d, Ln %d", iSockServ, err, __LINE__)); return NULL; } if (iSockServ->Connect() != OsclErrNone) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_ERR, (0, "PVMFSocketNode::CreateSocket() ERROR Connect(), Ln %d", __LINE__)); return NULL; } } switch (aSockAdd.iProtocol) { case INET_UDP: { int32 err; OsclUDPSocket *UDPSocket = NULL; OSCL_TRY(err, UDPSocket = OsclUDPSocket::NewL(iAlloc, *iSockServ, this, aSockId);); if (err || (UDPSocket == NULL)) { PVLOGGER_LOGMSG(PVLOGMSG_INST_LLDBG, iLogger, PVLOGMSG_ERR, (0, "PVMFSocketNode::CreateSocket() ERROR- UDPSocket=0x%x, err=%d, Ln %d", UDPSocket, err, __LINE__)); return NULL; } //Bind this socket to the address bool bBindOK = false; int maxNumOfBind = UDP_PORT_RANGE; while (maxNumOfBind-- > 0) { #if(ENABLE_SOCKET_NODE_STATS) iSocketNodeStats.iNumBind++; #endif if (UDPSocket->Bind(aSockAdd.iLocalAdd) == EPVSocketSuccess) { bBindOK = true; break; } aSockAdd.iLocalAdd.port++; }; //If bind failed, delete the socket. if (!bBindOK) { UDPSocket->~OsclUDPSocket(); iAlloc.deallocate(UDPSocket); UDPSocket = NULL; } return UDPSocket; } case INET_TCP: { int32 err; OsclTCPSocket *TCPSocket = NULL; OSCL_TRY(err, TCPSocket = OsclTCPSocket::NewL(iAlloc, *iSockServ, this, aSockId);); if (err || (TCPSocket == NULL)) { return NULL; } return TCPSocket; } default: return NULL; } } //This finishes up a TCP socket cleanup, after the socket shutdown //is complete. void PVMFSocketNode::CleanupTCP(SocketPortConfig& aSockConfig) { if (aSockConfig.iTCPSocket) { //Close the socket. This will free the OS-level resources. This //socket object cannot be re-used. aSockConfig.iTCPSocket->Close(); PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CleanupTCP - Socket Closed - SockId=%d, Mime=%s, Port=0x%x", aSockConfig.iSockId, aSockConfig.iMime.get_str(), aSockConfig.iPVMFPort)); //Send EOS. To avoid multiple EOS we only do this when the //socket is still open. if (aSockConfig.iPVMFPort && aSockConfig.iPVMFPort->IsConnected()) { PVMFSharedMediaCmdPtr sharedMediaCmdPtr = PVMFMediaCmd::createMediaCmd(); sharedMediaCmdPtr->setFormatID(PVMF_MEDIA_CMD_EOS_FORMAT_ID); PVMFSharedMediaMsgPtr mediaMsgOut; convertToPVMFMediaCmdMsg(mediaMsgOut, sharedMediaCmdPtr); #if(ENABLE_SOCKET_NODE_STATS) aSockConfig.iPortStats.iNumQueueOutgoingEOSMsg++; #endif PVMFStatus status = aSockConfig.iPVMFPort->QueueOutgoingMsg(mediaMsgOut); //There's no recovery from a queue busy condition here. //We assume the caller already waited on the connected port before //calling this routine. if (status != PVMFSuccess) { PVMF_SOCKETNODE_LOGERROR((0, "PVMFSocketNode::SendPVMFEOS() ERROR:%d, Outgoing queue size=%d. Data discarded!", status, aSockConfig.iPVMFPort->OutgoingMsgQueueSize())); ReportErrorEvent(PVMFErrPortProcessing); PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode::CleanupTCP - EOS Send FAILED - SockId=%d, Mime=%s, Port=0x%x", aSockConfig.iSockId, aSockConfig.iMime.get_str(), aSockConfig.iPVMFPort)); } else { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CleanupTCP - EOS Sent - SockId=%d, Mime=%s, Port=0x%x", aSockConfig.iSockId, aSockConfig.iMime.get_str(), aSockConfig.iPVMFPort)); } } if (iInSocketCallback) { //prevent deleting an object from within its own Run. PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CleanupTCP - queueing old socket for deletion - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); iClosedTCPSocketVector.push_back(aSockConfig.iTCPSocket); aSockConfig.iTCPSocket = NULL; RunIfNotReady();//schedule the AO to delete the socket. } else { PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CleanupTCP - Deleting socket - SockId=%d, Mime=%s, Port=0x%x", aSockConfig.iSockId, aSockConfig.iMime.get_str(), aSockConfig.iPVMFPort)); aSockConfig.iTCPSocket->~OsclTCPSocket(); iAlloc.deallocate(aSockConfig.iTCPSocket); aSockConfig.iTCPSocket = NULL; } } //cleanup media messages //(this must happen *after* the optional EOS) if (aSockConfig.iPendingRecvMediaData.GetRep() != NULL) aSockConfig.iPendingRecvMediaData.Unbind(); if (aSockConfig.iPendingSendMediaData.GetRep() != NULL) aSockConfig.iPendingSendMediaData.Unbind(); } //Cancel and/or delete UDP socket void PVMFSocketNode::CleanupUDP(SocketPortConfig& aSockConfig) { //delete if (aSockConfig.iUDPSocket) { if (iInSocketCallback) { //prevent deleting an object from within its own Run. PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CleanupTCP - queueing old socket for deletion - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); iClosedUDPSocketVector.push_back(aSockConfig.iUDPSocket); aSockConfig.iUDPSocket = NULL; RunIfNotReady();//schedule the AO to delete the socket. } else { aSockConfig.iUDPSocket->~OsclUDPSocket(); iAlloc.deallocate(aSockConfig.iUDPSocket); aSockConfig.iUDPSocket = NULL; } } //cleanup media messages when socket is deleted if (aSockConfig.iPendingRecvMediaData.GetRep() != NULL) aSockConfig.iPendingRecvMediaData.Unbind(); if (aSockConfig.iPendingSendMediaData.GetRep() != NULL) aSockConfig.iPendingSendMediaData.Unbind(); } void PVMFSocketNode::CleanupDNS(SocketPortConfig& aSockConfig) { if (aSockConfig.iDNS) { if (iInSocketCallback) { //prevent deleting an object from within its own Run. PVMF_SOCKETNODE_LOGDATATRAFFIC_I((0, "PVMFSocketNode::CleanupDNS - queueing old DNS for deletion - SockId=%d, Mime=%s", aSockConfig.iSockId, aSockConfig.iMime.get_str())); iClosedDNSVector.push_back(aSockConfig.iDNS); aSockConfig.iDNS = NULL; RunIfNotReady();//schedule the AO to delete the object. } else { aSockConfig.iDNS->~OsclDNS(); iAlloc.deallocate(aSockConfig.iDNS); aSockConfig.iDNS = NULL; } } } //A synchronous cleanup routine for all ports. //It is assumed that all sockets are idle when this is called. //This is used in node destructor, and in completion of //Reset and Cancel All commands. void PVMFSocketNode::CleanupPorts() { // Cleanup port vector while (!iPortVector.empty()) { SocketPortConfig* it = iPortVector.front()->iConfig; if (it) { //unlink the PVMFPort so we won't try to send any EOS msg //during the TCP cleanup. it->iPVMFPort = NULL; CleanupTCP(*it); CleanupUDP(*it); CleanupDNS(*it); it->CleanupMemPools(iAlloc); OSCL_DELETE(it); iPortVector.front()->iConfig = NULL; } iPortVector.Erase(&iPortVector.front()); } // Cleanup allocated ports (these may have bound UDP sockets // but don't have any PVMF ports). while (!iAllocatedPortVector.empty()) { SocketPortConfig* it = iAllocatedPortVector.front(); CleanupTCP(*it); CleanupUDP(*it); CleanupDNS(*it); it->CleanupMemPools(iAlloc); OSCL_DELETE(it); iAllocatedPortVector.erase(&iAllocatedPortVector.front()); } } void PVMFSocketNode::CleanupClosedTCPSockets() { while (!iClosedTCPSocketVector.empty()) { OsclTCPSocket* obj = iClosedTCPSocketVector.front(); obj->~OsclTCPSocket(); iAlloc.deallocate(obj); iClosedTCPSocketVector.erase(&iClosedTCPSocketVector.front()); } } void PVMFSocketNode::CleanupClosedUDPSockets() { while (!iClosedUDPSocketVector.empty()) { OsclUDPSocket* obj = iClosedUDPSocketVector.front(); obj->~OsclUDPSocket(); iAlloc.deallocate(obj); iClosedUDPSocketVector.erase(&iClosedUDPSocketVector.front()); } } void PVMFSocketNode::CleanupClosedDNS() { while (!iClosedDNSVector.empty()) { OsclDNS* obj = iClosedDNSVector.front(); obj->~OsclDNS(); iAlloc.deallocate(obj); iClosedDNSVector.erase(&iClosedDNSVector.front()); } } bool PVMFSocketNode::ParseTransportConfig(OSCL_String *aPortConfig, SOCKET_ADDR &aSockConfig, OSCL_String& aMime) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::ParseTransportConfig() In")); char* head = aPortConfig->get_str(); int32 len = aPortConfig->get_size(); return ParseTransportConfig(head, len, aSockConfig, aMime); } bool PVMFSocketNode::ParseTransportConfig(char *aPortConfig, int32 aLen, SOCKET_ADDR &aSockConfig, OSCL_String& aMime) { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::ParseTransportConfig() In")); char* head = aPortConfig; int32 len = aLen; char *tail = head + len; if (oscl_strstr(head, "TCP")) aSockConfig.iProtocol = INET_TCP; else if (oscl_strstr(head, "UDP")) aSockConfig.iProtocol = INET_UDP; else return false;//invalid protocol. //for future extension aSockConfig.iLocalAdd.ipAddr.Set("0.0.0.0"); //INADDR_ANY aSockConfig.iRemoteAdd.ipAddr.Set(""); OSCL_StackString<128> address("remote_address="); char *tmpHead = OSCL_CONST_CAST(char*, oscl_strstr(head, address.get_cstr())); if (tmpHead) { tmpHead += address.get_size(); char *tmpTail = OSCL_CONST_CAST(char*, oscl_strstr(tmpHead, ";")); if (tmpTail == NULL) { tmpTail = tail; } char tmpChar = *tmpTail; *tmpTail = '\0'; aSockConfig.iRemoteAdd.ipAddr.Set(tmpHead); OSCL_HeapString tmpServerName(tmpHead, (tmpTail - tmpHead)); aSockConfig.iRemoteHost = tmpServerName; *tmpTail = tmpChar; } { TimeValue current_time; current_time.set_to_current_time(); uint32 my_seed = current_time.get_sec(); OsclRand random_num; random_num.Seed(my_seed); int32 first = random_num.Rand(); uint32 myport = (first & 0x1FFF) + 0x2000; //start from 8192 aSockConfig.iLocalAdd.port = (myport >> 1) << 1; //start from even; } OSCL_StackString<128> client_port("client_port="); tmpHead = OSCL_CONST_CAST(char*, oscl_strstr(head, client_port.get_cstr())); if (tmpHead) { tmpHead += client_port.get_size(); char *tmpTail = OSCL_CONST_CAST(char*, oscl_strstr(tmpHead, ";")); if (tmpTail == NULL) { tmpTail = tail; } char tmpChar = *tmpTail; *tmpTail = '\0'; uint32 atoi_tmp; if (PV_atoi(tmpHead, 'd', atoi_tmp)) { aSockConfig.iLocalAdd.port = atoi_tmp; } *tmpTail = tmpChar; } aSockConfig.iRemoteAdd.port = 0; OSCL_StackString<128> server_port("remote_port="); tmpHead = OSCL_CONST_CAST(char*, oscl_strstr(head, server_port.get_cstr())); if (tmpHead) { tmpHead += server_port.get_size(); char *tmpTail = OSCL_CONST_CAST(char*, oscl_strstr(tmpHead, ";")); if (tmpTail == NULL) { tmpTail = tail; } char tmpChar = *tmpTail; *tmpTail = '\0'; uint32 atoi_tmp; if (PV_atoi(tmpHead, 'd', atoi_tmp)) { aSockConfig.iRemoteAdd.port = atoi_tmp; } *tmpTail = tmpChar; } OSCL_StackString<128> mime("mime="); tmpHead = OSCL_CONST_CAST(char*, oscl_strstr(head, mime.get_cstr())); if (tmpHead) { tmpHead += mime.get_size(); char *tmpTail = OSCL_CONST_CAST(char*, oscl_strstr(tmpHead, ";")); if (tmpTail == NULL) { tmpTail = tail; } aMime += tmpHead; } return true; } /////////////////////// // End Low-level port processing /////////////////////// ////////////////////////////// // Logging & Diagnostics ////////////////////////////// void PVMFSocketNode::LogRTPHeaderFields(SocketPortConfig& aSockConfig, OsclRefCounterMemFrag& memFragIn) { OSCL_UNUSED_ARG(aSockConfig); uint8* rtpHeader = NULL; uint32 rtpPacketLen = 0; /* Get start of RTP packet */ rtpHeader = (uint8*)(memFragIn.getMemFrag().ptr); rtpPacketLen = memFragIn.getMemFrag().len; if ((rtpHeader != NULL) && (rtpPacketLen > 0)) { /* is this a legal data packet? */ if (rtpPacketLen <= 12) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode - illegal RTP pkt size")); } /* Parse RTP version */ uint8 rtpVersion = (((*rtpHeader) & 0xC0) >> 6); if (rtpVersion != 2) { PVMF_SOCKETNODE_LOGDATATRAFFIC_E((0, "PVMFSocketNode illegal rtp version")); } /* Check for padding */ uint8 pbit = (((*rtpHeader) & 0x20) >> 5); uint8 numPaddingOctets = 0; if (pbit) { numPaddingOctets = *(rtpHeader + (rtpPacketLen - 1)); } // skip over xbit, csrc and mbit rtpHeader++; rtpHeader++; /* Parse sequence number */ uint16 seqNum16 = 0; oscl_memcpy((char *)&seqNum16, rtpHeader, sizeof(seqNum16)); big_endian_to_host((char *)&seqNum16, sizeof(seqNum16)); rtpHeader += 2; /* Parse rtp time stamp */ uint32 ts32 = 0; oscl_memcpy((char *)&ts32, rtpHeader, sizeof(ts32)); big_endian_to_host((char *)&ts32, sizeof(ts32)); rtpHeader += 4; /* Parse SSRC */ uint32 ssrc32 = 0; oscl_memcpy((char *)&ssrc32, rtpHeader, sizeof(ssrc32)); big_endian_to_host((char *)&ssrc32, sizeof(ssrc32)); rtpHeader += 4; PVMF_SOCKETNODE_LOGDATATRAFFIC_RTP((0, "SN-RTP - Mime=%s, ssrc=%d, seq=%d, ts=%d, len=%d", aSockConfig.iMime.get_cstr(), ssrc32, (uint32)seqNum16, (PVMFTimestamp)ts32, rtpPacketLen)); } } void PVMFSocketNode::LogRTCPHeaderFields(SocketPortConfig& aSockConfig, OsclRefCounterMemFrag& memFragIn) { OSCL_UNUSED_ARG(aSockConfig); const uint8 SR_PACKET_TYPE = 200; const uint8 RR_PACKET_TYPE = 201; const uint8 SDES_PACKET_TYPE = 202; const uint8 BYE_PACKET_TYPE = 203; const uint8 APP_PACKET_TYPE = 204; uint8* rtcpHeader = NULL; int32 rtcpPacketLen = 0; /* Get start of RTP packet */ rtcpHeader = (uint8*)(memFragIn.getMemFrag().ptr); rtcpPacketLen = (int32)(memFragIn.getMemFrag().len); if ((rtcpHeader != NULL) && (rtcpPacketLen > 0)) { while (rtcpPacketLen > 0) { OsclBinIStreamBigEndian inStream; inStream.Attach(rtcpHeader, rtcpPacketLen); uint8 tempChar; inStream >> tempChar; if (inStream.fail()) { PVMF_SOCKETNODE_LOGDATATRAFFIC_RTCP((0, "SN-RTCP - Mime=%s, Invalid RTCP Packet", aSockConfig.iMime.get_cstr())); return; } // read the type uint8 payloadType; uint16 rtcpLength; inStream >> payloadType; inStream >> rtcpLength; if (inStream.fail()) { PVMF_SOCKETNODE_LOGDATATRAFFIC_RTCP((0, "SN-RTCP - Mime=%s, Invalid RTCP Packet Headers", aSockConfig.iMime.get_cstr())); return; } switch (payloadType) { case SR_PACKET_TYPE: { PVMF_SOCKETNODE_LOGDATATRAFFIC_RTCP((0, "SN-RTCP - Mime=%s, RTCP_SR RECVD", aSockConfig.iMime.get_cstr())); } break; case RR_PACKET_TYPE: { PVMF_SOCKETNODE_LOGDATATRAFFIC_RTCP((0, "SN-RTCP - Mime=%s, RTCP_RR RECVD", aSockConfig.iMime.get_cstr())); } break; case SDES_PACKET_TYPE: { PVMF_SOCKETNODE_LOGDATATRAFFIC_RTCP((0, "SN-RTCP - Mime=%s, RTCP_SDES_PKT RECVD", aSockConfig.iMime.get_cstr())); } break; case BYE_PACKET_TYPE: { PVMF_SOCKETNODE_LOGDATATRAFFIC_RTCP((0, "SN-RTCP - Mime=%s, RTCP_BYE RECVD", aSockConfig.iMime.get_cstr())); } break; case APP_PACKET_TYPE: { PVMF_SOCKETNODE_LOGDATATRAFFIC_RTCP((0, "SN-RTCP - Mime=%s, RTCP_APP_PKT RECVD", aSockConfig.iMime.get_cstr())); } break; default: break; } rtcpPacketLen -= (rtcpLength + 1) * 4; rtcpHeader += (rtcpLength + 1) * 4; if (rtcpLength) { inStream.seekFromCurrentPosition((rtcpLength*4)); if (inStream.fail()) { PVMF_SOCKETNODE_LOGDATATRAFFIC_RTCP((0, "SN-RTCP - Mime=%s, RTCP Packet Read Failed", aSockConfig.iMime.get_cstr())); return; } } } //end of while loop } } ////////////////////////////// // End Logging & Diagnostics ////////////////////////////// /////////////////////////////////// // AO /////////////////////////////////// void PVMFSocketNode::Run() { PVMF_SOCKETNODE_LOGSTACKTRACE((0, "PVMFSocketNode::Run() In")); #if(ENABLE_SOCKET_NODE_STATS) iSocketNodeStats.iNumRun++; #endif if (!iClosedTCPSocketVector.empty()) CleanupClosedTCPSockets(); if (!iClosedUDPSocketVector.empty()) CleanupClosedUDPSockets(); if (!iClosedDNSVector.empty()) CleanupClosedDNS(); //Process node commands while (CanProcessCommand()) ProcessCommand(iPendingCmdQueue, iPendingCmdQueue.front()); } int32 PVMFSocketNode::SocketPlacementNew(PVMFSocketNodeMemPool *& aMemPool, OsclAny *aMemPtr, int32 aMemPoolNumBufs) { int32 errcode = 0; OSCL_TRY(errcode, aMemPool = OSCL_PLACEMENT_NEW(aMemPtr, PVMFSocketNodeMemPool(aMemPoolNumBufs);)); return errcode; } int32 PVMFSocketNode::CreateMediaData(SocketPortConfig& aSockConfig, OsclSharedPtr< PVMFMediaDataImpl > &aMediaptr) { int32 err = 0; OSCL_TRY_NO_TLS(iOsclErrorTrapImp, err, aSockConfig.iPendingRecvMediaData = PVMFMediaData::createMediaData(aMediaptr, aSockConfig.iMemPool->iMediaDataMemPool);); return err; } int32 PVMFSocketNode::Allocate(SocketPortConfig &aSockConfig, OsclSharedPtr< PVMFMediaDataImpl > &aMediaFragGroup) { #if SNODE_ENABLE_UDP_MULTI_PACKET int32 err = 0; OSCL_TRY_NO_TLS(iOsclErrorTrapImp, err, aMediaFragGroup = aSockConfig.iMemPool->iMediaFragGroupAlloc->allocate();); return err; #else return OsclErrNotSupported; #endif } int32 PVMFSocketNode::GetMediaDataImpl(SocketPortConfig& aSockConfig, OsclSharedPtr &aMediaDataImpl, int32 aMemSize) { int32 err = 0; aMediaDataImpl = aSockConfig.iMemPool->getMediaDataImpl(aMemSize); if (NULL == aMediaDataImpl.GetRep()) { err = OsclErrNoMemory; } return err; }