/* * * @APPLE_LICENSE_HEADER_START@ * * Copyright (c) 1999-2008 Apple Inc. All Rights Reserved. * * This file contains Original Code and/or Modifications of Original Code * as defined in and that are subject to the Apple Public Source License * Version 2.0 (the 'License'). You may not use this file except in * compliance with the License. Please obtain a copy of the License at * http://www.opensource.apple.com/apsl/ and read it before using this * file. * * The Original Code and all software distributed under the License are * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. * Please see the License for the specific language governing rights and * limitations under the License. * * @APPLE_LICENSE_HEADER_END@ * */ /* File: RTPPacketResender.cpp Contains: RTPPacketResender class to buffer and track re-transmits of RTP packets. */ #include #include "RTPPacketResender.h" #include "RTPStream.h" #include "atomic.h" #include "OSMutex.h" #if RTP_PACKET_RESENDER_DEBUGGING #include "QTSSRollingLog.h" #include class MyAckListLog : public QTSSRollingLog { public: MyAckListLog(char * logFName) : QTSSRollingLog() {this->SetTaskName("MyAckListLog"); ::strcpy( fLogFName, logFName ); } virtual ~MyAckListLog() {} virtual char* GetLogName() { char *logFileNameStr = NEW char[80]; ::strcpy( logFileNameStr, fLogFName ); return logFileNameStr; } virtual char* GetLogDir() { char *logDirStr = NEW char[80]; ::strcpy( logDirStr, DEFAULTPATHS_LOG_DIR); return logDirStr; } virtual UInt32 GetRollIntervalInDays() { return 0; } virtual UInt32 GetMaxLogBytes() { return 0; } char fLogFName[128]; }; #endif static const UInt32 kPacketArrayIncreaseInterval = 32;// must be multiple of 2 static const UInt32 kInitialPacketArraySize = 64;// must be multiple of kPacketArrayIncreaseInterval (Turns out this is as big as we typically need) //static const UInt32 kMaxPacketArraySize = 512;// must be multiple of kPacketArrayIncreaseInterval it would have to be a 3 mbit or more static const UInt32 kMaxDataBufferSize = 1600; OSBufferPool RTPPacketResender::sBufferPool(kMaxDataBufferSize); unsigned int RTPPacketResender::sNumWastedBytes = 0; RTPPacketResender::RTPPacketResender() : fBandwidthTracker(NULL), fSocket(NULL), fDestAddr(0), fDestPort(0), fMaxPacketsInList(0), fPacketsInList(0), fNumResends(0), fNumExpired(0), fNumAcksForMissingPackets(0), fNumSent(0), fPacketArray(NULL), fPacketArraySize(kInitialPacketArraySize), fPacketArrayMask(0), fHighestSeqNum(0), fLastUsed(0), fPacketQMutex() { fPacketArray = (RTPResenderEntry*) NEW char[sizeof(RTPResenderEntry) * fPacketArraySize]; ::memset(fPacketArray,0,sizeof(RTPResenderEntry) * fPacketArraySize); } RTPPacketResender::~RTPPacketResender() { for (UInt32 x = 0; x < fPacketArraySize; x++) { if (fPacketArray[x].fPacketSize > 0) atomic_sub(&sNumWastedBytes, kMaxDataBufferSize - fPacketArray[x].fPacketSize); if (fPacketArray[x].fPacketData != NULL) { if (fPacketArray[x].fIsSpecialBuffer) delete [] (char*)fPacketArray[x].fPacketData; else sBufferPool.Put(fPacketArray[x].fPacketData); } } delete [] fPacketArray; } #if RTP_PACKET_RESENDER_DEBUGGING void RTPPacketResender::logprintf( const char * format, ... ) { /* WARNING - the logger is not multiple task thread safe. its OK when we run just one thread for all of the sending tasks though. each logger for a given session will open up access to the same log file. with one thread we're serialized on writing to the file, so it works. */ va_list argptr; char buff[1024]; va_start( argptr, format ); vsprintf( buff, format, argptr ); va_end(argptr); if ( fLogger ) { fLogger->WriteToLog(buff, false); qtss_printf( buff ); } } void RTPPacketResender::SetDebugInfo(UInt32 trackID, UInt16 remoteRTCPPort, UInt32 curPacketDelay) { fTrackID = trackID; fRemoteRTCPPort = remoteRTCPPort; fCurrentPacketDelay = curPacketDelay; } void RTPPacketResender::SetLog( StrPtrLen *logname ) { /* WARNING - see logprintf() */ char logFName[128]; memcpy( logFName, logname->Ptr, logname->Len ); logFName[logname->Len] = 0; if ( fLogger ) delete fLogger; fLogger = new MyAckListLog( logFName ); fLogger->EnableLog(); } void RTPPacketResender::LogClose(SInt64 inTimeSpentInFlowControl) { this->logprintf("Flow control duration msec: %"_64BITARG_"d. Max outstanding packets: %d\n", inTimeSpentInFlowControl, this->GetMaxPacketsInList()); } UInt32 RTPPacketResender::SpillGuts(UInt32 inBytesSentThisInterval) { if (fInfoDisplayTimer.DurationInMilliseconds() > 1000 ) { //fDisplayCount++; // spill our guts on the state of KRR char *isFlowed = "open"; if ( fBandwidthTracker->IsFlowControlled() ) isFlowed = "flowed"; SInt64 kiloBitperSecond = (( (SInt64)inBytesSentThisInterval * (SInt64)1000 * (SInt64)8 ) / fInfoDisplayTimer.DurationInMilliseconds() ) / (SInt64)1024; //fStreamCumDuration += fInfoDisplayTimer.DurationInMilliseconds(); fInfoDisplayTimer.Reset(); //this->logprintf( "\n[%li] info for track %li, cur bytes %li, cur kbit/s %li\n", /*(SInt32)fStreamCumDuration,*/ fTrackID, (SInt32)inBytesSentThisInterval, (SInt32)kiloBitperSecond); this->logprintf( "\nx info for track %li, cur bytes %li, cur kbit/s %li\n", /*(SInt32)fStreamCumDuration,*/ fTrackID, (SInt32)inBytesSentThisInterval, (SInt32)kiloBitperSecond); this->logprintf( "stream is %s, bytes pending ack %li, cwnd %li, ssth %li, wind %li \n", isFlowed, fBandwidthTracker->BytesInList(), fBandwidthTracker->CongestionWindow(), fBandwidthTracker->SlowStartThreshold(), fBandwidthTracker->ClientWindowSize() ); this->logprintf( "stats- resends: %li, expired: %li, dupe acks: %li, sent: %li\n", fNumResends, fNumExpired, fNumAcksForMissingPackets, fNumSent ); this->logprintf( "delays- cur: %li, srtt: %li , dev: %li, rto: %li, bw: %li\n\n", fCurrentPacketDelay, fBandwidthTracker->RunningAverageMSecs(), fBandwidthTracker->RunningMeanDevationMSecs(), fBandwidthTracker->CurRetransmitTimeout(), fBandwidthTracker->GetCurrentBandwidthInBps()); inBytesSentThisInterval = 0; } return inBytesSentThisInterval; } #endif void RTPPacketResender::SetDestination(UDPSocket* inOutputSocket, UInt32 inDestAddr, UInt16 inDestPort) { fSocket = inOutputSocket; fDestAddr = inDestAddr; fDestPort = inDestPort; } RTPResenderEntry* RTPPacketResender::GetEmptyEntry(UInt16 inSeqNum, UInt32 inPacketSize) { RTPResenderEntry* theEntry = NULL; for (UInt32 packetIndex = 0 ;packetIndex < fPacketsInList; packetIndex++) // see if packet is already in the array { if (inSeqNum == fPacketArray[packetIndex].fSeqNum) { return NULL; } } if (fPacketsInList == fPacketArraySize) // allocate a new array { fPacketArraySize += kPacketArrayIncreaseInterval; RTPResenderEntry* tempArray = (RTPResenderEntry*) NEW char[sizeof(RTPResenderEntry) * fPacketArraySize]; ::memset(tempArray,0,sizeof(RTPResenderEntry) * fPacketArraySize); ::memcpy(tempArray,fPacketArray,sizeof(RTPResenderEntry) * fPacketsInList); delete [] fPacketArray; fPacketArray = tempArray; //qtss_printf("NewArray size=%"_S32BITARG_" packetsInList=%"_S32BITARG_"\n",fPacketArraySize, fPacketsInList); } if (fPacketsInList < fPacketArraySize) // have an open spot { theEntry = &fPacketArray[fPacketsInList]; fPacketsInList++; if (fPacketsInList < fPacketArraySize) fLastUsed = fPacketsInList; else fLastUsed = fPacketArraySize; } else { // nothing open so re-use if (fLastUsed < fPacketArraySize - 1) fLastUsed ++; else fLastUsed = 0; //qtss_printf("array is full = %"_U32BITARG_" reusing index=%"_U32BITARG_"\n",fPacketsInList,fLastUsed); theEntry = &fPacketArray[fLastUsed]; RemovePacket(fLastUsed, false); // delete packet in place don't fill we will use the spot } // // Check to see if this packet is too big for the buffer. If it is, then // we need to specially allocate a special buffer if (inPacketSize > kMaxDataBufferSize) { //sBufferPool.Put(theEntry->fPacketData); theEntry->fIsSpecialBuffer = true; theEntry->fPacketData = NEW char[inPacketSize]; } else// It is not special, it's from the buffer pool { theEntry->fIsSpecialBuffer = false; theEntry->fPacketData = sBufferPool.Get(); } return theEntry; } void RTPPacketResender::ClearOutstandingPackets() { //OSMutexLocker packetQLocker(&fPacketQMutex); //for (UInt16 packetIndex = 0; packetIndex < fPacketArraySize; packetIndex++) //Testing purposes for (UInt16 packetIndex = 0; packetIndex < fPacketsInList; packetIndex++) { this->RemovePacket(packetIndex,false);// don't move packets delete in place Assert(fPacketArray[packetIndex].fPacketSize==0); } if (fBandwidthTracker != NULL) fBandwidthTracker->EmptyWindow(fBandwidthTracker->BytesInList()); //clean it out fPacketsInList = 0; // deleting in place doesn't decrement Assert(fPacketsInList == 0); } void RTPPacketResender::AddPacket( void * inRTPPacket, UInt32 packetSize, SInt32 ageLimit ) { //OSMutexLocker packetQLocker(&fPacketQMutex); // the caller needs to adjust the overall age limit by reducing it // by the current packet lateness. // we compute a re-transmit timeout based on the Karns RTT esmitate UInt16* theSeqNumP = (UInt16*)inRTPPacket; UInt16 theSeqNum = ntohs(theSeqNumP[1]); if ( ageLimit > 0 ) { RTPResenderEntry* theEntry = this->GetEmptyEntry(theSeqNum, packetSize); // // This may happen if this sequence number has already been added. // That may happen if we have repeat packets in the stream. if (theEntry == NULL || theEntry->fPacketSize > 0) return; // // Reset all the information in the RTPResenderEntry ::memcpy(theEntry->fPacketData, inRTPPacket, packetSize); theEntry->fPacketSize = packetSize; theEntry->fAddedTime = OS::Milliseconds(); theEntry->fOrigRetransTimeout = fBandwidthTracker->CurRetransmitTimeout(); theEntry->fExpireTime = theEntry->fAddedTime + ageLimit; theEntry->fNumResends = 0; theEntry->fSeqNum = theSeqNum; // // Track the number of wasted bytes we have atomic_add(&sNumWastedBytes, kMaxDataBufferSize - packetSize); //PLDoubleLinkedListNode * listNode = NEW PLDoubleLinkedListNode( new RTPResenderEntry(inRTPPacket, packetSize, ageLimit, fRTTEstimator.CurRetransmitTimeout() ) ); //fAckList.AddNodeToTail(listNode); fBandwidthTracker->FillWindow(packetSize); } else { #if RTP_PACKET_RESENDER_DEBUGGING this->logprintf( "packet too old to add: seq# %li, age limit %li, cur late %li, track id %li\n", (SInt32)ntohs( *((UInt16*)(((char*)inRTPPacket)+2)) ), (SInt32)ageLimit, fCurrentPacketDelay, fTrackID ); #endif fNumExpired++; } fNumSent++; } void RTPPacketResender::AckPacket( UInt16 inSeqNum, SInt64& inCurTimeInMsec ) { //OSMutexLocker packetQLocker(&fPacketQMutex); SInt32 foundIndex = -1; for (UInt32 packetIndex = 0 ;packetIndex < fPacketsInList; packetIndex++) { if (inSeqNum == fPacketArray[packetIndex].fSeqNum) { foundIndex = packetIndex; break; } } RTPResenderEntry* theEntry = NULL; if (foundIndex != -1) theEntry = &fPacketArray[foundIndex]; if (theEntry == NULL || theEntry->fPacketSize == 0 ) { /* we got an ack for a packet that has already expired or for a packet whose re-transmit crossed with it's original ack */ #if RTP_PACKET_RESENDER_DEBUGGING this->logprintf( "acked packet not found: %li, track id %li, OS::MSecs %li\n" , (SInt32)inSeqNum, fTrackID, (SInt32)OS::Milliseconds() ); #endif fNumAcksForMissingPackets++; //qtss_printf("Ack for missing packet: %d\n", inSeqNum); // hmm.. we -should not have- closed down the window in this case // so reopen it a bit as we normally would. // ?? who know's what it really was, just use kMaximumSegmentSize fBandwidthTracker->EmptyWindow( RTPBandwidthTracker::kMaximumSegmentSize, false ); // when we don't add an estimate from re-transmitted segments we're actually *underestimating* // both the variation and srtt since we're throwing away ALL estimates above the current RTO! // therefore it's difficult for us to rapidly adapt to increases in RTT, as well as RTT that // are higher than our original RTO estimate. // for duplicate acks, use 1.5x the cur RTO as the RTT sample //fRTTEstimator.AddToEstimate( fRTTEstimator.CurRetransmitTimeout() * 3 / 2 ); /// this results in some very very big RTO's since the dupes come in batches of maybe 10 or more! // qtss_printf("Got ack for expired packet %d\n", inSeqNum); } else { #if RTP_PACKET_RESENDER_DEBUGGING Assert(inSeqNum == theEntry->fSeqNum); this->logprintf( "Ack for packet: %li, track id %li, OS::MSecs %qd\n" , (SInt32)inSeqNum, fTrackID, OS::Milliseconds() ); #endif fBandwidthTracker->EmptyWindow(theEntry->fPacketSize); if ( theEntry->fNumResends == 0 ) { // add RTT sample... // only use rtt from packets acked after their initial send, do not use // estimates gatherered from re-trasnmitted packets. //fRTTEstimator.AddToEstimate( theEntry->fPacketRTTDuration.DurationInMilliseconds() ); fBandwidthTracker->AddToRTTEstimate( (SInt32) ( inCurTimeInMsec - theEntry->fAddedTime ) ); // qtss_printf("Got ack for packet %d RTT = %qd\n", inSeqNum, inCurTimeInMsec - theEntry->fAddedTime); } else { #if RTP_PACKET_RESENDER_DEBUGGING this->logprintf( "re-tx'd packet acked. ack num : %li, pack seq #: %li, num resends %li, track id %li, size %li, OS::MSecs %qd\n" \ , (SInt32)inSeqNum, (SInt32)ntohs( *((UInt16*)(((char*)theEntry->fPacketData)+2)) ), (SInt32)theEntry->fNumResends , (SInt32)fTrackID, theEntry->fPacketSize, OS::Milliseconds() ); #endif } this->RemovePacket(foundIndex); } } void RTPPacketResender::RemovePacket(UInt32 packetIndex, Bool16 reuseIndex) { //OSMutexLocker packetQLocker(&fPacketQMutex); Assert(packetIndex < fPacketArraySize); if (packetIndex >= fPacketArraySize) return; if (fPacketsInList == 0) return; RTPResenderEntry* theEntry = &fPacketArray[packetIndex]; if (theEntry->fPacketSize == 0) return; // // Track the number of wasted bytes we have atomic_sub(&sNumWastedBytes, kMaxDataBufferSize - theEntry->fPacketSize); Assert(theEntry->fPacketSize > 0); // // Update our list information Assert(fPacketsInList > 0); if (theEntry->fIsSpecialBuffer) { delete [] (char*)theEntry->fPacketData; } else if (theEntry->fPacketData != NULL) sBufferPool.Put(theEntry->fPacketData); if (reuseIndex) // we are re-using the space so keep array contiguous { fPacketArray[packetIndex] = fPacketArray[fPacketsInList -1]; ::memset(&fPacketArray[fPacketsInList -1],0,sizeof(RTPResenderEntry)); fPacketsInList--; } else // the array is full { fBandwidthTracker->EmptyWindow( theEntry->fPacketSize, false ); // keep window available ::memset(theEntry,0,sizeof(RTPResenderEntry)); } } void RTPPacketResender::ResendDueEntries() { if (fPacketsInList <= 0) return; //OSMutexLocker packetQLocker(&fPacketQMutex); // SInt32 numResends = 0; RTPResenderEntry* theEntry = NULL; SInt64 curTime = OS::Milliseconds(); for (SInt32 packetIndex = fPacketsInList -1; packetIndex >= 0; packetIndex--) // walk backwards because remove packet moves array members forward { theEntry = &fPacketArray[packetIndex]; if (theEntry->fPacketSize == 0) continue; if ((curTime - theEntry->fAddedTime) > fBandwidthTracker->CurRetransmitTimeout()) { // Change: Only expire packets after they were due to be resent. This gives the client // a chance to ack them and improves congestion avoidance and RTT calculation if (curTime > theEntry->fExpireTime) { #if RTP_PACKET_RESENDER_DEBUGGING unsigned char version; version = *((char*)theEntry->fPacketData); version &= 0x84; // grab most sig 2 bits version = version >> 6; // shift by 6 bits this->logprintf( "expired: seq number %li, track id %li (port: %li), vers # %li, pack seq # %li, size: %li, OS::Msecs: %qd\n", \ (SInt32)ntohs( *((UInt16*)(((char*)theEntry->fPacketData)+2)) ), fTrackID, (SInt32) ntohs(fDestPort), \ (SInt32)version, (SInt32)ntohs( *((UInt16*)(((char*)theEntry->fPacketData)+2))), theEntry->fPacketSize, OS::Milliseconds() ); #endif // // This packet is expired fNumExpired++; //qtss_printf("Packet expired: %d\n", ((UInt16*)thePacket)[1]); fBandwidthTracker->EmptyWindow(theEntry->fPacketSize); this->RemovePacket(packetIndex); // qtss_printf("Expired packet %d\n", theEntry->fSeqNum); continue; } // Resend this packet fSocket->SendTo(fDestAddr, fDestPort, theEntry->fPacketData, theEntry->fPacketSize); //qtss_printf("Packet resent: %d\n", ((UInt16*)theEntry->fPacketData)[1]); theEntry->fNumResends++; #if RTP_PACKET_RESENDER_DEBUGGING this->logprintf( "re-sent: %li RTO %li, track id %li (port %li), size: %li, OS::Ms %qd\n", (SInt32)ntohs( *((UInt16*)(((char*)theEntry->fPacketData)+2)) ), curTime - theEntry->fAddedTime, \ fTrackID, (SInt32) ntohs(fDestPort) \ , theEntry->fPacketSize, OS::Milliseconds()); #endif fNumResends++; numResends ++; //qtss_printf("resend loop numResends=%"_S32BITARG_" packet theEntry->fNumResends=%"_S32BITARG_" stream fNumResends=\n",numResends,theEntry->fNumResends++, fNumResends); // ok -- lets try this.. add 1.5x of the INITIAL duration since the last send to the rto estimator // since we won't get an ack on this packet // this should keep us from exponentially increasing due o a one time increase // in the actuall rtt, only AddToEstimate on the first resend ( assume that it's a dupe ) // if it's not a dupe, but rather an actual loss, the subseqnuent actuals wil bring down the average quickly if ( theEntry->fNumResends == 1 ) fBandwidthTracker->AddToRTTEstimate( (SInt32) ((theEntry->fOrigRetransTimeout * 3) / 2 )); // qtss_printf("Retransmitted packet %d\n", theEntry->fSeqNum); theEntry->fAddedTime = curTime; fBandwidthTracker->AdjustWindowForRetransmit(); continue; } } } void RTPPacketResender::RemovePacket(RTPResenderEntry* inEntry){ Assert(0); }