/* * * @APPLE_LICENSE_HEADER_START@ * * Copyright (c) 1999-2008 Apple Inc. All Rights Reserved. * * This file contains Original Code and/or Modifications of Original Code * as defined in and that are subject to the Apple Public Source License * Version 2.0 (the 'License'). You may not use this file except in * compliance with the License. Please obtain a copy of the License at * http://www.opensource.apple.com/apsl/ and read it before using this * file. * * The Original Code and all software distributed under the License are * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. * Please see the License for the specific language governing rights and * limitations under the License. * * @APPLE_LICENSE_HEADER_END@ * */ /* File: ReflectorStream.cpp Contains: Implementation of object defined in ReflectorStream.h. */ #include "ReflectorStream.h" #include "QTSSModuleUtils.h" #include "OSMemory.h" #include "SocketUtils.h" #include "atomic.h" #include "RTCPPacket.h" #include "ReflectorSession.h" #if DEBUG #define REFLECTOR_STREAM_DEBUGGING 0 #else #define REFLECTOR_STREAM_DEBUGGING 0 #endif static ReflectorSocketPool sSocketPool; // ATTRIBUTES static QTSS_AttributeID sCantBindReflectorSocketErr = qtssIllegalAttrID; static QTSS_AttributeID sCantJoinMulticastGroupErr = qtssIllegalAttrID; // PREFS static UInt32 sDefaultOverBufferInSec = 10; static UInt32 sDefaultBucketDelayInMsec = 73; static Bool16 sDefaultUsePacketReceiveTime = false; static UInt32 sDefaultMaxFuturePacketTimeSec = 60; static UInt32 sDefaultFirstPacketOffsetMsec = 500; UInt32 ReflectorStream::sBucketSize = 16; UInt32 ReflectorStream::sOverBufferInMsec = 10000; // more or less what the client over buffer will be UInt32 ReflectorStream::sMaxFuturePacketMSec = 60000; // max packet future time UInt32 ReflectorStream::sMaxPacketAgeMSec = 10000; UInt32 ReflectorStream::sMaxFuturePacketSec = 60; // max packet future time UInt32 ReflectorStream::sOverBufferInSec = 10; UInt32 ReflectorStream::sBucketDelayInMsec = 73; Bool16 ReflectorStream::sUsePacketReceiveTime = false; UInt32 ReflectorStream::sFirstPacketOffsetMsec = 500; void ReflectorStream::Register() { // Add text messages attributes static char* sCantBindReflectorSocket= "QTSSReflectorModuleCantBindReflectorSocket"; static char* sCantJoinMulticastGroup = "QTSSReflectorModuleCantJoinMulticastGroup"; (void)QTSS_AddStaticAttribute(qtssTextMessagesObjectType, sCantBindReflectorSocket, NULL, qtssAttrDataTypeCharArray); (void)QTSS_IDForAttr(qtssTextMessagesObjectType, sCantBindReflectorSocket, &sCantBindReflectorSocketErr); (void)QTSS_AddStaticAttribute(qtssTextMessagesObjectType, sCantJoinMulticastGroup, NULL, qtssAttrDataTypeCharArray); (void)QTSS_IDForAttr(qtssTextMessagesObjectType, sCantJoinMulticastGroup, &sCantJoinMulticastGroupErr); } void ReflectorStream::Initialize(QTSS_ModulePrefsObject inPrefs) { QTSSModuleUtils::GetAttribute(inPrefs, "reflector_bucket_offset_delay_msec", qtssAttrDataTypeUInt32, &ReflectorStream::sBucketDelayInMsec, &sDefaultBucketDelayInMsec, sizeof(sBucketDelayInMsec)); QTSSModuleUtils::GetAttribute(inPrefs, "reflector_buffer_size_sec", qtssAttrDataTypeUInt32, &ReflectorStream::sOverBufferInSec, &sDefaultOverBufferInSec, sizeof(sDefaultOverBufferInSec)); QTSSModuleUtils::GetAttribute(inPrefs, "reflector_use_in_packet_receive_time", qtssAttrDataTypeBool16, &ReflectorStream::sUsePacketReceiveTime, &sDefaultUsePacketReceiveTime, sizeof(sDefaultUsePacketReceiveTime)); QTSSModuleUtils::GetAttribute(inPrefs, "reflector_in_packet_max_receive_sec", qtssAttrDataTypeUInt32, &ReflectorStream::sMaxFuturePacketSec, &sDefaultMaxFuturePacketTimeSec, sizeof(sDefaultMaxFuturePacketTimeSec)); QTSSModuleUtils::GetAttribute(inPrefs, "reflector_rtp_info_offset_msec", qtssAttrDataTypeUInt32, &ReflectorStream::sFirstPacketOffsetMsec, &sDefaultFirstPacketOffsetMsec, sizeof(sDefaultFirstPacketOffsetMsec)); ReflectorStream::sOverBufferInMsec = sOverBufferInSec * 1000; ReflectorStream::sMaxFuturePacketMSec = sMaxFuturePacketSec * 1000; ReflectorStream::sMaxPacketAgeMSec = (UInt32) (sOverBufferInMsec * 1.5); //allow a little time before deleting. } void ReflectorStream::GenerateSourceID(SourceInfo::StreamInfo* inInfo, char* ioBuffer) { ::memcpy(ioBuffer, &inInfo->fSrcIPAddr, sizeof(inInfo->fSrcIPAddr)); ::memcpy(&ioBuffer[sizeof(inInfo->fSrcIPAddr)], &inInfo->fPort, sizeof(inInfo->fPort)); } ReflectorStream::ReflectorStream(SourceInfo::StreamInfo* inInfo) : fPacketCount(0), fSockets(NULL), fRTPSender(NULL, qtssWriteFlagsIsRTP), fRTCPSender(NULL, qtssWriteFlagsIsRTCP), fOutputArray(NULL), fNumBuckets(kMinNumBuckets), fNumElements(0), fBucketMutex(), fDestRTCPAddr(0), fDestRTCPPort(0), fCurrentBitRate(0), fLastBitRateSample(OS::Milliseconds()), // don't calculate our first bit rate until kBitRateAvgIntervalInMilSecs has passed! fBytesSentInThisInterval(0), fRTPChannel(-1), fRTCPChannel(-1), fHasFirstRTCPPacket(false), fHasFirstRTPPacket(false), fEnableBuffer(false), fEyeCount(0), fFirst_RTCP_RTP_Time(0), fFirst_RTCP_Arrival_Time(0) { fRTPSender.fStream = this; fRTCPSender.fStream = this; fStreamInfo.Copy(*inInfo); // ALLOCATE BUCKET ARRAY this->AllocateBucketArray(fNumBuckets); // WRITE RTCP PACKET //write as much of the RTCP RR as is possible right now (most of it never changes) UInt32 theSsrc = (UInt32)::rand(); char theTempCName[RTCPSRPacket::kMaxCNameLen]; UInt32 cNameLen = RTCPSRPacket::GetACName(theTempCName); //write the RR (just header + ssrc) UInt32* theRRWriter = (UInt32*)&fReceiverReportBuffer[0]; *theRRWriter = htonl(0x80c90001); theRRWriter++; *theRRWriter = htonl(theSsrc); theRRWriter++; //SDES length is the length of the CName, plus 2 32bit words, minus 1 *theRRWriter = htonl(0x81ca0000 + (cNameLen >> 2) + 1); theRRWriter++; *theRRWriter = htonl(theSsrc); theRRWriter++; ::memcpy(theRRWriter, theTempCName, cNameLen); theRRWriter += cNameLen >> 2; //APP packet format, QTSS specific stuff *theRRWriter = htonl(0x80cc0008); theRRWriter++; *theRRWriter = htonl(theSsrc); theRRWriter++; *theRRWriter = htonl(FOUR_CHARS_TO_INT('Q','T','S','S')); theRRWriter++; *theRRWriter = htonl(0); theRRWriter++; *theRRWriter = htonl(0x00000004); theRRWriter++; *theRRWriter = htonl(0x6579000c); theRRWriter++; fEyeLocation = theRRWriter; fReceiverReportSize = kReceiverReportSize + kAppSize + cNameLen; // If the source is a multicast, we should send our receiver reports // to the multicast address if (SocketUtils::IsMulticastIPAddr(fStreamInfo.fDestIPAddr)) { fDestRTCPAddr = fStreamInfo.fDestIPAddr; fDestRTCPPort = fStreamInfo.fPort + 1; } } ReflectorStream::~ReflectorStream() { Assert(fNumElements == 0); if (fSockets != NULL) { //first things first, let's take this stream off the socket's queue //of streams. This will basically ensure that no reflecting activity //can happen on this stream. ((ReflectorSocket*)fSockets->GetSocketA())->RemoveSender(&fRTPSender); ((ReflectorSocket*)fSockets->GetSocketB())->RemoveSender(&fRTCPSender); //leave the multicast group. Because this socket is shared amongst several //potential multicasts, we don't want to remain a member of a stale multicast if (SocketUtils::IsMulticastIPAddr(fStreamInfo.fDestIPAddr)) { fSockets->GetSocketA()->LeaveMulticast(fStreamInfo.fDestIPAddr); fSockets->GetSocketB()->LeaveMulticast(fStreamInfo.fDestIPAddr); } //now release the socket pair sSocketPool.ReleaseUDPSocketPair(fSockets); } //qtss_printf("Deleting stream %x\n", this); //delete every client Bucket for (UInt32 y = 0; y < fNumBuckets; y++) delete [] fOutputArray[y]; delete [] fOutputArray; } void ReflectorStream::AllocateBucketArray(UInt32 inNumBuckets) { Bucket* oldArray = fOutputArray; //allocate the 2-dimensional array fOutputArray = NEW Bucket[inNumBuckets]; for (UInt32 x = 0; x < inNumBuckets; x++) { fOutputArray[x] = NEW ReflectorOutput*[sBucketSize]; ::memset(fOutputArray[x], 0, sizeof(ReflectorOutput*) * sBucketSize); } //copy over the old information if there was an old array if (oldArray != NULL) { Assert(inNumBuckets > fNumBuckets); for (UInt32 y = 0; y < fNumBuckets; y++) { ::memcpy(fOutputArray[y],oldArray[y], sBucketSize * sizeof(ReflectorOutput*)); delete [] oldArray[y]; } delete [] oldArray; } fNumBuckets = inNumBuckets; } SInt32 ReflectorStream::AddOutput(ReflectorOutput* inOutput, SInt32 putInThisBucket) { OSMutexLocker locker(&fBucketMutex); #if DEBUG // We should never be adding an output twice to a stream for (UInt32 dOne = 0; dOne < fNumBuckets; dOne++) for (UInt32 dTwo = 0; dTwo < sBucketSize; dTwo++) Assert(fOutputArray[dOne][dTwo] != inOutput); #endif // If caller didn't specify a bucket, find a bucket if (putInThisBucket < 0) putInThisBucket = this->FindBucket(); Assert(putInThisBucket >= 0); if (fNumBuckets <= (UInt32)putInThisBucket) this->AllocateBucketArray(putInThisBucket * 2); for(UInt32 y = 0; y < sBucketSize; y++) { if (fOutputArray[putInThisBucket][y] == NULL) { fOutputArray[putInThisBucket][y] = inOutput; #if REFLECTOR_STREAM_DEBUGGING qtss_printf("Adding new output (0x%lx) to bucket %"_S32BITARG_", index %"_S32BITARG_",\nnum buckets %li bucketSize: %li \n",(SInt32)inOutput, putInThisBucket, y, (SInt32)fNumBuckets, (SInt32)sBucketSize); #endif fNumElements++; return putInThisBucket; } } // There was no empty spot in the specified bucket. Return an error return -1; } SInt32 ReflectorStream::FindBucket() { // If we need more buckets, allocate them. if (fNumElements == (sBucketSize * fNumBuckets)) this->AllocateBucketArray(fNumBuckets * 2); //find the first open spot in the array for (SInt32 putInThisBucket = 0; (UInt32)putInThisBucket < fNumBuckets; putInThisBucket++) { for(UInt32 y = 0; y < sBucketSize; y++) if (fOutputArray[putInThisBucket][y] == NULL) return putInThisBucket; } Assert(0); return 0; } void ReflectorStream::RemoveOutput(ReflectorOutput* inOutput) { OSMutexLocker locker(&fBucketMutex); Assert(fNumElements > 0); //look at all the indexes in the array for (UInt32 x = 0; x < fNumBuckets; x++) { for (UInt32 y = 0; y < sBucketSize; y++) { //The array may have blank spaces! if (fOutputArray[x][y] == inOutput) { fOutputArray[x][y] = NULL;//just clear out the pointer #if REFLECTOR_STREAM_DEBUGGING qtss_printf("Removing output %x from bucket %"_S32BITARG_", index %"_S32BITARG_"\n",inOutput,x,y); #endif fNumElements--; return; } } } Assert(0); } void ReflectorStream::TearDownAllOutputs() { OSMutexLocker locker(&fBucketMutex); //look at all the indexes in the array for (UInt32 x = 0; x < fNumBuckets; x++) { for (UInt32 y = 0; y < sBucketSize; y++) { ReflectorOutput* theOutputPtr= fOutputArray[x][y]; //The array may have blank spaces! if (theOutputPtr != NULL) { theOutputPtr->TearDown(); #if REFLECTOR_STREAM_DEBUGGING qtss_printf("TearDownAllOutputs Removing output from bucket %"_S32BITARG_", index %"_S32BITARG_"\n",x,y); #endif } } } } QTSS_Error ReflectorStream::BindSockets(QTSS_StandardRTSP_Params* inParams, UInt32 inReflectorSessionFlags, Bool16 filterState, UInt32 timeout) { // If the incoming data is RTSP interleaved, we don't need to do anything here if (inReflectorSessionFlags & ReflectorSession::kIsPushSession) fStreamInfo.fSetupToReceive = true; QTSS_RTSPRequestObject inRequest = NULL; if (inParams != NULL) inRequest = inParams->inRTSPRequest; // Set the transport Type a Broadcaster QTSS_RTPTransportType transportType = qtssRTPTransportTypeUDP; if (inParams != NULL) { UInt32 theLen = sizeof(transportType); (void) QTSS_GetValue(inParams->inRTSPRequest, qtssRTSPReqTransportType, 0, (void*)&transportType, &theLen); } // get a pair of sockets. The socket must be bound on INADDR_ANY because we don't know // which interface has access to this broadcast. If there is a source IP address // specified by the source info, we can use that to demultiplex separate broadcasts on // the same port. If the src IP addr is 0, we cannot do this and must dedicate 1 port per // broadcast // changing INADDR_ANY to fStreamInfo.fDestIPAddr to deal with NATs (need to track this change though) // change submitted by denis@berlin.ccc.de Bool16 isMulticastDest = (SocketUtils::IsMulticastIPAddr(fStreamInfo.fDestIPAddr)); if (isMulticastDest) { fSockets = sSocketPool.GetUDPSocketPair(INADDR_ANY, fStreamInfo.fPort, fStreamInfo.fSrcIPAddr, 0); } else { fSockets = sSocketPool.GetUDPSocketPair(fStreamInfo.fDestIPAddr, fStreamInfo.fPort, fStreamInfo.fSrcIPAddr, 0); } if ((fSockets == NULL) && fStreamInfo.fSetupToReceive) { fStreamInfo.fPort = 0; if (isMulticastDest) { fSockets = sSocketPool.GetUDPSocketPair(INADDR_ANY, fStreamInfo.fPort, fStreamInfo.fSrcIPAddr, 0); } else { fSockets = sSocketPool.GetUDPSocketPair(fStreamInfo.fDestIPAddr, fStreamInfo.fPort, fStreamInfo.fSrcIPAddr, 0); } } if (fSockets == NULL) return QTSSModuleUtils::SendErrorResponse(inRequest, qtssServerInternal, sCantBindReflectorSocketErr); // If we know the source IP address of this broadcast, we can demux incoming traffic // on the same port by that source IP address. If we don't know the source IP addr, // it is impossible for us to demux, and therefore we shouldn't allow multiple // broadcasts on the same port. if (((ReflectorSocket*)fSockets->GetSocketA())->HasSender() && (fStreamInfo.fSrcIPAddr == 0)) return QTSSModuleUtils::SendErrorResponse(inRequest, qtssServerInternal, sCantBindReflectorSocketErr); //also put this stream onto the socket's queue of streams ((ReflectorSocket*)fSockets->GetSocketA())->AddSender(&fRTPSender); ((ReflectorSocket*)fSockets->GetSocketB())->AddSender(&fRTCPSender); // A broadcaster is setting up a UDP session so let the sockets update the session if (fStreamInfo.fSetupToReceive && qtssRTPTransportTypeUDP == transportType && inParams != NULL) { ((ReflectorSocket*)fSockets->GetSocketA())->AddBroadcasterSession(inParams->inClientSession); ((ReflectorSocket*)fSockets->GetSocketB())->AddBroadcasterSession(inParams->inClientSession); } ((ReflectorSocket*)fSockets->GetSocketA())->SetSSRCFilter(filterState, timeout); ((ReflectorSocket*)fSockets->GetSocketB())->SetSSRCFilter(filterState, timeout); #if 1 // Always set the Rcv buf size for the sockets. This is important because the // server is going to be getting many packets on these sockets. fSockets->GetSocketA()->SetSocketRcvBufSize(512 * 1024); fSockets->GetSocketB()->SetSocketRcvBufSize(512 * 1024); #endif //If the broadcaster is sending RTP directly to us, we don't //need to join a multicast group because we're not using multicast if (isMulticastDest) { QTSS_Error err = fSockets->GetSocketA()->JoinMulticast(fStreamInfo.fDestIPAddr); if (err == QTSS_NoErr) err = fSockets->GetSocketB()->JoinMulticast(fStreamInfo.fDestIPAddr); // If we get an error when setting the TTL, this isn't too important (TTL on // these sockets is only useful for RTCP RRs. if (err == QTSS_NoErr) (void)fSockets->GetSocketA()->SetTtl(fStreamInfo.fTimeToLive); if (err == QTSS_NoErr) (void)fSockets->GetSocketB()->SetTtl(fStreamInfo.fTimeToLive); if (err != QTSS_NoErr) return QTSSModuleUtils::SendErrorResponse(inRequest, qtssServerInternal, sCantJoinMulticastGroupErr); } // If the port is 0, update the port to be the actual port value fStreamInfo.fPort = fSockets->GetSocketA()->GetLocalPort(); //finally, register these sockets for events fSockets->GetSocketA()->RequestEvent(EV_RE); fSockets->GetSocketB()->RequestEvent(EV_RE); // Copy the source ID and setup the ref StrPtrLen theSourceID(fSourceIDBuf, kStreamIDSize); ReflectorStream::GenerateSourceID(&fStreamInfo, fSourceIDBuf); fRef.Set(theSourceID, this); return QTSS_NoErr; } void ReflectorStream::SendReceiverReport() { // Check to see if our destination RTCP addr & port are setup. They may // not be if the source is unicast and we haven't gotten any incoming packets yet if (fDestRTCPAddr == 0) return; UInt32 theEyeCount = this->GetEyeCount(); UInt32* theEyeWriter = fEyeLocation; *theEyeWriter = htonl(theEyeCount) & 0x7fffffff;//no idea why we do this! theEyeWriter++; *theEyeWriter = htonl(theEyeCount) & 0x7fffffff; theEyeWriter++; *theEyeWriter = htonl(0) & 0x7fffffff; //send the packet to the multicast RTCP addr & port for this stream (void)fSockets->GetSocketB()->SendTo(fDestRTCPAddr, fDestRTCPPort, fReceiverReportBuffer, fReceiverReportSize); } void ReflectorStream::PushPacket(char *packet, UInt32 packetLen, Bool16 isRTCP) { if (packetLen > 0) { ReflectorPacket* thePacket = NULL; if (isRTCP) { //qtss_printf("ReflectorStream::PushPacket RTCP packetlen = %"_U32BITARG_"\n",packetLen); thePacket = ((ReflectorSocket*)fSockets->GetSocketB())->GetPacket(); if (thePacket == NULL) { //qtss_printf("ReflectorStream::PushPacket RTCP GetPacket() is NULL\n"); return; } OSMutexLocker locker( ((ReflectorSocket*)(fSockets->GetSocketB()) )->GetDemuxer()->GetMutex()); thePacket->SetPacketData(packet, packetLen); ((ReflectorSocket*)fSockets->GetSocketB())->ProcessPacket(OS::Milliseconds(),thePacket,0,0); ((ReflectorSocket*)fSockets->GetSocketB())->Signal(Task::kIdleEvent); } else { //qtss_printf("ReflectorStream::PushPacket RTP packetlen = %"_U32BITARG_"\n",packetLen); thePacket = ((ReflectorSocket*)fSockets->GetSocketA())->GetPacket(); if (thePacket == NULL) { //qtss_printf("ReflectorStream::PushPacket GetPacket() is NULL\n"); return; } OSMutexLocker locker(((ReflectorSocket*)(fSockets->GetSocketA()))->GetDemuxer()->GetMutex()); thePacket->SetPacketData(packet, packetLen); ((ReflectorSocket*)fSockets->GetSocketA())->ProcessPacket(OS::Milliseconds(),thePacket,0,0); ((ReflectorSocket*)fSockets->GetSocketA())->Signal(Task::kIdleEvent); } } } ReflectorSender::ReflectorSender(ReflectorStream* inStream, UInt32 inWriteFlag) : fStream(inStream), fWriteFlag(inWriteFlag), fFirstNewPacketInQueue(NULL), fFirstPacketInQueueForNewOutput(NULL), fHasNewPackets(false), fNextTimeToRun(0), fLastRRTime(0), fSocketQueueElem() { fSocketQueueElem.SetEnclosingObject(this); } ReflectorSender::~ReflectorSender() { //dequeue and delete every buffer while (fPacketQueue.GetLength() > 0) { ReflectorPacket* packet = (ReflectorPacket*)fPacketQueue.DeQueue()->GetEnclosingObject(); delete packet; } } Bool16 ReflectorSender::ShouldReflectNow(const SInt64& inCurrentTime, SInt64* ioWakeupTime) { Assert(ioWakeupTime != NULL); //check to make sure there actually is work to do for this stream. if ((!fHasNewPackets) && ((fNextTimeToRun == 0) || (inCurrentTime < fNextTimeToRun))) { //We don't need to do work right now, but //this stream must still communicate when it needs to be woken up next SInt64 theWakeupTime = fNextTimeToRun + inCurrentTime; //qtss_printf("ReflectorSender::ShouldReflectNow theWakeupTime=%qd newWakeUpTime=%qd ioWakepTime=%qd\n", theWakeupTime, fNextTimeToRun + inCurrentTime,*ioWakeupTime); if ((fNextTimeToRun > 0) && (theWakeupTime < *ioWakeupTime)) *ioWakeupTime = theWakeupTime; return false; } return true; } UInt32 ReflectorSender::GetOldestPacketRTPTime(Bool16 *foundPtr) { if (foundPtr != NULL) *foundPtr = false; OSMutexLocker locker(&fStream->fBucketMutex); OSQueueElem* packetElem = this->GetClientBufferStartPacket(); if (packetElem == NULL) return 0; ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject(); if (thePacket == NULL) return 0; if (foundPtr != NULL) *foundPtr = true; return thePacket->GetPacketRTPTime(); } UInt16 ReflectorSender::GetFirstPacketRTPSeqNum(Bool16 *foundPtr) { if (foundPtr != NULL) *foundPtr = false; UInt16 resultSeqNum = 0; OSMutexLocker locker(&fStream->fBucketMutex); OSQueueElem* packetElem = this->GetClientBufferStartPacket(); if (packetElem == NULL) return 0; ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject(); if (thePacket == NULL) return 0; if (foundPtr != NULL) *foundPtr = true; resultSeqNum = thePacket->GetPacketRTPSeqNum(); return resultSeqNum; } OSQueueElem* ReflectorSender::GetClientBufferNextPacketTime(UInt32 inRTPTime) { OSQueueIter qIter(&fPacketQueue);// start at oldest packet in q OSQueueElem* requestedPacket = NULL; OSQueueElem* elem = NULL; while ( !qIter.IsDone() ) // start at oldest packet in q { elem = qIter.GetCurrent(); if (requestedPacket == NULL) requestedPacket = elem; if (requestedPacket == NULL) break; ReflectorPacket* thePacket = (ReflectorPacket*)elem->GetEnclosingObject(); Assert( thePacket ); if (thePacket->GetPacketRTPTime() > inRTPTime) { requestedPacket = elem; // return the first packet we have that has a later time break; // found the packet we need: done processing } qIter.Next(); } return requestedPacket; } Bool16 ReflectorSender::GetFirstRTPTimePacket(UInt16* outSeqNumPtr, UInt32* outRTPTimePtr, SInt64* outArrivalTimePtr) { OSMutexLocker locker(&fStream->fBucketMutex); OSQueueElem* packetElem = this->GetClientBufferStartPacketOffset(ReflectorStream::sFirstPacketOffsetMsec); if (packetElem == NULL) return false; ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject(); if (thePacket == NULL) return false; packetElem = GetClientBufferNextPacketTime(thePacket->GetPacketRTPTime()); if (packetElem == NULL) return false; thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject(); if (thePacket == NULL) return false; if (outSeqNumPtr) *outSeqNumPtr = thePacket->GetPacketRTPSeqNum(); if (outRTPTimePtr) *outRTPTimePtr = thePacket->GetPacketRTPTime(); if (outArrivalTimePtr) *outArrivalTimePtr = thePacket->fTimeArrived; return true; } Bool16 ReflectorSender::GetFirstPacketInfo(UInt16* outSeqNumPtr, UInt32* outRTPTimePtr, SInt64* outArrivalTimePtr) { OSMutexLocker locker(&fStream->fBucketMutex); OSQueueElem* packetElem = this->GetClientBufferStartPacketOffset(ReflectorStream::sFirstPacketOffsetMsec); // OSQueueElem* packetElem = this->GetClientBufferStartPacket(); if (packetElem == NULL) return false; ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject(); if (thePacket == NULL) return false; if (outSeqNumPtr) *outSeqNumPtr = thePacket->GetPacketRTPSeqNum(); if (outRTPTimePtr) *outRTPTimePtr = thePacket->GetPacketRTPTime(); if (outArrivalTimePtr) *outArrivalTimePtr = thePacket->fTimeArrived; thePacket->fNeededByOutput = true; return true; } #if REFLECTOR_STREAM_DEBUGGING static UInt16 DGetPacketSeqNumber(StrPtrLen* inPacket) { if (inPacket->Len < 4) return 0; //The RTP seq number is the second short of the packet UInt16* seqNumPtr = (UInt16*)inPacket->Ptr; return ntohs(seqNumPtr[1]); } #endif void ReflectorSender::ReflectRelayPackets(SInt64* ioWakeupTime, OSQueue* inFreeQueue) { //Most of this code is useless i.e. buckets and bookmarks. This code will get cleaned up eventually //printf("ReflectorSender::ReflectPackets %qd %qd\n",*ioWakeupTime,fNextTimeToRun); #if DEBUG Assert(ioWakeupTime != NULL); #endif #if REFLECTOR_STREAM_DEBUGGING > 2 Bool16 printQueueLenOnExit = false; #endif SInt64 currentTime = OS::Milliseconds(); //make sure to reset these state variables fHasNewPackets = false; fNextTimeToRun = 1000; // init to 1 secs //determine if we need to send a receiver report to the multicast source if ((fWriteFlag == qtssWriteFlagsIsRTCP) && (currentTime > (fLastRRTime + kRRInterval))) { fLastRRTime = currentTime; fStream->SendReceiverReport(); #if REFLECTOR_STREAM_DEBUGGING > 2 printQueueLenOnExit = true; printf( "fPacketQueue len %li\n", (SInt32)fPacketQueue.GetLength() ); #endif } //the rest of this function must be atomic wrt the ReflectorSession, because //it involves iterating through the RTPSession array, which isn't thread safe OSMutexLocker locker(&fStream->fBucketMutex); // Check to see if we should update the session's bitrate average if ((fStream->fLastBitRateSample + ReflectorStream::kBitRateAvgIntervalInMilSecs) < currentTime) { unsigned int intervalBytes = fStream->fBytesSentInThisInterval; (void)atomic_sub(&fStream->fBytesSentInThisInterval, intervalBytes); // Multiply by 1000 to convert from milliseconds to seconds, and by 8 to convert from bytes to bits Float32 bps = (Float32)(intervalBytes * 8) / (Float32)(currentTime - fStream->fLastBitRateSample); bps *= 1000; fStream->fCurrentBitRate = (UInt32)bps; // Don't check again for awhile! fStream->fLastBitRateSample = currentTime; } for (UInt32 bucketIndex = 0; bucketIndex < fStream->fNumBuckets; bucketIndex++) { for (UInt32 bucketMemberIndex = 0; bucketMemberIndex < fStream->sBucketSize; bucketMemberIndex++) { ReflectorOutput* theOutput = fStream->fOutputArray[bucketIndex][bucketMemberIndex]; if (theOutput != NULL) { SInt32 availBookmarksPosition = -1; // -1 == invalid position OSQueueElem* packetElem = NULL; UInt32 curBookmark = 0; Assert( curBookmark < theOutput->fNumBookmarks ); // see if we've bookmarked a held packet for this Sender in this Output while ( curBookmark < theOutput->fNumBookmarks ) { OSQueueElem* bookmarkedElem = theOutput->fBookmarkedPacketsElemsArray[curBookmark]; if ( bookmarkedElem ) // there may be holes in this array { if ( bookmarkedElem->IsMember( fPacketQueue ) ) { // this packet was previously bookmarked for this specific queue // remove if from the bookmark list and use it // to jump ahead into the Sender's over all packet queue theOutput->fBookmarkedPacketsElemsArray[curBookmark] = NULL; availBookmarksPosition = curBookmark; packetElem = bookmarkedElem; break; } } else { availBookmarksPosition = curBookmark; } curBookmark++; } Assert( availBookmarksPosition != -1 ); #if REFLECTOR_STREAM_DEBUGGING > 1 if ( packetElem ) // show 'em what we got johnny { ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject(); printf("Bookmarked packet time: %li, packetSeq %i\n", (SInt32)thePacket->fTimeArrived, DGetPacketSeqNumber( &thePacket->fPacketPtr ) ); } #endif // the output did not have a bookmarked packet if it's own // so show it the first new packet we have in this sender. // ( since TCP flow control may delay the sending of packets, this may not // be the same as the first packet in the queue if ( packetElem == NULL ) { packetElem = fFirstNewPacketInQueue; #if REFLECTOR_STREAM_DEBUGGING > 1 if ( packetElem ) // show 'em what we got johnny { ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject(); printf("1st NEW packet from Sender sess 0x%lx time: %li, packetSeq %i\n", (SInt32)theOutput, (SInt32)thePacket->fTimeArrived, DGetPacketSeqNumber( &thePacket->fPacketPtr ) ); } else printf("no new packets\n" ); #endif } OSQueueIter qIter(&fPacketQueue, packetElem); // starts from beginning if packetElem == NULL, else from packetElem Bool16 dodBookmarkPacket = false; while ( !qIter.IsDone() ) { packetElem = qIter.GetCurrent(); ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject(); QTSS_Error err = QTSS_NoErr; #if REFLECTOR_STREAM_DEBUGGING > 2 printf("packet time: %li, packetSeq %i\n", (SInt32)thePacket->fTimeArrived, DGetPacketSeqNumber( &thePacket->fPacketPtr ) ); #endif // once we see a packet we cant' send, we need to stop trying // during this pass mark remaining as still needed if ( !dodBookmarkPacket ) { SInt64 packetLateness = currentTime - thePacket->fTimeArrived - (ReflectorStream::sBucketDelayInMsec * (SInt64)bucketIndex); // packetLateness measures how late this packet it after being corrected for the bucket delay #if REFLECTOR_STREAM_DEBUGGING > 2 printf("packetLateness %li, seq# %li\n", (SInt32)packetLateness, (SInt32) DGetPacketSeqNumber( &thePacket->fPacketPtr ) ); #endif SInt64 timeToSendPacket = -1; err = theOutput->WritePacket(&thePacket->fPacketPtr, fStream, fWriteFlag, packetLateness, &timeToSendPacket, NULL, NULL, false); if ( err == QTSS_WouldBlock ) { #if REFLECTOR_STREAM_DEBUGGING > 2 printf("EAGAIN bookmark: %li, packetSeq %i\n", (SInt32)packetLateness, DGetPacketSeqNumber( &thePacket->fPacketPtr ) ); #endif // tag it and bookmark it thePacket->fNeededByOutput = true; Assert( availBookmarksPosition != -1 ); if ( availBookmarksPosition != -1 ) theOutput->fBookmarkedPacketsElemsArray[availBookmarksPosition] = packetElem; dodBookmarkPacket = true; // call us again in # ms to retry on an EAGAIN if ((timeToSendPacket > 0) && (fNextTimeToRun > timeToSendPacket )) fNextTimeToRun = timeToSendPacket; if ( timeToSendPacket == -1 ) this->SetNextTimeToRun(5); // keep in synch with delay on would block for on-demand lower is better for high-bit rate movies. } } else { if ( thePacket->fNeededByOutput ) // optimization: if the packet is already marked, another Output has been through this already break; thePacket->fNeededByOutput = true; } qIter.Next(); } } } } // reset our first new packet bookmark fFirstNewPacketInQueue = NULL; // iterate one more through the senders queue to clear out // the unneeded packets OSQueueIter removeIter(&fPacketQueue); while ( !removeIter.IsDone() ) { OSQueueElem* elem = removeIter.GetCurrent(); Assert( elem ); //at this point, move onto the next queue element, because we may be altering //the queue itself in the code below removeIter.Next(); ReflectorPacket* thePacket = (ReflectorPacket*)elem->GetEnclosingObject(); Assert( thePacket ); if ( thePacket->fNeededByOutput == false ) { thePacket->fNeededByOutput = true; fPacketQueue.Remove( elem ); inFreeQueue->EnQueue( elem ); } else // reset for next call to ReflectPackets { thePacket->fNeededByOutput = false; } } //Don't forget that the caller also wants to know when we next want to run if (*ioWakeupTime == 0) *ioWakeupTime = fNextTimeToRun; else if ((fNextTimeToRun > 0) && (*ioWakeupTime > fNextTimeToRun)) *ioWakeupTime = fNextTimeToRun; // exit with fNextTimeToRun in real time, not relative time. fNextTimeToRun += currentTime; #if REFLECTOR_STREAM_DEBUGGING > 2 if ( printQueueLenOnExit ) printf( "EXIT fPacketQueue len %li\n", (SInt32)fPacketQueue.GetLength() ); #endif } /*********************************************************************************************** / ReflectorSender::ReflectPackets / / There are n ReflectorSender's for n output streams per presentation. / / Each sender is associated with an array of ReflectorOutput's. Each / output represents a client connection. Each output has # RTPStream's. / / When we write a packet to the ReflectorOutput he matches it's payload / to one of his streams and sends it there. / / To smooth the bandwitdth (server, not user) requirements of the reflected streams, the Sender / groups the ReflectorOutput's into buckets. The input streams are reflected to / each bucket progressively later in time. So rather than send a single packet / to say 1000 clients all at once, we send it to just the first 16, then then next 16 / 100 ms later and so on. / / / intputs ioWakeupTime - relative time to call us again in MSec / inFreeQueue - queue of free packets. */ void ReflectorSender::ReflectPackets(SInt64* ioWakeupTime, OSQueue* inFreeQueue) { if (!fStream->BufferEnabled()) // Call old routine for relays; they don't want buffering. { this->ReflectRelayPackets(ioWakeupTime,inFreeQueue); return; } SInt64 currentTime = OS::Milliseconds(); //make sure to reset these state variables fHasNewPackets = false; fNextTimeToRun = 1000; // init to 1 secs if (fWriteFlag == qtssWriteFlagsIsRTCP) fNextTimeToRun = 1000; //determine if we need to send a receiver report to the multicast source if ((fWriteFlag == qtssWriteFlagsIsRTCP) && (currentTime > (fLastRRTime + kRRInterval))) { fLastRRTime = currentTime; fStream->SendReceiverReport(); } //the rest of this function must be atomic wrt the ReflectorSession, because //it involves iterating through the RTPSession array, which isn't thread safe OSMutexLocker locker(&fStream->fBucketMutex); // Check to see if we should update the session's bitrate average fStream->UpdateBitRate(currentTime); // where to start new clients in the q fFirstPacketInQueueForNewOutput = this->GetClientBufferStartPacketOffset(0); #if (0) //test code if (NULL != fFirstPacketInQueueForNewOutput) printf("ReflectorSender::ReflectPackets SET first packet fFirstPacketInQueueForNewOutput %d \n", DGetPacketSeqNumber( &( (ReflectorPacket*) ( fFirstPacketInQueueForNewOutput->GetEnclosingObject()))->fPacketPtr )); ReflectorPacket* thePacket = NULL; if (fFirstPacketInQueueForNewOutput != NULL) thePacket = (ReflectorPacket*) fFirstPacketInQueueForNewOutput->GetEnclosingObject(); if (thePacket == NULL) { printf("fFirstPacketInQueueForNewOutput is NULL \n"); } #endif Bool16 firstPacket =false; for (UInt32 bucketIndex = 0; bucketIndex < fStream->fNumBuckets; bucketIndex++) { for (UInt32 bucketMemberIndex = 0; bucketMemberIndex < fStream->sBucketSize; bucketMemberIndex++) { ReflectorOutput* theOutput = fStream->fOutputArray[bucketIndex][bucketMemberIndex]; if (theOutput != NULL) { if ( false == theOutput->IsPlaying() ) continue; OSQueueElem* packetElem = theOutput->GetBookMarkedPacket(&fPacketQueue); if ( packetElem == NULL ) // should only be a new output { packetElem = fFirstPacketInQueueForNewOutput; // everybody starts at the oldest packet in the buffer delay or uses a bookmark firstPacket = true; theOutput->fNewOutput = false; //if (packetElem) printf("ReflectorSender::ReflectPackets Sending first packet in Queue packetElem=fFirstPacketInQueueForNewOutput %d \n", ( (ReflectorPacket*) (packetElem->GetEnclosingObject() ) )->GetPacketRTPSeqNum()); } SInt64 bucketDelay = ReflectorStream::sBucketDelayInMsec * (SInt64)bucketIndex; packetElem = this->SendPacketsToOutput(theOutput, packetElem,currentTime, bucketDelay, firstPacket); if (packetElem) { ReflectorPacket* thePacket = (ReflectorPacket*)packetElem->GetEnclosingObject(); thePacket->fNeededByOutput = true; // flag to prevent removal in RemoveOldPackets (void) theOutput->SetBookMarkPacket(packetElem); // store a reference to the packet } } } } this->RemoveOldPackets(inFreeQueue); fFirstNewPacketInQueue = NULL; //Don't forget that the caller also wants to know when we next want to run if (*ioWakeupTime == 0) *ioWakeupTime = fNextTimeToRun; else if ((fNextTimeToRun > 0) && (*ioWakeupTime > fNextTimeToRun)) *ioWakeupTime = fNextTimeToRun; // exit with fNextTimeToRun in real time, not relative time. fNextTimeToRun += currentTime; // qtss_printf("SetNextTimeToRun fNextTimeToRun=%qd + currentTime=%qd\n", fNextTimeToRun, currentTime); // qtss_printf("ReflectorSender::ReflectPackets *ioWakeupTime = %qd\n", *ioWakeupTime); } OSQueueElem* ReflectorSender::SendPacketsToOutput(ReflectorOutput* theOutput, OSQueueElem* currentPacket, SInt64 currentTime, SInt64 bucketDelay, Bool16 firstPacket) { OSQueueElem* lastPacket = currentPacket; OSQueueIter qIter(&fPacketQueue, currentPacket); // starts from beginning if currentPacket == NULL, else from currentPacket UInt32 count = 0; QTSS_Error err = QTSS_NoErr; while ( !qIter.IsDone() ) { currentPacket = qIter.GetCurrent(); lastPacket = currentPacket; ReflectorPacket* thePacket = (ReflectorPacket*)currentPacket->GetEnclosingObject(); SInt64 packetLateness = bucketDelay; SInt64 timeToSendPacket = -1; //printf("packetLateness %qd, seq# %li\n", packetLateness, (SInt32) DGetPacketSeqNumber( &thePacket->fPacketPtr ) ); err = theOutput->WritePacket(&thePacket->fPacketPtr, fStream, fWriteFlag, packetLateness, &timeToSendPacket,&thePacket->fStreamCountID,&thePacket->fTimeArrived, firstPacket ); if (err == QTSS_WouldBlock) { // call us again in # ms to retry on an EAGAIN if ((timeToSendPacket > 0) && ( (fNextTimeToRun + currentTime) > timeToSendPacket )) // blocked but we are scheduled to wake up later fNextTimeToRun = timeToSendPacket - currentTime; if (theOutput->fLastIntervalMilliSec < 5 ) theOutput->fLastIntervalMilliSec = 5; if ( timeToSendPacket < 0 ) // blocked and we are behind { //qtss_printf("fNextTimeToRun = theOutput->fLastIntervalMilliSec=%qd;\n", theOutput->fLastIntervalMilliSec); // Use the last packet interval this->SetNextTimeToRun(theOutput->fLastIntervalMilliSec); } if (fNextTimeToRun > 100) //don't wait that long { //qtss_printf("fNextTimeToRun = %qd now 100;\n", fNextTimeToRun); this->SetNextTimeToRun(100); } if (fNextTimeToRun < 5) //wait longer { //qtss_printf("fNextTimeToRun = 5;\n"); this->SetNextTimeToRun(5); } if (theOutput->fLastIntervalMilliSec >= 100) // allow up to 1 second max -- allow some time for the socket to clear and don't go into a tight loop if the client is gone. theOutput->fLastIntervalMilliSec = 100; else theOutput->fLastIntervalMilliSec *= 2; // scale upwards over time //qtss_printf ( "Blocked ReflectorSender::SendPacketsToOutput timeToSendPacket=%qd fLastIntervalMilliSec=%qd fNextTimeToRun=%qd \n", timeToSendPacket, theOutput->fLastIntervalMilliSec, fNextTimeToRun); break; } count++; qIter.Next(); } return lastPacket; } OSQueueElem* ReflectorSender::GetClientBufferStartPacketOffset(SInt64 offsetMsec) { OSQueueIter qIter(&fPacketQueue);// start at oldest packet in q SInt64 theCurrentTime = OS::Milliseconds(); SInt64 packetDelay = 0; OSQueueElem* oldestPacketInClientBufferTime = NULL; while ( !qIter.IsDone() ) // start at oldest packet in q { OSQueueElem* elem = qIter.GetCurrent(); Assert( elem ); qIter.Next(); ReflectorPacket* thePacket = (ReflectorPacket*)elem->GetEnclosingObject(); Assert( thePacket ); packetDelay = theCurrentTime - thePacket->fTimeArrived; if (offsetMsec > ReflectorStream::sOverBufferInMsec) offsetMsec = ReflectorStream::sOverBufferInMsec; if ( packetDelay <= (ReflectorStream::sOverBufferInMsec - offsetMsec) ) { oldestPacketInClientBufferTime = &thePacket->fQueueElem; break; // found the packet we need: done processing } } return oldestPacketInClientBufferTime; } void ReflectorSender::RemoveOldPackets(OSQueue* inFreeQueue) { // Iterate through the senders queue to clear out packets // Start at the oldest packet and walk forward to the newest packet // OSQueueIter removeIter(&fPacketQueue); SInt64 theCurrentTime = OS::Milliseconds(); SInt64 packetDelay = 0; SInt64 currentMaxPacketDelay = ReflectorStream::sMaxPacketAgeMSec; while ( !removeIter.IsDone() ) { OSQueueElem* elem = removeIter.GetCurrent(); Assert( elem ); //at this point, move onto the next queue element, because we may be altering //the queue itself in the code below removeIter.Next(); ReflectorPacket* thePacket = (ReflectorPacket*)elem->GetEnclosingObject(); Assert( thePacket ); //printf("ReflectorSender::RemoveOldPackets Packet %d in queue is %qd milliseconds old\n", DGetPacketSeqNumber( &thePacket->fPacketPtr ) ,theCurrentTime - thePacket->fTimeArrived); packetDelay = theCurrentTime - thePacket->fTimeArrived; // walk q and remove packets that are too old if ( !thePacket->fNeededByOutput && packetDelay > currentMaxPacketDelay) // delete based on late tolerance and whether a client is blocked on the packet { // not needed and older than our required buffer thePacket->Reset(); fPacketQueue.Remove( elem ); inFreeQueue->EnQueue( elem ); } else { // we want to keep all of these but we should reset the ones that should be aged out unless marked // as need the next time through reflect packets. thePacket->fNeededByOutput = false; //mark not needed.. will be set next time through reflect packets if (packetDelay <= currentMaxPacketDelay) // this packet is going to be kept around as well as the ones that follow. break; } } } void ReflectorSocketPool::SetUDPSocketOptions(UDPSocketPair* inPair) { // Fix add ReuseAddr for compatibility with MPEG4IP broadcaster which likes to use the same //sockets. //Make sure this works with PlaylistBroadcaster //inPair->GetSocketA()->ReuseAddr(); //inPair->GetSocketA()->ReuseAddr(); } UDPSocketPair* ReflectorSocketPool::ConstructUDPSocketPair() { return NEW UDPSocketPair (NEW ReflectorSocket(), NEW ReflectorSocket()); } void ReflectorSocketPool::DestructUDPSocket(ReflectorSocket* socket) { if (socket) // allocated { if (socket->GetLocalPort() > 0) // bound and active { //The socket's run function may be executing RIGHT NOW! So we can't //just delete the thing, we need to send the sockets kill events. //qtss_printf("ReflectorSocketPool::DestructUDPSocketPair Signal kKillEvent socket=%p\n",socket); socket->Signal(Task::kKillEvent); } else // not bound ok to delete { //qtss_printf("ReflectorSocketPool::DestructUDPSocketPair delete socket=%p\n",socket); delete socket; } } } void ReflectorSocketPool::DestructUDPSocketPair(UDPSocketPair *inPair) { //qtss_printf("ReflectorSocketPool::DestructUDPSocketPair inPair=%p socketA=%p\n", inPair,(ReflectorSocket*)inPair->GetSocketA()); this->DestructUDPSocket((ReflectorSocket*)inPair->GetSocketA()); //qtss_printf("ReflectorSocketPool::DestructUDPSocketPair inPair=%p socketB=%p\n", inPair,(ReflectorSocket*)inPair->GetSocketB()); this->DestructUDPSocket((ReflectorSocket*)inPair->GetSocketB()); delete inPair; } ReflectorSocket::ReflectorSocket() : IdleTask(), UDPSocket(NULL, Socket::kNonBlockingSocketType | UDPSocket::kWantsDemuxer), fBroadcasterClientSession(NULL), fLastBroadcasterTimeOutRefresh(0), fSleepTime(0), fValidSSRC(0), fLastValidSSRCTime(0), fFilterSSRCs(true), fTimeoutSecs(30), fHasReceiveTime(false), fFirstReceiveTime(0), fFirstArrivalTime(0), fCurrentSSRC(0) { //construct all the preallocated packets this->SetTaskName("ReflectorSocket"); this->SetTask(this); for (UInt32 numPackets = 0; numPackets < kNumPreallocatedPackets; numPackets++) { //If the local port # of this socket is odd, then all the packets //used for this socket are rtcp packets. ReflectorPacket* packet = NEW ReflectorPacket(); fFreeQueue.EnQueue(&packet->fQueueElem);//put this packet onto the free queue } } ReflectorSocket::~ReflectorSocket() { //printf("ReflectorSocket::~ReflectorSocket\n"); while (fFreeQueue.GetLength() > 0) { ReflectorPacket* packet = (ReflectorPacket*)fFreeQueue.DeQueue()->GetEnclosingObject(); delete packet; } } void ReflectorSocket::AddSender(ReflectorSender* inSender) { OSMutexLocker locker(this->GetDemuxer()->GetMutex()); QTSS_Error err = this->GetDemuxer()->RegisterTask(inSender->fStream->fStreamInfo.fSrcIPAddr, 0, inSender); Assert(err == QTSS_NoErr); fSenderQueue.EnQueue(&inSender->fSocketQueueElem); } void ReflectorSocket::RemoveSender(ReflectorSender* inSender) { OSMutexLocker locker(this->GetDemuxer()->GetMutex()); fSenderQueue.Remove(&inSender->fSocketQueueElem); QTSS_Error err = this->GetDemuxer()->UnregisterTask(inSender->fStream->fStreamInfo.fSrcIPAddr, 0, inSender); Assert(err == QTSS_NoErr); } SInt64 ReflectorSocket::Run() { //We want to make sure we can't get idle events WHILE we are inside //this function. That will cause us to run the queues unnecessarily //and just get all confused. this->CancelTimeout(); Task::EventFlags theEvents = this->GetEvents(); //if we have been told to delete ourselves, do so. if (theEvents & Task::kKillEvent) return -1; OSMutexLocker locker(this->GetDemuxer()->GetMutex()); SInt64 theMilliseconds = OS::Milliseconds(); //Only check for data on the socket if we've actually been notified to that effect if (theEvents & Task::kReadEvent) this->GetIncomingData(theMilliseconds); #if DEBUG //make sure that we haven't gotten here prematurely! This wouldn't mess //anything up, but it would waste CPU. if (theEvents & Task::kIdleEvent) { SInt32 temp = (SInt32)(fSleepTime - theMilliseconds); char tempBuf[20]; qtss_sprintf(tempBuf,"%"_S32BITARG_"",temp); WarnV(fSleepTime <= theMilliseconds, tempBuf); } #endif fSleepTime = 0; //Now that we've gotten all available packets, have the streams reflect for (OSQueueIter iter2(&fSenderQueue); !iter2.IsDone(); iter2.Next()) { ReflectorSender* theSender2 = (ReflectorSender*)iter2.GetCurrent()->GetEnclosingObject(); if (theSender2 != NULL && theSender2->ShouldReflectNow(theMilliseconds, &fSleepTime)) theSender2->ReflectPackets(&fSleepTime, &fFreeQueue); } #if DEBUG theMilliseconds = OS::Milliseconds(); #endif //For smoothing purposes, the streams can mark when they want to wakeup. if (fSleepTime > 0) this->SetIdleTimer(fSleepTime); #if DEBUG //The debugging check above expects real time. fSleepTime += theMilliseconds; #endif return 0; } void ReflectorSocket::FilterInvalidSSRCs(ReflectorPacket* thePacket,Bool16 isRTCP) { // assume the first SSRC we see is valid and all others are to be ignored. if ( thePacket->fPacketPtr.Len > 0) do { SInt64 currentTime = OS::Milliseconds() / 1000; if (0 == fValidSSRC) { fValidSSRC = thePacket->GetSSRC(isRTCP); // SSRC of 0 is allowed fLastValidSSRCTime = currentTime; //qtss_printf("socket=%"_U32BITARG_" FIRST PACKET fValidSSRC=%"_U32BITARG_" \n", (UInt32) this,fValidSSRC); break; } UInt32 packetSSRC = thePacket->GetSSRC(isRTCP); if (packetSSRC != 0) { if (packetSSRC == fValidSSRC) { fLastValidSSRCTime = currentTime; //qtss_printf("socket=%"_U32BITARG_" good packet\n", (UInt32) this ); break; } //qtss_printf("socket=%"_U32BITARG_" bad packet packetSSRC= %"_U32BITARG_" fValidSSRC=%"_U32BITARG_" \n", (UInt32) this,packetSSRC,fValidSSRC); thePacket->fPacketPtr.Len = 0; // ignore this packet wrong SSRC } // this executes whenever an invalid SSRC is found -- maybe the original stream ended and a new one is now active if ( (fLastValidSSRCTime + fTimeoutSecs) < currentTime) // fValidSSRC timed out --no packets with this SSRC seen for awhile { fValidSSRC = 0; // reset the valid SSRC with the next packet's SSRC //qtss_printf("RESET fValidSSRC\n"); } }while (false); } Bool16 ReflectorSocket::ProcessPacket(const SInt64& inMilliseconds,ReflectorPacket* thePacket,UInt32 theRemoteAddr,UInt16 theRemotePort) { Bool16 done = false; // stop when result is true if (thePacket != NULL) do { if (GetLocalPort() & 1) thePacket->fIsRTCP = true; else thePacket->fIsRTCP = false; if (fBroadcasterClientSession != NULL) // alway refresh timeout even if we are filtering. { if ( (inMilliseconds - fLastBroadcasterTimeOutRefresh) > kRefreshBroadcastSessionIntervalMilliSecs) { QTSS_RefreshTimeOut(fBroadcasterClientSession); fLastBroadcasterTimeOutRefresh = inMilliseconds; } } if (thePacket->fPacketPtr.Len == 0) { //put the packet back on the free queue, because we didn't actually //get any data here. fFreeQueue.EnQueue(&thePacket->fQueueElem); this->RequestEvent(EV_RE); done = true; //qtss_printf("ReflectorSocket::ProcessPacket no more packets on this socket!\n"); break;//no more packets on this socket! } if (thePacket->IsRTCP()) { //if this is a new RTCP packet, check to see if it is a sender report. //We should only reflect sender reports. Because RTCP packets can't have both //an SR & an RR, and because the SR & the RR must be the first packet in a //compound RTCP packet, all we have to do to determine this is look at the //packet type of the first packet in the compound packet. RTCPPacket theRTCPPacket; if ((!theRTCPPacket.ParsePacket((UInt8*)thePacket->fPacketPtr.Ptr, thePacket->fPacketPtr.Len)) || (theRTCPPacket.GetPacketType() != RTCPSRPacket::kSRPacketType)) { //pretend as if we never got this packet fFreeQueue.EnQueue(&thePacket->fQueueElem); done = true; break; } } // Only reflect one SSRC stream at a time. // Pass the packet and whether it is an RTCP or RTP packet based on the port number. if (fFilterSSRCs) this->FilterInvalidSSRCs(thePacket,GetLocalPort() & 1);// thePacket->fPacketPtr.Len is set to 0 for invalid SSRCs. // Find the appropriate ReflectorSender for this packet. ReflectorSender* theSender = (ReflectorSender*)this->GetDemuxer()->GetTask(theRemoteAddr, 0); // If there is a generic sender for this socket, use it. if (theSender == NULL) theSender = (ReflectorSender*)this->GetDemuxer()->GetTask(0, 0); if (theSender == NULL) { //UInt16* theSeqNumberP = (UInt16*)thePacket->fPacketPtr.Ptr; //qtss_printf("ReflectorSocket::ProcessPacket no sender found for packet! sequence number=%d\n",ntohs(theSeqNumberP[1])); fFreeQueue.EnQueue(&thePacket->fQueueElem); // don't process the packet done = true; break; } Assert(theSender != NULL); // at this point we have a sender const UInt32 maxQSize = 4000; // Check to see if we need to set the remote RTCP address // for this stream. This will be necessary if the source is unicast. #ifdef NAT_WORKAROUND if ((theRemoteAddr != 0) && ((theSender->fStream->fDestRTCPAddr == 0) || (thePacket->IsRTCP()))) // Submitted fix from denis@berlin.ccc.de { Assert(!SocketUtils::IsMulticastIPAddr(theSender->fStream->fStreamInfo.fDestIPAddr)); Assert(theRemotePort != 0); theSender->fStream->fDestRTCPAddr = theRemoteAddr; theSender->fStream->fDestRTCPPort = theRemotePort; // RTCPs are always on odd ports, so check to see if this port is an // RTP port, and if so, just add 1. if (!(thePacket->IsRTCP()) && !(theRemotePort & 1)) theSender->fStream->fDestRTCPPort++; } #else if ((theRemoteAddr != 0) && (theSender->fStream->fDestRTCPAddr == 0)) { // If the source is multicast, this shouldn't be necessary Assert(!SocketUtils::IsMulticastIPAddr(theSender->fStream->fStreamInfo.fDestIPAddr)); Assert(theRemotePort != 0); theSender->fStream->fDestRTCPAddr = theRemoteAddr; theSender->fStream->fDestRTCPPort = theRemotePort; // RTCPs are always on odd ports, so check to see if this port is an // RTP port, and if so, just add 1. if (!(theRemotePort & 1)) theSender->fStream->fDestRTCPPort++; } #endif //NAT_WORKAROUND thePacket->fStreamCountID = ++(theSender->fStream->fPacketCount); thePacket->fBucketsSeenThisPacket = 0; thePacket->fTimeArrived = inMilliseconds; theSender->fPacketQueue.EnQueue(&thePacket->fQueueElem); if ( theSender->fFirstNewPacketInQueue == NULL ) theSender->fFirstNewPacketInQueue = &thePacket->fQueueElem; theSender->fHasNewPackets = true; if (!(thePacket->IsRTCP())) { // don't check for duplicate packets, they may be needed to keep in sync. // Because this is an RTP packet make sure to atomic add this because // multiple sockets can be adding to this variable simultaneously (void)atomic_add(&theSender->fStream->fBytesSentInThisInterval, thePacket->fPacketPtr.Len); //printf("ReflectorSocket::ProcessPacket received RTP id=%qu\n", thePacket->fStreamCountID); theSender->fStream->SetHasFirstRTP(true); } else { //printf("ReflectorSocket::ProcessPacket received RTCP id=%qu\n", thePacket->fStreamCountID); theSender->fStream->SetHasFirstRTCP(true); theSender->fStream->SetFirst_RTCP_RTP_Time(thePacket->GetPacketRTPTime()); theSender->fStream->SetFirst_RTCP_Arrival_Time(thePacket->fTimeArrived); } if (ReflectorStream::sUsePacketReceiveTime && thePacket->fPacketPtr.Len > 12) { UInt32 offset = thePacket->fPacketPtr.Len; char* theTag = ((char*) thePacket->fPacketPtr.Ptr + offset) - 12; UInt64* theValue = (UInt64*) ((char*) ( (char*) thePacket->fPacketPtr.Ptr + offset) - 8); if (0 == ::strncmp(theTag,"aktt",4)) { UInt64 theReceiveTime = OS::NetworkToHostSInt64(*theValue); UInt32 theSSRC = thePacket->GetSSRC(theRemotePort & 1); // use to check if broadcast has restarted so we can reset if ( !this->fHasReceiveTime || (this->fCurrentSSRC != theSSRC) ) { this->fCurrentSSRC = theSSRC; this->fFirstArrivalTime = thePacket->fTimeArrived; this->fFirstReceiveTime = theReceiveTime; this->fHasReceiveTime = true; } SInt64 packetOffsetFromStart = theReceiveTime - this->fFirstReceiveTime; // packets arrive at time 0 and fill forward into the future thePacket->fTimeArrived = this->fFirstArrivalTime + packetOffsetFromStart; // offset starts negative by over buffer amount thePacket->fPacketPtr.Len -= 12; SInt64 arrivalTimeOffset = thePacket->fTimeArrived - inMilliseconds; if ( arrivalTimeOffset > ReflectorStream::sMaxFuturePacketMSec ) // way out in the future. thePacket->fTimeArrived = inMilliseconds + ReflectorStream::sMaxFuturePacketMSec; //keep it but only for sMaxFuturePacketMSec = (sMaxPacketAgeMSec <-- current --> sMaxFuturePacketMSec) // if it was in the past we leave it alone because it will be deleted after processing. //printf("ReflectorSocket::ProcessPacket packetOffsetFromStart=%f\n", (Float32) packetOffsetFromStart / 1000); } } //printf("ReflectorSocket::GetIncomingData has packet from time=%qd src addr=%"_U32BITARG_" src port=%u packetlen=%"_U32BITARG_"\n",inMilliseconds, theRemoteAddr,theRemotePort,thePacket->fPacketPtr.Len); if (0) //turn on / off buffer size checking -- pref can go here if we find we need to adjust this if (theSender->fPacketQueue.GetLength() > maxQSize) //don't grow memory too big { char outMessage[256]; sprintf(outMessage,"Packet Queue for port=%d qsize = %"_S32BITARG_" hit max qSize=%"_U32BITARG_"", theRemotePort,theSender->fPacketQueue.GetLength(), maxQSize); WarnV(false, outMessage); } } while(false); return done; } void ReflectorSocket::GetIncomingData(const SInt64& inMilliseconds) { OSMutexLocker locker(this->GetDemuxer()->GetMutex()); UInt32 theRemoteAddr = 0; UInt16 theRemotePort = 0; //get all the outstanding packets for this socket while (true) { //get a packet off the free queue. ReflectorPacket* thePacket = this->GetPacket(); thePacket->fPacketPtr.Len = 0; (void)this->RecvFrom(&theRemoteAddr, &theRemotePort, thePacket->fPacketPtr.Ptr, ReflectorPacket::kMaxReflectorPacketSize, &thePacket->fPacketPtr.Len); if (this->ProcessPacket(inMilliseconds,thePacket,theRemoteAddr, theRemotePort)) break; //printf("ReflectorSocket::GetIncomingData \n"); } } ReflectorPacket* ReflectorSocket::GetPacket() { OSMutexLocker locker(this->GetDemuxer()->GetMutex()); if (fFreeQueue.GetLength() == 0) //if the port number of this socket is odd, this packet is an RTCP packet. return NEW ReflectorPacket(); else return (ReflectorPacket*)fFreeQueue.DeQueue()->GetEnclosingObject(); }