/* * * @APPLE_LICENSE_HEADER_START@ * * Copyright (c) 1999-2008 Apple Inc. All Rights Reserved. * * This file contains Original Code and/or Modifications of Original Code * as defined in and that are subject to the Apple Public Source License * Version 2.0 (the 'License'). You may not use this file except in * compliance with the License. Please obtain a copy of the License at * http://www.opensource.apple.com/apsl/ and read it before using this * file. * * The Original Code and all software distributed under the License are * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. * Please see the License for the specific language governing rights and * limitations under the License. * * @APPLE_LICENSE_HEADER_END@ * */ /* File: ReflectorSession.cpp Contains: Implementation of object defined in ReflectorSession.h. */ #include "ReflectorSession.h" #include "RTCPPacket.h" #include "SocketUtils.h" #include "EventContext.h" #include "OSMemory.h" #include "OS.h" #include "atomic.h" #include "QTSSModuleUtils.h" #include #ifndef __Win32__ #include #endif #if DEBUG #define REFLECTOR_SESSION_DEBUGGING 0 #else #define REFLECTOR_SESSION_DEBUGGING 0 #endif FileDeleter::FileDeleter(StrPtrLen* inSDPPath) { Assert (inSDPPath); fFilePath.Len = inSDPPath->Len; fFilePath.Ptr = NEW char[inSDPPath->Len + 1]; Assert (fFilePath.Ptr); memcpy(fFilePath.Ptr, inSDPPath->Ptr,inSDPPath->Len); fFilePath.Ptr[inSDPPath->Len] = 0; } FileDeleter::~FileDeleter() { //qtss_printf("FileDeleter::~FileDeleter delete = %s \n",fFilePath.Ptr); ::unlink(fFilePath.Ptr); delete fFilePath.Ptr; fFilePath.Ptr = NULL; fFilePath.Len = 0; } static OSRefTable* sStreamMap = NULL; void ReflectorSession::Initialize() { if (sStreamMap == NULL) sStreamMap = NEW OSRefTable(); } ReflectorSession::ReflectorSession(StrPtrLen* inSourceID, SourceInfo* inInfo) : fIsSetup(false), fQueueElem(), fNumOutputs(0), fStreamArray(NULL), fFormatter(fHTMLBuf, kMaxHTMLSize), fSourceInfo(inInfo), fSocketStream(NULL), fBroadcasterSession(NULL), fInitTimeMS(OS::Milliseconds()), fHasBufferedStreams(false) { fQueueElem.SetEnclosingObject(this); if (inSourceID != NULL) { fSourceID.Ptr = NEW char[inSourceID->Len + 1]; ::memcpy(fSourceID.Ptr, inSourceID->Ptr, inSourceID->Len); fSourceID.Len = inSourceID->Len; fRef.Set(fSourceID, this); } } ReflectorSession::~ReflectorSession() { #if REFLECTOR_SESSION_DEBUGGING qtss_printf("Removing ReflectorSession: %s\n", fSourceInfoHTML.Ptr); #endif // For each stream, check to see if the ReflectorStream should be deleted OSMutexLocker locker (sStreamMap->GetMutex()); for (UInt32 x = 0; x < fSourceInfo->GetNumStreams(); x++) { if (fStreamArray[x] == NULL) continue; UInt32 refCount = fStreamArray[x]->GetRef()->GetRefCount(); Bool16 unregisterNow = (refCount == 1) ? true : false; //qtss_printf("ReflectorSession::~ReflectorSession stream index=%"_U32BITARG_" refcount=%"_U32BITARG_"\n",x,refCount); //decrement the ref count if (refCount > 0) // Refcount may be 0 if there was some error setting up the stream sStreamMap->Release(fStreamArray[x]->GetRef()); // decrement the refcount refCount = fStreamArray[x]->GetRef()->GetRefCount(); if (refCount == 0) { // Delete this stream if the refcount has dropped to 0 if (unregisterNow) sStreamMap->UnRegister(fStreamArray[x]->GetRef()); // Refcount may be 0 if there was some error setting up the stream //qtss_printf("delete stream index=%"_U32BITARG_" refcount=%"_U32BITARG_"\n",x,refCount); delete fStreamArray[x]; fStreamArray[x] = NULL; } } // We own this object when it is given to us, so delete it now delete [] fStreamArray; delete fSourceInfo; fLocalSDP.Delete(); fSourceID.Delete(); } QTSS_Error ReflectorSession::SetupReflectorSession(SourceInfo* inInfo, QTSS_StandardRTSP_Params* inParams, UInt32 inFlags, Bool16 filterState, UInt32 filterTimeout) { if (inInfo == NULL) // use the current SourceInfo inInfo = fSourceInfo; // Store a reference to this sourceInfo permanently Assert((fSourceInfo == NULL) || (inInfo == fSourceInfo)); fSourceInfo = inInfo; fLocalSDP.Delete();// this must be set to the new SDP. fLocalSDP.Ptr = inInfo->GetLocalSDP(&fLocalSDP.Len); // Allocate all our ReflectorStreams, using the SourceInfo if (fStreamArray != NULL) { for (UInt32 x = 0; x < fSourceInfo->GetNumStreams(); x++) if (fSourceInfo->GetStreamInfo(x)->fPort > 0 && fStreamArray[x] != NULL) sStreamMap->Release(fStreamArray[x]->GetRef()); } delete fStreamArray; // keep the array list synchronized with the source info. fStreamArray = NEW ReflectorStream*[fSourceInfo->GetNumStreams()]; ::memset(fStreamArray, 0, fSourceInfo->GetNumStreams() * sizeof(ReflectorStream*)); for (UInt32 x = 0; x < fSourceInfo->GetNumStreams(); x++) { // For each ReflectorStream, check and see if there is one that matches // this stream ID char theStreamID[ReflectorStream::kStreamIDSize]; StrPtrLen theStreamIDPtr(theStreamID, ReflectorStream::kStreamIDSize); ReflectorStream::GenerateSourceID(fSourceInfo->GetStreamInfo(x), &theStreamID[0]); OSMutexLocker locker(sStreamMap->GetMutex()); OSRef* theStreamRef = NULL; if (false && (inFlags & kIsPushSession)) // always setup our own ports when pushed. fSourceInfo->GetStreamInfo(x)->fPort = 0; else // If the port # of this stream is 0, that means "any port". // We don't know what the dest port of this stream is yet (this // can happen while acting as an RTSP client). Never share these streams. // This can also happen if the incoming data is interleaved in the TCP connection or a dynamic UDP port is requested if (fSourceInfo->GetStreamInfo(x)->fPort > 0) { theStreamRef = sStreamMap->Resolve(&theStreamIDPtr); #if REFLECTOR_SESSION_DEBUGGING if (theStreamRef != NULL) { ReflectorStream* theRef = (ReflectorStream*)theStreamRef->GetObject(); UInt32 refCount = theRef->GetRef()->GetRefCount(); qtss_printf("stream has port stream index=%"_U32BITARG_" refcount=%"_U32BITARG_"\n",x,refCount); } #endif } if (theStreamRef == NULL) { fStreamArray[x] = NEW ReflectorStream(fSourceInfo->GetStreamInfo(x)); // Obviously, we may encounter an error binding the reflector sockets. // If that happens, we'll just abort here, which will leave the ReflectorStream // array in an inconsistent state, so we need to make sure in our cleanup // code to check for NULL. QTSS_Error theError = fStreamArray[x]->BindSockets(inParams,inFlags, filterState, filterTimeout); if (theError != QTSS_NoErr) { delete fStreamArray[x]; fStreamArray[x] = NULL; return theError; } fStreamArray[x]->SetEnableBuffer(this->fHasBufferedStreams);// buffering is done by the stream's sender // If the port was 0, update it to reflect what the actual RTP port is. fSourceInfo->GetStreamInfo(x)->fPort = fStreamArray[x]->GetStreamInfo()->fPort; //qtss_printf("ReflectorSession::SetupReflectorSession fSourceInfo->GetStreamInfo(x)->fPort= %u\n",fSourceInfo->GetStreamInfo(x)->fPort); ReflectorStream::GenerateSourceID(fSourceInfo->GetStreamInfo(x), &theStreamID[0]); theError = sStreamMap->Register(fStreamArray[x]->GetRef()); Assert(theError == QTSS_NoErr); //unless we do this, the refcount won't increment (and we'll delete the session prematurely OSRef* debug = sStreamMap->Resolve(&theStreamIDPtr); Assert(debug == fStreamArray[x]->GetRef()); //UInt32 refCount = fStreamArray[x]->GetRef()->GetRefCount(); //qtss_printf("stream index=%"_U32BITARG_" refcount=%"_U32BITARG_"\n",x,refCount); } else fStreamArray[x] = (ReflectorStream*)theStreamRef->GetObject(); } if (inFlags & kMarkSetup) fIsSetup = true; return QTSS_NoErr; } void ReflectorSession::AddBroadcasterClientSession(QTSS_StandardRTSP_Params* inParams) { if (NULL == fStreamArray || NULL == inParams) return; for (UInt32 x = 0; x < fSourceInfo->GetNumStreams(); x++) { if (fStreamArray[x] != NULL) { //qtss_printf("AddBroadcasterSession=%"_U32BITARG_"\n",inParams->inClientSession); ((ReflectorSocket*)fStreamArray[x]->GetSocketPair()->GetSocketA())->AddBroadcasterSession(inParams->inClientSession); ((ReflectorSocket*)fStreamArray[x]->GetSocketPair()->GetSocketB())->AddBroadcasterSession(inParams->inClientSession); } } fBroadcasterSession = inParams->inClientSession; } void ReflectorSession::FormatHTML(StrPtrLen* inURL) { // Begin writing our source description HTML (used by the relay) // Line looks like: Relay Source: 17.221.98.239, Ports: 5430 5432 5434 static StrPtrLen sHTMLStart("

Relay Source: "); static StrPtrLen sPorts(", Ports: "); static StrPtrLen sHTMLEnd("


"); // Begin writing the HTML fFormatter.Put(sHTMLStart); if (inURL == NULL) { // If no URL is provided, format the source IP addr as a string. char theIPAddrBuf[20]; StrPtrLen theIPAddr(theIPAddrBuf, 20); struct in_addr theAddr; theAddr.s_addr = htonl(fSourceInfo->GetStreamInfo(0)->fSrcIPAddr); SocketUtils::ConvertAddrToString(theAddr, &theIPAddr); fFormatter.Put(theIPAddr); } else fFormatter.Put(*inURL); fFormatter.Put(sPorts); for (UInt32 x = 0; x < fSourceInfo->GetNumStreams(); x++) { fFormatter.Put(fSourceInfo->GetStreamInfo(x)->fPort); fFormatter.PutSpace(); } fFormatter.Put(sHTMLEnd); // Setup the StrPtrLen to point to the right stuff fSourceInfoHTML.Ptr = fFormatter.GetBufPtr(); fSourceInfoHTML.Len = fFormatter.GetCurrentOffset(); fFormatter.PutTerminator(); } void ReflectorSession::AddOutput(ReflectorOutput* inOutput, Bool16 isClient) { Assert(fSourceInfo->GetNumStreams() > 0); // We need to make sure that this output goes into the same bucket for each ReflectorStream. SInt32 bucket = -1; SInt32 lastBucket = -1; while (true) { UInt32 x = 0; for ( ; x < fSourceInfo->GetNumStreams(); x++) { bucket = fStreamArray[x]->AddOutput(inOutput, bucket); if (bucket == -1) // If this output couldn't be added to this bucket, break; // break and try again else { lastBucket = bucket; // Remember the last successful bucket placement. if (isClient) fStreamArray[x]->IncEyeCount(); } } if (bucket == -1) { // If there was some kind of conflict adding this output to this bucket, // we need to remove it from the streams to which it was added. for (UInt32 y = 0; y < x; y++) { fStreamArray[y]->RemoveOutput(inOutput); if (isClient) fStreamArray[y]->DecEyeCount(); } // Because there was an error, we need to start the whole process over again, // this time starting from a higher bucket lastBucket = bucket = lastBucket + 1; } else break; } (void)atomic_add(&fNumOutputs, 1); } void ReflectorSession::RemoveOutput(ReflectorOutput* inOutput, Bool16 isClient) { (void)atomic_sub(&fNumOutputs, 1); for (UInt32 y = 0; y < fSourceInfo->GetNumStreams(); y++) { fStreamArray[y]->RemoveOutput(inOutput); if (isClient) fStreamArray[y]->DecEyeCount(); } } void ReflectorSession::TearDownAllOutputs() { for (UInt32 y = 0; y < fSourceInfo->GetNumStreams(); y++) fStreamArray[y]->TearDownAllOutputs(); } void ReflectorSession::RemoveSessionFromOutput(QTSS_ClientSessionObject inSession) { for (UInt32 x = 0; x < fSourceInfo->GetNumStreams(); x++) { ((ReflectorSocket*)fStreamArray[x]->GetSocketPair()->GetSocketA())->RemoveBroadcasterSession(inSession); ((ReflectorSocket*)fStreamArray[x]->GetSocketPair()->GetSocketB())->RemoveBroadcasterSession(inSession); } fBroadcasterSession = NULL; } UInt32 ReflectorSession::GetBitRate() { UInt32 retval = 0; for (UInt32 x = 0; x < fSourceInfo->GetNumStreams(); x++) retval += fStreamArray[x]->GetBitRate(); return retval; } Bool16 ReflectorSession::Equal(SourceInfo* inInfo) { return fSourceInfo->Equal(inInfo); } void* ReflectorSession::GetStreamCookie(UInt32 inStreamID) { for (UInt32 x = 0; x < fSourceInfo->GetNumStreams(); x++) { if (fSourceInfo->GetStreamInfo(x)->fTrackID == inStreamID) return fStreamArray[x]->GetStreamCookie(); } return NULL; }